|
@@ -28,49 +28,52 @@ In the consumer:
|
|
|
|
|
|
.. code-block:: python
|
|
|
|
|
|
- from celery.contrib.abortable import AbortableTask
|
|
|
- from celery.utils.log import get_task_logger
|
|
|
-
|
|
|
- logger = get_logger(__name__)
|
|
|
-
|
|
|
- class MyLongRunningTask(AbortableTask):
|
|
|
-
|
|
|
- def run(self, **kwargs):
|
|
|
- results = []
|
|
|
- for x in range(100):
|
|
|
- # Check after every 5 loops..
|
|
|
- if x % 5 == 0: # alternatively, check when some timer is due
|
|
|
- if self.is_aborted(**kwargs):
|
|
|
- # Respect the aborted status and terminate
|
|
|
- # gracefully
|
|
|
- logger.warning('Task aborted.')
|
|
|
- return
|
|
|
- y = do_something_expensive(x)
|
|
|
- results.append(y)
|
|
|
- logger.info('Task finished.')
|
|
|
- return results
|
|
|
-
|
|
|
+ from __future__ import absolute_import
|
|
|
+
|
|
|
+ from celery.contrib.abortable import AbortableTask
|
|
|
+ from celery.utils.log import get_task_logger
|
|
|
+
|
|
|
+ from proj.celery import app
|
|
|
+
|
|
|
+ logger = get_logger(__name__)
|
|
|
+
|
|
|
+ @app.task(bind=True, base=AbortableTask)
|
|
|
+ def long_running_task(self):
|
|
|
+ results = []
|
|
|
+ for i in range(100):
|
|
|
+ # check after every 5 iterations...
|
|
|
+ # (or alternatively, check when some timer is due)
|
|
|
+ if not i % 5:
|
|
|
+ if self.is_aborted():
|
|
|
+ # respect aborted state, and terminate gracefully.
|
|
|
+ logger.warning('Task aborted')
|
|
|
+ return
|
|
|
+ value = do_something_expensive(i)
|
|
|
+ results.append(y)
|
|
|
+ logger.info('Task complete')
|
|
|
+ return results
|
|
|
|
|
|
In the producer:
|
|
|
|
|
|
.. code-block:: python
|
|
|
|
|
|
- from myproject.tasks import MyLongRunningTask
|
|
|
+ from __future__ import absolute_import
|
|
|
|
|
|
- def myview(request):
|
|
|
+ import time
|
|
|
|
|
|
- async_result = MyLongRunningTask.delay()
|
|
|
- # async_result is of type AbortableAsyncResult
|
|
|
+ from proj.tasks import MyLongRunningTask
|
|
|
|
|
|
- # After 10 seconds, abort the task
|
|
|
- time.sleep(10)
|
|
|
- async_result.abort()
|
|
|
+ def myview(request):
|
|
|
+ # result is of type AbortableAsyncResult
|
|
|
+ result = long_running_task.delay()
|
|
|
|
|
|
- ...
|
|
|
+ # abort the task after 10 seconds
|
|
|
+ time.sleep(10)
|
|
|
+ result.abort()
|
|
|
|
|
|
-After the `async_result.abort()` call, the task execution is not
|
|
|
+After the `result.abort()` call, the task execution is not
|
|
|
aborted immediately. In fact, it is not guaranteed to abort at all. Keep
|
|
|
-checking the `async_result` status, or call `async_result.wait()` to
|
|
|
+checking `result.state` status, or call `result.get(timeout=)` to
|
|
|
have it block until the task is finished.
|
|
|
|
|
|
.. note::
|