| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172 | # -*- coding: utf-8 -*-"""Abortable Tasks.Abortable tasks overview=========================For long-running :class:`Task`'s, it can be desirable to supportaborting during execution.  Of course, these tasks should be built tosupport abortion specifically.The :class:`AbortableTask` serves as a base class for all :class:`Task`objects that should support abortion by producers.* Producers may invoke the :meth:`abort` method on  :class:`AbortableAsyncResult` instances, to request abortion.* Consumers (workers) should periodically check (and honor!) the  :meth:`is_aborted` method at controlled points in their task's  :meth:`run` method.  The more often, the better.The necessary intermediate communication is dealt with by the:class:`AbortableTask` implementation.Usage example-------------In the consumer:.. code-block:: python    from __future__ import absolute_import    from celery.contrib.abortable import AbortableTask    from celery.utils.log import get_task_logger    from proj.celery import app    logger = get_logger(__name__)    @app.task(bind=True, base=AbortableTask)    def long_running_task(self):        results = []        for i in range(100):            # check after every 5 iterations...            # (or alternatively, check when some timer is due)            if not i % 5:                if self.is_aborted():                    # respect aborted state, and terminate gracefully.                    logger.warning('Task aborted')                    return                value = do_something_expensive(i)                results.append(y)        logger.info('Task complete')        return resultsIn the producer:.. code-block:: python    from __future__ import absolute_import    import time    from proj.tasks import MyLongRunningTask    def myview(request):        # result is of type AbortableAsyncResult        result = long_running_task.delay()        # abort the task after 10 seconds        time.sleep(10)        result.abort()After the `result.abort()` call, the task execution isn'taborted immediately.  In fact, it's not guaranteed to abort at all.Keep checking `result.state` status, or call `result.get(timeout=)` tohave it block until the task is finished... note::   In order to abort tasks, there needs to be communication between the   producer and the consumer.  This is currently implemented through the   database backend.  Therefore, this class will only work with the   database backends."""from __future__ import absolute_import, unicode_literalsfrom celery import Taskfrom celery.result import AsyncResult__all__ = ('AbortableAsyncResult', 'AbortableTask')"""Task States-----------.. state:: ABORTEDABORTED~~~~~~~Task is aborted (typically by the producer) and should beaborted as soon as possible."""ABORTED = 'ABORTED'class AbortableAsyncResult(AsyncResult):    """Represents an abortable result.    Specifically, this gives the `AsyncResult` a :meth:`abort()` method,    that sets the state of the underlying Task to `'ABORTED'`.    """    def is_aborted(self):        """Return :const:`True` if the task is (being) aborted."""        return self.state == ABORTED    def abort(self):        """Set the state of the task to :const:`ABORTED`.        Abortable tasks monitor their state at regular intervals and        terminate execution if so.        Warning:            Be aware that invoking this method does not guarantee when the            task will be aborted (or even if the task will be aborted at all).        """        # TODO: store_result requires all four arguments to be set,        # but only state should be updated here        return self.backend.store_result(self.id, result=None,                                         state=ABORTED, traceback=None)class AbortableTask(Task):    """Task that can be aborted.    This serves as a base class for all :class:`Task`'s    that support aborting during execution.    All subclasses of :class:`AbortableTask` must call the    :meth:`is_aborted` method periodically and act accordingly when    the call evaluates to :const:`True`.    """    abstract = True    def AsyncResult(self, task_id):        """Return the accompanying AbortableAsyncResult instance."""        return AbortableAsyncResult(task_id, backend=self.backend)    def is_aborted(self, **kwargs):        """Return true if task is aborted.        Checks against the backend whether this        :class:`AbortableAsyncResult` is :const:`ABORTED`.        Always return :const:`False` in case the `task_id` parameter        refers to a regular (non-abortable) :class:`Task`.        Be aware that invoking this method will cause a hit in the        backend (for example a database query), so find a good balance        between calling it regularly (for responsiveness), but not too        often (for performance).        """        task_id = kwargs.get('task_id', self.request.id)        result = self.AsyncResult(task_id)        if not isinstance(result, AbortableAsyncResult):            return False        return result.is_aborted()
 |