Browse Source

Use billiard>=0.3.0's support for accept callbacks for acks.

Ask Solem 15 years ago
parent
commit
434dc8bd32
3 changed files with 9 additions and 5 deletions
  1. 7 3
      celery/worker/job.py
  2. 1 1
      contrib/requirements/default.txt
  3. 1 1
      setup.py

+ 7 - 3
celery/worker/job.py

@@ -313,15 +313,19 @@ class TaskWrapper(object):
         # Make sure task has not already been executed.
         self._set_executed_bit()
 
-        self.send_event("task-accepted", uuid=self.task_id)
-
         args = self._get_tracer_args(loglevel, logfile)
         self.time_start = time.time()
         result = pool.apply_async(execute_and_trace, args=args,
+                    accept_callback=self.on_accepted,
                     callbacks=[self.on_success], errbacks=[self.on_failure])
-        self.on_ack()
         return result
 
+    def on_accepted(self):
+        self.on_ack()
+        self.send_event("task-accepted", uuid=self.task_id)
+        self.logger.debug("Task accepted: %s[%s]" % (
+            self.task_name, self.task_id))
+
     def on_success(self, ret_value):
         """The handler used if the task was successfully processed (
         without raising an exception)."""

+ 1 - 1
contrib/requirements/default.txt

@@ -3,4 +3,4 @@ python-dateutil
 anyjson
 carrot>=0.10.3
 django-picklefield
-billiard>=0.2.1
+billiard>=0.3.0

+ 1 - 1
setup.py

@@ -65,7 +65,7 @@ install_requires.extend([
     "anyjson",
     "carrot>=0.10.3",
     "django-picklefield",
-    "billiard>=0.2.1"])
+    "billiard>=0.3.0"])
 
 py_version = sys.version_info
 if sys.version_info <= (2, 5):