messaging.py 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. """
  2. Sending and Receiving Messages
  3. """
  4. from carrot.messaging import Publisher, Consumer, ConsumerSet
  5. from celery import conf
  6. from celery import signals
  7. from celery.utils import gen_unique_id
  8. from celery.utils import mitemgetter
  9. from celery.serialization import pickle
  10. MSG_OPTIONS = ("mandatory", "priority",
  11. "immediate", "routing_key",
  12. "serializer")
  13. get_msg_options = mitemgetter(*MSG_OPTIONS)
  14. extract_msg_options = lambda d: dict(zip(MSG_OPTIONS, get_msg_options(d)))
  15. class TaskPublisher(Publisher):
  16. """The AMQP Task Publisher class."""
  17. exchange = conf.AMQP_EXCHANGE
  18. exchange_type = conf.AMQP_EXCHANGE_TYPE
  19. routing_key = conf.AMQP_PUBLISHER_ROUTING_KEY
  20. serializer = conf.TASK_SERIALIZER
  21. encoder = pickle.dumps
  22. def delay_task(self, task_name, task_args, task_kwargs, **kwargs):
  23. """Delay task for execution by the celery nodes."""
  24. return self._delay_task(task_name=task_name, task_args=task_args,
  25. task_kwargs=task_kwargs, **kwargs)
  26. def delay_task_in_set(self, taskset_id, task_name, task_args, task_kwargs,
  27. **kwargs):
  28. """Delay a task which part of a task set."""
  29. return self._delay_task(task_name=task_name, part_of_set=taskset_id,
  30. task_args=task_args, task_kwargs=task_kwargs,
  31. **kwargs)
  32. def _delay_task(self, task_name, task_id=None, part_of_set=None,
  33. task_args=None, task_kwargs=None, **kwargs):
  34. """INTERNAL"""
  35. task_id = task_id or gen_unique_id()
  36. message_data = {
  37. "task": task_name,
  38. "id": task_id,
  39. "args": task_args or [],
  40. "kwargs": task_kwargs or {},
  41. "retries": kwargs.get("retries", 0),
  42. "eta": kwargs.get("eta"),
  43. }
  44. if part_of_set:
  45. message_data["taskset"] = part_of_set
  46. self.send(message_data, **extract_msg_options(kwargs))
  47. signals.task_sent.send(sender=task_name, **message_data)
  48. return task_id
  49. def get_consumer_set(connection, queues=conf.AMQP_CONSUMER_QUEUES, **options):
  50. return ConsumerSet(connection, from_dict=queues, **options)
  51. class TaskConsumer(Consumer):
  52. """The AMQP Task Consumer class."""
  53. queue = conf.AMQP_CONSUMER_QUEUE
  54. exchange = conf.AMQP_EXCHANGE
  55. routing_key = conf.AMQP_CONSUMER_ROUTING_KEY
  56. exchange_type = conf.AMQP_EXCHANGE_TYPE
  57. decoder = pickle.loads
  58. auto_ack = False
  59. no_ack = False
  60. class StatsPublisher(Publisher):
  61. exchange = "celerygraph"
  62. routing_key = "stats"
  63. encoder = pickle.dumps
  64. class StatsConsumer(Consumer):
  65. queue = "celerygraph"
  66. exchange = "celerygraph"
  67. routing_key = "stats"
  68. exchange_type = "direct"
  69. decoder = pickle.loads
  70. no_ack=True