Browse Source

Terminate task now gives RevokedTaskError instead of WorkerLostError.

This requires billiard master branch.

In addition

- AsyncResult.revoke() now accepts terminate and signal arguments
- task-revoked event now sets 'terminated', 'signum' and 'expired' fields.
- TaskRevokedError argument is now the reason: 'revoked', 'expired' or
  'terminated'.
Ask Solem 13 years ago
parent
commit
a571aac0d9

+ 2 - 2
celery/backends/base.py

@@ -118,8 +118,8 @@ class BaseBackend(object):
         return self.store_result(task_id, exc, status=states.RETRY,
         return self.store_result(task_id, exc, status=states.RETRY,
                                  traceback=traceback)
                                  traceback=traceback)
 
 
-    def mark_as_revoked(self, task_id):
-        return self.store_result(task_id, TaskRevokedError(),
+    def mark_as_revoked(self, task_id, reason=''):
+        return self.store_result(task_id, TaskRevokedError(reason),
                                  status=states.REVOKED, traceback=None)
                                  status=states.REVOKED, traceback=None)
 
 
     def prepare_exception(self, exc):
     def prepare_exception(self, exc):

+ 0 - 2
celery/concurrency/processes/__init__.py

@@ -12,8 +12,6 @@
 from __future__ import absolute_import
 from __future__ import absolute_import
 
 
 import os
 import os
-import platform
-import signal as _signal
 
 
 from celery import platforms
 from celery import platforms
 from celery import signals
 from celery import signals

+ 8 - 2
celery/result.py

@@ -75,14 +75,20 @@ class AsyncResult(ResultBase):
         """Forget about (and possibly remove the result of) this task."""
         """Forget about (and possibly remove the result of) this task."""
         self.backend.forget(self.id)
         self.backend.forget(self.id)
 
 
-    def revoke(self, connection=None):
+    def revoke(self, connection=None, terminate=False, signal=None):
         """Send revoke signal to all workers.
         """Send revoke signal to all workers.
 
 
         Any worker receiving the task, or having reserved the
         Any worker receiving the task, or having reserved the
         task, *must* ignore it.
         task, *must* ignore it.
 
 
+        :keyword terminate: Also terminate the process currently working
+            on the task (if any).
+        :keyword signal: Name of signal to send to process if terminate.
+            Default is TERM.
+
         """
         """
-        self.app.control.revoke(self.id, connection=connection)
+        self.app.control.revoke(self.id, connection=connection,
+                                terminate=terminate, signal=signal)
 
 
     def get(self, timeout=None, propagate=True, interval=0.5):
     def get(self, timeout=None, propagate=True, interval=0.5):
         """Wait until task is ready, and return its result.
         """Wait until task is ready, and return its result.

+ 13 - 9
celery/worker/job.py

@@ -240,11 +240,20 @@ class Request(object):
         if self.time_start:
         if self.time_start:
             signal = _signals.signum(signal or 'TERM')
             signal = _signals.signum(signal or 'TERM')
             pool.terminate_job(self.worker_pid, signal)
             pool.terminate_job(self.worker_pid, signal)
-            send_revoked(self.task, signum=signal,
-                         terminated=True, expired=False)
+            self._announce_revoked('terminated', True, signal, False)
         else:
         else:
             self._terminate_on_ack = pool, signal
             self._terminate_on_ack = pool, signal
 
 
+    def _announce_revoked(self, reason, terminated, signum, expired):
+        self.send_event('task-revoked', uuid=self.id,
+                        terminated=terminated, signum=signum, expired=expired)
+        if self.store_errors:
+            self.task.backend.mark_as_revoked(self.id, reason)
+        self.acknowledge()
+        self._already_revoked = True
+        send_revoked(self.task, terminated=terminated,
+                     signum=signum, expired=expired)
+
     def revoked(self):
     def revoked(self):
         """If revoked, skip task and mark state."""
         """If revoked, skip task and mark state."""
         expired = False
         expired = False
@@ -254,13 +263,8 @@ class Request(object):
             expired = self.maybe_expire()
             expired = self.maybe_expire()
         if self.id in revoked_tasks:
         if self.id in revoked_tasks:
             warn('Skipping revoked task: %s[%s]', self.name, self.id)
             warn('Skipping revoked task: %s[%s]', self.name, self.id)
-            self.send_event('task-revoked', uuid=self.id)
-            if self.store_errors:
-                self.task.backend.mark_as_revoked(self.id)
-            self.acknowledge()
-            self._already_revoked = True
-            send_revoked(self.task, terminated=False,
-                         signum=None, expired=expired)
+            self._announce_revoked('expired' if expired else 'revoked',
+                False, None, expired)
             return True
             return True
         return False
         return False
 
 

+ 6 - 1
docs/userguide/monitoring.rst

@@ -642,11 +642,16 @@ Task Events
 
 
     Sent if the execution of the task failed.
     Sent if the execution of the task failed.
 
 
-* ``task-revoked(uuid)``
+* ``task-revoked(uuid, terminated, signum, expired)``
 
 
     Sent if the task has been revoked (Note that this is likely
     Sent if the task has been revoked (Note that this is likely
     to be sent by more than one worker).
     to be sent by more than one worker).
 
 
+    - ``terminated`` is set to true if the task process was terminated,
+      and the ``signum`` field set to the signal used.
+
+    - ``expired`` is set to true if the task expired.
+
 * ``task-retried(uuid, exception, traceback, hostname, timestamp)``
 * ``task-retried(uuid, exception, traceback, hostname, timestamp)``
 
 
     Sent if the task failed, but will be retried in the future.
     Sent if the task failed, but will be retried in the future.