__init__.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. from celery import conf
  2. from celery.datastructures import ExceptionInfo
  3. from celery.execute.trace import TaskTrace
  4. from celery.messaging import with_connection
  5. from celery.messaging import TaskPublisher
  6. from celery.registry import tasks
  7. from celery.result import AsyncResult, EagerResult
  8. from celery.routes import Router
  9. from celery.utils import gen_unique_id, fun_takes_kwargs, mattrgetter
  10. extract_exec_options = mattrgetter("queue", "routing_key", "exchange",
  11. "immediate", "mandatory",
  12. "priority", "serializer",
  13. "delivery_mode")
  14. @with_connection
  15. def apply_async(task, args=None, kwargs=None, countdown=None, eta=None,
  16. task_id=None, publisher=None, connection=None, connect_timeout=None,
  17. router=None, expires=None, **options):
  18. """Run a task asynchronously by the celery daemon(s).
  19. :param task: The :class:`~celery.task.base.Task` to run.
  20. :keyword args: The positional arguments to pass on to the
  21. task (a :class:`list` or :class:`tuple`).
  22. :keyword kwargs: The keyword arguments to pass on to the
  23. task (a :class:`dict`)
  24. :keyword countdown: Number of seconds into the future that the task should
  25. execute. Defaults to immediate delivery (Do not confuse that with
  26. the ``immediate`` setting, they are unrelated).
  27. :keyword eta: A :class:`~datetime.datetime` object that describes the
  28. absolute time and date of when the task should execute. May not be
  29. specified if ``countdown`` is also supplied. (Do not confuse this
  30. with the ``immediate`` setting, they are unrelated).
  31. :keyword expires: Either a :class:`int`, describing the number of seconds,
  32. or a :class:`~datetime.datetime` object that describes the absolute time
  33. and date of when the task should expire.
  34. The task will not be executed after the expiration time.
  35. :keyword connection: Re-use existing broker connection instead
  36. of establishing a new one. The ``connect_timeout`` argument is
  37. not respected if this is set.
  38. :keyword connect_timeout: The timeout in seconds, before we give up
  39. on establishing a connection to the AMQP server.
  40. :keyword routing_key: The routing key used to route the task to a worker
  41. server. Defaults to the tasks :attr:`~celery.task.base.Task.exchange`
  42. attribute.
  43. :keyword exchange: The named exchange to send the task to. Defaults to
  44. the tasks :attr:`~celery.task.base.Task.exchange` attribute.
  45. :keyword exchange_type: The exchange type to initalize the exchange as
  46. if not already declared. Defaults to the tasks
  47. :attr:`~celery.task.base.Task.exchange_type` attribute.
  48. :keyword immediate: Request immediate delivery. Will raise an exception
  49. if the task cannot be routed to a worker immediately.
  50. (Do not confuse this parameter with the ``countdown`` and ``eta``
  51. settings, as they are unrelated). Defaults to the tasks
  52. :attr:`~celery.task.base.Task.immediate` attribute.
  53. :keyword mandatory: Mandatory routing. Raises an exception if there's
  54. no running workers able to take on this task. Defaults to the tasks
  55. :attr:`~celery.task.base.Task.mandatory` attribute.
  56. :keyword priority: The task priority, a number between ``0`` and ``9``.
  57. Defaults to the tasks :attr:`~celery.task.base.Task.priority` attribute.
  58. :keyword serializer: A string identifying the default serialization
  59. method to use. Defaults to the :setting:`CELERY_TASK_SERIALIZER` setting.
  60. Can be ``pickle`` ``json``, ``yaml``, or any custom serialization
  61. methods that have been registered with
  62. :mod:`carrot.serialization.registry`. Defaults to the tasks
  63. :attr:`~celery.task.base.Task.serializer` attribute.
  64. **Note**: If the :setting:`CELERY_ALWAYS_EAGER` setting is set, it will be
  65. replaced by a local :func:`apply` call instead.
  66. """
  67. router = router or Router(conf.ROUTES, conf.get_queues(),
  68. conf.CREATE_MISSING_QUEUES)
  69. if conf.ALWAYS_EAGER:
  70. return apply(task, args, kwargs, task_id=task_id)
  71. task = tasks[task.name] # get instance from registry
  72. options = dict(extract_exec_options(task), **options)
  73. options = router.route(options, task.name, args, kwargs)
  74. exchange = options.get("exchange")
  75. exchange_type = options.get("exchange_type")
  76. if not expires: # If expires has not been passed to this function, get it from the task (it may still be None)
  77. expires = task.expires
  78. publish = publisher or task.get_publisher(connection, exchange=exchange,
  79. exchange_type=exchange_type)
  80. try:
  81. task_id = publish.delay_task(task.name, args, kwargs, task_id=task_id,
  82. countdown=countdown, eta=eta,
  83. expires=expires, **options)
  84. finally:
  85. publisher or publish.close()
  86. return task.AsyncResult(task_id)
  87. @with_connection
  88. def send_task(name, args=None, kwargs=None, countdown=None, eta=None,
  89. task_id=None, publisher=None, connection=None, connect_timeout=None,
  90. result_cls=AsyncResult, expires=None, **options):
  91. """Send task by name.
  92. Useful if you don't have access to the :class:`~celery.task.base.Task`
  93. class.
  94. :param name: Name of task to execute.
  95. Supports the same arguments as :func:`apply_async`.
  96. """
  97. exchange = options.get("exchange")
  98. exchange_type = options.get("exchange_type")
  99. publish = publisher or TaskPublisher(connection, exchange=exchange,
  100. exchange_type=exchange_type)
  101. try:
  102. task_id = publish.delay_task(name, args, kwargs, task_id=task_id,
  103. countdown=countdown, eta=eta,
  104. expires=expires, **options)
  105. finally:
  106. publisher or publish.close()
  107. return result_cls(task_id)
  108. def delay_task(task_name, *args, **kwargs):
  109. """Delay a task for execution by the ``celery`` daemon.
  110. :param task_name: the name of a task registered in the task registry.
  111. :param \*args: positional arguments to pass on to the task.
  112. :param \*\*kwargs: keyword arguments to pass on to the task.
  113. :raises celery.exceptions.NotRegistered: exception if no such task
  114. has been registered in the task registry.
  115. :returns :class:`celery.result.AsyncResult`:
  116. Example
  117. >>> r = delay_task("update_record", name="George Costanza", age=32)
  118. >>> r.ready()
  119. True
  120. >>> r.result
  121. "Record was updated"
  122. """
  123. return apply_async(tasks[task_name], args, kwargs)
  124. def apply(task, args, kwargs, **options):
  125. """Apply the task locally.
  126. :keyword throw: Re-raise task exceptions. Defaults to
  127. the :setting:`CELERY_EAGER_PROPAGATES_EXCEPTIONS` setting.
  128. This will block until the task completes, and returns a
  129. :class:`celery.result.EagerResult` instance.
  130. """
  131. args = args or []
  132. kwargs = kwargs or {}
  133. task_id = options.get("task_id") or gen_unique_id()
  134. retries = options.get("retries", 0)
  135. throw = options.pop("throw", conf.EAGER_PROPAGATES_EXCEPTIONS)
  136. task = tasks[task.name] # make sure we get the instance, not class.
  137. default_kwargs = {"task_name": task.name,
  138. "task_id": task_id,
  139. "task_retries": retries,
  140. "task_is_eager": True,
  141. "logfile": options.get("logfile"),
  142. "delivery_info": {"is_eager": True},
  143. "loglevel": options.get("loglevel", 0)}
  144. supported_keys = fun_takes_kwargs(task.run, default_kwargs)
  145. extend_with = dict((key, val) for key, val in default_kwargs.items()
  146. if key in supported_keys)
  147. kwargs.update(extend_with)
  148. trace = TaskTrace(task.name, task_id, args, kwargs, task=task)
  149. retval = trace.execute()
  150. if isinstance(retval, ExceptionInfo):
  151. if throw:
  152. raise retval.exception
  153. retval = retval.exception
  154. return EagerResult(task_id, retval, trace.status, traceback=trace.strtb)