|
@@ -11,29 +11,33 @@ from celery.conf import AMQP_CONNECTION_TIMEOUT
|
|
|
from celery.utils import gen_unique_id, noop, fun_takes_kwargs
|
|
|
from celery.result import AsyncResult, EagerResult
|
|
|
from celery.registry import tasks
|
|
|
-from celery.messaging import TaskPublisher
|
|
|
+from celery.messaging import TaskPublisher, with_connection
|
|
|
from celery.exceptions import RetryTaskError
|
|
|
from celery.datastructures import ExceptionInfo
|
|
|
|
|
|
+TASK_EXEC_OPTIONS = ("routing_key", "exchange",
|
|
|
+ "immediate", "mandatory",
|
|
|
+ "priority", "serializer")
|
|
|
+
|
|
|
|
|
|
def apply_async(task, args=None, kwargs=None, countdown=None, eta=None,
|
|
|
- routing_key=None, exchange=None, task_id=None,
|
|
|
- immediate=None, mandatory=None, priority=None, connection=None,
|
|
|
- connect_timeout=AMQP_CONNECTION_TIMEOUT, serializer=None, **opts):
|
|
|
+ task_id=None, publisher=None, connection=None, connect_timeout=None,
|
|
|
+ **options):
|
|
|
"""Run a task asynchronously by the celery daemon(s).
|
|
|
|
|
|
:param task: The task to run (a callable object, or a :class:`Task`
|
|
|
instance
|
|
|
|
|
|
- :param args: The positional arguments to pass on to the task (a ``list``).
|
|
|
+ :keyword args: The positional arguments to pass on to the
|
|
|
+ task (a ``list``).
|
|
|
|
|
|
- :param kwargs: The keyword arguments to pass on to the task (a ``dict``)
|
|
|
+ :keyword kwargs: The keyword arguments to pass on to the task (a ``dict``)
|
|
|
|
|
|
- :param countdown: Number of seconds into the future that the task should
|
|
|
+ :keyword countdown: Number of seconds into the future that the task should
|
|
|
execute. Defaults to immediate delivery (Do not confuse that with
|
|
|
the ``immediate`` setting, they are unrelated).
|
|
|
|
|
|
- :param eta: A :class:`datetime.datetime` object that describes the
|
|
|
+ :keyword eta: A :class:`datetime.datetime` object that describes the
|
|
|
absolute time when the task should execute. May not be specified
|
|
|
if ``countdown`` is also supplied. (Do not confuse this with the
|
|
|
``immediate`` setting, they are unrelated).
|
|
@@ -70,50 +74,29 @@ def apply_async(task, args=None, kwargs=None, countdown=None, eta=None,
|
|
|
replaced by a local :func:`apply` call instead.
|
|
|
|
|
|
"""
|
|
|
- args = args or []
|
|
|
- kwargs = kwargs or {}
|
|
|
- routing_key = routing_key or getattr(task, "routing_key", None)
|
|
|
- exchange = exchange or getattr(task, "exchange", None)
|
|
|
- if immediate is None:
|
|
|
- immediate = getattr(task, "immediate", None)
|
|
|
- if mandatory is None:
|
|
|
- mandatory = getattr(task, "mandatory", None)
|
|
|
- if priority is None:
|
|
|
- priority = getattr(task, "priority", None)
|
|
|
- serializer = serializer or getattr(task, "serializer", None)
|
|
|
- taskset_id = opts.get("taskset_id")
|
|
|
- publisher = opts.get("publisher")
|
|
|
- retries = opts.get("retries", 0)
|
|
|
- if countdown:
|
|
|
- eta = datetime.now() + timedelta(seconds=countdown)
|
|
|
-
|
|
|
from celery.conf import ALWAYS_EAGER
|
|
|
if ALWAYS_EAGER:
|
|
|
return apply(task, args, kwargs)
|
|
|
|
|
|
- need_to_close_connection = False
|
|
|
- if not publisher:
|
|
|
- if not connection:
|
|
|
- connection = DjangoBrokerConnection(
|
|
|
- connect_timeout=connect_timeout)
|
|
|
- need_to_close_connection = True
|
|
|
- publisher = TaskPublisher(connection=connection)
|
|
|
-
|
|
|
- delay_task = publisher.delay_task
|
|
|
- if taskset_id:
|
|
|
- delay_task = curry(publisher.delay_task_in_set, taskset_id)
|
|
|
-
|
|
|
- task_id = delay_task(task.name, args, kwargs,
|
|
|
- task_id=task_id, retries=retries,
|
|
|
- routing_key=routing_key, exchange=exchange,
|
|
|
- mandatory=mandatory, immediate=immediate,
|
|
|
- serializer=serializer, priority=priority,
|
|
|
- eta=eta)
|
|
|
-
|
|
|
- if need_to_close_connection:
|
|
|
- publisher.close()
|
|
|
- connection.close()
|
|
|
+ for option_name in TASK_EXEC_OPTIONS:
|
|
|
+ if option_name not in options:
|
|
|
+ options[option_name] = getattr(task, option_name, None)
|
|
|
+
|
|
|
+ if countdown: # Convert countdown to ETA.
|
|
|
+ eta = datetime.now() + timedelta(seconds=countdown)
|
|
|
|
|
|
+ def _delay_task(connection):
|
|
|
+ publish = publisher or TaskPublisher(connection)
|
|
|
+ try:
|
|
|
+ return publish.delay_task(task.name, args or [], kwargs or {},
|
|
|
+ task_id=task_id,
|
|
|
+ eta=eta,
|
|
|
+ **options)
|
|
|
+ finally:
|
|
|
+ publisher or publish.close()
|
|
|
+
|
|
|
+ task_id = with_connection(_delay_task, connection=connection,
|
|
|
+ connect_timeout=connect_timeout)
|
|
|
return AsyncResult(task_id)
|
|
|
|
|
|
|