__init__.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. from datetime import datetime, timedelta
  2. from celery import conf
  3. from celery.utils import gen_unique_id, fun_takes_kwargs, mattrgetter
  4. from celery.result import AsyncResult, EagerResult
  5. from celery.execute.trace import TaskTrace
  6. from celery.registry import tasks
  7. from celery.messaging import with_connection
  8. from celery.messaging import TaskPublisher
  9. extract_exec_options = mattrgetter("routing_key", "exchange",
  10. "immediate", "mandatory",
  11. "priority", "serializer")
  12. @with_connection
  13. def apply_async(task, args=None, kwargs=None, countdown=None, eta=None,
  14. task_id=None, publisher=None, connection=None, connect_timeout=None,
  15. **options):
  16. """Run a task asynchronously by the celery daemon(s).
  17. :param task: The task to run (a callable object, or a :class:`Task`
  18. instance
  19. :keyword args: The positional arguments to pass on to the
  20. task (a ``list``).
  21. :keyword kwargs: The keyword arguments to pass on to the task (a ``dict``)
  22. :keyword countdown: Number of seconds into the future that the task should
  23. execute. Defaults to immediate delivery (Do not confuse that with
  24. the ``immediate`` setting, they are unrelated).
  25. :keyword eta: A :class:`datetime.datetime` object that describes the
  26. absolute time when the task should execute. May not be specified
  27. if ``countdown`` is also supplied. (Do not confuse this with the
  28. ``immediate`` setting, they are unrelated).
  29. :keyword routing_key: The routing key used to route the task to a worker
  30. server.
  31. :keyword exchange: The named exchange to send the task to. Defaults to
  32. :attr:`celery.task.base.Task.exchange`.
  33. :keyword immediate: Request immediate delivery. Will raise an exception
  34. if the task cannot be routed to a worker immediately.
  35. (Do not confuse this parameter with the ``countdown`` and ``eta``
  36. settings, as they are unrelated).
  37. :keyword mandatory: Mandatory routing. Raises an exception if there's
  38. no running workers able to take on this task.
  39. :keyword connection: Re-use existing AMQP connection.
  40. The ``connect_timeout`` argument is not respected if this is set.
  41. :keyword connect_timeout: The timeout in seconds, before we give up
  42. on establishing a connection to the AMQP server.
  43. :keyword priority: The task priority, a number between ``0`` and ``9``.
  44. :keyword serializer: A string identifying the default serialization
  45. method to use. Defaults to the ``CELERY_TASK_SERIALIZER`` setting.
  46. Can be ``pickle`` ``json``, ``yaml``, or any custom serialization
  47. methods that have been registered with
  48. :mod:`carrot.serialization.registry`.
  49. **Note**: If the ``CELERY_ALWAYS_EAGER`` setting is set, it will be
  50. replaced by a local :func:`apply` call instead.
  51. """
  52. if conf.ALWAYS_EAGER:
  53. return apply(task, args, kwargs, task_id=task_id)
  54. task = tasks[task.name] # get instance from registry
  55. options = dict(extract_exec_options(task), **options)
  56. exchange = options.get("exchange")
  57. publish = publisher or task.get_publisher(connection, exchange=exchange)
  58. try:
  59. task_id = publish.delay_task(task.name, args, kwargs, task_id=task_id,
  60. countdown=countdown, eta=eta, **options)
  61. finally:
  62. publisher or publish.close()
  63. return task.AsyncResult(task_id)
  64. @with_connection
  65. def send_task(name, args=None, kwargs=None, countdown=None, eta=None,
  66. task_id=None, publisher=None, connection=None, connect_timeout=None,
  67. result_cls=AsyncResult, **options):
  68. exchange = options.get("exchange")
  69. publish = publisher or TaskPublisher(connection, exchange=exchange)
  70. try:
  71. task_id = publish.delay_task(name, args, kwargs, task_id=task_id,
  72. countdown=countdown, eta=eta, **options)
  73. finally:
  74. publisher or publish.close()
  75. return result_cls(task_id)
  76. def delay_task(task_name, *args, **kwargs):
  77. """Delay a task for execution by the ``celery`` daemon.
  78. :param task_name: the name of a task registered in the task registry.
  79. :param \*args: positional arguments to pass on to the task.
  80. :param \*\*kwargs: keyword arguments to pass on to the task.
  81. :raises celery.exceptions.NotRegistered: exception if no such task
  82. has been registered in the task registry.
  83. :returns: :class:`celery.result.AsyncResult`.
  84. Example
  85. >>> r = delay_task("update_record", name="George Constanza", age=32)
  86. >>> r.ready()
  87. True
  88. >>> r.result
  89. "Record was updated"
  90. """
  91. return apply_async(tasks[task_name], args, kwargs)
  92. def apply(task, args, kwargs, **options):
  93. """Apply the task locally.
  94. This will block until the task completes, and returns a
  95. :class:`celery.result.EagerResult` instance.
  96. """
  97. args = args or []
  98. kwargs = kwargs or {}
  99. task_id = options.get("task_id", gen_unique_id())
  100. retries = options.get("retries", 0)
  101. task = tasks[task.name] # Make sure we get the instance, not class.
  102. default_kwargs = {"task_name": task.name,
  103. "task_id": task_id,
  104. "task_retries": retries,
  105. "task_is_eager": True,
  106. "logfile": None,
  107. "loglevel": 0}
  108. supported_keys = fun_takes_kwargs(task.run, default_kwargs)
  109. extend_with = dict((key, val) for key, val in default_kwargs.items()
  110. if key in supported_keys)
  111. kwargs.update(extend_with)
  112. trace = TaskTrace(task.name, task_id, args, kwargs, task=task)
  113. retval = trace.execute()
  114. return EagerResult(task_id, retval, trace.status, traceback=trace.strtb)