فهرست منبع

Adds signal task_revoked

Ask Solem 12 سال پیش
والد
کامیت
b7fe953077
5فایلهای تغییر یافته به همراه32 افزوده شده و 5 حذف شده
  1. 1 0
      celery/signals.py
  2. 1 1
      celery/tests/worker/test_request.py
  3. 7 4
      celery/worker/job.py
  4. 18 0
      docs/userguide/signals.rst
  5. 5 0
      docs/whatsnew-2.6.rst

+ 1 - 0
celery/signals.py

@@ -23,6 +23,7 @@ task_postrun = Signal(providing_args=[
 task_success = Signal(providing_args=['result'])
 task_failure = Signal(providing_args=[
     'task_id', 'exception', 'args', 'kwargs', 'traceback', 'einfo'])
+task_revoked = Signal(providing_args=['terminated', 'signal'])
 celeryd_init = Signal(providing_args=['instance'])
 worker_init = Signal(providing_args=[])
 worker_process_init = Signal(providing_args=[])

+ 1 - 1
celery/tests/worker/test_request.py

@@ -300,7 +300,7 @@ class test_TaskRequest(Case):
         tw.time_start = None
         tw.terminate(pool, signal='KILL')
         self.assertFalse(pool.terminate_job.call_count)
-        self.assertTupleEqual(tw._terminate_on_ack, (True, pool, 'KILL'))
+        self.assertTupleEqual(tw._terminate_on_ack, (pool, 'KILL'))
         tw.terminate(pool, signal='KILL')
 
     def test_revoked_expires_expired(self):

+ 7 - 4
celery/worker/job.py

@@ -20,6 +20,7 @@ from kombu.utils import kwdict, reprcall
 from kombu.utils.encoding import safe_repr, safe_str
 
 from celery import exceptions
+from celery import signals
 from celery.app import app_or_default
 from celery.datastructures import ExceptionInfo
 from celery.task.trace import (
@@ -44,6 +45,7 @@ _does_info = logger.isEnabledFor(logging.INFO)
 tz_to_local = timezone.to_local
 tz_or_local = timezone.tz_or_local
 tz_utc = timezone.utc
+send_revoked = signals.task_revoked.send
 
 task_accepted = state.task_accepted
 task_ready = state.task_ready
@@ -229,9 +231,10 @@ class Request(object):
 
     def terminate(self, pool, signal=None):
         if self.time_start:
-            return pool.terminate_job(self.worker_pid, signal)
+            pool.terminate_job(self.worker_pid, signal)
+            send_revoked(self.task, terminated=True, signal=signal)
         else:
-            self._terminate_on_ack = (True, pool, signal)
+            self._terminate_on_ack = pool, signal
 
     def revoked(self):
         """If revoked, skip task and mark state."""
@@ -244,6 +247,7 @@ class Request(object):
             self.send_event('task-revoked', uuid=self.id)
             self.acknowledge()
             self._already_revoked = True
+            send_revoked(self.task, terminated=False)
             return True
         return False
 
@@ -262,8 +266,7 @@ class Request(object):
         if _does_debug:
             debug('Task accepted: %s[%s] pid:%r', self.name, self.id, pid)
         if self._terminate_on_ack is not None:
-            _, pool, signal = self._terminate_on_ack
-            self.terminate(pool, signal)
+            self.terminate(*self._terminate_on_ack)
 
     def on_timeout(self, soft, timeout):
         """Handler called if the task times out."""

+ 18 - 0
docs/userguide/signals.rst

@@ -177,6 +177,24 @@ Provides arguments:
 * einfo
     The :class:`celery.datastructures.ExceptionInfo` instance.
 
+.. signal:: task_revoked
+
+task_revoked
+~~~~~~~~~~~~
+
+Dispatched when a task is revoked/terminated by the worker.
+
+Sender is the task class revoked/terminated.
+
+Provides arguments:
+
+* terminated
+    :const:`True` if the task was terminated.
+
+* signal
+    Signal number used to terminate the task. If this is :const:`None` and
+    terminated is :const:`True` then :sig:`TERM` should be assumed.
+
 Worker Signals
 --------------
 

+ 5 - 0
docs/whatsnew-2.6.rst

@@ -570,8 +570,13 @@ In Other News
         $ celery inspect report
 
 - Module ``celery.log`` moved to :mod:`celery.app.log`.
+
 - Module ``celery.task.control`` moved to :mod:`celery.app.control`.
 
+- New signal: :signal:`task-revoked`
+
+    Sent in the main process when the task is revoked or terminated.
+
 - ``AsyncResult.task_id`` renamed to ``AsyncResult.id``
 
 - ``TasksetResult.taskset_id`` renamed to ``.id``