|
@@ -15,12 +15,12 @@ from datetime import timedelta
|
|
|
from weakref import WeakValueDictionary
|
|
|
|
|
|
from kombu import BrokerConnection, Consumer, Exchange, Producer, Queue
|
|
|
-from kombu.common import entry_to_queue, maybe_declare
|
|
|
+from kombu.common import entry_to_queue
|
|
|
from kombu.pools import ProducerPool
|
|
|
|
|
|
from celery import signals
|
|
|
-from celery.utils import cached_property, lpmerge, uuid
|
|
|
-from celery.utils import text
|
|
|
+from celery.utils import cached_property, uuid
|
|
|
+from celery.utils.text import indent as textindent
|
|
|
|
|
|
from . import routes as _routes
|
|
|
|
|
@@ -109,8 +109,8 @@ class Queues(dict):
|
|
|
"routing_key": q.routing_key}
|
|
|
for name, q in sorted(active.iteritems())]
|
|
|
if indent_first:
|
|
|
- return text.indent("\n".join(info), indent)
|
|
|
- return info[0] + "\n" + text.indent("\n".join(info[1:]), indent)
|
|
|
+ return textindent("\n".join(info), indent)
|
|
|
+ return info[0] + "\n" + textindent("\n".join(info[1:]), indent)
|
|
|
|
|
|
def select_subset(self, wanted):
|
|
|
"""Sets :attr:`consume_from` by selecting a subset of the
|
|
@@ -156,11 +156,9 @@ class TaskProducer(Producer):
|
|
|
routing_key=None, serializer=None, delivery_mode=None,
|
|
|
compression=None, **kwargs):
|
|
|
"""Send task message."""
|
|
|
-
|
|
|
- connection = self.connection
|
|
|
- _retry_policy = self.retry_policy
|
|
|
- if retry_policy: # merge default and custom policy
|
|
|
- _retry_policy = dict(_retry_policy, **retry_policy)
|
|
|
+ # merge default and custom policy
|
|
|
+ _rp = (dict(self.retry_policy, **retry_policy) if retry_policy
|
|
|
+ else self.retry_policy)
|
|
|
task_id = task_id or uuid()
|
|
|
task_args = task_args or []
|
|
|
task_kwargs = task_kwargs or {}
|
|
@@ -168,7 +166,7 @@ class TaskProducer(Producer):
|
|
|
raise ValueError("task args must be a list or tuple")
|
|
|
if not isinstance(task_kwargs, dict):
|
|
|
raise ValueError("task kwargs must be a dictionary")
|
|
|
- if countdown: # Convert countdown to ETA.
|
|
|
+ if countdown: # Convert countdown to ETA.
|
|
|
now = now or self.app.now()
|
|
|
eta = now + timedelta(seconds=countdown)
|
|
|
if isinstance(expires, (int, float)):
|
|
@@ -179,8 +177,8 @@ class TaskProducer(Producer):
|
|
|
|
|
|
body = {"task": task_name,
|
|
|
"id": task_id,
|
|
|
- "args": task_args or [],
|
|
|
- "kwargs": task_kwargs or {},
|
|
|
+ "args": task_args,
|
|
|
+ "kwargs": task_kwargs,
|
|
|
"retries": retries or 0,
|
|
|
"eta": eta,
|
|
|
"expires": expires,
|
|
@@ -195,9 +193,8 @@ class TaskProducer(Producer):
|
|
|
self.publish(body, exchange=exchange, mandatory=mandatory,
|
|
|
immediate=immediate, routing_key=routing_key,
|
|
|
serializer=serializer or self.serializer,
|
|
|
- delivery_mode=delivery_mode,
|
|
|
compression=compression or self.compression,
|
|
|
- retry=retry, retry_policy=retry_policy,
|
|
|
+ retry=retry, retry_policy=_rp, delivery_mode=delivery_mode,
|
|
|
declare=[self.app.amqp.queues[queue]] if queue else [])
|
|
|
|
|
|
signals.task_sent.send(sender=task_name, **body)
|
|
@@ -298,8 +295,8 @@ class AMQP(object):
|
|
|
"""Queue name⇒ declaration mapping."""
|
|
|
return self.Queues(self.app.conf.CELERY_QUEUES)
|
|
|
|
|
|
- @queues.setter
|
|
|
- def queues(self, queues): # noqa
|
|
|
+ @queues.setter # noqa
|
|
|
+ def queues(self, queues):
|
|
|
return self.Queues(queues)
|
|
|
|
|
|
@property
|