Просмотр исходного кода

Added 'propagate=True' argument TaskSetResult.join: Disable to not re-raise exceptions occuring in subtasks

Ask Solem 14 лет назад
Родитель
Сommit
9dadb1c659
1 измененных файлов с 26 добавлено и 16 удалено
  1. 26 16
      celery/result.py

+ 26 - 16
celery/result.py

@@ -121,6 +121,11 @@ class BaseAsyncResult(object):
 
     @property
     def status(self):
+        """Deprecated alias of :attr:`state`."""
+        return self.state
+
+    @property
+    def state(self):
         """The current status of the task.
 
         Can be one of the following:
@@ -301,22 +306,21 @@ class TaskSetResult(object):
                 elif result.status in states.PROPAGATE_STATES:
                     raise result.result
 
-    def join(self, timeout=None):
-        """Gather the results for all of the tasks in the taskset,
-        and return a list with them ordered by the order of which they
-        were called.
+    def join(self, timeout=None, propagate=True):
+        """Gather the results of all tasks in the taskset,
+        and returns a list ordered by the order of the set.
+
+        :keyword timeout: The number of seconds to wait for results
+            before the operation times out.
 
-        :keyword timeout: The time in seconds, how long
-            it will wait for results, before the operation times out.
+        :keyword propagate: If any of the subtasks raises an exception, the
+            exception will be reraised.
 
         :raises celery.exceptions.TimeoutError: if ``timeout`` is not
             :const:`None` and the operation takes longer than ``timeout``
             seconds.
 
-        If any of the tasks raises an exception, the exception
-        will be reraised by :meth:`join`.
-
-        :returns: list of return values for all tasks in the taskset.
+        :returns: list of return values for all subtasks in order.
 
         """
 
@@ -329,17 +333,18 @@ class TaskSetResult(object):
 
         while True:
             for position, pending_result in enumerate(self.subtasks):
-                if pending_result.status == states.SUCCESS:
+                state = pending_result.state
+                if state in states.READY_STATES:
+                    if propagate and state in states.PROPAGATE_STATES:
+                        raise pending_result.result
                     results[position] = pending_result.result
-                elif pending_result.status in states.PROPAGATE_STATES:
-                    raise pending_result.result
             if results.full():
                 # Make list copy, so the returned type is not a position
                 # queue.
                 return list(results)
             else:
-                if timeout is not None and \
-                        time.time() >= time_start + timeout:
+                if (timeout is not None and
+                        time.time() >= time_start + timeout):
                     on_timeout()
 
     def save(self, backend=None):
@@ -403,9 +408,14 @@ class EagerResult(BaseAsyncResult):
 
     @property
     def status(self):
-        """The tasks status"""
+        """The tasks status (alias to :attr:`state`)."""
         return self._status
 
+    @property
+    def state(self):
+        """The tasks state."""
+        return self._state
+
     @property
     def traceback(self):
         """The traceback if the task failed."""