execute.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. from carrot.connection import DjangoBrokerConnection
  2. from celery.conf import AMQP_CONNECTION_TIMEOUT
  3. from celery.result import AsyncResult, EagerResult
  4. from celery.messaging import TaskPublisher
  5. from celery.registry import tasks
  6. from celery.utils import gen_unique_id
  7. from functools import partial as curry
  8. from datetime import datetime, timedelta
  9. import inspect
  10. def apply_async(task, args=None, kwargs=None, countdown=None, eta=None,
  11. routing_key=None, exchange=None, task_id=None,
  12. immediate=None, mandatory=None, priority=None, connection=None,
  13. connect_timeout=AMQP_CONNECTION_TIMEOUT, **opts):
  14. """Run a task asynchronously by the celery daemon(s).
  15. :param task: The task to run (a callable object, or a :class:`Task`
  16. instance
  17. :param args: The positional arguments to pass on to the task (a ``list``).
  18. :param kwargs: The keyword arguments to pass on to the task (a ``dict``)
  19. :param countdown: Number of seconds into the future that the task should
  20. execute. Defaults to immediate delivery (Do not confuse that with
  21. the ``immediate`` setting, they are unrelated).
  22. :param eta: A :class:`datetime.datetime` object that describes the
  23. absolute time when the task should execute. May not be specified
  24. if ``countdown`` is also supplied. (Do not confuse this with the
  25. ``immediate`` setting, they are unrelated).
  26. :keyword routing_key: The routing key used to route the task to a worker
  27. server.
  28. :keyword exchange: The named exchange to send the task to. Defaults to
  29. :attr:`celery.task.base.Task.exchange`.
  30. :keyword immediate: Request immediate delivery. Will raise an exception
  31. if the task cannot be routed to a worker immediately.
  32. (Do not confuse this parameter with the ``countdown`` and ``eta``
  33. settings, as they are unrelated).
  34. :keyword mandatory: Mandatory routing. Raises an exception if there's
  35. no running workers able to take on this task.
  36. :keyword connection: Re-use existing AMQP connection.
  37. The ``connect_timeout`` argument is not respected if this is set.
  38. :keyword connect_timeout: The timeout in seconds, before we give up
  39. on establishing a connection to the AMQP server.
  40. :keyword priority: The task priority, a number between ``0`` and ``9``.
  41. """
  42. args = args or []
  43. kwargs = kwargs or {}
  44. routing_key = routing_key or getattr(task, "routing_key", None)
  45. exchange = exchange or getattr(task, "exchange", None)
  46. immediate = immediate or getattr(task, "immediate", None)
  47. mandatory = mandatory or getattr(task, "mandatory", None)
  48. priority = priority or getattr(task, "priority", None)
  49. taskset_id = opts.get("taskset_id")
  50. publisher = opts.get("publisher")
  51. retries = opts.get("retries")
  52. if countdown:
  53. eta = datetime.now() + timedelta(seconds=countdown)
  54. from celery.conf import ALWAYS_EAGER
  55. if ALWAYS_EAGER:
  56. return apply(task, args, kwargs)
  57. need_to_close_connection = False
  58. if not publisher:
  59. if not connection:
  60. connection = DjangoBrokerConnection(
  61. connect_timeout=connect_timeout)
  62. need_to_close_connection = True
  63. publisher = TaskPublisher(connection=connection)
  64. delay_task = publisher.delay_task
  65. if taskset_id:
  66. delay_task = curry(publisher.delay_task_in_set, taskset_id)
  67. task_id = delay_task(task.name, args, kwargs,
  68. task_id=task_id, retries=retries,
  69. routing_key=routing_key, exchange=exchange,
  70. mandatory=mandatory, immediate=immediate,
  71. priority=priority, eta=eta)
  72. if need_to_close_connection:
  73. publisher.close()
  74. connection.close()
  75. return AsyncResult(task_id)
  76. def delay_task(task_name, *args, **kwargs):
  77. """Delay a task for execution by the ``celery`` daemon.
  78. :param task_name: the name of a task registered in the task registry.
  79. :param \*args: positional arguments to pass on to the task.
  80. :param \*\*kwargs: keyword arguments to pass on to the task.
  81. :raises celery.registry.NotRegistered: exception if no such task
  82. has been registered in the task registry.
  83. :rtype: :class:`celery.result.AsyncResult`.
  84. Example
  85. >>> r = delay_task("update_record", name="George Constanza", age=32)
  86. >>> r.ready()
  87. True
  88. >>> r.result
  89. "Record was updated"
  90. """
  91. if task_name not in tasks:
  92. raise tasks.NotRegistered(
  93. "Task with name %s not registered in the task registry." % (
  94. task_name))
  95. task = tasks[task_name]
  96. return apply_async(task, args, kwargs)
  97. def apply(task, args, kwargs, **ignored):
  98. """Apply the task locally.
  99. This will block until the task completes, and returns a
  100. :class:`celery.result.EagerResult` instance.
  101. """
  102. args = args or []
  103. kwargs = kwargs or {}
  104. task_id = gen_unique_id()
  105. # If it's a Task class we need to have to instance
  106. # for it to be callable.
  107. task = inspect.isclass(task) and task() or task
  108. try:
  109. ret_value = task(*args, **kwargs)
  110. status = "DONE"
  111. except Exception, exc:
  112. ret_value = exc
  113. status = "FAILURE"
  114. return EagerResult(task_id, ret_value, status)