|
@@ -1,24 +1,24 @@
|
|
|
"""
|
|
|
=========================
|
|
|
-Cancelable tasks overview
|
|
|
+Abortable tasks overview
|
|
|
=========================
|
|
|
|
|
|
For long-running :class:`Task`'s, it can be desirable to support
|
|
|
aborting during execution. Of course, these tasks should be built to
|
|
|
support abortion specifically.
|
|
|
|
|
|
-The :class:`CancelableTask` serves as a base class for all :class:`Task`
|
|
|
-objects that should support cancellation by producers.
|
|
|
+The :class:`AbortableTask` serves as a base class for all :class:`Task`
|
|
|
+objects that should support abortion by producers.
|
|
|
|
|
|
-* Producers may invoke the :meth:`cancel` method on
|
|
|
- :class:`CancelableAsyncResult` instances, to request abortion.
|
|
|
+* Producers may invoke the :meth:`abort` method on
|
|
|
+ :class:`AbortableAsyncResult` instances, to request abortion.
|
|
|
|
|
|
* Consumers (workers) should periodically check (and honor!) the
|
|
|
- :meth:`is_cancelled` method at controlled points in their task's
|
|
|
+ :meth:`is_aborted` method at controlled points in their task's
|
|
|
:meth:`run` method. The more often, the better.
|
|
|
|
|
|
The necessary intermediate communication is dealt with by the
|
|
|
-:class:`CancelableTask` implementation.
|
|
|
+:class:`AbortableTask` implementation.
|
|
|
|
|
|
Usage example
|
|
|
-------------
|
|
@@ -27,9 +27,9 @@ In the consumer:
|
|
|
|
|
|
.. code-block:: python
|
|
|
|
|
|
- from celery.contrib.cancelable import CancelableTask
|
|
|
+ from celery.contrib.abortable import AbortableTask
|
|
|
|
|
|
- def MyLongRunningTask(CancelableTask):
|
|
|
+ def MyLongRunningTask(AbortableTask):
|
|
|
|
|
|
def run(self, **kwargs):
|
|
|
logger = self.get_logger(**kwargs)
|
|
@@ -37,10 +37,10 @@ In the consumer:
|
|
|
for x in xrange(100):
|
|
|
# Check after every 5 loops..
|
|
|
if x % 5 == 0: # alternatively, check when some timer is due
|
|
|
- if self.is_cancelled(**kwargs):
|
|
|
- # Respect the cancelled status and terminate
|
|
|
+ if self.is_aborted(**kwargs):
|
|
|
+ # Respect the aborted status and terminate
|
|
|
# gracefully
|
|
|
- logger.warning("Task cancelled.")
|
|
|
+ logger.warning("Task aborted.")
|
|
|
return None
|
|
|
y = do_something_expensive(x)
|
|
|
results.append(y)
|
|
@@ -57,15 +57,15 @@ In the producer:
|
|
|
def myview(request):
|
|
|
|
|
|
async_result = MyLongRunningTask.delay()
|
|
|
- # async_result is of type CancelableAsyncResult
|
|
|
+ # async_result is of type AbortableAsyncResult
|
|
|
|
|
|
- # After 10 seconds, cancel the task
|
|
|
+ # After 10 seconds, abort the task
|
|
|
time.sleep(10)
|
|
|
- async_result.cancel()
|
|
|
+ async_result.abort()
|
|
|
|
|
|
...
|
|
|
|
|
|
-After the ``async_result.cancel()`` call, the task execution is not
|
|
|
+After the ``async_result.abort()`` call, the task execution is not
|
|
|
aborted immediately. In fact, it is not guaranteed to abort at all. Keep
|
|
|
checking the ``async_result`` status, or call ``async_result.wait()`` to
|
|
|
have it block until the task is finished.
|
|
@@ -77,65 +77,65 @@ from celery.result import AsyncResult
|
|
|
|
|
|
""" Task States
|
|
|
|
|
|
-.. data:: CANCELLED
|
|
|
+.. data:: ABORTED
|
|
|
|
|
|
- Task is cancelled (typically by the producer) and should be
|
|
|
- cancelled as soon as possible.
|
|
|
+ Task is aborted (typically by the producer) and should be
|
|
|
+ aborted as soon as possible.
|
|
|
|
|
|
"""
|
|
|
-CANCELLED = "CANCELLED"
|
|
|
+ABORTED = "ABORTED"
|
|
|
|
|
|
|
|
|
-class CancelableAsyncResult(AsyncResult):
|
|
|
- """Represents a cancelable result.
|
|
|
+class AbortableAsyncResult(AsyncResult):
|
|
|
+ """Represents a abortable result.
|
|
|
|
|
|
- Specifically, this gives the ``AsyncResult`` a :meth:`cancel()` method,
|
|
|
- which sets the state of the underlying Task to ``"CANCELLED"``.
|
|
|
+ Specifically, this gives the ``AsyncResult`` a :meth:`abort()` method,
|
|
|
+ which sets the state of the underlying Task to ``"ABORTED"``.
|
|
|
|
|
|
"""
|
|
|
|
|
|
- def is_cancelled(self):
|
|
|
- """Returns :const:`True` if the task is (being) cancelled."""
|
|
|
- return self.backend.get_status(self.task_id) == CANCELLED
|
|
|
+ def is_aborted(self):
|
|
|
+ """Returns :const:`True` if the task is (being) aborted."""
|
|
|
+ return self.backend.get_status(self.task_id) == ABORTED
|
|
|
|
|
|
- def cancel(self):
|
|
|
- """Set the state of the task to :const:`CANCELLED`.
|
|
|
+ def abort(self):
|
|
|
+ """Set the state of the task to :const:`ABORTED`.
|
|
|
|
|
|
- Cancelable tasks monitor their state at regular intervals and
|
|
|
+ Abortable tasks monitor their state at regular intervals and
|
|
|
terminate execution if so.
|
|
|
|
|
|
Be aware that invoking this method does not guarantee when the
|
|
|
- task will be cancelled (or even if the task will be cancelled at
|
|
|
+ task will be aborted (or even if the task will be aborted at
|
|
|
all).
|
|
|
|
|
|
"""
|
|
|
# TODO: store_result requires all four arguments to be set,
|
|
|
# but only status should be updated here
|
|
|
return self.backend.store_result(self.task_id, result=None,
|
|
|
- status=CANCELLED, traceback=None)
|
|
|
+ status=ABORTED, traceback=None)
|
|
|
|
|
|
|
|
|
-class CancelableTask(Task):
|
|
|
+class AbortableTask(Task):
|
|
|
"""A celery task that serves as a base class for all :class:`Task`'s
|
|
|
that support aborting during execution.
|
|
|
|
|
|
- All subclasses of :class:`CancelableTask` must call the
|
|
|
- :meth:`is_cancelled` method periodically and act accordingly when
|
|
|
+ All subclasses of :class:`AbortableTask` must call the
|
|
|
+ :meth:`is_aborted` method periodically and act accordingly when
|
|
|
the call evaluates to :const:`True`.
|
|
|
|
|
|
"""
|
|
|
|
|
|
@classmethod
|
|
|
def AsyncResult(cls, task_id):
|
|
|
- """Returns the accompanying CancelableAsyncResult instance."""
|
|
|
- return CancelableAsyncResult(task_id, backend=cls.backend)
|
|
|
+ """Returns the accompanying AbortableAsyncResult instance."""
|
|
|
+ return AbortableAsyncResult(task_id, backend=cls.backend)
|
|
|
|
|
|
- def is_cancelled(self, **kwargs):
|
|
|
+ def is_aborted(self, **kwargs):
|
|
|
"""Checks against the backend whether this
|
|
|
- :class:`CancelableAsyncResult` is :const:`CANCELLED`.
|
|
|
+ :class:`AbortableAsyncResult` is :const:`ABORTED`.
|
|
|
|
|
|
Always returns :const:`False` in case the `task_id` parameter
|
|
|
- refers to a regular (non-cancelable) :class:`Task`.
|
|
|
+ refers to a regular (non-abortable) :class:`Task`.
|
|
|
|
|
|
Be aware that invoking this method will cause a hit in the
|
|
|
backend (for example a database query), so find a good balance
|
|
@@ -144,6 +144,6 @@ class CancelableTask(Task):
|
|
|
|
|
|
"""
|
|
|
result = self.AsyncResult(kwargs["task_id"])
|
|
|
- if not isinstance(result, CancelableAsyncResult):
|
|
|
+ if not isinstance(result, AbortableAsyncResult):
|
|
|
return False
|
|
|
- return result.is_cancelled()
|
|
|
+ return result.is_aborted()
|