Selaa lähdekoodia

[optimization] Revoke now accepts list argument (and used by ResultSet.revoke)

Ask Solem 11 vuotta sitten
vanhempi
commit
b9ca48d97a
3 muutettua tiedostoa jossa 73 lisäystä ja 16 poistoa
  1. 3 5
      celery/result.py
  2. 40 11
      celery/worker/control.py
  3. 30 0
      docs/userguide/workers.rst

+ 3 - 5
celery/result.py

@@ -413,11 +413,9 @@ class ResultSet(ResultBase):
             Default is TERM.
 
         """
-        with self.app.connection_or_acquire(connection) as conn:
-            for result in self.results:
-                result.revoke(
-                    connection=conn, terminate=terminate, signal=signal,
-                )
+        self.app.control.revoke([r.id for r in self.results],
+                                connection=connection,
+                                terminate=terminate, signal=signal)
 
     def __iter__(self):
         return iter(self.results)

+ 40 - 11
celery/worker/control.py

@@ -15,6 +15,7 @@ from kombu.utils.encoding import safe_repr
 from celery.five import UserDict, items, StringIO
 from celery.platforms import signals as _signals
 from celery.utils import timeutils
+from celery.utils.functional import maybe_list
 from celery.utils.log import get_logger
 from celery.utils import jsonify
 
@@ -34,23 +35,51 @@ class Panel(UserDict):
         return method
 
 
+def _find_requests_by_id(ids, requests):
+    found, total = 0, len(ids)
+    for request in worker_state.reserved_requests:
+        if request.id in ids:
+            yield request
+            found += 1
+            if found >= total:
+                break
+
+
 @Panel.register
 def revoke(state, task_id, terminate=False, signal=None, **kwargs):
     """Revoke task by task id."""
-    revoked.add(task_id)
-    if terminate:
+    # supports list argument since 3.1
+    task_ids, task_id = maybe_list(task_id) or [], None
+    to_terminate = set()
+    terminated = set()
+    for task_id in task_ids:
+        revoked.add(task_id)
+        if terminate:
+            to_terminate.add(task_id)
+
+    if to_terminate:
         signum = _signals.signum(signal or 'TERM')
-        for request in worker_state.reserved_requests:
-            if request.id == task_id:
-                logger.info('Terminating %s (%s)', task_id, signum)
-                request.terminate(state.consumer.pool, signal=signum)
+        _to_terminate = set()
+        # reserved_requests changes size during iteration
+        # so need to consume the items first, then terminate after.
+        requests = set(_find_requests_by_id(
+            to_terminate,
+            worker_state.reserved_requests,
+        ))
+        for request in requests:
+            logger.info('Terminating %s (%s)', task_id, signum)
+            request.terminate(state.consumer.pool, signal=signum)
+            terminated.add(request.id)
+            if len(terminated) >= len(_to_terminate):
                 break
-        else:
-            return {'ok': 'terminate: task {0} not found'.format(task_id)}
-        return {'ok': 'terminating {0} ({1})'.format(task_id, signal)}
 
-    logger.info('Revoking task %s', task_id)
-    return {'ok': 'revoking task {0}'.format(task_id)}
+        if not terminated:
+            return {'ok': 'terminate: tasks unknown'}
+        return {'ok': 'terminate: {0}'.format(', '.join(terminated))}
+
+    idstr = ', '.join(task_ids)
+    logger.info('Tasks flagged as revoked: %s', idstr)
+    return {'ok': 'tasks {0} flagged as revoked'.format(idstr)}
 
 
 @Panel.register

+ 30 - 0
docs/userguide/workers.rst

@@ -249,6 +249,10 @@ Terminating a task also revokes it.
 
 ::
 
+    >>> result.revoke()
+
+    >>> AsyncResult(id).revoke()
+
     >>> app.control.revoke('d9078da5-9915-40a0-bfa1-392c7bde42ed')
 
     >>> app.control.revoke('d9078da5-9915-40a0-bfa1-392c7bde42ed',
@@ -257,6 +261,32 @@ Terminating a task also revokes it.
     >>> app.control.revoke('d9078da5-9915-40a0-bfa1-392c7bde42ed',
     ...                    terminate=True, signal='SIGKILL')
 
+
+
+
+Revoking multiple tasks
+-----------------------
+
+.. versionadded:: 3.1
+
+
+The revoke method also accepts a list argument, where it will revoke
+several tasks at once.
+
+**Example**
+
+::
+
+    >>> app.control.revoke([
+    ...    '7993b0aa-1f0b-4780-9af0-c47c0858b3f2',
+    ...    'f565793e-b041-4b2b-9ca4-dca22762a55d',
+    ...    'd9d35e03-2997-42d0-a13e-64a66b88a618',
+    ])
+
+
+The ``GroupResult.revoke`` method takes advantage of this since
+version 3.1.
+
 .. _worker-persistent-revokes:
 
 Persistent revokes