Browse Source

Fixes terminate of revoked task and now updates state+event for terminated tasks

Closes #1007
Ask Solem 12 years ago
parent
commit
21295ee732
4 changed files with 26 additions and 13 deletions
  1. 11 3
      Changelog
  2. 1 1
      celery/exceptions.py
  3. 6 3
      celery/worker/control.py
  4. 8 6
      celery/worker/job.py

+ 11 - 3
Changelog

@@ -53,12 +53,20 @@ If you're looking for versions prior to 3.0.x you should go to :ref:`history`.
 
 - Now depends on billiard 2.7.3.16
 
-- :program:`celery worker` and :program:`celery beat` commands now respects
-  the :option:`--no-color` option (Issue #999).
-
 - Fixes request stack protection when app is initialized more than
   once (Issue #1003).
 
+- Terminating a task now works if the task has been sent to the
+  pool but not yet acknowledged by a pool process (Issue #1007).
+
+    Fix contributed by Alexey Zatelepin
+
+- Terminating a task now properly updates the state of the task to revoked,
+  and sends a ``task-revoked`` event.
+
+- :program:`celery worker` and :program:`celery beat` commands now respects
+  the :option:`--no-color` option (Issue #999).
+
 - Fixed typos in eventlet examples (Issue #1000)
 
     Fix contributed by Bryan Bishop.

+ 1 - 1
celery/exceptions.py

@@ -9,7 +9,7 @@
 from __future__ import absolute_import
 
 from billiard.exceptions import (  # noqa
-    SoftTimeLimitExceeded, TimeLimitExceeded, WorkerLostError,
+    SoftTimeLimitExceeded, TimeLimitExceeded, WorkerLostError, Terminated,
 )
 
 UNREGISTERED_FMT = """\

+ 6 - 3
celery/worker/control.py

@@ -40,13 +40,16 @@ def revoke(panel, task_id, terminate=False, signal=None, **kwargs):
     action = 'revoked'
     if terminate:
         signum = _signals.signum(signal or 'TERM')
-        for request in state.active_requests:
+        for request in state.reserved_requests:
             if request.id == task_id:
-                action = 'terminated (%s)' % (signum, )
+                logger.info('Terminating %s (%s)', task_id, signum)
                 request.terminate(panel.consumer.pool, signal=signum)
                 break
+        else:
+            return {'ok': 'terminate: task %s not found' % (task_id, )}
+        return {'ok': 'terminating %s (%s)' % (task_id, signal)}
 
-    logger.info('Task %s %s.', task_id, action)
+    logger.info('Revoking task %s', task_id)
     return {'ok': 'task %s %s' % (task_id, action)}
 
 

+ 8 - 6
celery/worker/job.py

@@ -352,19 +352,21 @@ class Request(object):
         task_ready(self)
 
         if not exc_info.internal:
+            exc = exc_info.exception
 
-            if isinstance(exc_info.exception, exceptions.RetryTaskError):
+            if isinstance(exc, exceptions.RetryTaskError):
                 return self.on_retry(exc_info)
 
-            # This is a special case as the process would not have had
+            # These are special cases where the process would not have had
             # time to write the result.
-            if isinstance(exc_info.exception, exceptions.WorkerLostError) and \
-                    self.store_errors:
-                self.task.backend.mark_as_failure(self.id, exc_info.exception)
+            if self.store_errors:
+                if isinstance(exc, exceptions.WorkerLostError):
+                    self.task.backend.mark_as_failure(self.id, exc_info.exception)
+                elif isinstance(exc, exceptions.Terminated):
+                    self._announce_revoked('terminated', True, str(exc), False)
             # (acks_late) acknowledge after result stored.
             if self.task.acks_late:
                 self.acknowledge()
-
         self._log_error(exc_info)
 
     def _log_error(self, einfo):