abortable.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. # -*- coding: utf-8 -*-
  2. """Abortable Tasks.
  3. Abortable tasks overview
  4. =========================
  5. For long-running :class:`Task`'s, it can be desirable to support
  6. aborting during execution. Of course, these tasks should be built to
  7. support abortion specifically.
  8. The :class:`AbortableTask` serves as a base class for all :class:`Task`
  9. objects that should support abortion by producers.
  10. * Producers may invoke the :meth:`abort` method on
  11. :class:`AbortableAsyncResult` instances, to request abortion.
  12. * Consumers (workers) should periodically check (and honor!) the
  13. :meth:`is_aborted` method at controlled points in their task's
  14. :meth:`run` method. The more often, the better.
  15. The necessary intermediate communication is dealt with by the
  16. :class:`AbortableTask` implementation.
  17. Usage example
  18. -------------
  19. In the consumer:
  20. .. code-block:: python
  21. from __future__ import absolute_import
  22. from celery.contrib.abortable import AbortableTask
  23. from celery.utils.log import get_task_logger
  24. from proj.celery import app
  25. logger = get_logger(__name__)
  26. @app.task(bind=True, base=AbortableTask)
  27. def long_running_task(self):
  28. results = []
  29. for i in range(100):
  30. # check after every 5 iterations...
  31. # (or alternatively, check when some timer is due)
  32. if not i % 5:
  33. if self.is_aborted():
  34. # respect aborted state, and terminate gracefully.
  35. logger.warning('Task aborted')
  36. return
  37. value = do_something_expensive(i)
  38. results.append(y)
  39. logger.info('Task complete')
  40. return results
  41. In the producer:
  42. .. code-block:: python
  43. from __future__ import absolute_import
  44. import time
  45. from proj.tasks import MyLongRunningTask
  46. def myview(request):
  47. # result is of type AbortableAsyncResult
  48. result = long_running_task.delay()
  49. # abort the task after 10 seconds
  50. time.sleep(10)
  51. result.abort()
  52. After the `result.abort()` call, the task execution isn't
  53. aborted immediately. In fact, it's not guaranteed to abort at all.
  54. Keep checking `result.state` status, or call `result.get(timeout=)` to
  55. have it block until the task is finished.
  56. .. note::
  57. In order to abort tasks, there needs to be communication between the
  58. producer and the consumer. This is currently implemented through the
  59. database backend. Therefore, this class will only work with the
  60. database backends.
  61. """
  62. from __future__ import absolute_import, unicode_literals
  63. from celery import Task
  64. from celery.result import AsyncResult
  65. __all__ = ('AbortableAsyncResult', 'AbortableTask')
  66. """
  67. Task States
  68. -----------
  69. .. state:: ABORTED
  70. ABORTED
  71. ~~~~~~~
  72. Task is aborted (typically by the producer) and should be
  73. aborted as soon as possible.
  74. """
  75. ABORTED = 'ABORTED'
  76. class AbortableAsyncResult(AsyncResult):
  77. """Represents an abortable result.
  78. Specifically, this gives the `AsyncResult` a :meth:`abort()` method,
  79. that sets the state of the underlying Task to `'ABORTED'`.
  80. """
  81. def is_aborted(self):
  82. """Return :const:`True` if the task is (being) aborted."""
  83. return self.state == ABORTED
  84. def abort(self):
  85. """Set the state of the task to :const:`ABORTED`.
  86. Abortable tasks monitor their state at regular intervals and
  87. terminate execution if so.
  88. Warning:
  89. Be aware that invoking this method does not guarantee when the
  90. task will be aborted (or even if the task will be aborted at all).
  91. """
  92. # TODO: store_result requires all four arguments to be set,
  93. # but only state should be updated here
  94. return self.backend.store_result(self.id, result=None,
  95. state=ABORTED, traceback=None)
  96. class AbortableTask(Task):
  97. """Task that can be aborted.
  98. This serves as a base class for all :class:`Task`'s
  99. that support aborting during execution.
  100. All subclasses of :class:`AbortableTask` must call the
  101. :meth:`is_aborted` method periodically and act accordingly when
  102. the call evaluates to :const:`True`.
  103. """
  104. abstract = True
  105. def AsyncResult(self, task_id):
  106. """Return the accompanying AbortableAsyncResult instance."""
  107. return AbortableAsyncResult(task_id, backend=self.backend)
  108. def is_aborted(self, **kwargs):
  109. """Return true if task is aborted.
  110. Checks against the backend whether this
  111. :class:`AbortableAsyncResult` is :const:`ABORTED`.
  112. Always return :const:`False` in case the `task_id` parameter
  113. refers to a regular (non-abortable) :class:`Task`.
  114. Be aware that invoking this method will cause a hit in the
  115. backend (for example a database query), so find a good balance
  116. between calling it regularly (for responsiveness), but not too
  117. often (for performance).
  118. """
  119. task_id = kwargs.get('task_id', self.request.id)
  120. result = self.AsyncResult(task_id)
  121. if not isinstance(result, AbortableAsyncResult):
  122. return False
  123. return result.is_aborted()