Procházet zdrojové kódy

Added a CancelableTask class.

Cancelable tasks support execution of tasks that support being aborted.
Producers can set the cancelled flag via the CancelableAsyncResult and
cancelable tasks should honor the flag and act upon it.

Documentation is also added.
Vincent Driessen před 15 roky
rodič
revize
4aebbd7bd8

+ 140 - 0
celery/contrib/cancelable.py

@@ -0,0 +1,140 @@
+"""
+=========================
+Cancelable tasks overview
+=========================
+
+For long-running :class:`Task`'s, it can be desirable to support
+aborting during execution. Of course, these tasks should be built to
+support abortion specifically.
+
+The :class:`CancelableTask` serves as a base class for all :class:`Task`
+objects that should support cancellation by producers.
+
+* Producers may invoke the :meth:`cancel` method on
+  :class:`CancelableAsyncResult` instances, to request abortion.
+
+* Consumers (workers) should periodically check (and honor!) the
+  :meth:`is_cancelled` method at controlled points in their task's
+  :meth:`run` method. The more often, the better.
+
+The necessary intermediate communication is dealt with by the
+:class:`CancelableTask` implementation.
+
+Usage example
+-------------
+
+In the consumer:
+
+.. code-block:: python
+
+   from celery.contrib.cancelable import CancelableTask
+   def MyLongRunningTask(CancelableTask):
+       def run(self, **kwargs):
+           logger = self.get_logger()
+           results = []
+           for x in xrange(100):
+               # Check after every 5 loops..
+               if x % 5 == 0:  # alternatively, check when some timer is due
+                   if self.is_cancelled(**kwargs):
+                       # Respect the cancelled status and terminate
+                       # gracefully
+                       logger.warning('Task cancelled.')
+                       return None
+               y = do_something_expensive(x)
+               results.append(y)
+           logger.info('Task finished.')
+           return results
+
+
+In the producer:
+
+.. code-block:: python
+
+   from myproject.tasks import MyLongRunningTask
+   def myview(request):
+       async_result = MyLongRunningTask.delay()
+       # async_result is of type CancelableAsyncResult
+
+       # After 10 seconds, cancel the task
+       time.sleep(10)
+       async_result.cancel()
+
+       ...
+
+After the `async_result.cancel()` call, the task execution is not
+aborted immediately. In fact, it is not guaranteed to abort at all. Keep
+checking the `async_result` status, or call `async_result.wait()` to
+have it block until the task is finished.
+
+"""
+
+from celery.task.base import Task
+from celery.result import AsyncResult
+from multiprocessing import Process, Event
+
+""" Task States
+
+.. data:: CANCELLED
+
+    Task is cancelled (typically by the producer) and should be
+    cancelled as soon as possible.
+
+"""
+CANCELLED = "CANCELLED"
+
+class CancelableAsyncResult(AsyncResult):
+    """Represents a cancelable result.
+
+    Specifically, this gives the AsyncResult a cancel() method, which sets
+    the state of the underlying Task to "CANCELLED".
+
+    """
+
+    def is_cancelled(self):
+        """Returns ``True'' if the task is (being) cancelled."""
+        return self.backend.get_status(self.task_id) == CANCELLED
+
+    def cancel(self):
+        """Set the state of the task to cancelled.
+
+        TODO: Be more descriptive. What does this mean for the worker?
+        TODO: What does the method return?
+
+        """
+        # TODO: store_result requires all four arguments to be set, but only status should be updated here
+        return self.backend.store_result(self.task_id, result=None, status=CANCELLED, traceback=None)
+
+
+class CancelableTask(Task):
+    """A celery task that serves as a base class for all :class:`Task`'s
+    that support aborting during execution.
+
+    All subclasses of :class:`CancelableTask` must call the
+    :meth:`is_cancelled` method periodically and act accordingly when
+    the call evaluates to ``True''.
+
+    """
+
+    @classmethod
+    def AsyncResult(cls, task_id):
+        """Returns the accompanying CancelableAsyncResult instance."""
+        return CancelableAsyncResult(task_id, backend=cls.backend)
+
+    def is_cancelled(self, **kwargs):
+        """Checks against the backend whether this
+        :class:`CancelableAsyncResult` is :const:`CANCELLED`.
+
+        Always returns ``False'' in case the `task_id` parameter refers
+        to a regular (non-cancelable) :class:`Task`.
+
+        Be aware that invoking this method will cause a hit in the
+        backend (for example a database query), so find a good balance
+        between calling it regularly (for responsiveness), but not too
+        often (for performance).
+
+        """
+        result = self.AsyncResult(kwargs['task_id'])
+        if not isinstance(result, CancelableAsyncResult):
+            return False
+        return result.is_cancelled()
+

+ 8 - 0
docs/reference/celery.contrib.cancelable.rst

@@ -0,0 +1,8 @@
+=======================================================
+ Contrib: Cancelable tasks - celery.contrib.cancelable
+=======================================================
+
+ .. currentmodule:: celery.contrib.cancelable
+
+ .. automodule:: celery.contrib.cancelable
+     :members:

+ 1 - 0
docs/reference/index.rst

@@ -28,6 +28,7 @@
     celery.states
     celery.messaging
     celery.contrib.test_runner
+    celery.contrib.cancelable
     celery.views
     celery.events
     celery.bin.celeryd