|
@@ -10,13 +10,14 @@ from __future__ import absolute_import
|
|
|
|
|
|
import numbers
|
|
|
|
|
|
+from collections import Mapping, namedtuple
|
|
|
from datetime import timedelta
|
|
|
from weakref import WeakValueDictionary
|
|
|
|
|
|
from kombu import Connection, Consumer, Exchange, Producer, Queue
|
|
|
from kombu.common import Broadcast
|
|
|
from kombu.pools import ProducerPool
|
|
|
-from kombu.utils import cached_property, uuid
|
|
|
+from kombu.utils import cached_property
|
|
|
from kombu.utils.encoding import safe_repr
|
|
|
from kombu.utils.functional import maybe_list
|
|
|
|
|
@@ -25,10 +26,9 @@ from celery.five import items, string_t
|
|
|
from celery.utils.text import indent as textindent
|
|
|
from celery.utils.timeutils import to_utc
|
|
|
|
|
|
-from . import app_or_default
|
|
|
from . import routes as _routes
|
|
|
|
|
|
-__all__ = ['AMQP', 'Queues', 'TaskProducer', 'TaskConsumer']
|
|
|
+__all__ = ['AMQP', 'Queues', 'task_message']
|
|
|
|
|
|
#: Human readable queue declaration.
|
|
|
QUEUE_FORMAT = """
|
|
@@ -36,6 +36,9 @@ QUEUE_FORMAT = """
|
|
|
key={0.routing_key}
|
|
|
"""
|
|
|
|
|
|
+task_message = namedtuple('task_message',
|
|
|
+ ('headers', 'properties', 'body', 'sent_event'))
|
|
|
+
|
|
|
|
|
|
class Queues(dict):
|
|
|
"""Queue name⇒ declaration mapping.
|
|
@@ -184,204 +187,14 @@ class Queues(dict):
|
|
|
return self
|
|
|
|
|
|
|
|
|
-class TaskProducer(Producer):
|
|
|
- app = None
|
|
|
- auto_declare = False
|
|
|
- retry = False
|
|
|
- retry_policy = None
|
|
|
- utc = True
|
|
|
- event_dispatcher = None
|
|
|
- send_sent_event = False
|
|
|
-
|
|
|
- def __init__(self, channel=None, exchange=None, *args, **kwargs):
|
|
|
- self.retry = kwargs.pop('retry', self.retry)
|
|
|
- self.retry_policy = kwargs.pop('retry_policy',
|
|
|
- self.retry_policy or {})
|
|
|
- self.send_sent_event = kwargs.pop('send_sent_event',
|
|
|
- self.send_sent_event)
|
|
|
- exchange = exchange or self.exchange
|
|
|
- self.queues = self.app.amqp.queues # shortcut
|
|
|
- self.default_queue = self.app.amqp.default_queue
|
|
|
- super(TaskProducer, self).__init__(channel, exchange, *args, **kwargs)
|
|
|
-
|
|
|
- def publish_task(self, task_name, task_args=None, task_kwargs=None,
|
|
|
- countdown=None, eta=None, task_id=None, group_id=None,
|
|
|
- taskset_id=None, # compat alias to group_id
|
|
|
- expires=None, exchange=None, exchange_type=None,
|
|
|
- event_dispatcher=None, retry=None, retry_policy=None,
|
|
|
- queue=None, now=None, retries=0, chord=None,
|
|
|
- callbacks=None, errbacks=None, routing_key=None,
|
|
|
- serializer=None, delivery_mode=None, compression=None,
|
|
|
- reply_to=None, time_limit=None, soft_time_limit=None,
|
|
|
- declare=None, headers=None,
|
|
|
- send_before_publish=signals.before_task_publish.send,
|
|
|
- before_receivers=signals.before_task_publish.receivers,
|
|
|
- send_after_publish=signals.after_task_publish.send,
|
|
|
- after_receivers=signals.after_task_publish.receivers,
|
|
|
- send_task_sent=signals.task_sent.send, # XXX deprecated
|
|
|
- sent_receivers=signals.task_sent.receivers,
|
|
|
- **kwargs):
|
|
|
- """Send task message."""
|
|
|
- retry = self.retry if retry is None else retry
|
|
|
- headers = {} if headers is None else headers
|
|
|
-
|
|
|
- qname = queue
|
|
|
- if queue is None and exchange is None:
|
|
|
- queue = self.default_queue
|
|
|
- if queue is not None:
|
|
|
- if isinstance(queue, string_t):
|
|
|
- qname, queue = queue, self.queues[queue]
|
|
|
- else:
|
|
|
- qname = queue.name
|
|
|
- exchange = exchange or queue.exchange.name
|
|
|
- routing_key = routing_key or queue.routing_key
|
|
|
- if declare is None and queue and not isinstance(queue, Broadcast):
|
|
|
- declare = [queue]
|
|
|
-
|
|
|
- # merge default and custom policy
|
|
|
- retry = self.retry if retry is None else retry
|
|
|
- _rp = (dict(self.retry_policy, **retry_policy) if retry_policy
|
|
|
- else self.retry_policy)
|
|
|
- task_id = task_id or uuid()
|
|
|
- task_args = task_args or []
|
|
|
- task_kwargs = task_kwargs or {}
|
|
|
- if not isinstance(task_args, (list, tuple)):
|
|
|
- raise ValueError('task args must be a list or tuple')
|
|
|
- if not isinstance(task_kwargs, dict):
|
|
|
- raise ValueError('task kwargs must be a dictionary')
|
|
|
- if countdown: # Convert countdown to ETA.
|
|
|
- now = now or self.app.now()
|
|
|
- eta = now + timedelta(seconds=countdown)
|
|
|
- if self.utc:
|
|
|
- eta = to_utc(eta).astimezone(self.app.timezone)
|
|
|
- if isinstance(expires, numbers.Real):
|
|
|
- now = now or self.app.now()
|
|
|
- expires = now + timedelta(seconds=expires)
|
|
|
- if self.utc:
|
|
|
- expires = to_utc(expires).astimezone(self.app.timezone)
|
|
|
- eta = eta and eta.isoformat()
|
|
|
- expires = expires and expires.isoformat()
|
|
|
-
|
|
|
- body = {
|
|
|
- 'task': task_name,
|
|
|
- 'id': task_id,
|
|
|
- 'args': task_args,
|
|
|
- 'kwargs': task_kwargs,
|
|
|
- 'retries': retries or 0,
|
|
|
- 'eta': eta,
|
|
|
- 'expires': expires,
|
|
|
- 'utc': self.utc,
|
|
|
- 'callbacks': callbacks,
|
|
|
- 'errbacks': errbacks,
|
|
|
- 'timelimit': (time_limit, soft_time_limit),
|
|
|
- 'taskset': group_id or taskset_id,
|
|
|
- 'chord': chord,
|
|
|
- }
|
|
|
-
|
|
|
- if before_receivers:
|
|
|
- send_before_publish(
|
|
|
- sender=task_name, body=body,
|
|
|
- exchange=exchange,
|
|
|
- routing_key=routing_key,
|
|
|
- declare=declare,
|
|
|
- headers=headers,
|
|
|
- properties=kwargs,
|
|
|
- retry_policy=retry_policy,
|
|
|
- )
|
|
|
-
|
|
|
- self.publish(
|
|
|
- body,
|
|
|
- exchange=exchange, routing_key=routing_key,
|
|
|
- serializer=serializer or self.serializer,
|
|
|
- compression=compression or self.compression,
|
|
|
- headers=headers,
|
|
|
- retry=retry, retry_policy=_rp,
|
|
|
- reply_to=reply_to,
|
|
|
- correlation_id=task_id,
|
|
|
- delivery_mode=delivery_mode, declare=declare,
|
|
|
- **kwargs
|
|
|
- )
|
|
|
-
|
|
|
- if after_receivers:
|
|
|
- send_after_publish(sender=task_name, body=body,
|
|
|
- exchange=exchange, routing_key=routing_key)
|
|
|
-
|
|
|
- if sent_receivers: # XXX deprecated
|
|
|
- send_task_sent(sender=task_name, task_id=task_id,
|
|
|
- task=task_name, args=task_args,
|
|
|
- kwargs=task_kwargs, eta=eta,
|
|
|
- taskset=group_id or taskset_id)
|
|
|
- if self.send_sent_event:
|
|
|
- evd = event_dispatcher or self.event_dispatcher
|
|
|
- exname = exchange or self.exchange
|
|
|
- if isinstance(exname, Exchange):
|
|
|
- exname = exname.name
|
|
|
- evd.publish(
|
|
|
- 'task-sent',
|
|
|
- {
|
|
|
- 'uuid': task_id,
|
|
|
- 'name': task_name,
|
|
|
- 'args': safe_repr(task_args),
|
|
|
- 'kwargs': safe_repr(task_kwargs),
|
|
|
- 'retries': retries,
|
|
|
- 'eta': eta,
|
|
|
- 'expires': expires,
|
|
|
- 'queue': qname,
|
|
|
- 'exchange': exname,
|
|
|
- 'routing_key': routing_key,
|
|
|
- },
|
|
|
- self, retry=retry, retry_policy=retry_policy,
|
|
|
- )
|
|
|
- return task_id
|
|
|
- delay_task = publish_task # XXX Compat
|
|
|
-
|
|
|
- @cached_property
|
|
|
- def event_dispatcher(self):
|
|
|
- # We call Dispatcher.publish with a custom producer
|
|
|
- # so don't need the dispatcher to be "enabled".
|
|
|
- return self.app.events.Dispatcher(enabled=False)
|
|
|
-
|
|
|
-
|
|
|
-class TaskPublisher(TaskProducer):
|
|
|
- """Deprecated version of :class:`TaskProducer`."""
|
|
|
-
|
|
|
- def __init__(self, channel=None, exchange=None, *args, **kwargs):
|
|
|
- self.app = app_or_default(kwargs.pop('app', self.app))
|
|
|
- self.retry = kwargs.pop('retry', self.retry)
|
|
|
- self.retry_policy = kwargs.pop('retry_policy',
|
|
|
- self.retry_policy or {})
|
|
|
- exchange = exchange or self.exchange
|
|
|
- if not isinstance(exchange, Exchange):
|
|
|
- exchange = Exchange(exchange,
|
|
|
- kwargs.pop('exchange_type', 'direct'))
|
|
|
- self.queues = self.app.amqp.queues # shortcut
|
|
|
- super(TaskPublisher, self).__init__(channel, exchange, *args, **kwargs)
|
|
|
-
|
|
|
-
|
|
|
-class TaskConsumer(Consumer):
|
|
|
- app = None
|
|
|
-
|
|
|
- def __init__(self, channel, queues=None, app=None, accept=None, **kw):
|
|
|
- self.app = app or self.app
|
|
|
- if accept is None:
|
|
|
- accept = self.app.conf.CELERY_ACCEPT_CONTENT
|
|
|
- super(TaskConsumer, self).__init__(
|
|
|
- channel,
|
|
|
- queues or list(self.app.amqp.queues.consume_from.values()),
|
|
|
- accept=accept,
|
|
|
- **kw
|
|
|
- )
|
|
|
-
|
|
|
-
|
|
|
class AMQP(object):
|
|
|
Connection = Connection
|
|
|
Consumer = Consumer
|
|
|
+ Producer = Producer
|
|
|
|
|
|
#: compat alias to Connection
|
|
|
BrokerConnection = Connection
|
|
|
|
|
|
- producer_cls = TaskProducer
|
|
|
- consumer_cls = TaskConsumer
|
|
|
queues_cls = Queues
|
|
|
|
|
|
#: Cached and prepared routing table.
|
|
@@ -400,6 +213,18 @@ class AMQP(object):
|
|
|
def __init__(self, app):
|
|
|
self.app = app
|
|
|
|
|
|
+ @cached_property
|
|
|
+ def _task_retry(self):
|
|
|
+ return self.app.conf.CELERY_TASK_PUBLISH_RETRY
|
|
|
+
|
|
|
+ @cached_property
|
|
|
+ def _task_retry_policy(self):
|
|
|
+ return self.app.conf.CELERY_TASK_PUBLISH_RETRY_POLICY
|
|
|
+
|
|
|
+ @cached_property
|
|
|
+ def _task_sent_event(self):
|
|
|
+ return self.app.conf.CELERY_SEND_TASK_SENT_EVENT
|
|
|
+
|
|
|
def flush_routes(self):
|
|
|
self._rtable = _routes.prepare(self.app.conf.CELERY_ROUTES)
|
|
|
|
|
@@ -429,35 +254,14 @@ class AMQP(object):
|
|
|
self.app.either('CELERY_CREATE_MISSING_QUEUES',
|
|
|
create_missing), app=self.app)
|
|
|
|
|
|
- @cached_property
|
|
|
- def TaskConsumer(self):
|
|
|
- """Return consumer configured to consume from the queues
|
|
|
- we are configured for (``app.amqp.queues.consume_from``)."""
|
|
|
- return self.app.subclass_with_self(self.consumer_cls,
|
|
|
- reverse='amqp.TaskConsumer')
|
|
|
- get_task_consumer = TaskConsumer # XXX compat
|
|
|
-
|
|
|
- @cached_property
|
|
|
- def TaskProducer(self):
|
|
|
- """Return publisher used to send tasks.
|
|
|
-
|
|
|
- You should use `app.send_task` instead.
|
|
|
-
|
|
|
- """
|
|
|
- conf = self.app.conf
|
|
|
- return self.app.subclass_with_self(
|
|
|
- self.producer_cls,
|
|
|
- reverse='amqp.TaskProducer',
|
|
|
- exchange=self.default_exchange,
|
|
|
- routing_key=conf.CELERY_DEFAULT_ROUTING_KEY,
|
|
|
- serializer=conf.CELERY_TASK_SERIALIZER,
|
|
|
- compression=conf.CELERY_MESSAGE_COMPRESSION,
|
|
|
- retry=conf.CELERY_TASK_PUBLISH_RETRY,
|
|
|
- retry_policy=conf.CELERY_TASK_PUBLISH_RETRY_POLICY,
|
|
|
- send_sent_event=conf.CELERY_SEND_TASK_SENT_EVENT,
|
|
|
- utc=conf.CELERY_ENABLE_UTC,
|
|
|
+ def TaskConsumer(self, channel, queues=None, accept=None, **kw):
|
|
|
+ if accept is None:
|
|
|
+ accept = self.app.conf.CELERY_ACCEPT_CONTENT
|
|
|
+ return self.Consumer(
|
|
|
+ channel, accept=accept,
|
|
|
+ queues=queues or list(self.queues.consume_from.values()),
|
|
|
+ **kw
|
|
|
)
|
|
|
- TaskPublisher = TaskProducer # compat
|
|
|
|
|
|
@cached_property
|
|
|
def default_queue(self):
|
|
@@ -488,7 +292,7 @@ class AMQP(object):
|
|
|
self._producer_pool = ProducerPool(
|
|
|
self.app.pool,
|
|
|
limit=self.app.pool.limit,
|
|
|
- Producer=self.TaskProducer,
|
|
|
+ Producer=self.Producer,
|
|
|
)
|
|
|
return self._producer_pool
|
|
|
publisher_pool = producer_pool # compat alias
|
|
@@ -497,3 +301,164 @@ class AMQP(object):
|
|
|
def default_exchange(self):
|
|
|
return Exchange(self.app.conf.CELERY_DEFAULT_EXCHANGE,
|
|
|
self.app.conf.CELERY_DEFAULT_EXCHANGE_TYPE)
|
|
|
+
|
|
|
+ def create_task_message(self, task_id, name, args=None, kwargs=None,
|
|
|
+ countdown=None, eta=None, group_id=None,
|
|
|
+ expires=None, now=None, retries=0, chord=None,
|
|
|
+ callbacks=None, errbacks=None, reply_to=None,
|
|
|
+ time_limit=None, soft_time_limit=None,
|
|
|
+ create_sent_event=False):
|
|
|
+ args = args or ()
|
|
|
+ kwargs = kwargs or {}
|
|
|
+ utc = self.utc
|
|
|
+ if not isinstance(args, (list, tuple)):
|
|
|
+ raise ValueError('task args must be a list or tuple')
|
|
|
+ if not isinstance(kwargs, Mapping):
|
|
|
+ raise ValueError('task keyword arguments must be a mapping')
|
|
|
+ if countdown: # convert countdown to ETA
|
|
|
+ now = now or self.app.now()
|
|
|
+ eta = now + timedelta(seconds=countdown)
|
|
|
+ if utc:
|
|
|
+ eta = to_utc(eta).astimezone(self.app.timezone)
|
|
|
+ if isinstance(expires, numbers.Real):
|
|
|
+ now = now or self.app.now()
|
|
|
+ expires = now + timedelta(seconds=expires)
|
|
|
+ if utc:
|
|
|
+ expires = to_utc(expires).astimezone(self.app.timezone)
|
|
|
+ eta = eta and eta.isoformat()
|
|
|
+ expires = expires and expires.isoformat()
|
|
|
+
|
|
|
+ return task_message(
|
|
|
+ {},
|
|
|
+ {
|
|
|
+ 'correlation_id': task_id,
|
|
|
+ 'reply_to': reply_to,
|
|
|
+ },
|
|
|
+ {
|
|
|
+ 'task': name,
|
|
|
+ 'id': task_id,
|
|
|
+ 'args': args,
|
|
|
+ 'kwargs': kwargs,
|
|
|
+ 'retries': retries,
|
|
|
+ 'eta': eta,
|
|
|
+ 'expires': expires,
|
|
|
+ 'utc': utc,
|
|
|
+ 'callbacks': callbacks,
|
|
|
+ 'errbacks': errbacks,
|
|
|
+ 'timelimit': (time_limit, soft_time_limit),
|
|
|
+ 'taskset': group_id,
|
|
|
+ 'chord': chord,
|
|
|
+ },
|
|
|
+ {
|
|
|
+ 'uuid': task_id,
|
|
|
+ 'name': name,
|
|
|
+ 'args': safe_repr(args),
|
|
|
+ 'kwargs': safe_repr(kwargs),
|
|
|
+ 'retries': retries,
|
|
|
+ 'eta': eta,
|
|
|
+ 'expires': expires,
|
|
|
+ } if create_sent_event else None,
|
|
|
+ )
|
|
|
+
|
|
|
+ def _create_task_sender(self):
|
|
|
+ default_retry = self.app.conf.CELERY_TASK_PUBLISH_RETRY
|
|
|
+ default_policy = self.app.conf.CELERY_TASK_PUBLISH_RETRY_POLICY
|
|
|
+ default_queue = self.default_queue
|
|
|
+ queues = self.queues
|
|
|
+ send_before_publish = signals.before_task_publish.send
|
|
|
+ before_receivers = signals.before_task_publish.receivers
|
|
|
+ send_after_publish = signals.after_task_publish.send
|
|
|
+ after_receivers = signals.after_task_publish.receivers
|
|
|
+
|
|
|
+ send_task_sent = signals.task_sent.send # XXX compat
|
|
|
+ sent_receivers = signals.task_sent.receivers
|
|
|
+
|
|
|
+ default_evd = self._event_dispatcher
|
|
|
+ default_exchange = self.default_exchange
|
|
|
+
|
|
|
+ default_rkey = self.app.conf.CELERY_DEFAULT_ROUTING_KEY
|
|
|
+ default_serializer = self.app.conf.CELERY_TASK_SERIALIZER
|
|
|
+ default_compressor = self.app.conf.CELERY_MESSAGE_COMPRESSION
|
|
|
+
|
|
|
+ def publish_task(producer, name, message,
|
|
|
+ exchange=None, routing_key=None, queue=None,
|
|
|
+ event_dispatcher=None, retry=None, retry_policy=None,
|
|
|
+ serializer=None, delivery_mode=None,
|
|
|
+ compression=None, declare=None,
|
|
|
+ headers=None, **kwargs):
|
|
|
+ retry = default_retry if retry is None else retry
|
|
|
+ headers, properties, body, sent_event = message
|
|
|
+ if kwargs:
|
|
|
+ properties.update(kwargs)
|
|
|
+
|
|
|
+ qname = queue
|
|
|
+ if queue is None and exchange is None:
|
|
|
+ queue = default_queue
|
|
|
+ if queue is not None:
|
|
|
+ if isinstance(queue, string_t):
|
|
|
+ qname, queue = queue, queues[queue]
|
|
|
+ else:
|
|
|
+ qname = queue.name
|
|
|
+ exchange = exchange or queue.exchange.name
|
|
|
+ routing_key = routing_key or queue.routing_key
|
|
|
+ if declare is None and queue and not isinstance(queue, Broadcast):
|
|
|
+ declare = [queue]
|
|
|
+
|
|
|
+ # merge default and custom policy
|
|
|
+ retry = default_retry if retry is None else retry
|
|
|
+ _rp = (dict(default_policy, **retry_policy) if retry_policy
|
|
|
+ else default_policy)
|
|
|
+
|
|
|
+ if before_receivers:
|
|
|
+ send_before_publish(
|
|
|
+ sender=name, body=body,
|
|
|
+ exchange=exchange, routing_key=routing_key,
|
|
|
+ declare=declare, headers=headers,
|
|
|
+ properties=kwargs, retry_policy=retry_policy,
|
|
|
+ )
|
|
|
+ ret = producer.publish(
|
|
|
+ body,
|
|
|
+ exchange=exchange or default_exchange,
|
|
|
+ routing_key=routing_key or default_rkey,
|
|
|
+ serializer=serializer or default_serializer,
|
|
|
+ compression=compression or default_compressor,
|
|
|
+ retry=retry, retry_policy=_rp,
|
|
|
+ delivery_mode=delivery_mode, declare=declare,
|
|
|
+ headers=headers,
|
|
|
+ **properties
|
|
|
+ )
|
|
|
+ if after_receivers:
|
|
|
+ send_after_publish(sender=name, body=body,
|
|
|
+ exchange=exchange, routing_key=routing_key)
|
|
|
+ if sent_receivers: # XXX deprecated
|
|
|
+ send_task_sent(sender=name, task_id=body['id'], task=name,
|
|
|
+ args=body['args'], kwargs=body['kwargs'],
|
|
|
+ eta=body['eta'], taskset=body['taskset'])
|
|
|
+ if sent_event:
|
|
|
+ evd = event_dispatcher or default_evd
|
|
|
+ exname = exchange or self.exchange
|
|
|
+ if isinstance(name, Exchange):
|
|
|
+ exname = exname.name
|
|
|
+ sent_event.update({
|
|
|
+ 'queue': qname,
|
|
|
+ 'exchange': exname,
|
|
|
+ 'routing_key': routing_key,
|
|
|
+ })
|
|
|
+ evd.publish('task-sent', sent_event,
|
|
|
+ self, retry=retry, retry_policy=retry_policy)
|
|
|
+ return ret
|
|
|
+ return publish_task
|
|
|
+
|
|
|
+ @cached_property
|
|
|
+ def send_task_message(self):
|
|
|
+ return self._create_task_sender()
|
|
|
+
|
|
|
+ @cached_property
|
|
|
+ def utc(self):
|
|
|
+ return self.app.conf.CELERY_ENABLE_UTC
|
|
|
+
|
|
|
+ @cached_property
|
|
|
+ def _event_dispatcher(self):
|
|
|
+ # We call Dispatcher.publish with a custom producer
|
|
|
+ # so don't need the diuspatcher to be enabled.
|
|
|
+ return self.app.events.Dispatcher(enabled=False)
|