abortable.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. # -*- coding: utf-8 -*-
  2. """
  3. =========================
  4. Abortable tasks overview
  5. =========================
  6. For long-running :class:`Task`'s, it can be desirable to support
  7. aborting during execution. Of course, these tasks should be built to
  8. support abortion specifically.
  9. The :class:`AbortableTask` serves as a base class for all :class:`Task`
  10. objects that should support abortion by producers.
  11. * Producers may invoke the :meth:`abort` method on
  12. :class:`AbortableAsyncResult` instances, to request abortion.
  13. * Consumers (workers) should periodically check (and honor!) the
  14. :meth:`is_aborted` method at controlled points in their task's
  15. :meth:`run` method. The more often, the better.
  16. The necessary intermediate communication is dealt with by the
  17. :class:`AbortableTask` implementation.
  18. Usage example
  19. -------------
  20. In the consumer:
  21. .. code-block:: python
  22. from celery.contrib.abortable import AbortableTask
  23. class MyLongRunningTask(AbortableTask):
  24. def run(self, **kwargs):
  25. logger = self.get_logger(**kwargs)
  26. results = []
  27. for x in xrange(100):
  28. # Check after every 5 loops..
  29. if x % 5 == 0: # alternatively, check when some timer is due
  30. if self.is_aborted(**kwargs):
  31. # Respect the aborted status and terminate
  32. # gracefully
  33. logger.warning("Task aborted.")
  34. return None
  35. y = do_something_expensive(x)
  36. results.append(y)
  37. logger.info("Task finished.")
  38. return results
  39. In the producer:
  40. .. code-block:: python
  41. from myproject.tasks import MyLongRunningTask
  42. def myview(request):
  43. async_result = MyLongRunningTask.delay()
  44. # async_result is of type AbortableAsyncResult
  45. # After 10 seconds, abort the task
  46. time.sleep(10)
  47. async_result.abort()
  48. ...
  49. After the `async_result.abort()` call, the task execution is not
  50. aborted immediately. In fact, it is not guaranteed to abort at all. Keep
  51. checking the `async_result` status, or call `async_result.wait()` to
  52. have it block until the task is finished.
  53. .. note::
  54. In order to abort tasks, there needs to be communication between the
  55. producer and the consumer. This is currently implemented through the
  56. database backend. Therefore, this class will only work with the
  57. database backends.
  58. """
  59. from __future__ import absolute_import
  60. from celery.task.base import Task
  61. from celery.result import AsyncResult
  62. """
  63. Task States
  64. -----------
  65. .. state:: ABORTED
  66. ABORTED
  67. ~~~~~~~
  68. Task is aborted (typically by the producer) and should be
  69. aborted as soon as possible.
  70. """
  71. ABORTED = "ABORTED"
  72. class AbortableAsyncResult(AsyncResult):
  73. """Represents a abortable result.
  74. Specifically, this gives the `AsyncResult` a :meth:`abort()` method,
  75. which sets the state of the underlying Task to `"ABORTED"`.
  76. """
  77. def is_aborted(self):
  78. """Returns :const:`True` if the task is (being) aborted."""
  79. return self.backend.get_status(self.task_id) == ABORTED
  80. def abort(self):
  81. """Set the state of the task to :const:`ABORTED`.
  82. Abortable tasks monitor their state at regular intervals and
  83. terminate execution if so.
  84. Be aware that invoking this method does not guarantee when the
  85. task will be aborted (or even if the task will be aborted at
  86. all).
  87. """
  88. # TODO: store_result requires all four arguments to be set,
  89. # but only status should be updated here
  90. return self.backend.store_result(self.task_id, result=None,
  91. status=ABORTED, traceback=None)
  92. class AbortableTask(Task):
  93. """A celery task that serves as a base class for all :class:`Task`'s
  94. that support aborting during execution.
  95. All subclasses of :class:`AbortableTask` must call the
  96. :meth:`is_aborted` method periodically and act accordingly when
  97. the call evaluates to :const:`True`.
  98. """
  99. @classmethod
  100. def AsyncResult(cls, task_id):
  101. """Returns the accompanying AbortableAsyncResult instance."""
  102. return AbortableAsyncResult(task_id, backend=cls.backend)
  103. def is_aborted(self, **kwargs):
  104. """Checks against the backend whether this
  105. :class:`AbortableAsyncResult` is :const:`ABORTED`.
  106. Always returns :const:`False` in case the `task_id` parameter
  107. refers to a regular (non-abortable) :class:`Task`.
  108. Be aware that invoking this method will cause a hit in the
  109. backend (for example a database query), so find a good balance
  110. between calling it regularly (for responsiveness), but not too
  111. often (for performance).
  112. """
  113. task_id = kwargs.get('task_id', self.request.id)
  114. result = self.AsyncResult(task_id)
  115. if not isinstance(result, AbortableAsyncResult):
  116. return False
  117. return result.is_aborted()