Browse Source

Task.acks_late / CELERY_ACKS_LATE: Modify default ack behavior.

Late ack means the task messages will be acknowledged **after** the task
has been executed, not *just before*, which is the default behavior.

Note that this means the tasks may be executed twice if the worker
crashes in the middle of execution, which may be acceptable for some
applications.
Ask Solem 15 years ago
parent
commit
03bd2ba802
3 changed files with 38 additions and 5 deletions
  1. 2 0
      celery/conf.py
  2. 16 2
      celery/task/base.py
  3. 20 3
      celery/worker/job.py

+ 2 - 0
celery/conf.py

@@ -36,6 +36,7 @@ _DEFAULTS = {
     "CELERY_BROKER_CONNECTION_TIMEOUT": 4,
     "CELERY_BROKER_CONNECTION_RETRY": True,
     "CELERY_BROKER_CONNECTION_MAX_RETRIES": 100,
+    "CELERY_ACKS_LATE": False,
     "CELERYD_POOL": "celery.worker.pool.TaskPool",
     "CELERYD_MEDIATOR": "celery.worker.controllers.Mediator",
     "CELERYD_ETA_SCHEDULER": "celery.worker.controllers.ScheduleController",
@@ -93,6 +94,7 @@ TASK_SERIALIZER = _get("CELERY_TASK_SERIALIZER")
 TASK_RESULT_EXPIRES = _get("CELERY_TASK_RESULT_EXPIRES")
 IGNORE_RESULT = _get("CELERY_IGNORE_RESULT")
 TRACK_STARTED = _get("CELERY_TRACK_STARTED")
+ACKS_LATE = _get("CELERY_ACKS_LATE")
 # Make sure TASK_RESULT_EXPIRES is a timedelta.
 if isinstance(TASK_RESULT_EXPIRES, int):
     TASK_RESULT_EXPIRES = timedelta(seconds=TASK_RESULT_EXPIRES)

+ 16 - 2
celery/task/base.py

@@ -64,6 +64,9 @@ class Task(object):
     The :meth:`run` method can take use of the default keyword arguments,
     as listed in the :meth:`run` documentation.
 
+    The resulting class is callable, which if called will apply the
+    :meth:`run` method.
+
     .. attribute:: name
         Name of the task.
 
@@ -166,8 +169,18 @@ class Task(object):
         The global default can be overridden by the ``CELERY_TRACK_STARTED``
         setting.
 
-    The resulting class is callable, which if called will apply the
-    :meth:`run` method.
+    .. attribute:: acks_late
+
+        If set to ``True`` messages for this task will be acknowledged
+        **after** the task has been executed, not *just before*, which is
+        the default behavior.
+
+        Note that this means the task may be executed twice if the worker
+        crashes in the middle of execution, which may be acceptable for some
+        applications.
+
+        The global default can be overriden by the ``CELERY_ACKS_LATE``
+        setting.
 
     """
     __metaclass__ = TaskType
@@ -192,6 +205,7 @@ class Task(object):
     exchange_type = conf.DEFAULT_EXCHANGE_TYPE
     delivery_mode = conf.DEFAULT_DELIVERY_MODE
     track_started = conf.TRACK_STARTED
+    acks_late = conf.ACKS_LATE
 
     MaxRetriesExceededError = MaxRetriesExceededError
 

+ 20 - 3
celery/worker/job.py

@@ -168,8 +168,12 @@ class TaskWrapper(object):
 
     .. attribute executed
 
-    Set if the task has been executed. A task should only be executed
-    once.
+        Set to ``True`` if the task has been executed.
+        A task should only be executed once.
+
+    .. attribute acknowledged
+
+        Set to ``True`` if the task has been acknowledged.
 
     """
     success_msg = "Task %(name)s[%(id)s] processed: %(return_value)s"
@@ -181,6 +185,7 @@ class TaskWrapper(object):
     """
     fail_email_body = TASK_FAIL_EMAIL_BODY
     executed = False
+    acknowledged = False
     time_start = None
 
     def __init__(self, task_name, task_id, args, kwargs,
@@ -321,15 +326,24 @@ class TaskWrapper(object):
         return result
 
     def on_accepted(self):
-        self.on_ack()
+        if not self.task.acks_late:
+            self.acknowledge()
         self.send_event("task-accepted", uuid=self.task_id)
         self.logger.debug("Task accepted: %s[%s]" % (
             self.task_name, self.task_id))
 
+    def acknowledge(self):
+        if not self.acknowledged:
+            self.on_ack()
+            self.acknowledged = True
+
     def on_success(self, ret_value):
         """The handler used if the task was successfully processed (
         without raising an exception)."""
 
+        if self.task.acks_late:
+            self.acknowledge()
+
         runtime = time.time() - self.time_start
         self.send_event("task-succeeded", uuid=self.task_id,
                         result=ret_value, runtime=runtime)
@@ -343,6 +357,9 @@ class TaskWrapper(object):
     def on_failure(self, exc_info):
         """The handler used if the task raised an exception."""
 
+        if self.task.acks_late:
+            self.acknowledge()
+
         self.send_event("task-failed", uuid=self.task_id,
                                        exception=exc_info.exception,
                                        traceback=exc_info.traceback)