execute.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. from carrot.connection import DjangoAMQPConnection
  2. from celery.conf import AMQP_CONNECTION_TIMEOUT
  3. from celery.result import AsyncResult
  4. from celery.messaging import TaskPublisher
  5. from functools import partial as curry
  6. from datetime import datetime, timedelta
  7. def apply_async(task, args=None, kwargs=None, routing_key=None,
  8. immediate=None, mandatory=None, connection=None,
  9. connect_timeout=AMQP_CONNECTION_TIMEOUT, priority=None, **opts):
  10. """Run a task asynchronously by the celery daemon(s).
  11. :param task: The task to run (a callable object, or a :class:`Task`
  12. instance
  13. :param args: The positional arguments to pass on to the task (a ``list``).
  14. :param kwargs: The keyword arguments to pass on to the task (a ``dict``)
  15. :keyword routing_key: The routing key used to route the task to a worker
  16. server.
  17. :keyword immediate: Request immediate delivery. Will raise an exception
  18. if the task cannot be routed to a worker immediately.
  19. :keyword mandatory: Mandatory routing. Raises an exception if there's
  20. no running workers able to take on this task.
  21. :keyword connection: Re-use existing AMQP connection.
  22. The ``connect_timeout`` argument is not respected if this is set.
  23. :keyword connect_timeout: The timeout in seconds, before we give up
  24. on establishing a connection to the AMQP server.
  25. :keyword priority: The task priority, a number between ``0`` and ``9``.
  26. """
  27. args = args or []
  28. kwargs = kwargs or {}
  29. routing_key = routing_key or getattr(task, "routing_key", None)
  30. immediate = immediate or getattr(task, "immediate", None)
  31. mandatory = mandatory or getattr(task, "mandatory", None)
  32. priority = priority or getattr(task, "priority", None)
  33. taskset_id = opts.get("taskset_id")
  34. publisher = opts.get("publisher")
  35. if countdown:
  36. eta = datetime.now() + timedelta(seconds=countdown)
  37. need_to_close_connection = False
  38. if not publisher:
  39. if not connection:
  40. connection = DjangoAMQPConnection(connect_timeout=connect_timeout)
  41. need_to_close_connection = True
  42. publisher = TaskPublisher(connection=connection)
  43. delay_task = publisher.delay_task
  44. if taskset_id:
  45. delay_task = curry(publisher.delay_task_in_set, taskset_id)
  46. task_id = delay_task(task.name, args, kwargs,
  47. routing_key=routing_key, mandatory=mandatory,
  48. immediate=immediate, priority=priority,
  49. eta=eta)
  50. if need_to_close_connection:
  51. publisher.close()
  52. connection.close()
  53. return AsyncResult(task_id)
  54. def delay_task(task_name, *args, **kwargs):
  55. """Delay a task for execution by the ``celery`` daemon.
  56. :param task_name: the name of a task registered in the task registry.
  57. :param \*args: positional arguments to pass on to the task.
  58. :param \*\*kwargs: keyword arguments to pass on to the task.
  59. :raises celery.registry.NotRegistered: exception if no such task
  60. has been registered in the task registry.
  61. :rtype: :class:`celery.result.AsyncResult`.
  62. Example
  63. >>> r = delay_task("update_record", name="George Constanza", age=32)
  64. >>> r.ready()
  65. True
  66. >>> r.result
  67. "Record was updated"
  68. """
  69. if task_name not in tasks:
  70. raise tasks.NotRegistered(
  71. "Task with name %s not registered in the task registry." % (
  72. task_name))
  73. task = tasks[task_name]
  74. return apply_async(task, args, kwargs)