Ver código fonte

Acknowledgement now happens in the pool callback (can't do in the job target,
as it's not pickleable (can't send amqp connection etc)).

Ask Solem 15 anos atrás
pai
commit
14b8302256
2 arquivos alterados com 19 adições e 13 exclusões
  1. 9 3
      celery/pool.py
  2. 10 10
      celery/worker/job.py

+ 9 - 3
celery/pool.py

@@ -50,7 +50,7 @@ class TaskPool(object):
         self._pool = None
         self._pool = None
 
 
     def apply_async(self, target, args=None, kwargs=None, callbacks=None,
     def apply_async(self, target, args=None, kwargs=None, callbacks=None,
-            errbacks=None, meta=None):
+            errbacks=None, on_ack=None, meta=None):
         """Equivalent of the :func:``apply`` built-in function.
         """Equivalent of the :func:``apply`` built-in function.
 
 
         All ``callbacks`` and ``errbacks`` should complete immediately since
         All ``callbacks`` and ``errbacks`` should complete immediately since
@@ -64,7 +64,8 @@ class TaskPool(object):
         meta = meta or {}
         meta = meta or {}
         tid = gen_unique_id()
         tid = gen_unique_id()
 
 
-        on_return = curry(self.on_return, tid, callbacks, errbacks, meta)
+        on_return = curry(self.on_return, tid, callbacks, errbacks,
+                          on_ack, meta)
 
 
         result = self._pool.apply_async(target, args, kwargs,
         result = self._pool.apply_async(target, args, kwargs,
                                         callback=on_return)
                                         callback=on_return)
@@ -73,8 +74,13 @@ class TaskPool(object):
 
 
         return result
         return result
 
 
-    def on_return(self, tid, callbacks, errbacks, meta, ret_value):
+    def on_return(self, tid, callbacks, errbacks, on_ack, meta, ret_value):
         """What to do when the process returns."""
         """What to do when the process returns."""
+
+        # Acknowledge the task as being processed.
+        if on_ack:
+            on_ack()
+
         try:
         try:
             del(self._processes[tid])
             del(self._processes[tid])
         except KeyError:
         except KeyError:

+ 10 - 10
celery/worker/job.py

@@ -32,7 +32,7 @@ celeryd at %%(hostname)s.
 """ % {"EMAIL_SIGNATURE_SEP": EMAIL_SIGNATURE_SEP}
 """ % {"EMAIL_SIGNATURE_SEP": EMAIL_SIGNATURE_SEP}
 
 
 
 
-def jail(task_id, task_name, func, args, kwargs, on_acknowledge=None):
+def jail(task_id, task_name, func, args, kwargs):
     """Wraps the task in a jail, which catches all exceptions, and
     """Wraps the task in a jail, which catches all exceptions, and
     saves the status and result of the task execution to the task
     saves the status and result of the task execution to the task
     meta backend.
     meta backend.
@@ -80,10 +80,6 @@ def jail(task_id, task_name, func, args, kwargs, on_acknowledge=None):
     # Backend process cleanup
     # Backend process cleanup
     default_backend.process_cleanup()
     default_backend.process_cleanup()
 
 
-    # Handle task acknowledgement.
-    if on_acknowledge:
-        on_acknowledge()
-
     try:
     try:
         result = func(*args, **kwargs)
         result = func(*args, **kwargs)
     except (SystemExit, KeyboardInterrupt):
     except (SystemExit, KeyboardInterrupt):
@@ -149,14 +145,14 @@ class TaskWrapper(object):
     fail_email_body = TASK_FAIL_EMAIL_BODY
     fail_email_body = TASK_FAIL_EMAIL_BODY
 
 
     def __init__(self, task_name, task_id, task_func, args, kwargs,
     def __init__(self, task_name, task_id, task_func, args, kwargs,
-            on_acknowledge=None, **opts):
+            on_ack=None, **opts):
         self.task_name = task_name
         self.task_name = task_name
         self.task_id = task_id
         self.task_id = task_id
         self.task_func = task_func
         self.task_func = task_func
         self.args = args
         self.args = args
         self.kwargs = kwargs
         self.kwargs = kwargs
         self.logger = kwargs.get("logger")
         self.logger = kwargs.get("logger")
-        self.on_acknowledge = on_acknowledge
+        self.on_ack = on_ack
         for opt in ("success_msg", "fail_msg", "fail_email_subject",
         for opt in ("success_msg", "fail_msg", "fail_email_subject",
                 "fail_email_body"):
                 "fail_email_body"):
             setattr(self, opt, opts.get(opt, getattr(self, opt, None)))
             setattr(self, opt, opts.get(opt, getattr(self, opt, None)))
@@ -193,7 +189,7 @@ class TaskWrapper(object):
             raise NotRegistered(task_name)
             raise NotRegistered(task_name)
         task_func = tasks[task_name]
         task_func = tasks[task_name]
         return cls(task_name, task_id, task_func, args, kwargs,
         return cls(task_name, task_id, task_func, args, kwargs,
-                    on_acknowledge=message.ack, logger=logger)
+                    on_ack=message.ack, logger=logger)
 
 
     def extend_with_default_kwargs(self, loglevel, logfile):
     def extend_with_default_kwargs(self, loglevel, logfile):
         """Extend the tasks keyword arguments with standard task arguments.
         """Extend the tasks keyword arguments with standard task arguments.
@@ -218,8 +214,11 @@ class TaskWrapper(object):
 
 
         """
         """
         task_func_kwargs = self.extend_with_default_kwargs(loglevel, logfile)
         task_func_kwargs = self.extend_with_default_kwargs(loglevel, logfile)
+        # acknowledge task as being processed.
+        if self.on_ack:
+            self.on_ack()
         return jail(self.task_id, self.task_name, self.task_func,
         return jail(self.task_id, self.task_name, self.task_func,
-                    self.args, task_func_kwargs, self.on_acknowledge)
+                    self.args, task_func_kwargs)
 
 
     def on_success(self, ret_value, meta):
     def on_success(self, ret_value, meta):
         """The handler used if the task was successfully processed (
         """The handler used if the task was successfully processed (
@@ -271,7 +270,8 @@ class TaskWrapper(object):
         """
         """
         task_func_kwargs = self.extend_with_default_kwargs(loglevel, logfile)
         task_func_kwargs = self.extend_with_default_kwargs(loglevel, logfile)
         jail_args = [self.task_id, self.task_name, self.task_func,
         jail_args = [self.task_id, self.task_name, self.task_func,
-                     self.args, task_func_kwargs, self.on_acknowledge]
+                     self.args, task_func_kwargs]
         return pool.apply_async(jail, args=jail_args,
         return pool.apply_async(jail, args=jail_args,
                 callbacks=[self.on_success], errbacks=[self.on_failure],
                 callbacks=[self.on_success], errbacks=[self.on_failure],
+                on_ack=self.on_ack,
                 meta={"task_id": self.task_id, "task_name": self.task_name})
                 meta={"task_id": self.task_id, "task_name": self.task_name})