فهرست منبع

Revoking a task must also release the semaphore. Closes #877. Big thanks to @hynek

Ask Solem 12 سال پیش
والد
کامیت
bf09961d12
3فایلهای تغییر یافته به همراه9 افزوده شده و 5 حذف شده
  1. 4 1
      celery/worker/__init__.py
  2. 5 1
      celery/worker/job.py
  3. 0 3
      celery/worker/mediator.py

+ 4 - 1
celery/worker/__init__.py

@@ -29,7 +29,7 @@ from celery import concurrency as _concurrency
 from celery import platforms
 from celery.app import app_or_default, set_default_app
 from celery.app.abstract import configurated, from_config
-from celery.exceptions import SystemTerminate
+from celery.exceptions import SystemTerminate, TaskRevokedError
 from celery.task import trace
 from celery.utils.functional import noop
 from celery.utils.imports import qualname, reload_from_cwd
@@ -369,6 +369,9 @@ class WorkController(configurated):
         """Process task by sending it to the pool of workers."""
         try:
             req.execute_using_pool(self.pool)
+        except TaskRevokedError:
+            if self.semaphore:  # (Issue #877)
+                self.semaphore.release()
         except Exception, exc:
             logger.critical('Internal error: %r\n%s',
                             exc, traceback.format_exc(), exc_info=True)

+ 5 - 1
celery/worker/job.py

@@ -23,6 +23,7 @@ from celery import exceptions
 from celery import signals
 from celery.app import app_or_default
 from celery.datastructures import ExceptionInfo
+from celery.exceptions import TaskRevokedError
 from celery.platforms import signals as _signals
 from celery.task.trace import (
     trace_task,
@@ -170,10 +171,13 @@ class Request(object):
 
         :param pool: A :class:`celery.concurrency.base.TaskPool` instance.
 
+        :raises celery.exceptions.TaskRevokedError: if the task was revoked
+            and ignored.
+
         """
         task = self.task
         if self.revoked():
-            return
+            raise TaskRevokedError(self.id)
 
         hostname = self.hostname
         kwargs = self.kwargs

+ 0 - 3
celery/worker/mediator.py

@@ -66,9 +66,6 @@ class Mediator(bgThread):
         except Empty:
             return
 
-        if task.revoked():
-            return
-
         if self._does_debug:
             logger.debug('Mediator: Running callback for task: %s[%s]',
                          task.name, task.id)