123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273 |
- """
- Sending and Receiving Messages
- """
- import socket
- from datetime import datetime, timedelta
- from itertools import count
- from carrot.connection import BrokerConnection
- from carrot.messaging import Publisher, Consumer, ConsumerSet as _ConsumerSet
- from celery import conf
- from celery import signals
- from celery.utils import gen_unique_id, mitemgetter, noop
- from celery.utils.functional import wraps
- MSG_OPTIONS = ("mandatory", "priority", "immediate",
- "routing_key", "serializer", "delivery_mode")
- get_msg_options = mitemgetter(*MSG_OPTIONS)
- extract_msg_options = lambda d: dict(zip(MSG_OPTIONS, get_msg_options(d)))
- default_queue = conf.get_queues()[conf.DEFAULT_QUEUE]
- _queues_declared = False
- _exchanges_declared = {}
- class TaskPublisher(Publisher):
- """Publish tasks."""
- exchange = default_queue["exchange"]
- exchange_type = default_queue["exchange_type"]
- routing_key = conf.DEFAULT_ROUTING_KEY
- serializer = conf.TASK_SERIALIZER
- auto_declare = False
- def __init__(self, *args, **kwargs):
- super(TaskPublisher, self).__init__(*args, **kwargs)
-
- global _queues_declared
- if not _queues_declared:
- consumers = get_consumer_set(self.connection)
- consumers.close()
- _queues_declared = True
- if self.exchange not in _exchanges_declared:
- self.declare()
- _exchanges_declared[self.exchange] = True
- def delay_task(self, task_name, task_args=None, task_kwargs=None,
- countdown=None, eta=None, task_id=None, taskset_id=None, **kwargs):
- """Delay task for execution by the celery nodes."""
- task_id = task_id or gen_unique_id()
- task_args = task_args or []
- task_kwargs = task_kwargs or {}
- if countdown:
- eta = datetime.now() + timedelta(seconds=countdown)
- if not isinstance(task_args, (list, tuple)):
- raise ValueError("task args must be a list or tuple")
- if not isinstance(task_kwargs, dict):
- raise ValueError("task kwargs must be a dictionary")
- message_data = {
- "task": task_name,
- "id": task_id,
- "args": task_args or [],
- "kwargs": task_kwargs or {},
- "retries": kwargs.get("retries", 0),
- "eta": eta and eta.isoformat(),
- }
- if taskset_id:
- message_data["taskset"] = taskset_id
- self.send(message_data, **extract_msg_options(kwargs))
- signals.task_sent.send(sender=task_name, **message_data)
- return task_id
- class ConsumerSet(_ConsumerSet):
- """ConsumerSet with an optional decode error callback.
- For more information see :class:`carrot.messaging.ConsumerSet`.
- .. attribute:: on_decode_error
- Callback called if a message had decoding errors.
- The callback is called with the signature::
- callback(message, exception)
- """
- on_decode_error = None
- def _receive_callback(self, raw_message):
- message = self.backend.message_to_python(raw_message)
- if self.auto_ack and not message.acknowledged:
- message.ack()
- try:
- decoded = message.decode()
- except Exception, exc:
- if self.on_decode_error:
- return self.on_decode_error(message, exc)
- else:
- raise
- self.receive(decoded, message)
- class TaskConsumer(Consumer):
- """Consume tasks"""
- queue = conf.DEFAULT_QUEUE
- exchange = default_queue["exchange"]
- routing_key = default_queue["binding_key"]
- exchange_type = default_queue["exchange_type"]
- class EventPublisher(Publisher):
- """Publish events"""
- exchange = conf.EVENT_EXCHANGE
- exchange_type = conf.EVENT_EXCHANGE_TYPE
- routing_key = conf.EVENT_ROUTING_KEY
- serializer = conf.EVENT_SERIALIZER
- class EventConsumer(Consumer):
- """Consume events"""
- queue = conf.EVENT_QUEUE
- exchange = conf.EVENT_EXCHANGE
- exchange_type = conf.EVENT_EXCHANGE_TYPE
- routing_key = conf.EVENT_ROUTING_KEY
- no_ack = True
- class ControlReplyConsumer(Consumer):
- exchange = "celerycrq"
- exchange_type = "direct"
- durable = False
- exclusive = False
- auto_delete = True
- no_ack = True
- def __init__(self, connection, ticket, **kwargs):
- self.ticket = ticket
- queue = "%s.%s" % (self.exchange, ticket)
- super(ControlReplyConsumer, self).__init__(connection,
- queue=queue,
- routing_key=ticket,
- **kwargs)
- def collect(self, limit=None, timeout=1):
- responses = []
- def callback(message_data, message):
- responses.append(message_data)
- self.callbacks = [callback]
- self.consume()
- for i in limit and range(limit) or count():
- try:
- self.connection.drain_events(timeout=timeout)
- except socket.timeout:
- break
- return responses
- class ControlReplyPublisher(Publisher):
- exchange = "celerycrq"
- exchange_type = "direct"
- delivery_mode = "non-persistent"
- class BroadcastPublisher(Publisher):
- """Publish broadcast commands"""
- ReplyTo = ControlReplyConsumer
- exchange = conf.BROADCAST_EXCHANGE
- exchange_type = conf.BROADCAST_EXCHANGE_TYPE
- def send(self, type, arguments, destination=None, reply_ticket=None):
- """Send broadcast command."""
- arguments["command"] = type
- arguments["destination"] = destination
- if reply_ticket:
- arguments["reply_to"] = {"exchange": self.ReplyTo.exchange,
- "routing_key": reply_ticket}
- super(BroadcastPublisher, self).send({"control": arguments})
- class BroadcastConsumer(Consumer):
- """Consume broadcast commands"""
- queue = conf.BROADCAST_QUEUE
- exchange = conf.BROADCAST_EXCHANGE
- exchange_type = conf.BROADCAST_EXCHANGE_TYPE
- no_ack = True
- def __init__(self, *args, **kwargs):
- hostname = kwargs.pop("hostname", None) or socket.gethostname()
- self.queue = "%s_%s" % (self.queue, hostname)
- super(BroadcastConsumer, self).__init__(*args, **kwargs)
- def establish_connection(hostname=None, userid=None, password=None,
- virtual_host=None, port=None, ssl=None, insist=None,
- connect_timeout=None, backend_cls=None):
- """Establish a connection to the message broker."""
- if insist is None:
- insist = conf.BROKER_INSIST
- if ssl is None:
- ssl = conf.BROKER_USE_SSL
- if connect_timeout is None:
- connect_timeout = conf.BROKER_CONNECTION_TIMEOUT
- return BrokerConnection(hostname or conf.BROKER_HOST,
- userid or conf.BROKER_USER,
- password or conf.BROKER_PASSWORD,
- virtual_host or conf.BROKER_VHOST,
- port or conf.BROKER_PORT,
- backend_cls=backend_cls or conf.BROKER_BACKEND,
- insist=insist, ssl=ssl,
- connect_timeout=connect_timeout)
- def with_connection(fun):
- """Decorator for providing default message broker connection for functions
- supporting the ``connection`` and ``connect_timeout`` keyword
- arguments."""
- @wraps(fun)
- def _inner(*args, **kwargs):
- connection = kwargs.get("connection")
- timeout = kwargs.get("connect_timeout", conf.BROKER_CONNECTION_TIMEOUT)
- kwargs["connection"] = conn = connection or \
- establish_connection(connect_timeout=timeout)
- close_connection = not connection and conn.close or noop
- try:
- return fun(*args, **kwargs)
- finally:
- close_connection()
- return _inner
- def get_consumer_set(connection, queues=None, **options):
- """Get the :class:`carrot.messaging.ConsumerSet`` for a queue
- configuration.
- Defaults to the queues in ``CELERY_QUEUES``.
- """
- queues = queues or conf.get_queues()
- cset = ConsumerSet(connection)
- for queue_name, queue_options in queues.items():
- queue_options = dict(queue_options)
- queue_options["routing_key"] = queue_options.pop("binding_key", None)
- consumer = Consumer(connection, queue=queue_name,
- backend=cset.backend, **queue_options)
- cset.consumers.append(consumer)
- return cset
- @with_connection
- def reply(data, exchange, routing_key, connection=None, connect_timeout=None,
- **kwargs):
- pub = Publisher(connection, exchange=exchange,
- routing_key=routing_key, **kwargs)
- pub.send(data)
|