messaging.py 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839
  1. from carrot.messaging import Publisher, Consumer
  2. import uuid
  3. __all__ = ["NoProcessConsumer", "TaskPublisher", "TaskConsumer"]
  4. class NoProcessConsumer(Consumer):
  5. def receive(self, message_data, message):
  6. raise NotImplementedError(
  7. "Don't use process_next() or wait() with the TaskConsumer!")
  8. class TaskPublisher(Publisher):
  9. exchange = "celery"
  10. routing_key = "celery"
  11. def delay_task(self, task_name, **task_kwargs):
  12. return self._delay_task(task_name=task_name, extra_data=task_kwargs)
  13. def delay_task_in_set(self, task_name, taskset_id, task_kwargs):
  14. return self._delay_task(task_name=task_name, part_of_set=taskset_id,
  15. extra_data=task_kwargs)
  16. def _delay_task(self, task_name, part_of_set=None, extra_data=None):
  17. extra_data = extra_data or {}
  18. task_id = str(uuid.uuid4())
  19. message_data = dict(extra_data)
  20. message_data["celeryTASK"] = task_name
  21. message_data["celeryID"] = task_id
  22. if part_of_set:
  23. message_data["celeryTASKSET"] = part_of_set
  24. self.send(message_data)
  25. class TaskConsumer(NoProcessConsumer):
  26. queue = "celery"
  27. exchange = "celery"
  28. routing_key = "celery"