|
@@ -459,7 +459,8 @@ class Task(object):
|
|
|
args = (self.__self__, ) + tuple(args)
|
|
|
|
|
|
if conf.CELERY_ALWAYS_EAGER:
|
|
|
- return self.apply(args, kwargs, task_id=task_id, **options)
|
|
|
+ return self.apply(args, kwargs, task_id=task_id,
|
|
|
+ link=link, link_error=link_error, **options)
|
|
|
options = dict(extract_exec_options(self), **options)
|
|
|
options = router.route(options, self.name, args, kwargs)
|
|
|
|
|
@@ -580,7 +581,8 @@ class Task(object):
|
|
|
raise ret
|
|
|
return ret
|
|
|
|
|
|
- def apply(self, args=None, kwargs=None, **options):
|
|
|
+ def apply(self, args=None, kwargs=None,
|
|
|
+ link=None, link_error=None, **options):
|
|
|
"""Execute this task locally, by blocking until the task returns.
|
|
|
|
|
|
:param args: positional arguments passed on to the task.
|
|
@@ -614,6 +616,8 @@ class Task(object):
|
|
|
'is_eager': True,
|
|
|
'logfile': options.get('logfile'),
|
|
|
'loglevel': options.get('loglevel', 0),
|
|
|
+ 'callbacks': maybe_list(link),
|
|
|
+ 'errbacks': maybe_list(link_error),
|
|
|
'delivery_info': {'is_eager': True}}
|
|
|
if self.accept_magic_kwargs:
|
|
|
default_kwargs = {'task_name': task.name,
|