Browse Source

Move revoke() + discard_all() to celery.task.control

Ask Solem 15 years ago
parent
commit
3f4e261d68
2 changed files with 50 additions and 51 deletions
  1. 8 51
      celery/task/__init__.py
  2. 42 0
      celery/task/control.py

+ 8 - 51
celery/task/__init__.py

@@ -5,57 +5,16 @@ Working with tasks and task sets.
 """
 from billiard.serialization import pickle
 
-from celery.conf import AMQP_CONNECTION_TIMEOUT
 from celery.execute import apply_async
 from celery.registry import tasks
 from celery.backends import default_backend
-from celery.messaging import TaskConsumer, BroadcastPublisher, with_connection
 from celery.task.base import Task, TaskSet, PeriodicTask
 from celery.task.base import ExecuteRemoteTask, AsynchronousMapTask
 from celery.task.rest import RESTProxyTask
+from celery.task.control import revoke, discard_all
 from celery.task.builtins import DeleteExpiredTaskMetaTask, PingTask
 
 
-def discard_all(connect_timeout=AMQP_CONNECTION_TIMEOUT):
-    """Discard all waiting tasks.
-
-    This will ignore all tasks waiting for execution, and they will
-    be deleted from the messaging server.
-
-    :returns: the number of tasks discarded.
-
-    :rtype: int
-
-    """
-
-    def _discard(connection):
-        consumer = TaskConsumer(connection=connection)
-        try:
-            return consumer.discard_all()
-        finally:
-            consumer.close()
-
-    return with_connection(_discard, connect_timeout=connect_timeout)
-
-
-def revoke(task_id, connection=None, connect_timeout=None):
-    """Revoke a task by id.
-
-    Revoked tasks will not be executed after all.
-
-    """
-
-    def _revoke(connection):
-        broadcast = BroadcastPublisher(connection)
-        try:
-            broadcast.revoke(task_id)
-        finally:
-            broadcast.close()
-
-    return with_connection(_revoke, connection=connection,
-                           connect_timeout=connect_timeout)
-
-
 def is_successful(task_id):
     """Returns ``True`` if task with ``task_id`` has been executed.
 
@@ -65,7 +24,7 @@ def is_successful(task_id):
     return default_backend.is_successful(task_id)
 
 
-def dmap(func, args, timeout=None):
+def dmap(fun, args, timeout=None):
     """Distribute processing of the arguments and collect the results.
 
     Example
@@ -76,10 +35,10 @@ def dmap(func, args, timeout=None):
         [4, 8, 16]
 
     """
-    return TaskSet.map(func, args, timeout=timeout)
+    return TaskSet.map(fun, args, timeout=timeout)
 
 
-def dmap_async(func, args, timeout=None):
+def dmap_async(fun, args, timeout=None):
     """Distribute processing of the arguments and collect the results
     asynchronously.
 
@@ -98,16 +57,14 @@ def dmap_async(func, args, timeout=None):
         [4, 8, 16]
 
     """
-    return TaskSet.map_async(func, args, timeout=timeout)
+    return TaskSet.map_async(fun, args, timeout=timeout)
 
 
-def execute_remote(func, *args, **kwargs):
+def execute_remote(fun, *args, **kwargs):
     """Execute arbitrary function/object remotely.
 
-    :param func: A callable function or object.
-
+    :param fun: A callable function or object.
     :param \*args: Positional arguments to apply to the function.
-
     :param \*\*kwargs: Keyword arguments to apply to the function.
 
     The object must be picklable, so you can't use lambdas or functions
@@ -116,7 +73,7 @@ def execute_remote(func, *args, **kwargs):
     :returns: class:`celery.result.AsyncResult`.
 
     """
-    return ExecuteRemoteTask.delay(pickle.dumps(func), args, kwargs)
+    return ExecuteRemoteTask.delay(pickle.dumps(fun), args, kwargs)
 
 
 def ping():

+ 42 - 0
celery/task/control.py

@@ -0,0 +1,42 @@
+from celery import conf
+from celery.messaging import TaskConsumer, BroadcastPublisher, with_connection
+
+
+def discard_all(connect_timeout=conf.AMQP_CONNECTION_TIMEOUT):
+    """Discard all waiting tasks.
+
+    This will ignore all tasks waiting for execution, and they will
+    be deleted from the messaging server.
+
+    :returns: the number of tasks discarded.
+
+    """
+
+    def _discard(connection):
+        consumer = TaskConsumer(connection=connection)
+        try:
+            return consumer.discard_all()
+        finally:
+            consumer.close()
+
+    return with_connection(_discard, connect_timeout=connect_timeout)
+
+
+def revoke(task_id, connection=None,
+        connect_timeout=conf.AMQP_CONNECTION_TIMEOUT):
+    """Revoke a task by id.
+
+    If a task is revoked, the workers will ignore the task and not execute
+    it after all.
+
+    """
+
+    def _revoke(connection):
+        broadcast = BroadcastPublisher(connection)
+        try:
+            broadcast.revoke(task_id)
+        finally:
+            broadcast.close()
+
+    return with_connection(_revoke, connection=connection,
+                           connect_timeout=connect_timeout)