|
@@ -27,7 +27,7 @@ from celery.exceptions import AlwaysEagerIgnored, ImproperlyConfigured
|
|
|
from celery.loaders import get_loader_cls
|
|
|
from celery.local import PromiseProxy, maybe_evaluate
|
|
|
from celery._state import _task_stack, _tls, get_current_app, _register_app
|
|
|
-from celery.utils.functional import first
|
|
|
+from celery.utils.functional import first, maybe_list
|
|
|
from celery.utils.imports import instantiate, symbol_by_name
|
|
|
|
|
|
from .annotations import prepare as prepare_annotations
|
|
@@ -230,6 +230,7 @@ class Celery(object):
|
|
|
def send_task(self, name, args=None, kwargs=None, countdown=None,
|
|
|
eta=None, task_id=None, producer=None, connection=None,
|
|
|
result_cls=None, expires=None, queues=None, publisher=None,
|
|
|
+ link=None, link_error=None,
|
|
|
**options):
|
|
|
producer = producer or publisher # XXX compat
|
|
|
if self.conf.CELERY_ALWAYS_EAGER: # pragma: no cover
|
|
@@ -242,14 +243,12 @@ class Celery(object):
|
|
|
self.conf.CELERY_MESSAGE_COMPRESSION)
|
|
|
options = router.route(options, name, args, kwargs)
|
|
|
with self.producer_or_acquire(producer) as producer:
|
|
|
- cb = None
|
|
|
- if 'callbacks' not in options:
|
|
|
- cb = options.get('link')
|
|
|
return result_cls(producer.publish_task(
|
|
|
name, args, kwargs,
|
|
|
task_id=task_id,
|
|
|
countdown=countdown, eta=eta,
|
|
|
- callbacks=cb,
|
|
|
+ callbacks=maybe_list(link),
|
|
|
+ errbacks=maybe_list(link_error),
|
|
|
expires=expires, **options
|
|
|
))
|
|
|
|