cancelable.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. """
  2. =========================
  3. Cancelable tasks overview
  4. =========================
  5. For long-running :class:`Task`'s, it can be desirable to support
  6. aborting during execution. Of course, these tasks should be built to
  7. support abortion specifically.
  8. The :class:`CancelableTask` serves as a base class for all :class:`Task`
  9. objects that should support cancellation by producers.
  10. * Producers may invoke the :meth:`cancel` method on
  11. :class:`CancelableAsyncResult` instances, to request abortion.
  12. * Consumers (workers) should periodically check (and honor!) the
  13. :meth:`is_cancelled` method at controlled points in their task's
  14. :meth:`run` method. The more often, the better.
  15. The necessary intermediate communication is dealt with by the
  16. :class:`CancelableTask` implementation.
  17. Usage example
  18. -------------
  19. In the consumer:
  20. .. code-block:: python
  21. from celery.contrib.cancelable import CancelableTask
  22. def MyLongRunningTask(CancelableTask):
  23. def run(self, **kwargs):
  24. logger = self.get_logger(**kwargs)
  25. results = []
  26. for x in xrange(100):
  27. # Check after every 5 loops..
  28. if x % 5 == 0: # alternatively, check when some timer is due
  29. if self.is_cancelled(**kwargs):
  30. # Respect the cancelled status and terminate
  31. # gracefully
  32. logger.warning("Task cancelled.")
  33. return None
  34. y = do_something_expensive(x)
  35. results.append(y)
  36. logger.info("Task finished.")
  37. return results
  38. In the producer:
  39. .. code-block:: python
  40. from myproject.tasks import MyLongRunningTask
  41. def myview(request):
  42. async_result = MyLongRunningTask.delay()
  43. # async_result is of type CancelableAsyncResult
  44. # After 10 seconds, cancel the task
  45. time.sleep(10)
  46. async_result.cancel()
  47. ...
  48. After the ``async_result.cancel()`` call, the task execution is not
  49. aborted immediately. In fact, it is not guaranteed to abort at all. Keep
  50. checking the ``async_result`` status, or call ``async_result.wait()`` to
  51. have it block until the task is finished.
  52. """
  53. from celery.task.base import Task
  54. from celery.result import AsyncResult
  55. """ Task States
  56. .. data:: CANCELLED
  57. Task is cancelled (typically by the producer) and should be
  58. cancelled as soon as possible.
  59. """
  60. CANCELLED = "CANCELLED"
  61. class CancelableAsyncResult(AsyncResult):
  62. """Represents a cancelable result.
  63. Specifically, this gives the ``AsyncResult`` a :meth:`cancel()` method,
  64. which sets the state of the underlying Task to ``"CANCELLED"``.
  65. """
  66. def is_cancelled(self):
  67. """Returns :const:`True` if the task is (being) cancelled."""
  68. return self.backend.get_status(self.task_id) == CANCELLED
  69. def cancel(self):
  70. """Set the state of the task to :const:`CANCELLED`.
  71. Cancelable tasks monitor their state at regular intervals and
  72. terminate execution if so.
  73. Be aware that invoking this method does not guarantee when the
  74. task will be cancelled (or even if the task will be cancelled at
  75. all).
  76. """
  77. # TODO: store_result requires all four arguments to be set,
  78. # but only status should be updated here
  79. return self.backend.store_result(self.task_id, result=None,
  80. status=CANCELLED, traceback=None)
  81. class CancelableTask(Task):
  82. """A celery task that serves as a base class for all :class:`Task`'s
  83. that support aborting during execution.
  84. All subclasses of :class:`CancelableTask` must call the
  85. :meth:`is_cancelled` method periodically and act accordingly when
  86. the call evaluates to :const:`True`.
  87. """
  88. @classmethod
  89. def AsyncResult(cls, task_id):
  90. """Returns the accompanying CancelableAsyncResult instance."""
  91. return CancelableAsyncResult(task_id, backend=cls.backend)
  92. def is_cancelled(self, **kwargs):
  93. """Checks against the backend whether this
  94. :class:`CancelableAsyncResult` is :const:`CANCELLED`.
  95. Always returns :const:`False` in case the `task_id` parameter
  96. refers to a regular (non-cancelable) :class:`Task`.
  97. Be aware that invoking this method will cause a hit in the
  98. backend (for example a database query), so find a good balance
  99. between calling it regularly (for responsiveness), but not too
  100. often (for performance).
  101. """
  102. result = self.AsyncResult(kwargs["task_id"])
  103. if not isinstance(result, CancelableAsyncResult):
  104. return False
  105. return result.is_cancelled()