|
@@ -16,10 +16,14 @@ from itertools import chain as _chain
|
|
|
from kombu.utils import fxrange, kwdict, reprcall
|
|
|
|
|
|
from celery import current_app
|
|
|
-from celery.local import Proxy, regen
|
|
|
+from celery.local import Proxy
|
|
|
from celery.utils import cached_property, uuid
|
|
|
-from celery.utils.functional import maybe_list, is_list, chunks as _chunks
|
|
|
from celery.utils.compat import chain_from_iterable
|
|
|
+from celery.utils.functional import (
|
|
|
+ maybe_list, is_list, regen,
|
|
|
+ chunks as _chunks,
|
|
|
+)
|
|
|
+from celery.utils.text import truncate
|
|
|
|
|
|
Chord = Proxy(lambda: current_app.tasks["celery.chord"])
|
|
|
|
|
@@ -111,9 +115,9 @@ class Signature(dict):
|
|
|
|
|
|
def clone(self, args=(), kwargs={}, **options):
|
|
|
args, kwargs, options = self._merge(args, kwargs, options)
|
|
|
- s = self.from_dict({"task": self.task, "args": args,
|
|
|
- "kwargs": kwargs, "options": options,
|
|
|
- "subtask_type": self.subtask_type})
|
|
|
+ s = Signature.from_dict({"task": self.task, "args": args,
|
|
|
+ "kwargs": kwargs, "options": options,
|
|
|
+ "subtask_type": self.subtask_type})
|
|
|
s._type = self._type
|
|
|
return s
|
|
|
partial = clone
|
|
@@ -216,10 +220,11 @@ class _basemap(Signature):
|
|
|
Signature.__init__(self, self._task_name, (),
|
|
|
{"task": task, "it": regen(it)}, **options)
|
|
|
|
|
|
- def apply_async(self, *args, **kwargs):
|
|
|
+ def apply_async(self, args=(), kwargs={}, **opts):
|
|
|
# need to evaluate generators
|
|
|
task, it = self._unpack_args(self.kwargs)
|
|
|
- return self.type.apply_async((), {"task": task, "it": list(it)})
|
|
|
+ return self.type.apply_async((),
|
|
|
+ {"task": task, "it": list(it)}, **opts)
|
|
|
|
|
|
@classmethod
|
|
|
def from_dict(self, d):
|
|
@@ -231,7 +236,7 @@ class xmap(_basemap):
|
|
|
|
|
|
def __repr__(self):
|
|
|
task, it = self._unpack_args(self.kwargs)
|
|
|
- return "[%s(x) for x in %r]" % (task.name, it)
|
|
|
+ return "[%s(x) for x in %s]" % (task.task, truncate(repr(it), 100))
|
|
|
Signature.register_type(xmap)
|
|
|
|
|
|
|
|
@@ -240,7 +245,7 @@ class xstarmap(_basemap):
|
|
|
|
|
|
def __repr__(self):
|
|
|
task, it = self._unpack_args(self.kwargs)
|
|
|
- return "[%s(*x) for x in %r]" % (task.name, it)
|
|
|
+ return "[%s(*x) for x in %s]" % (task.task, truncate(repr(it), 100))
|
|
|
Signature.register_type(xstarmap)
|
|
|
|
|
|
|
|
@@ -255,12 +260,18 @@ class chunks(Signature):
|
|
|
def from_dict(self, d):
|
|
|
return chunks(*self._unpack_args(d["kwargs"]), **d["options"])
|
|
|
|
|
|
+ def apply_async(self, args=(), kwargs={}, **opts):
|
|
|
+ # need to evaluate generators
|
|
|
+ task, it, n = self._unpack_args(self.kwargs)
|
|
|
+ return self.type.apply_async((),
|
|
|
+ {"task": task, "it": list(it), "n": n}, **opts)
|
|
|
+
|
|
|
def __call__(self, **options):
|
|
|
return self.group()(**options)
|
|
|
|
|
|
def group(self):
|
|
|
task, it, n = self._unpack_args(self.kwargs)
|
|
|
- return group(xstarmap.s(task, part) for part in _chunks(iter(it), n))
|
|
|
+ return group(xstarmap(task, part) for part in _chunks(iter(it), n))
|
|
|
|
|
|
@classmethod
|
|
|
def apply_chunks(cls, task, it, n):
|