|
@@ -24,9 +24,11 @@ from ...result import EagerResult
|
|
|
from ...utils import (fun_takes_kwargs, instantiate,
|
|
|
mattrgetter, uuid, maybe_reraise)
|
|
|
from ...utils.mail import ErrorMail
|
|
|
+from ...utils.compat import fun_of_method
|
|
|
|
|
|
from ..registry import _unpickle_task
|
|
|
|
|
|
+#: extracts options related to publishing a message from a dict.
|
|
|
extract_exec_options = mattrgetter("queue", "routing_key",
|
|
|
"exchange", "immediate",
|
|
|
"mandatory", "priority",
|
|
@@ -34,6 +36,14 @@ extract_exec_options = mattrgetter("queue", "routing_key",
|
|
|
"compression", "expires")
|
|
|
|
|
|
|
|
|
+#: list of methods that should be classmethods in
|
|
|
+#: backward compatibility mode.
|
|
|
+_COMPAT_CLASSMETHODS = ("get_logger", "establish_connection",
|
|
|
+ "get_publisher", "get_consumer",
|
|
|
+ "delay", "apply_async", "retry",
|
|
|
+ "apply", "AsyncResult", "subtask")
|
|
|
+
|
|
|
+
|
|
|
class Context(threading.local):
|
|
|
# Default context
|
|
|
logfile = None
|
|
@@ -84,6 +94,19 @@ class TaskType(type):
|
|
|
# see note about __call__ below.
|
|
|
attrs["__defines_call__"] = True
|
|
|
|
|
|
+ # In old Celery the @task decorator didn't exist, so one would create
|
|
|
+ # classes instead and use them directly (e.g. MyTask.apply_async()).
|
|
|
+ # the use of classmethods was a hack so that it was not necessary
|
|
|
+ # to instantiate the class before using it, but it has only
|
|
|
+ # given us pain (like all magic).
|
|
|
+ #
|
|
|
+ # This must be removed for 3.0.
|
|
|
+ if attrs.pop("compat", False):
|
|
|
+ for fun_name in _COMPAT_CLASSMETHODS:
|
|
|
+ if fun_name not in attrs:
|
|
|
+ fun = fun_of_method(getattr(bases[0], fun_name))
|
|
|
+ attrs[fun_name] = classmethod(fun)
|
|
|
+
|
|
|
# - Abstract class: abstract attribute should not be inherited.
|
|
|
if attrs.pop("abstract", None) or not attrs.get("autoregister", True):
|
|
|
return new(cls, name, bases, attrs)
|
|
@@ -300,6 +323,9 @@ class BaseTask(object):
|
|
|
"CELERY_STORE_ERRORS_EVEN_IF_IGNORED"),
|
|
|
)
|
|
|
|
|
|
+ # - Tasks are lazily bound, so that configuration is not set
|
|
|
+ # - until the task is actually used
|
|
|
+
|
|
|
@classmethod
|
|
|
def bind(cls, app):
|
|
|
cls._app = app
|
|
@@ -337,6 +363,8 @@ class BaseTask(object):
|
|
|
cls.bind(app)
|
|
|
app = property(_get_app, _set_app)
|
|
|
|
|
|
+ # - tasks are pickled into the name of the task only, and the reciever
|
|
|
+ # - simply grabs it from the local registry.
|
|
|
def __reduce__(self):
|
|
|
return (_unpickle_task, (self.name, ), None)
|
|
|
|
|
@@ -347,7 +375,6 @@ class BaseTask(object):
|
|
|
def start_strategy(self, app, consumer):
|
|
|
return instantiate(self.Strategy, self, app, consumer)
|
|
|
|
|
|
- @classmethod
|
|
|
def get_logger(self, loglevel=None, logfile=None, propagate=False,
|
|
|
**kwargs):
|
|
|
"""Get task-aware logger object."""
|
|
@@ -356,13 +383,11 @@ class BaseTask(object):
|
|
|
logfile=self.request.logfile if logfile is None else logfile,
|
|
|
propagate=propagate, task_name=self.name, task_id=self.request.id)
|
|
|
|
|
|
- @classmethod
|
|
|
def establish_connection(self, connect_timeout=None):
|
|
|
"""Establish a connection to the message broker."""
|
|
|
return self._get_app().broker_connection(
|
|
|
connect_timeout=connect_timeout)
|
|
|
|
|
|
- @classmethod
|
|
|
def get_publisher(self, connection=None, exchange=None,
|
|
|
connect_timeout=None, exchange_type=None, **options):
|
|
|
"""Get a celery task message publisher.
|
|
@@ -395,7 +420,6 @@ class BaseTask(object):
|
|
|
routing_key=self.routing_key,
|
|
|
**options)
|
|
|
|
|
|
- @classmethod
|
|
|
def get_consumer(self, connection=None, connect_timeout=None):
|
|
|
"""Get message consumer.
|
|
|
|
|
@@ -418,7 +442,6 @@ class BaseTask(object):
|
|
|
exchange=self.exchange,
|
|
|
routing_key=self.routing_key)
|
|
|
|
|
|
- @classmethod
|
|
|
def delay(self, *args, **kwargs):
|
|
|
"""Star argument version of :meth:`apply_async`.
|
|
|
|
|
@@ -432,7 +455,6 @@ class BaseTask(object):
|
|
|
"""
|
|
|
return self.apply_async(args, kwargs)
|
|
|
|
|
|
- @classmethod
|
|
|
def apply_async(self, args=None, kwargs=None,
|
|
|
task_id=None, publisher=None, connection=None,
|
|
|
router=None, queues=None, **options):
|
|
@@ -544,7 +566,6 @@ class BaseTask(object):
|
|
|
|
|
|
return self.AsyncResult(task_id)
|
|
|
|
|
|
- @classmethod
|
|
|
def retry(self, args=None, kwargs=None, exc=None, throw=True,
|
|
|
eta=None, countdown=None, max_retries=None, **options):
|
|
|
"""Retry the task.
|
|
@@ -632,7 +653,6 @@ class BaseTask(object):
|
|
|
eta and "Retry at %s" % (eta, )
|
|
|
or "Retry in %s secs." % (countdown, ), exc)
|
|
|
|
|
|
- @classmethod
|
|
|
def apply(self, args=None, kwargs=None, **options):
|
|
|
"""Execute this task locally, by blocking until the task returns.
|
|
|
|
|
@@ -685,7 +705,6 @@ class BaseTask(object):
|
|
|
state, tb = info.state, info.strtb
|
|
|
return EagerResult(task_id, retval, state, traceback=tb)
|
|
|
|
|
|
- @classmethod
|
|
|
def AsyncResult(self, task_id):
|
|
|
"""Get AsyncResult instance for this kind of task.
|
|
|
|
|
@@ -799,7 +818,6 @@ class BaseTask(object):
|
|
|
"""`repr(task)`"""
|
|
|
return "<@task: %s>" % (self.name, )
|
|
|
|
|
|
- @classmethod
|
|
|
def subtask(cls, *args, **kwargs):
|
|
|
"""Returns :class:`~celery.task.sets.subtask` object for
|
|
|
this task, wrapping arguments and execution options
|