|
@@ -8,7 +8,7 @@ from billiard.serialization import pickle
|
|
|
from celery import conf
|
|
|
from celery.log import setup_logger
|
|
|
from celery.utils import gen_unique_id, get_full_cls_name
|
|
|
-from celery.result import TaskSetResult, EagerResult
|
|
|
+from celery.result import BaseAsyncResult, TaskSetResult, EagerResult
|
|
|
from celery.execute import apply_async, apply
|
|
|
from celery.registry import tasks
|
|
|
from celery.backends import default_backend
|
|
@@ -195,6 +195,7 @@ class Task(object):
|
|
|
"""
|
|
|
raise NotImplementedError("Tasks must define the run method.")
|
|
|
|
|
|
+ @classmethod
|
|
|
def get_logger(self, loglevel=None, logfile=None, **kwargs):
|
|
|
"""Get process-aware logger object.
|
|
|
|
|
@@ -203,11 +204,13 @@ class Task(object):
|
|
|
"""
|
|
|
return setup_logger(loglevel=loglevel, logfile=logfile)
|
|
|
|
|
|
+ @classmethod
|
|
|
def establish_connection(self,
|
|
|
connect_timeout=conf.BROKER_CONNECTION_TIMEOUT):
|
|
|
"""Establish a connection to the message broker."""
|
|
|
return _establish_connection(connect_timeout)
|
|
|
|
|
|
+ @classmethod
|
|
|
def get_publisher(self, connection=None, exchange=None,
|
|
|
connect_timeout=conf.BROKER_CONNECTION_TIMEOUT):
|
|
|
"""Get a celery task message publisher.
|
|
@@ -229,6 +232,7 @@ class Task(object):
|
|
|
exchange=exchange,
|
|
|
routing_key=self.routing_key)
|
|
|
|
|
|
+ @classmethod
|
|
|
def get_consumer(self, connection=None,
|
|
|
connect_timeout=conf.BROKER_CONNECTION_TIMEOUT):
|
|
|
"""Get a celery task message consumer.
|
|
@@ -248,7 +252,7 @@ class Task(object):
|
|
|
routing_key=self.routing_key)
|
|
|
|
|
|
@classmethod
|
|
|
- def delay(cls, *args, **kwargs):
|
|
|
+ def delay(self, *args, **kwargs):
|
|
|
"""Shortcut to :meth:`apply_async`, with star arguments,
|
|
|
but doesn't support the extra options.
|
|
|
|
|
@@ -258,10 +262,10 @@ class Task(object):
|
|
|
:returns: :class:`celery.result.AsyncResult`
|
|
|
|
|
|
"""
|
|
|
- return cls.apply_async(args, kwargs)
|
|
|
+ return self.apply_async(args, kwargs)
|
|
|
|
|
|
@classmethod
|
|
|
- def apply_async(cls, args=None, kwargs=None, **options):
|
|
|
+ def apply_async(self, args=None, kwargs=None, **options):
|
|
|
"""Delay this task for execution by the ``celery`` daemon(s).
|
|
|
|
|
|
:param args: positional arguments passed on to the task.
|
|
@@ -275,8 +279,9 @@ class Task(object):
|
|
|
|
|
|
|
|
|
"""
|
|
|
- return apply_async(cls, args, kwargs, **options)
|
|
|
+ return apply_async(self, args, kwargs, **options)
|
|
|
|
|
|
+ @classmethod
|
|
|
def retry(self, args, kwargs, exc=None, throw=True, **options):
|
|
|
"""Retry the task.
|
|
|
|
|
@@ -337,6 +342,24 @@ class Task(object):
|
|
|
message = "Retry in %d seconds." % options["countdown"]
|
|
|
raise RetryTaskError(message, exc)
|
|
|
|
|
|
+ @classmethod
|
|
|
+ def apply(self, args=None, kwargs=None, **options):
|
|
|
+ """Execute this task at once, by blocking until the task
|
|
|
+ has finished executing.
|
|
|
+
|
|
|
+ :param args: positional arguments passed on to the task.
|
|
|
+ :param kwargs: keyword arguments passed on to the task.
|
|
|
+ :rtype: :class:`celery.result.EagerResult`
|
|
|
+
|
|
|
+ See :func:`celery.execute.apply`.
|
|
|
+
|
|
|
+ """
|
|
|
+ return apply(self, args, kwargs, **options)
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def AsyncResult(self, task_id):
|
|
|
+ return BaseAsyncResult(task_id, backend=self.backend)
|
|
|
+
|
|
|
def on_retry(self, exc, task_id, args, kwargs):
|
|
|
"""Retry handler.
|
|
|
|
|
@@ -382,20 +405,6 @@ class Task(object):
|
|
|
"""
|
|
|
pass
|
|
|
|
|
|
- @classmethod
|
|
|
- def apply(cls, args=None, kwargs=None, **options):
|
|
|
- """Execute this task at once, by blocking until the task
|
|
|
- has finished executing.
|
|
|
-
|
|
|
- :param args: positional arguments passed on to the task.
|
|
|
- :param kwargs: keyword arguments passed on to the task.
|
|
|
- :rtype: :class:`celery.result.EagerResult`
|
|
|
-
|
|
|
- See :func:`celery.execute.apply`.
|
|
|
-
|
|
|
- """
|
|
|
- return apply(cls, args, kwargs, **options)
|
|
|
-
|
|
|
|
|
|
class ExecuteRemoteTask(Task):
|
|
|
"""Execute an arbitrary function or object.
|