__init__.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. from celery import conf
  2. from celery.utils import gen_unique_id, fun_takes_kwargs, mattrgetter
  3. from celery.result import AsyncResult, EagerResult
  4. from celery.execute.trace import TaskTrace
  5. from celery.registry import tasks
  6. from celery.messaging import with_connection
  7. from celery.messaging import TaskPublisher
  8. from celery.datastructures import ExceptionInfo
  9. extract_exec_options = mattrgetter("routing_key", "exchange",
  10. "immediate", "mandatory",
  11. "priority", "serializer",
  12. "delivery_mode")
  13. @with_connection
  14. def apply_async(task, args=None, kwargs=None, countdown=None, eta=None,
  15. task_id=None, publisher=None, connection=None, connect_timeout=None,
  16. **options):
  17. """Run a task asynchronously by the celery daemon(s).
  18. :param task: The task to run (a callable object, or a :class:`Task`
  19. instance
  20. :keyword args: The positional arguments to pass on to the
  21. task (a ``list``).
  22. :keyword kwargs: The keyword arguments to pass on to the task (a ``dict``)
  23. :keyword countdown: Number of seconds into the future that the task should
  24. execute. Defaults to immediate delivery (Do not confuse that with
  25. the ``immediate`` setting, they are unrelated).
  26. :keyword eta: A :class:`datetime.datetime` object that describes the
  27. absolute time when the task should execute. May not be specified
  28. if ``countdown`` is also supplied. (Do not confuse this with the
  29. ``immediate`` setting, they are unrelated).
  30. :keyword routing_key: The routing key used to route the task to a worker
  31. server.
  32. :keyword exchange: The named exchange to send the task to. Defaults to
  33. :attr:`celery.task.base.Task.exchange`.
  34. :keyword exchange_type: The exchange type to initalize the exchange as
  35. if not already declared.
  36. Defaults to :attr:`celery.task.base.Task.exchange_type`.
  37. :keyword immediate: Request immediate delivery. Will raise an exception
  38. if the task cannot be routed to a worker immediately.
  39. (Do not confuse this parameter with the ``countdown`` and ``eta``
  40. settings, as they are unrelated).
  41. :keyword mandatory: Mandatory routing. Raises an exception if there's
  42. no running workers able to take on this task.
  43. :keyword connection: Re-use existing AMQP connection.
  44. The ``connect_timeout`` argument is not respected if this is set.
  45. :keyword connect_timeout: The timeout in seconds, before we give up
  46. on establishing a connection to the AMQP server.
  47. :keyword priority: The task priority, a number between ``0`` and ``9``.
  48. :keyword serializer: A string identifying the default serialization
  49. method to use. Defaults to the ``CELERY_TASK_SERIALIZER`` setting.
  50. Can be ``pickle`` ``json``, ``yaml``, or any custom serialization
  51. methods that have been registered with
  52. :mod:`carrot.serialization.registry`.
  53. **Note**: If the ``CELERY_ALWAYS_EAGER`` setting is set, it will be
  54. replaced by a local :func:`apply` call instead.
  55. """
  56. if conf.ALWAYS_EAGER:
  57. return apply(task, args, kwargs, task_id=task_id)
  58. task = tasks[task.name] # get instance from registry
  59. options = dict(extract_exec_options(task), **options)
  60. exchange = options.get("exchange")
  61. exchange_type = options.get("exchange_type")
  62. publish = publisher or task.get_publisher(connection, exchange=exchange,
  63. exchange_type=exchange_type)
  64. try:
  65. task_id = publish.delay_task(task.name, args, kwargs, task_id=task_id,
  66. countdown=countdown, eta=eta, **options)
  67. finally:
  68. publisher or publish.close()
  69. return task.AsyncResult(task_id)
  70. @with_connection
  71. def send_task(name, args=None, kwargs=None, countdown=None, eta=None,
  72. task_id=None, publisher=None, connection=None, connect_timeout=None,
  73. result_cls=AsyncResult, **options):
  74. exchange = options.get("exchange")
  75. exchange_type = options.get("exchange_type")
  76. publish = publisher or TaskPublisher(connection, exchange=exchange,
  77. exchange_type=exchange_type)
  78. try:
  79. task_id = publish.delay_task(name, args, kwargs, task_id=task_id,
  80. countdown=countdown, eta=eta, **options)
  81. finally:
  82. publisher or publish.close()
  83. return result_cls(task_id)
  84. def delay_task(task_name, *args, **kwargs):
  85. """Delay a task for execution by the ``celery`` daemon.
  86. :param task_name: the name of a task registered in the task registry.
  87. :param \*args: positional arguments to pass on to the task.
  88. :param \*\*kwargs: keyword arguments to pass on to the task.
  89. :raises celery.exceptions.NotRegistered: exception if no such task
  90. has been registered in the task registry.
  91. :returns: :class:`celery.result.AsyncResult`.
  92. Example
  93. >>> r = delay_task("update_record", name="George Constanza", age=32)
  94. >>> r.ready()
  95. True
  96. >>> r.result
  97. "Record was updated"
  98. """
  99. return apply_async(tasks[task_name], args, kwargs)
  100. def apply(task, args, kwargs, **options):
  101. """Apply the task locally.
  102. :keyword throw: Re-raise task exceptions. Defaults to
  103. the ``CELERY_EAGER_PROPAGATES_EXCEPTIONS`` setting.
  104. This will block until the task completes, and returns a
  105. :class:`celery.result.EagerResult` instance.
  106. """
  107. args = args or []
  108. kwargs = kwargs or {}
  109. task_id = options.get("task_id", gen_unique_id())
  110. retries = options.get("retries", 0)
  111. throw = options.pop("throw", conf.EAGER_PROPAGATES_EXCEPTIONS)
  112. task = tasks[task.name] # Make sure we get the instance, not class.
  113. default_kwargs = {"task_name": task.name,
  114. "task_id": task_id,
  115. "task_retries": retries,
  116. "task_is_eager": True,
  117. "logfile": options.get("logfile"),
  118. "delivery_info": {"is_eager": True},
  119. "loglevel": options.get("loglevel", 0)}
  120. supported_keys = fun_takes_kwargs(task.run, default_kwargs)
  121. extend_with = dict((key, val) for key, val in default_kwargs.items()
  122. if key in supported_keys)
  123. kwargs.update(extend_with)
  124. trace = TaskTrace(task.name, task_id, args, kwargs, task=task)
  125. retval = trace.execute()
  126. if isinstance(retval, ExceptionInfo):
  127. if throw:
  128. raise retval.exception
  129. retval = retval.exception
  130. return EagerResult(task_id, retval, trace.status, traceback=trace.strtb)