123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149 |
- """
- =========================
- Cancelable tasks overview
- =========================
- For long-running :class:`Task`'s, it can be desirable to support
- aborting during execution. Of course, these tasks should be built to
- support abortion specifically.
- The :class:`CancelableTask` serves as a base class for all :class:`Task`
- objects that should support cancellation by producers.
- * Producers may invoke the :meth:`cancel` method on
- :class:`CancelableAsyncResult` instances, to request abortion.
- * Consumers (workers) should periodically check (and honor!) the
- :meth:`is_cancelled` method at controlled points in their task's
- :meth:`run` method. The more often, the better.
- The necessary intermediate communication is dealt with by the
- :class:`CancelableTask` implementation.
- Usage example
- -------------
- In the consumer:
- .. code-block:: python
- from celery.contrib.cancelable import CancelableTask
- def MyLongRunningTask(CancelableTask):
- def run(self, **kwargs):
- logger = self.get_logger(**kwargs)
- results = []
- for x in xrange(100):
- # Check after every 5 loops..
- if x % 5 == 0: # alternatively, check when some timer is due
- if self.is_cancelled(**kwargs):
- # Respect the cancelled status and terminate
- # gracefully
- logger.warning("Task cancelled.")
- return None
- y = do_something_expensive(x)
- results.append(y)
- logger.info("Task finished.")
- return results
- In the producer:
- .. code-block:: python
- from myproject.tasks import MyLongRunningTask
- def myview(request):
- async_result = MyLongRunningTask.delay()
- # async_result is of type CancelableAsyncResult
- # After 10 seconds, cancel the task
- time.sleep(10)
- async_result.cancel()
- ...
- After the ``async_result.cancel()`` call, the task execution is not
- aborted immediately. In fact, it is not guaranteed to abort at all. Keep
- checking the ``async_result`` status, or call ``async_result.wait()`` to
- have it block until the task is finished.
- """
- from celery.task.base import Task
- from celery.result import AsyncResult
- """ Task States
- .. data:: CANCELLED
- Task is cancelled (typically by the producer) and should be
- cancelled as soon as possible.
- """
- CANCELLED = "CANCELLED"
- class CancelableAsyncResult(AsyncResult):
- """Represents a cancelable result.
- Specifically, this gives the ``AsyncResult`` a :meth:`cancel()` method,
- which sets the state of the underlying Task to ``"CANCELLED"``.
- """
- def is_cancelled(self):
- """Returns :const:`True` if the task is (being) cancelled."""
- return self.backend.get_status(self.task_id) == CANCELLED
- def cancel(self):
- """Set the state of the task to :const:`CANCELLED`.
- Cancelable tasks monitor their state at regular intervals and
- terminate execution if so.
- Be aware that invoking this method does not guarantee when the
- task will be cancelled (or even if the task will be cancelled at
- all).
- """
- # TODO: store_result requires all four arguments to be set,
- # but only status should be updated here
- return self.backend.store_result(self.task_id, result=None,
- status=CANCELLED, traceback=None)
- class CancelableTask(Task):
- """A celery task that serves as a base class for all :class:`Task`'s
- that support aborting during execution.
- All subclasses of :class:`CancelableTask` must call the
- :meth:`is_cancelled` method periodically and act accordingly when
- the call evaluates to :const:`True`.
- """
- @classmethod
- def AsyncResult(cls, task_id):
- """Returns the accompanying CancelableAsyncResult instance."""
- return CancelableAsyncResult(task_id, backend=cls.backend)
- def is_cancelled(self, **kwargs):
- """Checks against the backend whether this
- :class:`CancelableAsyncResult` is :const:`CANCELLED`.
- Always returns :const:`False` in case the `task_id` parameter
- refers to a regular (non-cancelable) :class:`Task`.
- Be aware that invoking this method will cause a hit in the
- backend (for example a database query), so find a good balance
- between calling it regularly (for responsiveness), but not too
- often (for performance).
- """
- result = self.AsyncResult(kwargs["task_id"])
- if not isinstance(result, CancelableAsyncResult):
- return False
- return result.is_cancelled()
|