|
@@ -8,11 +8,10 @@ from celery.registry import tasks
|
|
|
from celery.messaging import TaskPublisher, with_connection
|
|
|
|
|
|
extract_exec_options = mattrgetter("routing_key", "exchange",
|
|
|
- "immediate", "mandatory",
|
|
|
- "priority", "serializer")
|
|
|
+ "immediate", "mandatory",
|
|
|
+ "priority", "serializer")
|
|
|
|
|
|
|
|
|
-@with_connection
|
|
|
def apply_async(task, args=None, kwargs=None, countdown=None, eta=None,
|
|
|
task_id=None, publisher=None, connection=None, connect_timeout=None,
|
|
|
**options):
|
|
@@ -69,6 +68,16 @@ def apply_async(task, args=None, kwargs=None, countdown=None, eta=None,
|
|
|
"""
|
|
|
if conf.ALWAYS_EAGER:
|
|
|
return apply(task, args, kwargs)
|
|
|
+ return _apply_async(task, args=args, kwargs=kwargs, countdown=countdown,
|
|
|
+ eta=eta, task_id=task_id, publisher=publisher,
|
|
|
+ connection=connection,
|
|
|
+ connect_timeout=connect_timeout, **options)
|
|
|
+
|
|
|
+
|
|
|
+@with_connection
|
|
|
+def _apply_async(task, args=None, kwargs=None, countdown=None, eta=None,
|
|
|
+ task_id=None, publisher=None, connection=None, connect_timeout=None,
|
|
|
+ **options):
|
|
|
|
|
|
task = tasks[task.name] # Get instance.
|
|
|
exchange = options.get("exchange")
|