Browse Source

Make sure acknowledgement happens at start of task execution. (After the wait
for pool was removed in 4.0.2, the message was acknowledged before it was
actually executed (that is, while still waiting for an available slot in the
pool)

Ask Solem 15 năm trước cách đây
mục cha
commit
759716aae4
2 tập tin đã thay đổi với 9 bổ sung13 xóa
  1. 2 7
      celery/pool.py
  2. 7 6
      celery/worker/job.py

+ 2 - 7
celery/pool.py

@@ -4,11 +4,8 @@ Process Pools.
 
 
 """
 """
 import multiprocessing
 import multiprocessing
-import itertools
-import threading
 
 
-from multiprocessing.pool import Pool, worker
-from multiprocessing.pool import RUN as POOL_STATE_RUN
+from multiprocessing.pool import Pool
 from celery.datastructures import ExceptionInfo
 from celery.datastructures import ExceptionInfo
 from celery.utils import gen_unique_id
 from celery.utils import gen_unique_id
 from functools import partial as curry
 from functools import partial as curry
@@ -53,7 +50,7 @@ class TaskPool(object):
         self._pool = None
         self._pool = None
 
 
     def apply_async(self, target, args=None, kwargs=None, callbacks=None,
     def apply_async(self, target, args=None, kwargs=None, callbacks=None,
-            errbacks=None, on_acknowledge=None, meta=None):
+            errbacks=None, meta=None):
         """Equivalent of the :func:``apply`` built-in function.
         """Equivalent of the :func:``apply`` built-in function.
 
 
         All ``callbacks`` and ``errbacks`` should complete immediately since
         All ``callbacks`` and ``errbacks`` should complete immediately since
@@ -71,8 +68,6 @@ class TaskPool(object):
 
 
         result = self._pool.apply_async(target, args, kwargs,
         result = self._pool.apply_async(target, args, kwargs,
                                         callback=on_return)
                                         callback=on_return)
-        if on_acknowledge:
-            on_acknowledge()
 
 
         self._processes[tid] = [result, callbacks, errbacks, meta]
         self._processes[tid] = [result, callbacks, errbacks, meta]
 
 

+ 7 - 6
celery/worker/job.py

@@ -32,7 +32,7 @@ celeryd at %%(hostname)s.
 """ % {"EMAIL_SIGNATURE_SEP": EMAIL_SIGNATURE_SEP}
 """ % {"EMAIL_SIGNATURE_SEP": EMAIL_SIGNATURE_SEP}
 
 
 
 
-def jail(task_id, task_name, func, args, kwargs):
+def jail(task_id, task_name, func, args, kwargs, on_acknowledge=None):
     """Wraps the task in a jail, which catches all exceptions, and
     """Wraps the task in a jail, which catches all exceptions, and
     saves the status and result of the task execution to the task
     saves the status and result of the task execution to the task
     meta backend.
     meta backend.
@@ -80,6 +80,10 @@ def jail(task_id, task_name, func, args, kwargs):
     # Backend process cleanup
     # Backend process cleanup
     default_backend.process_cleanup()
     default_backend.process_cleanup()
 
 
+    # Handle task acknowledgement.
+    if on_acknowledge:
+        on_acknowledge()
+
     try:
     try:
         result = func(*args, **kwargs)
         result = func(*args, **kwargs)
     except (SystemExit, KeyboardInterrupt):
     except (SystemExit, KeyboardInterrupt):
@@ -214,10 +218,8 @@ class TaskWrapper(object):
 
 
         """
         """
         task_func_kwargs = self.extend_with_default_kwargs(loglevel, logfile)
         task_func_kwargs = self.extend_with_default_kwargs(loglevel, logfile)
-        if self.on_acknowledge:
-            self.on_acknowledge()
         return jail(self.task_id, self.task_name, self.task_func,
         return jail(self.task_id, self.task_name, self.task_func,
-                    self.args, task_func_kwargs)
+                    self.args, task_func_kwargs, self.on_acknowledge)
 
 
     def on_success(self, ret_value, meta):
     def on_success(self, ret_value, meta):
         """The handler used if the task was successfully processed (
         """The handler used if the task was successfully processed (
@@ -269,8 +271,7 @@ class TaskWrapper(object):
         """
         """
         task_func_kwargs = self.extend_with_default_kwargs(loglevel, logfile)
         task_func_kwargs = self.extend_with_default_kwargs(loglevel, logfile)
         jail_args = [self.task_id, self.task_name, self.task_func,
         jail_args = [self.task_id, self.task_name, self.task_func,
-                     self.args, task_func_kwargs]
+                     self.args, task_func_kwargs, self.on_acknowledge]
         return pool.apply_async(jail, args=jail_args,
         return pool.apply_async(jail, args=jail_args,
                 callbacks=[self.on_success], errbacks=[self.on_failure],
                 callbacks=[self.on_success], errbacks=[self.on_failure],
-                on_acknowledge=self.on_acknowledge,
                 meta={"task_id": self.task_id, "task_name": self.task_name})
                 meta={"task_id": self.task_id, "task_name": self.task_name})