messaging.py 2.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. """
  2. Sending and Receiving Messages
  3. """
  4. from carrot.messaging import Publisher, Consumer
  5. from celery import conf
  6. import uuid
  7. class NoProcessConsumer(Consumer):
  8. """A consumer that raises an error if used with wait callbacks (i.e.
  9. it doesn't support :meth:`carrot.messaging.Consumer.wait``)."""
  10. def receive(self, message_data, message):
  11. raise NotImplementedError(
  12. "This consumer doesn't support process_next() or wait()")
  13. class TaskPublisher(Publisher):
  14. """The AMQP Task Publisher class."""
  15. exchange = conf.AMQP_EXCHANGE
  16. routing_key = conf.AMQP_ROUTING_KEY
  17. def delay_task(self, task_name, task_args, task_kwargs, **kwargs):
  18. """Delay task for execution by the celery nodes."""
  19. return self._delay_task(task_name=task_name, args=task_args,
  20. kwargs=task_kwargs, **kwargs)
  21. def delay_task_in_set(self, task_name, taskset_id, task_args,
  22. task_kwargs, **kwargs):
  23. """Delay a task which part of a task set."""
  24. return self._delay_task(task_name=task_name, part_of_set=taskset_id,
  25. args=task_args, kwargs=task_kwargs, **kwargs)
  26. def requeue_task(self, task_name, task_id, task_args, task_kwargs,
  27. part_of_set=None, **kwargs):
  28. """Requeue a failed task."""
  29. return self._delay_task(task_name=task_name, part_of_set=part_of_set,
  30. task_id=task_id, args=task_args,
  31. kwargs=task_kwargs, **kwargs)
  32. def _delay_task(self, task_name, task_id=None, part_of_set=None,
  33. task_args=None, task_kwargs=None, **kwargs):
  34. priority = kwargs.get("priority")
  35. immediate = kwargs.get("immediate")
  36. mandatory = kwargs.get("mandatory")
  37. routing_key = kwargs.get("routing_key")
  38. task_args = task_args or []
  39. task_kwargs = task_kwargs or {}
  40. task_id = task_id or str(uuid.uuid4())
  41. message_data = {
  42. "id": task_id,
  43. "task": task_name,
  44. "args": task_args,
  45. "kwargs": task_kwargs,
  46. }
  47. if part_of_set:
  48. message_data["taskset"] = part_of_set
  49. self.send(message_data)
  50. return task_id
  51. class TaskConsumer(NoProcessConsumer):
  52. """The AMQP Task Consumer class."""
  53. queue = conf.AMQP_CONSUMER_QUEUE
  54. exchange = conf.AMQP_EXCHANGE
  55. routing_key = conf.AMQP_ROUTING_KEY
  56. exchange_type = conf.AMQP_EXCHANGE_TYPE