|
@@ -5,8 +5,9 @@
|
|
|
|
|
|
Composing task workflows.
|
|
|
|
|
|
- Documentation for these functions are in :mod:`celery`.
|
|
|
- You should not import from this module directly.
|
|
|
+ Documentation for some of these types are in :mod:`celery`.
|
|
|
+ You should not import these from :mod:`celery` and not this module.
|
|
|
+
|
|
|
|
|
|
"""
|
|
|
from __future__ import absolute_import
|
|
@@ -82,8 +83,9 @@ class Signature(dict):
|
|
|
"""Class that wraps the arguments and execution options
|
|
|
for a single task invocation.
|
|
|
|
|
|
- Used as the parts in a :class:`group` or to safely
|
|
|
- pass tasks around as callbacks.
|
|
|
+ Used as the parts in a :class:`group` and other constructs,
|
|
|
+ or to pass tasks around as callbacks while being compatible
|
|
|
+ with serializers with a strict type subset.
|
|
|
|
|
|
:param task: Either a task class/instance, or the name of a task.
|
|
|
:keyword args: Positional arguments to apply.
|
|
@@ -282,6 +284,7 @@ class Signature(dict):
|
|
|
immutable = _getitem_property('immutable')
|
|
|
|
|
|
|
|
|
+@Signature.register_type
|
|
|
class chain(Signature):
|
|
|
|
|
|
def __init__(self, *tasks, **options):
|
|
@@ -314,7 +317,6 @@ class chain(Signature):
|
|
|
|
|
|
def __repr__(self):
|
|
|
return ' | '.join(repr(t) for t in self.tasks)
|
|
|
-Signature.register_type(chain)
|
|
|
|
|
|
|
|
|
class _basemap(Signature):
|
|
@@ -339,6 +341,7 @@ class _basemap(Signature):
|
|
|
return cls(*cls._unpack_args(d['kwargs']), **d['options'])
|
|
|
|
|
|
|
|
|
+@Signature.register_type
|
|
|
class xmap(_basemap):
|
|
|
_task_name = 'celery.map'
|
|
|
|
|
@@ -346,9 +349,9 @@ class xmap(_basemap):
|
|
|
task, it = self._unpack_args(self.kwargs)
|
|
|
return '[{0}(x) for x in {1}]'.format(task.task,
|
|
|
truncate(repr(it), 100))
|
|
|
-Signature.register_type(xmap)
|
|
|
|
|
|
|
|
|
+@Signature.register_type
|
|
|
class xstarmap(_basemap):
|
|
|
_task_name = 'celery.starmap'
|
|
|
|
|
@@ -356,9 +359,9 @@ class xstarmap(_basemap):
|
|
|
task, it = self._unpack_args(self.kwargs)
|
|
|
return '[{0}(*x) for x in {1}]'.format(task.task,
|
|
|
truncate(repr(it), 100))
|
|
|
-Signature.register_type(xstarmap)
|
|
|
|
|
|
|
|
|
+@Signature.register_type
|
|
|
class chunks(Signature):
|
|
|
_unpack_args = itemgetter('task', 'it', 'n')
|
|
|
|
|
@@ -387,7 +390,6 @@ class chunks(Signature):
|
|
|
@classmethod
|
|
|
def apply_chunks(cls, task, it, n):
|
|
|
return cls(task, it, n)()
|
|
|
-Signature.register_type(chunks)
|
|
|
|
|
|
|
|
|
def _maybe_group(tasks):
|
|
@@ -400,6 +402,7 @@ def _maybe_group(tasks):
|
|
|
return tasks
|
|
|
|
|
|
|
|
|
+@Signature.register_type
|
|
|
class group(Signature):
|
|
|
|
|
|
def __init__(self, *tasks, **options):
|
|
@@ -456,9 +459,9 @@ class group(Signature):
|
|
|
|
|
|
def __repr__(self):
|
|
|
return repr(self.tasks)
|
|
|
-Signature.register_type(group)
|
|
|
|
|
|
|
|
|
+Signature.register_type
|
|
|
class chord(Signature):
|
|
|
|
|
|
def __init__(self, header, body=None, task='celery.chord',
|
|
@@ -520,7 +523,6 @@ class chord(Signature):
|
|
|
|
|
|
tasks = _getitem_property('kwargs.header')
|
|
|
body = _getitem_property('kwargs.body')
|
|
|
-Signature.register_type(chord)
|
|
|
|
|
|
|
|
|
def subtask(varies, *args, **kwargs):
|