Преглед на файлове

Wait with message acknowledgement until the task has actually been executed.
Closes #14.

Ask Solem преди 16 години
родител
ревизия
ef3f82bcf3
променени са 2 файла, в които са добавени 16 реда и са изтрити 6 реда
  1. 6 4
      celery/pool.py
  2. 10 2
      celery/worker.py

+ 6 - 4
celery/pool.py

@@ -86,7 +86,7 @@ class TaskPool(object):
         return self._pool._state == POOL_STATE_RUN
 
     def apply_async(self, target, args=None, kwargs=None, callbacks=None,
-            errbacks=None, meta=None):
+            errbacks=None, on_acknowledge=None, meta=None):
         """Equivalent of the :func:``apply`` built-in function.
 
         All ``callbacks`` and ``errbacks`` should complete immediately since
@@ -107,9 +107,13 @@ class TaskPool(object):
 
         on_return = lambda r: self.on_return(r, tid, callbacks, errbacks, meta)
 
+
+        if self.full():
+            self.wait_for_result()
         result = self._pool.apply_async(target, args, kwargs,
                                            callback=on_return)
-
+        if on_acknowledge:
+            on_acknowledge()
         self.add(result, callbacks, errbacks, tid, meta)
 
         return result
@@ -147,8 +151,6 @@ class TaskPool(object):
 
         self._processes[tid] = [result, callbacks, errbacks, meta]
 
-        if self.full():
-            self.wait_for_result()
 
     def full(self):
         """Is the pool full?

+ 10 - 2
celery/worker.py

@@ -145,13 +145,15 @@ class TaskWrapper(object):
     """
     fail_email_body = TASK_FAIL_EMAIL_BODY
 
-    def __init__(self, task_name, task_id, task_func, args, kwargs, **opts):
+    def __init__(self, task_name, task_id, task_func, args, kwargs,
+            on_acknowledge=None, **opts):
         self.task_name = task_name
         self.task_id = task_id
         self.task_func = task_func
         self.args = args
         self.kwargs = kwargs
         self.logger = kwargs.get("logger")
+        self.on_acknowledge = on_acknowledge
         for opt in ("success_msg", "fail_msg", "fail_email_subject",
                 "fail_email_body"):
             setattr(self, opt, opts.get(opt, getattr(self, opt, None)))
@@ -175,6 +177,8 @@ class TaskWrapper(object):
         :returns: :class:`TaskWrapper` instance.
 
         """
+        # TODO This doesn't really make sense anymore, since the message
+        # is now part of the TaskWrapper instance itself. Needs refactoring.
         task_name = message_data["task"]
         task_id = message_data["id"]
         args = message_data["args"]
@@ -187,7 +191,8 @@ class TaskWrapper(object):
         if task_name not in tasks:
             raise UnknownTask(task_name)
         task_func = tasks[task_name]
-        return cls(task_name, task_id, task_func, args, kwargs, logger=logger)
+        return cls(task_name, task_id, task_func, args, kwargs,
+                    on_acknowledge=message.ack, logger=logger)
 
     def extend_with_default_kwargs(self, loglevel, logfile):
         """Extend the tasks keyword arguments with standard task arguments.
@@ -212,6 +217,8 @@ class TaskWrapper(object):
 
         """
         task_func_kwargs = self.extend_with_default_kwargs(loglevel, logfile)
+        if self.on_acknowledge:
+            self.on_acknowledge()
         return jail(self.task_id, [
                         self.task_func, self.args, task_func_kwargs])
 
@@ -261,6 +268,7 @@ class TaskWrapper(object):
                      self.args, task_func_kwargs]
         return pool.apply_async(jail, args=jail_args,
                 callbacks=[self.on_success], errbacks=[self.on_failure],
+                on_acknowledge=self.on_acknowledge,
                 meta={"task_id": self.task_id, "task_name": self.task_name})