messaging.py 2.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. """
  2. Sending and Receiving Messages
  3. """
  4. from carrot.messaging import Publisher, Consumer
  5. from celery import conf
  6. import uuid
  7. class NoProcessConsumer(Consumer):
  8. """A consumer that raises an error if used with wait callbacks (i.e.
  9. it doesn't support :meth:`carrot.messaging.Consumer.wait``)."""
  10. def receive(self, message_data, message):
  11. raise NotImplementedError(
  12. "This consumer doesn't support process_next() or wait()")
  13. class TaskPublisher(Publisher):
  14. """The AMQP Task Publisher class."""
  15. exchange = conf.AMQP_EXCHANGE
  16. routing_key = conf.AMQP_PUBLISHER_ROUTING_KEY
  17. def delay_task(self, task_name, task_args, task_kwargs, **kwargs):
  18. """Delay task for execution by the celery nodes."""
  19. return self._delay_task(task_name=task_name, task_args=task_args,
  20. task_kwargs=task_kwargs, **kwargs)
  21. def delay_task_in_set(self, task_name, taskset_id, task_args,
  22. task_kwargs, **kwargs):
  23. """Delay a task which part of a task set."""
  24. return self._delay_task(task_name=task_name, part_of_set=taskset_id,
  25. task_args=task_args, task_kwargs=task_kwargs,
  26. **kwargs)
  27. def requeue_task(self, task_name, task_id, task_args, task_kwargs,
  28. part_of_set=None, **kwargs):
  29. """Requeue a failed task."""
  30. return self._delay_task(task_name=task_name, part_of_set=part_of_set,
  31. task_id=task_id, task_args=task_args,
  32. task_kwargs=task_kwargs, **kwargs)
  33. def _delay_task(self, task_name, task_id=None, part_of_set=None,
  34. task_args=None, task_kwargs=None, **kwargs):
  35. priority = kwargs.get("priority")
  36. immediate = kwargs.get("immediate")
  37. mandatory = kwargs.get("mandatory")
  38. routing_key = kwargs.get("routing_key")
  39. task_args = task_args or []
  40. task_kwargs = task_kwargs or {}
  41. task_id = task_id or str(uuid.uuid4())
  42. message_data = {
  43. "id": task_id,
  44. "task": task_name,
  45. "args": task_args,
  46. "kwargs": task_kwargs,
  47. }
  48. if part_of_set:
  49. message_data["taskset"] = part_of_set
  50. self.send(message_data,
  51. routing_key=routing_key, priority=priority,
  52. immediate=immediate, mandatory=mandatory)
  53. return task_id
  54. class TaskConsumer(NoProcessConsumer):
  55. """The AMQP Task Consumer class."""
  56. queue = conf.AMQP_CONSUMER_QUEUE
  57. exchange = conf.AMQP_EXCHANGE
  58. routing_key = conf.AMQP_CONSUMER_ROUTING_KEY
  59. exchange_type = conf.AMQP_EXCHANGE_TYPE