|
@@ -8,6 +8,7 @@ from itertools import imap
|
|
|
|
|
|
from celery.utils import any, all
|
|
|
from celery.backends import default_backend
|
|
|
+from celery.messaging import with_connection
|
|
|
from celery.exceptions import TimeoutError
|
|
|
from celery.datastructures import PositionQueue
|
|
|
|
|
@@ -34,6 +35,16 @@ class BaseAsyncResult(object):
|
|
|
self.task_id = task_id
|
|
|
self.backend = backend
|
|
|
|
|
|
+ @with_connection
|
|
|
+ def revoke(self, connection=None, connect_timeout=None):
|
|
|
+ """Send revoke signal to all workers.
|
|
|
+
|
|
|
+ The workers will ignore the task if received.
|
|
|
+
|
|
|
+ """
|
|
|
+ from celery.task import control
|
|
|
+ control.revoke(self.task_id)
|
|
|
+
|
|
|
def get(self, timeout=None):
|
|
|
"""Alias to :meth:`wait`."""
|
|
|
return self.wait(timeout=timeout)
|
|
@@ -227,6 +238,11 @@ class TaskSetResult(object):
|
|
|
return sum(imap(int, (subtask.successful()
|
|
|
for subtask in self.itersubtasks())))
|
|
|
|
|
|
+ @with_connection
|
|
|
+ def revoke(self, connection=None, connect_timeout=None):
|
|
|
+ for subtask in self.subtasks:
|
|
|
+ subtask.revoke(connection=connection)
|
|
|
+
|
|
|
def __iter__(self):
|
|
|
"""``iter(res)`` -> ``res.iterate()``."""
|
|
|
return self.iterate()
|
|
@@ -319,6 +335,9 @@ class EagerResult(BaseAsyncResult):
|
|
|
elif self.status == "FAILURE":
|
|
|
raise self.result.exception
|
|
|
|
|
|
+ def revoke(self):
|
|
|
+ pass
|
|
|
+
|
|
|
@property
|
|
|
def result(self):
|
|
|
"""The tasks return value"""
|