|
@@ -244,17 +244,7 @@ class Signature(dict):
|
|
args, kwargs, options = self._merge(args, kwargs, options)
|
|
args, kwargs, options = self._merge(args, kwargs, options)
|
|
else:
|
|
else:
|
|
args, kwargs, options = self.args, self.kwargs, self.options
|
|
args, kwargs, options = self.args, self.kwargs, self.options
|
|
- route_name = route_name or self.route_name_for(args, kwargs, options)
|
|
|
|
- return _apply(args, kwargs, route_name=route_name, **options)
|
|
|
|
-
|
|
|
|
- def route_name_for(self, args, kwargs, options):
|
|
|
|
- """Can be used to override the name used for routing the task
|
|
|
|
- to a queue.
|
|
|
|
-
|
|
|
|
- If this returns :const:`None` the name of the task will be used.
|
|
|
|
-
|
|
|
|
- """
|
|
|
|
- pass
|
|
|
|
|
|
+ return _apply(args, kwargs, **options)
|
|
|
|
|
|
def append_to_list_option(self, key, value):
|
|
def append_to_list_option(self, key, value):
|
|
items = self.options.setdefault(key, [])
|
|
items = self.options.setdefault(key, [])
|
|
@@ -504,15 +494,12 @@ class _basemap(Signature):
|
|
{'task': task, 'it': regen(it)}, immutable=True, **options
|
|
{'task': task, 'it': regen(it)}, immutable=True, **options
|
|
)
|
|
)
|
|
|
|
|
|
- def route_name_for(self, args, kwargs, options):
|
|
|
|
- return task_name_from(self.kwargs.get('task'))
|
|
|
|
-
|
|
|
|
def apply_async(self, args=(), kwargs={}, **opts):
|
|
def apply_async(self, args=(), kwargs={}, **opts):
|
|
# need to evaluate generators
|
|
# need to evaluate generators
|
|
task, it = self._unpack_args(self.kwargs)
|
|
task, it = self._unpack_args(self.kwargs)
|
|
return self.type.apply_async(
|
|
return self.type.apply_async(
|
|
(), {'task': task, 'it': list(it)},
|
|
(), {'task': task, 'it': list(it)},
|
|
- route_name=self.route_name_for(args, kwargs, opts), **opts
|
|
|
|
|
|
+ route_name=task_name_from(self.kwargs.get('task')), **opts
|
|
)
|
|
)
|
|
|
|
|
|
@classmethod
|
|
@classmethod
|
|
@@ -555,13 +542,10 @@ class chunks(Signature):
|
|
def from_dict(self, d, app=None):
|
|
def from_dict(self, d, app=None):
|
|
return chunks(*self._unpack_args(d['kwargs']), app=app, **d['options'])
|
|
return chunks(*self._unpack_args(d['kwargs']), app=app, **d['options'])
|
|
|
|
|
|
- def route_name_for(self, args, kwargs, options):
|
|
|
|
- return task_name_from(self.kwargs.get('task'))
|
|
|
|
-
|
|
|
|
def apply_async(self, args=(), kwargs={}, **opts):
|
|
def apply_async(self, args=(), kwargs={}, **opts):
|
|
return self.group().apply_async(
|
|
return self.group().apply_async(
|
|
args, kwargs,
|
|
args, kwargs,
|
|
- route_name=self.route_name_for(args, kwargs, opts), **opts
|
|
|
|
|
|
+ route_name=task_name_from(self.kwargs.get('task')), **opts
|
|
)
|
|
)
|
|
|
|
|
|
def __call__(self, **options):
|
|
def __call__(self, **options):
|
|
@@ -833,6 +817,9 @@ class chord(Signature):
|
|
root_id = body.options.get('root_id')
|
|
root_id = body.options.get('root_id')
|
|
if 'chord_size' not in body:
|
|
if 'chord_size' not in body:
|
|
body['chord_size'] = self.__length_hint__()
|
|
body['chord_size'] = self.__length_hint__()
|
|
|
|
+ options = dict(self.options, **options) if options else self.options
|
|
|
|
+ if options:
|
|
|
|
+ body.options.update(options)
|
|
|
|
|
|
results = header.freeze(
|
|
results = header.freeze(
|
|
group_id=group_id, chord=body, root_id=root_id).results
|
|
group_id=group_id, chord=body, root_id=root_id).results
|
|
@@ -841,7 +828,8 @@ class chord(Signature):
|
|
parent = app.backend.apply_chord(
|
|
parent = app.backend.apply_chord(
|
|
header, partial_args, group_id, body,
|
|
header, partial_args, group_id, body,
|
|
interval=interval, countdown=countdown,
|
|
interval=interval, countdown=countdown,
|
|
- max_retries=max_retries, propagate=propagate, result=results)
|
|
|
|
|
|
+ options=options, max_retries=max_retries,
|
|
|
|
+ propagate=propagate, result=results)
|
|
bodyres.parent = parent
|
|
bodyres.parent = parent
|
|
return bodyres
|
|
return bodyres
|
|
|
|
|