123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142 |
- """
- Sending and Receiving Messages
- """
- from carrot.connection import DjangoBrokerConnection
- from carrot.messaging import Publisher, Consumer, ConsumerSet
- from celery import conf
- from celery import signals
- from celery.utils import gen_unique_id
- from celery.utils import mitemgetter
- MSG_OPTIONS = ("mandatory", "priority",
- "immediate", "routing_key",
- "serializer")
- get_msg_options = mitemgetter(*MSG_OPTIONS)
- extract_msg_options = lambda d: dict(zip(MSG_OPTIONS, get_msg_options(d)))
- class TaskPublisher(Publisher):
- """The AMQP Task Publisher class."""
- exchange = conf.AMQP_EXCHANGE
- exchange_type = conf.AMQP_EXCHANGE_TYPE
- routing_key = conf.AMQP_PUBLISHER_ROUTING_KEY
- serializer = conf.TASK_SERIALIZER
- def delay_task(self, task_name, task_args, task_kwargs, **kwargs):
- """Delay task for execution by the celery nodes."""
- return self._delay_task(task_name=task_name, task_args=task_args,
- task_kwargs=task_kwargs, **kwargs)
- def _delay_task(self, task_name, task_id=None, taskset_id=None,
- task_args=None, task_kwargs=None, **kwargs):
- """INTERNAL"""
- task_id = task_id or gen_unique_id()
- eta = kwargs.get("eta")
- eta = eta and eta.isoformat()
- message_data = {
- "task": task_name,
- "id": task_id,
- "args": task_args or [],
- "kwargs": task_kwargs or {},
- "retries": kwargs.get("retries", 0),
- "eta": eta,
- }
- if taskset_id:
- message_data["taskset"] = taskset_id
- self.send(message_data, **extract_msg_options(kwargs))
- signals.task_sent.send(sender=task_name, **message_data)
- return task_id
- def get_consumer_set(connection, queues=conf.AMQP_CONSUMER_QUEUES, **options):
- return ConsumerSet(connection, from_dict=queues, **options)
- class TaskConsumer(Consumer):
- """The AMQP Task Consumer class."""
- queue = conf.AMQP_CONSUMER_QUEUE
- exchange = conf.AMQP_EXCHANGE
- routing_key = conf.AMQP_CONSUMER_ROUTING_KEY
- exchange_type = conf.AMQP_EXCHANGE_TYPE
- auto_ack = False
- no_ack = False
- class EventPublisher(Publisher):
- exchange = "celeryevent"
- routing_key = "event"
- class EventConsumer(Consumer):
- queue = "celeryevent"
- exchange = "celeryevent"
- routing_key = "event"
- exchange_type = "direct"
- no_ack = True
- class BroadcastPublisher(Publisher):
- exchange = "celeryctl"
- exchange_type = "fanout"
- routing_key = ""
- def revoke(self, task_id):
- self.send("revoke", dict(task_id=task_id))
- def send(self, type, data):
- data["command"] = type
- super(BroadcastPublisher, self).send({"control": data})
- class BroadcastConsumer(Consumer):
- queue = "celeryctl"
- exchange = "celeryctl"
- routing_key = ""
- exchange_type = "fanout"
- no_ack = True
- def establish_connection(connect_timeout=conf.AMQP_CONNECTION_TIMEOUT):
- return DjangoBrokerConnection(connect_timeout=connect_timeout)
- def with_connection(fun, connection=None,
- connect_timeout=conf.AMQP_CONNECTION_TIMEOUT):
- conn = connection or establish_connection()
- close_connection = not connection and conn.close or noop
- try:
- return fun(conn)
- finally:
- close_connection()
- def get_connection_info():
- broker_connection = DjangoBrokerConnection()
- carrot_backend = broker_connection.backend_cls
- if carrot_backend and not isinstance(carrot_backend, str):
- carrot_backend = carrot_backend.__name__
- port = broker_connection.port or \
- broker_connection.get_backend_cls().default_port
- port = port and ":%s" % port or ""
- vhost = broker_connection.virtual_host
- if not vhost.startswith("/"):
- vhost = "/" + vhost
- return "%(carrot_backend)s://%(userid)s@%(host)s%(port)s%(vhost)s" % {
- "carrot_backend": carrot_backend,
- "userid": broker_connection.userid,
- "host": broker_connection.hostname,
- "port": port,
- "vhost": vhost}
|