|
@@ -3,12 +3,51 @@ from celery import conf
|
|
|
from celery.messaging import TaskPublisher, TaskConsumer
|
|
|
from celery.log import setup_logger
|
|
|
from celery.result import TaskSetResult, EagerResult
|
|
|
-from celery.execute import apply_async, delay_task, apply
|
|
|
+from celery.execute import apply_async, apply
|
|
|
from celery.utils import gen_unique_id, get_full_cls_name
|
|
|
from celery.registry import tasks
|
|
|
from celery.serialization import pickle
|
|
|
from celery.exceptions import MaxRetriesExceededError, RetryTaskError
|
|
|
from datetime import timedelta
|
|
|
+import sys
|
|
|
+
|
|
|
+
|
|
|
+class TaskType(type):
|
|
|
+ """Metaclass for tasks.
|
|
|
+
|
|
|
+ Automatically registers the task in the task registry, except
|
|
|
+ if the ``abstract`` attribute is set.
|
|
|
+
|
|
|
+ If no ``name`` attribute is provieded, the name is automatically
|
|
|
+ set to the name of the module it was defined in, and the class name.
|
|
|
+
|
|
|
+ """
|
|
|
+
|
|
|
+ def __new__(cls, name, bases, attrs):
|
|
|
+ super_new = super(TaskType, cls).__new__
|
|
|
+ task_module = attrs["__module__"]
|
|
|
+
|
|
|
+ # Abstract class, remove the abstract attribute so the
|
|
|
+ # any class inheriting from this won't be abstract by default.
|
|
|
+ if attrs.pop("abstract", None):
|
|
|
+ return super_new(cls, name, bases, attrs)
|
|
|
+
|
|
|
+ # Automatically generate missing name.
|
|
|
+ if not attrs.get("name"):
|
|
|
+ task_module = sys.modules[task_module]
|
|
|
+ task_name = ".".join([task_module.__name__, name])
|
|
|
+ attrs["name"] = task_name
|
|
|
+
|
|
|
+ # Because of the way import happens (recursively)
|
|
|
+ # we may or may not be the first time the task tries to register
|
|
|
+ # with the framework. There should only be one class for each task
|
|
|
+ # name, so we always return the registered version.
|
|
|
+
|
|
|
+ task_name = attrs["name"]
|
|
|
+ if task_name not in tasks:
|
|
|
+ task_cls = super_new(cls, name, bases, attrs)
|
|
|
+ tasks.register(task_cls)
|
|
|
+ return tasks[task_name].__class__
|
|
|
|
|
|
|
|
|
class Task(object):
|
|
@@ -26,7 +65,12 @@ class Task(object):
|
|
|
|
|
|
*REQUIRED* All subclasses of :class:`Task` has to define the
|
|
|
:attr:`name` attribute. This is the name of the task, registered
|
|
|
- in the task registry, and passed to :func:`delay_task`.
|
|
|
+ in the task registry, and passed on to the workers.
|
|
|
+
|
|
|
+ .. attribute:: abstract
|
|
|
+
|
|
|
+ Abstract classes are not registered in the task registry, so they're
|
|
|
+ only used for making new tasks by subclassing.
|
|
|
|
|
|
.. attribute:: type
|
|
|
|
|
@@ -108,7 +152,6 @@ class Task(object):
|
|
|
... logger.info("Running MyTask with arg some_arg=%s" %
|
|
|
... some_arg))
|
|
|
... return 42
|
|
|
- ... tasks.register(MyTask)
|
|
|
|
|
|
You can delay the task using the classmethod :meth:`delay`...
|
|
|
|
|
@@ -118,15 +161,12 @@ class Task(object):
|
|
|
>>> result.result
|
|
|
42
|
|
|
|
|
|
- ...or using the :func:`delay_task` function, by passing the name of
|
|
|
- the task.
|
|
|
-
|
|
|
- >>> from celery.task import delay_task
|
|
|
- >>> result = delay_task(MyTask.name, some_arg="foo")
|
|
|
-
|
|
|
|
|
|
"""
|
|
|
+ __metaclass__ = TaskType
|
|
|
+
|
|
|
name = None
|
|
|
+ abstract = True
|
|
|
type = "regular"
|
|
|
exchange = None
|
|
|
routing_key = None
|
|
@@ -253,7 +293,8 @@ class Task(object):
|
|
|
|
|
|
@classmethod
|
|
|
def delay(cls, *args, **kwargs):
|
|
|
- """Delay this task for execution by the ``celery`` daemon(s).
|
|
|
+ """Shortcut to :meth:`apply_async` but with star arguments,
|
|
|
+ and doesn't support the extra options.
|
|
|
|
|
|
:param \*args: positional arguments passed on to the task.
|
|
|
|
|
@@ -261,8 +302,6 @@ class Task(object):
|
|
|
|
|
|
:rtype: :class:`celery.result.AsyncResult`
|
|
|
|
|
|
- See :func:`celery.execute.delay_task`.
|
|
|
-
|
|
|
"""
|
|
|
return apply_async(cls, args, kwargs)
|
|
|
|
|
@@ -429,7 +468,6 @@ class ExecuteRemoteTask(Task):
|
|
|
"""
|
|
|
callable_ = pickle.loads(ser_callable)
|
|
|
return callable_(*fargs, **fkwargs)
|
|
|
-tasks.register(ExecuteRemoteTask)
|
|
|
|
|
|
|
|
|
class AsynchronousMapTask(Task):
|
|
@@ -441,7 +479,6 @@ class AsynchronousMapTask(Task):
|
|
|
"""The method run by ``celeryd``."""
|
|
|
timeout = kwargs.get("timeout")
|
|
|
return TaskSet.map(pickle.loads(serfunc), args, timeout=timeout)
|
|
|
-tasks.register(AsynchronousMapTask)
|
|
|
|
|
|
|
|
|
class TaskSet(object):
|
|
@@ -599,8 +636,6 @@ class PeriodicTask(Task):
|
|
|
:raises NotImplementedError: if the :attr:`run_every` attribute is
|
|
|
not defined.
|
|
|
|
|
|
- You have to register the periodic task in the task registry.
|
|
|
-
|
|
|
Example
|
|
|
|
|
|
>>> from celery.task import tasks, PeriodicTask
|
|
@@ -612,9 +647,9 @@ class PeriodicTask(Task):
|
|
|
... def run(self, **kwargs):
|
|
|
... logger = self.get_logger(**kwargs)
|
|
|
... logger.info("Running MyPeriodicTask")
|
|
|
- >>> tasks.register(MyPeriodicTask)
|
|
|
|
|
|
"""
|
|
|
+ abstract = True
|
|
|
run_every = timedelta(days=1)
|
|
|
type = "periodic"
|
|
|
|