1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253 |
- from carrot.messaging import Publisher, Consumer
- from celery import conf
- import uuid
- class NoProcessConsumer(Consumer):
-
- def receive(self, message_data, message):
- raise NotImplementedError(
- "Don't use process_next() or wait() with the TaskConsumer!")
- class TaskPublisher(Publisher):
- exchange = conf.AMQP_EXCHANGE
- routing_key = conf.AMQP_ROUTING_KEY
- def delay_task(self, task_name, *task_args, **task_kwargs):
- return self._delay_task(task_name=task_name, args=task_args,
- kwargs=task_kwargs)
- def delay_task_in_set(self, task_name, taskset_id, task_args,
- task_kwargs):
- return self._delay_task(task_name=task_name, part_of_set=taskset_id,
- args=task_args, kwargs=task_kwargs)
-
- def requeue_task(self, task_name, task_id, task_args, task_kwargs,
- part_of_set=None):
- return self._delay_task(task_name=task_name, part_of_set=part_of_set,
- task_id=task_id, args=task_args,
- kwargs=task_kwargs)
- def _delay_task(self, task_name, task_id=None, part_of_set=None,
- args=None, kwargs=None):
- args = args or []
- kwargs = kwargs or {}
- task_id = task_id or str(uuid.uuid4())
- message_data = {
- "id": task_id,
- "task": task_name,
- "args": args,
- "kwargs": kwargs,
- }
- if part_of_set:
- message_data["taskset"] = part_of_set
- self.send(message_data)
- return task_id
- class TaskConsumer(NoProcessConsumer):
- queue = conf.AMQP_CONSUMER_QUEUE
- exchange = conf.AMQP_EXCHANGE
- routing_key = conf.AMQP_ROUTING_KEY
|