Browse Source

Revoked tasks now marked with state REVOKED, and result.get() will now raise TaskRevokedError.

Closes #103
Ask Solem 15 years ago
parent
commit
aa389721b1

+ 1 - 1
celery/backends/amqp.py

@@ -95,7 +95,7 @@ class AMQPBackend(BaseDictBackend):
 
         if meta["status"] == states.SUCCESS:
             return self.get_result(task_id)
-        elif meta["status"] == states.FAILURE:
+        elif meta["status"] in states.PROPAGATE_STATES:
             raise self.get_result(task_id)
 
     def _get_task_meta_for(self, task_id, timeout=None):

+ 6 - 2
celery/backends/base.py

@@ -7,7 +7,7 @@ from billiard.serialization import get_pickleable_exception
 
 from celery import conf
 from celery import states
-from celery.exceptions import TimeoutError
+from celery.exceptions import TimeoutError, TaskRevokedError
 from celery.datastructures import LocalCache
 
 
@@ -53,6 +53,10 @@ class BaseBackend(object):
         return self.store_result(task_id, exc, status=states.RETRY,
                                  traceback=traceback)
 
+    def mark_as_revoked(self, task_id):
+        return self.store_result(task_id, TaskRevokedError(),
+                                 status=states.REVOKED, traceback=None)
+
     def prepare_exception(self, exc):
         """Prepare exception for serialization."""
         return get_pickleable_exception(exc)
@@ -88,7 +92,7 @@ class BaseBackend(object):
             status = self.get_status(task_id)
             if status == states.SUCCESS:
                 return self.get_result(task_id)
-            elif status == states.FAILURE:
+            elif status in states.PROPAGATE_STATES:
                 raise self.get_result(task_id)
             # avoid hammering the CPU checking status.
             time.sleep(sleep_inbetween)

+ 4 - 0
celery/conf.py

@@ -29,6 +29,9 @@ _DEFAULTS = {
     "CELERY_STORE_ERRORS_EVEN_IF_IGNORED": False,
     "CELERY_TASK_SERIALIZER": "pickle",
     "CELERY_DISABLE_RATE_LIMITS": False,
+    "CELERYD_TASK_TIME_LIMIT": None,
+    "CELERYD_TASK_SOFT_TIME_LIMIT": None,
+    "CELERYD_MAX_TASKS_PER_CHILD": None,
     "CELERY_DEFAULT_ROUTING_KEY": "celery",
     "CELERY_DEFAULT_QUEUE": "celery",
     "CELERY_DEFAULT_EXCHANGE": "celery",
@@ -122,6 +125,7 @@ DEFAULT_RATE_LIMIT = _get("CELERY_DEFAULT_RATE_LIMIT")
 DISABLE_RATE_LIMITS = _get("CELERY_DISABLE_RATE_LIMITS")
 CELERYD_TASK_TIME_LIMIT = _get("CELERYD_TASK_TIME_LIMIT")
 CELERYD_TASK_SOFT_TIME_LIMIT = _get("CELERYD_TASK_SOFT_TIME_LIMIT")
+CELERYD_MAX_TASKS_PER_CHILD = _get("CELERYD_MAX_TASKS_PER_CHILD")
 STORE_ERRORS_EVEN_IF_IGNORED = _get("CELERY_STORE_ERRORS_EVEN_IF_IGNORED")
 CELERY_SEND_TASK_ERROR_EMAILS = _get("CELERY_SEND_TASK_ERROR_EMAILS",
                                      not settings.DEBUG,

+ 9 - 0
celery/exceptions.py

@@ -13,10 +13,12 @@ Task of kind %s is not registered, please make sure it's imported.
 class SoftTimeLimitExceeded(_SoftTimeLimitExceeded):
     """The soft time limit has been exceeded. This exception is raised
     to give the task a chance to clean up."""
+    pass
 
 
 class ImproperlyConfigured(Exception):
     """Celery is somehow improperly configured."""
+    pass
 
 
 class NotRegistered(KeyError):
@@ -34,10 +36,12 @@ class AlreadyRegistered(Exception):
 
 class TimeoutError(Exception):
     """The operation timed out."""
+    pass
 
 
 class MaxRetriesExceededError(Exception):
     """The tasks max restart limit has been exceeded."""
+    pass
 
 
 class RetryTaskError(Exception):
@@ -46,3 +50,8 @@ class RetryTaskError(Exception):
     def __init__(self, message, exc, *args, **kwargs):
         self.exc = exc
         Exception.__init__(self, message, exc, *args, **kwargs)
+
+
+class TaskRevokedError(Exception):
+    """The task has been revoked, so no result available."""
+    pass

+ 3 - 3
celery/result.py

@@ -283,7 +283,7 @@ class TaskSetResult(object):
                     except ValueError:
                         pass
                     yield result.result
-                elif result.status == states.FAILURE:
+                elif result.status in states.PROPAGATE_STATES:
                     raise result.result
 
     def join(self, timeout=None):
@@ -315,7 +315,7 @@ class TaskSetResult(object):
             for position, pending_result in enumerate(self.subtasks):
                 if pending_result.status == states.SUCCESS:
                     results[position] = pending_result.result
-                elif pending_result.status == states.FAILURE:
+                elif pending_result.status in states.PROPAGATE_STATES:
                     raise pending_result.result
             if results.full():
                 # Make list copy, so the returned type is not a position
@@ -370,7 +370,7 @@ class EagerResult(BaseAsyncResult):
         """Wait until the task has been executed and return its result."""
         if self.status == states.SUCCESS:
             return self.result
-        elif self.status == states.FAILURE:
+        elif self.status in states.PROPAGATE_STATES:
             raise self.result
 
     def revoke(self):

+ 9 - 3
celery/states.py

@@ -20,11 +20,16 @@
 
     Task is being retried.
 
+.. data:: REVOKED
+
+    Task has been revoked.
+
 """
 PENDING = "PENDING"
 STARTED = "STARTED"
 SUCCESS = "SUCCESS"
 FAILURE = "FAILURE"
+REVOKED = "REVOKED"
 RETRY = "RETRY"
 
 
@@ -46,8 +51,9 @@ RETRY = "RETRY"
     Set of all possible states.
 
 """
-READY_STATES = frozenset([SUCCESS, FAILURE])
+READY_STATES = frozenset([SUCCESS, FAILURE, REVOKED])
 UNREADY_STATES = frozenset([PENDING, STARTED, RETRY])
-EXCEPTION_STATES = frozenset([RETRY, FAILURE])
+EXCEPTION_STATES = frozenset([RETRY, FAILURE, REVOKED])
+PROPAGATE_STATES = frozenset([FAILURE, REVOKED])
 
-ALL_STATES = frozenset([PENDING, STARTED, SUCCESS, FAILURE, RETRY])
+ALL_STATES = frozenset([PENDING, STARTED, SUCCESS, FAILURE, RETRY, REVOKED])

+ 9 - 1
celery/worker/control/builtins.py

@@ -4,14 +4,22 @@ from celery import conf
 from celery.registry import tasks
 from celery.worker.revoke import revoked
 from celery.worker.control.registry import Panel
+from celery.backends import default_backend
 
 TASK_INFO_FIELDS = ("exchange", "routing_key", "rate_limit")
 
 
 @Panel.register
-def revoke(panel, task_id, **kwargs):
+def revoke(panel, task_id, task_name=None, **kwargs):
     """Revoke task by task id."""
     revoked.add(task_id)
+    backend = default_backend
+    if task_name: # Use custom task backend (if any)
+        try:
+            backend = tasks[task_name].backend
+        except KeyError:
+            pass
+    backend.mark_as_revoked(task_id)
     panel.logger.warn("Task %s revoked" % (task_id, ))
     return True