messaging.py 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
  1. from carrot.messaging import Publisher, Consumer
  2. from celery import conf
  3. import uuid
  4. class NoProcessConsumer(Consumer):
  5. def receive(self, message_data, message):
  6. raise NotImplementedError(
  7. "Don't use process_next() or wait() with the TaskConsumer!")
  8. class TaskPublisher(Publisher):
  9. exchange = conf.AMQP_EXCHANGE
  10. routing_key = conf.AMQP_ROUTING_KEY
  11. def delay_task(self, task_name, *task_args, **task_kwargs):
  12. return self._delay_task(task_name=task_name, args=task_args,
  13. kwargs=task_kwargs)
  14. def delay_task_in_set(self, task_name, taskset_id, task_args,
  15. task_kwargs):
  16. return self._delay_task(task_name=task_name, part_of_set=taskset_id,
  17. args=task_args, kwargs=task_kwargs)
  18. def requeue_task(self, task_name, task_id, task_args, task_kwargs,
  19. part_of_set=None):
  20. return self._delay_task(task_name=task_name, part_of_set=part_of_set,
  21. task_id=task_id, args=task_args,
  22. kwargs=task_kwargs)
  23. def _delay_task(self, task_name, task_id=None, part_of_set=None,
  24. args=None, kwargs=None):
  25. args = args or []
  26. kwargs = kwargs or {}
  27. task_id = task_id or str(uuid.uuid4())
  28. message_data = {
  29. "id": task_id,
  30. "task": task_name,
  31. "args": args,
  32. "kwargs": kwargs,
  33. }
  34. if part_of_set:
  35. message_data["taskset"] = part_of_set
  36. self.send(message_data)
  37. return task_id
  38. class TaskConsumer(NoProcessConsumer):
  39. queue = conf.AMQP_CONSUMER_QUEUE
  40. exchange = conf.AMQP_EXCHANGE
  41. routing_key = conf.AMQP_ROUTING_KEY