Преглед на файлове

Adds callback argument to ResultSet.get/join_native

Ask Solem преди 11 години
родител
ревизия
0ebd2865ca
променени са 1 файла, в които са добавени 26 реда и са изтрити 12 реда
  1. 26 12
      celery/result.py

+ 26 - 12
celery/result.py

@@ -467,7 +467,7 @@ class ResultSet(ResultBase):
             if timeout and elapsed >= timeout:
                 raise TimeoutError('The operation timed out')
 
-    def get(self, timeout=None, propagate=True, interval=0.5):
+    def get(self, timeout=None, propagate=True, interval=0.5, callback=None):
         """See :meth:`join`
 
         This is here for API compatibility with :class:`AsyncResult`,
@@ -476,9 +476,10 @@ class ResultSet(ResultBase):
 
         """
         return (self.join_native if self.supports_native_join else self.join)(
-            timeout=timeout, propagate=propagate, interval=interval)
+            timeout=timeout, propagate=propagate,
+            interval=interval, callback=callback)
 
-    def join(self, timeout=None, propagate=True, interval=0.5):
+    def join(self, timeout=None, propagate=True, interval=0.5, callback=None):
         """Gathers the results of all tasks as a list in order.
 
         .. note::
@@ -505,6 +506,11 @@ class ResultSet(ResultBase):
                            does not have any effect when using the amqp
                            result store backend, as it does not use polling.
 
+        :keyword callback: Optional callback to be called for every result
+                           received.  Must have signature ``(task_id, value)``
+                           No results will be returned by this function if
+                           a callback is specified.
+
         :raises celery.exceptions.TimeoutError: if `timeout` is not
             :const:`None` and the operation takes longer than `timeout`
             seconds.
@@ -520,9 +526,13 @@ class ResultSet(ResultBase):
                 remaining = timeout - (monotonic() - time_start)
                 if remaining <= 0.0:
                     raise TimeoutError('join operation timed out')
-            results.append(result.get(timeout=remaining,
-                                      propagate=propagate,
-                                      interval=interval))
+            value = result.get(timeout=remaining,
+                               propagate=propagate,
+                               interval=interval)
+            if callback:
+                callback(result.id, value)
+            else:
+                results.append(value)
         return results
 
     def iter_native(self, timeout=None, interval=0.5):
@@ -543,7 +553,8 @@ class ResultSet(ResultBase):
         ids = [result.id for result in self.results]
         return backend.get_many(ids, timeout=timeout, interval=interval)
 
-    def join_native(self, timeout=None, propagate=True, interval=0.5):
+    def join_native(self, timeout=None, propagate=True,
+                    interval=0.5, callback=None):
         """Backend optimized version of :meth:`join`.
 
         .. versionadded:: 2.2
@@ -556,12 +567,15 @@ class ResultSet(ResultBase):
 
         """
         results = self.results
-        acc = [None for _ in range(len(self))]
-        for task_id, meta in self.iter_native(timeout=timeout,
-                                              interval=interval):
+        acc = None if callback else [None for _ in range(len(self))]
+        for task_id, meta in self.iter_native(timeout, interval):
+            value = meta['result']
             if propagate and meta['status'] in states.PROPAGATE_STATES:
-                raise meta['result']
-            acc[results.index(task_id)] = meta['result']
+                raise value
+            if callback:
+                callback(task_id, value)
+            else:
+                acc[results.index(task_id)] = value
         return acc
 
     def _failed_join_report(self):