Просмотр исходного кода

Aggregate needs a process method processing all the data gathered at once.

Ask Solem 15 лет назад
Родитель
Сommit
7ff85f12e2
1 измененных файлов с 27 добавлено и 13 удалено
  1. 27 13
      celery/contrib/coroutine.py

+ 27 - 13
celery/contrib/coroutine.py

@@ -1,5 +1,7 @@
+import time
+from collections import deque
+
 from celery.task.base import Task
-from celery.task.builtins import PingTask
 
 
 class CoroutineTask(Task):
@@ -29,25 +31,37 @@ class Aggregate(CoroutineTask):
     abstract = True
     proxied = None
     minlen = 100
+    time_max = 60
+    _time_since = None
 
     def body(self):
-        waiting = []
+        waiting = deque()
 
+        timesince = time.time()
         while True:
             argtuple = (yield)
             waiting.append(argtuple)
-            if len(waiting) >= self.minlen:
-                res = []
-                for task_args, task_kwargs in waiting:
-                    try:
-                        res.append(self.proxied(*args, **kwargs))
-                    except Exception, exc:
-                        pass # TODO handle errors here, please
-                yield res
+            if self._expired() or len(waiting) >= self.minlen:
+                yield self.process(waiting)
+                waiting.clear()
             else:
                 yield None
 
+    def process(self, jobs):
+        """Jobs is a deque with the arguments gathered so far.
+
+        Arguments is a args, kwargs tuple.
+
+        """
+        raise NotImplementedError(
+                "Subclasses of Aggregate needs to implement process()")
+
+    def _expired(self):
+        if not self._time_since:
+            self._time_since = time.time()
+            return False
 
-class AggregatePing(Aggregate):
-    proxied = PingTask
-    n = 100
+        if time.time() + self.time_max > self._time_since:
+            self._time_since = time.time()
+            return True
+        return False