|
@@ -28,9 +28,11 @@ In the consumer:
|
|
|
.. code-block:: python
|
|
|
|
|
|
from celery.contrib.cancelable import CancelableTask
|
|
|
+
|
|
|
def MyLongRunningTask(CancelableTask):
|
|
|
+
|
|
|
def run(self, **kwargs):
|
|
|
- logger = self.get_logger()
|
|
|
+ logger = self.get_logger(**kwargs)
|
|
|
results = []
|
|
|
for x in xrange(100):
|
|
|
# Check after every 5 loops..
|
|
@@ -38,11 +40,11 @@ In the consumer:
|
|
|
if self.is_cancelled(**kwargs):
|
|
|
# Respect the cancelled status and terminate
|
|
|
# gracefully
|
|
|
- logger.warning('Task cancelled.')
|
|
|
+ logger.warning("Task cancelled.")
|
|
|
return None
|
|
|
y = do_something_expensive(x)
|
|
|
results.append(y)
|
|
|
- logger.info('Task finished.')
|
|
|
+ logger.info("Task finished.")
|
|
|
return results
|
|
|
|
|
|
|
|
@@ -51,7 +53,9 @@ In the producer:
|
|
|
.. code-block:: python
|
|
|
|
|
|
from myproject.tasks import MyLongRunningTask
|
|
|
+
|
|
|
def myview(request):
|
|
|
+
|
|
|
async_result = MyLongRunningTask.delay()
|
|
|
# async_result is of type CancelableAsyncResult
|
|
|
|
|
@@ -61,16 +65,15 @@ In the producer:
|
|
|
|
|
|
...
|
|
|
|
|
|
-After the `async_result.cancel()` call, the task execution is not
|
|
|
+After the ``async_result.cancel()`` call, the task execution is not
|
|
|
aborted immediately. In fact, it is not guaranteed to abort at all. Keep
|
|
|
-checking the `async_result` status, or call `async_result.wait()` to
|
|
|
+checking the ``async_result`` status, or call ``async_result.wait()`` to
|
|
|
have it block until the task is finished.
|
|
|
|
|
|
"""
|
|
|
-
|
|
|
from celery.task.base import Task
|
|
|
from celery.result import AsyncResult
|
|
|
-from multiprocessing import Process, Event
|
|
|
+
|
|
|
|
|
|
""" Task States
|
|
|
|
|
@@ -82,11 +85,12 @@ from multiprocessing import Process, Event
|
|
|
"""
|
|
|
CANCELLED = "CANCELLED"
|
|
|
|
|
|
+
|
|
|
class CancelableAsyncResult(AsyncResult):
|
|
|
"""Represents a cancelable result.
|
|
|
|
|
|
- Specifically, this gives the AsyncResult a cancel() method, which sets
|
|
|
- the state of the underlying Task to "CANCELLED".
|
|
|
+ Specifically, this gives the ``AsyncResult`` a :meth:`cancel()` method,
|
|
|
+ which sets the state of the underlying Task to ``"CANCELLED"``.
|
|
|
|
|
|
"""
|
|
|
|
|
@@ -105,8 +109,10 @@ class CancelableAsyncResult(AsyncResult):
|
|
|
all).
|
|
|
|
|
|
"""
|
|
|
- # TODO: store_result requires all four arguments to be set, but only status should be updated here
|
|
|
- return self.backend.store_result(self.task_id, result=None, status=CANCELLED, traceback=None)
|
|
|
+ # TODO: store_result requires all four arguments to be set,
|
|
|
+ # but only status should be updated here
|
|
|
+ return self.backend.store_result(self.task_id, result=None,
|
|
|
+ status=CANCELLED, traceback=None)
|
|
|
|
|
|
|
|
|
class CancelableTask(Task):
|
|
@@ -137,8 +143,7 @@ class CancelableTask(Task):
|
|
|
often (for performance).
|
|
|
|
|
|
"""
|
|
|
- result = self.AsyncResult(kwargs['task_id'])
|
|
|
+ result = self.AsyncResult(kwargs["task_id"])
|
|
|
if not isinstance(result, CancelableAsyncResult):
|
|
|
return False
|
|
|
return result.is_cancelled()
|
|
|
-
|