execute.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. from carrot.connection import DjangoAMQPConnection
  2. from celery.conf import AMQP_CONNECTION_TIMEOUT
  3. from celery.result import AsyncResult, EagerResult
  4. from celery.messaging import TaskPublisher
  5. from celery.registry import tasks
  6. from celery.utils import gen_unique_id
  7. from functools import partial as curry
  8. from datetime import datetime, timedelta
  9. import inspect
  10. def apply_async(task, args=None, kwargs=None, routing_key=None,
  11. immediate=None, mandatory=None, connection=None,
  12. connect_timeout=AMQP_CONNECTION_TIMEOUT, priority=None,
  13. countdown=None, eta=None, **opts):
  14. """Run a task asynchronously by the celery daemon(s).
  15. :param task: The task to run (a callable object, or a :class:`Task`
  16. instance
  17. :param args: The positional arguments to pass on to the task (a ``list``).
  18. :param kwargs: The keyword arguments to pass on to the task (a ``dict``)
  19. :param countdown: Number of seconds into the future that the task should
  20. execute. Defaults to immediate delivery (Do not confuse that with
  21. the ``immediate`` setting, they are unrelated).
  22. :param eta: A :class:`datetime.datetime` object that describes the
  23. absolute time when the task should execute. May not be specified
  24. if ``countdown`` is also supplied. (Do not confuse this with the
  25. ``immediate`` setting, they are unrelated).
  26. :keyword routing_key: The routing key used to route the task to a worker
  27. server.
  28. :keyword immediate: Request immediate delivery. Will raise an exception
  29. if the task cannot be routed to a worker immediately.
  30. (Do not confuse this parameter with the ``countdown`` and ``eta``
  31. settings, as they are unrelated).
  32. :keyword mandatory: Mandatory routing. Raises an exception if there's
  33. no running workers able to take on this task.
  34. :keyword connection: Re-use existing AMQP connection.
  35. The ``connect_timeout`` argument is not respected if this is set.
  36. :keyword connect_timeout: The timeout in seconds, before we give up
  37. on establishing a connection to the AMQP server.
  38. :keyword priority: The task priority, a number between ``0`` and ``9``.
  39. """
  40. args = args or []
  41. kwargs = kwargs or {}
  42. routing_key = routing_key or getattr(task, "routing_key", None)
  43. immediate = immediate or getattr(task, "immediate", None)
  44. mandatory = mandatory or getattr(task, "mandatory", None)
  45. priority = priority or getattr(task, "priority", None)
  46. taskset_id = opts.get("taskset_id")
  47. publisher = opts.get("publisher")
  48. if countdown:
  49. eta = datetime.now() + timedelta(seconds=countdown)
  50. from celery.conf import ALWAYS_EAGER
  51. if ALWAYS_EAGER:
  52. return apply(task, args, kwargs)
  53. need_to_close_connection = False
  54. if not publisher:
  55. if not connection:
  56. connection = DjangoAMQPConnection(connect_timeout=connect_timeout)
  57. need_to_close_connection = True
  58. publisher = TaskPublisher(connection=connection)
  59. delay_task = publisher.delay_task
  60. if taskset_id:
  61. delay_task = curry(publisher.delay_task_in_set, taskset_id)
  62. task_id = delay_task(task.name, args, kwargs,
  63. routing_key=routing_key, mandatory=mandatory,
  64. immediate=immediate, priority=priority,
  65. eta=eta)
  66. if need_to_close_connection:
  67. publisher.close()
  68. connection.close()
  69. return AsyncResult(task_id)
  70. def delay_task(task_name, *args, **kwargs):
  71. """Delay a task for execution by the ``celery`` daemon.
  72. :param task_name: the name of a task registered in the task registry.
  73. :param \*args: positional arguments to pass on to the task.
  74. :param \*\*kwargs: keyword arguments to pass on to the task.
  75. :raises celery.registry.NotRegistered: exception if no such task
  76. has been registered in the task registry.
  77. :rtype: :class:`celery.result.AsyncResult`.
  78. Example
  79. >>> r = delay_task("update_record", name="George Constanza", age=32)
  80. >>> r.ready()
  81. True
  82. >>> r.result
  83. "Record was updated"
  84. """
  85. if task_name not in tasks:
  86. raise tasks.NotRegistered(
  87. "Task with name %s not registered in the task registry." % (
  88. task_name))
  89. task = tasks[task_name]
  90. return apply_async(task, args, kwargs)
  91. def apply(task, args, kwargs, **ignored):
  92. """Apply the task locally.
  93. This will block until the task completes, and returns a
  94. :class:`celery.result.EagerResult` instance.
  95. """
  96. args = args or []
  97. kwargs = kwargs or {}
  98. task_id = gen_unique_id()
  99. # If it's a Task class we need to have to instance
  100. # for it to be callable.
  101. task = inspect.isclass(task) and task() or task
  102. try:
  103. ret_value = task(*args, **kwargs)
  104. status = "DONE"
  105. except Exception, exc:
  106. ret_value = exc
  107. status = "FAILURE"
  108. return EagerResult(task_id, ret_value, status)