execute.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. from carrot.connection import DjangoAMQPConnection
  2. from celery.conf import AMQP_CONNECTION_TIMEOUT
  3. from celery.result import AsyncResult
  4. from celery.messaging import TaskPublisher
  5. from celery.registry import tasks
  6. from functools import partial as curry
  7. from datetime import datetime, timedelta
  8. def apply_async(task, args=None, kwargs=None, routing_key=None,
  9. immediate=None, mandatory=None, connection=None,
  10. connect_timeout=AMQP_CONNECTION_TIMEOUT, priority=None,
  11. countdown=None, eta=None, **opts):
  12. """Run a task asynchronously by the celery daemon(s).
  13. :param task: The task to run (a callable object, or a :class:`Task`
  14. instance
  15. :param args: The positional arguments to pass on to the task (a ``list``).
  16. :param kwargs: The keyword arguments to pass on to the task (a ``dict``)
  17. :param countdown: Number of seconds into the future that the task should
  18. execute. Defaults to immediate delivery (Do not confuse that with
  19. the ``immediate`` setting, they are unrelated).
  20. :param eta: A :class:`datetime.datetime` object that describes the
  21. absolute time when the task should execute. May not be specified
  22. if ``countdown`` is also supplied. (Do not confuse this with the
  23. ``immediate`` setting, they are unrelated).
  24. :keyword routing_key: The routing key used to route the task to a worker
  25. server.
  26. :keyword immediate: Request immediate delivery. Will raise an exception
  27. if the task cannot be routed to a worker immediately.
  28. (Do not confuse this parameter with the ``countdown`` and ``eta``
  29. settings, as they are unrelated).
  30. :keyword mandatory: Mandatory routing. Raises an exception if there's
  31. no running workers able to take on this task.
  32. :keyword connection: Re-use existing AMQP connection.
  33. The ``connect_timeout`` argument is not respected if this is set.
  34. :keyword connect_timeout: The timeout in seconds, before we give up
  35. on establishing a connection to the AMQP server.
  36. :keyword priority: The task priority, a number between ``0`` and ``9``.
  37. """
  38. args = args or []
  39. kwargs = kwargs or {}
  40. routing_key = routing_key or getattr(task, "routing_key", None)
  41. immediate = immediate or getattr(task, "immediate", None)
  42. mandatory = mandatory or getattr(task, "mandatory", None)
  43. priority = priority or getattr(task, "priority", None)
  44. taskset_id = opts.get("taskset_id")
  45. publisher = opts.get("publisher")
  46. if countdown:
  47. eta = datetime.now() + timedelta(seconds=countdown)
  48. need_to_close_connection = False
  49. if not publisher:
  50. if not connection:
  51. connection = DjangoAMQPConnection(connect_timeout=connect_timeout)
  52. need_to_close_connection = True
  53. publisher = TaskPublisher(connection=connection)
  54. delay_task = publisher.delay_task
  55. if taskset_id:
  56. delay_task = curry(publisher.delay_task_in_set, taskset_id)
  57. task_id = delay_task(task.name, args, kwargs,
  58. routing_key=routing_key, mandatory=mandatory,
  59. immediate=immediate, priority=priority,
  60. eta=eta)
  61. if need_to_close_connection:
  62. publisher.close()
  63. connection.close()
  64. return AsyncResult(task_id)
  65. def delay_task(task_name, *args, **kwargs):
  66. """Delay a task for execution by the ``celery`` daemon.
  67. :param task_name: the name of a task registered in the task registry.
  68. :param \*args: positional arguments to pass on to the task.
  69. :param \*\*kwargs: keyword arguments to pass on to the task.
  70. :raises celery.registry.NotRegistered: exception if no such task
  71. has been registered in the task registry.
  72. :rtype: :class:`celery.result.AsyncResult`.
  73. Example
  74. >>> r = delay_task("update_record", name="George Constanza", age=32)
  75. >>> r.ready()
  76. True
  77. >>> r.result
  78. "Record was updated"
  79. """
  80. if task_name not in tasks:
  81. raise tasks.NotRegistered(
  82. "Task with name %s not registered in the task registry." % (
  83. task_name))
  84. task = tasks[task_name]
  85. return apply_async(task, args, kwargs)