Browse Source

Fixes bug with revoke list only terminating one task. Closes #1592

Ask Solem 11 years ago
parent
commit
ce9436eedd
2 changed files with 12 additions and 15 deletions
  1. 1 1
      celery/datastructures.py
  2. 11 14
      celery/worker/control.py

+ 1 - 1
celery/datastructures.py

@@ -619,7 +619,7 @@ class LimitedSet(object):
             i += 1
 
     def update(self, other, heappush=heappush):
-        if isinstance(other, self.__class__):
+        if isinstance(other, LimitedSet):
             self._data.update(other._data)
             self._heap.extend(other._heap)
             heapify(self._heap)

+ 11 - 14
celery/worker/control.py

@@ -71,29 +71,26 @@ def query_task(state, ids, **kwargs):
 def revoke(state, task_id, terminate=False, signal=None, **kwargs):
     """Revoke task by task id."""
     # supports list argument since 3.1
-    task_ids, task_id = maybe_list(task_id) or [], None
-    to_terminate = set()
+    task_ids, task_id = set(maybe_list(task_id) or []), None
+    size = len(task_ids)
     terminated = set()
-    for task_id in task_ids:
-        revoked.add(task_id)
-        if terminate:
-            to_terminate.add(task_id)
 
-    if to_terminate:
+    revoked.update(task_ids)
+    if terminate:
         signum = _signals.signum(signal or 'TERM')
-        _to_terminate = set()
         # reserved_requests changes size during iteration
         # so need to consume the items first, then terminate after.
         requests = set(_find_requests_by_id(
-            to_terminate,
+            task_ids,
             worker_state.reserved_requests,
         ))
         for request in requests:
-            logger.info('Terminating %s (%s)', task_id, signum)
-            request.terminate(state.consumer.pool, signal=signum)
-            terminated.add(request.id)
-            if len(terminated) >= len(_to_terminate):
-                break
+            if request.id not in terminated:
+                terminated.add(request.id)
+                logger.info('Terminating %s (%s)', task_id, signum)
+                request.terminate(state.consumer.pool, signal=signum)
+                if len(terminated) >= size:
+                    break
 
         if not terminated:
             return {'ok': 'terminate: tasks unknown'}