Bladeren bron

Merge branch '3.0'

Conflicts:
	celery/app/amqp.py
	celery/worker/consumer.py
	docs/userguide/canvas.rst
	docs/userguide/signals.rst
Ask Solem 12 jaren geleden
bovenliggende
commit
5807229c84
4 gewijzigde bestanden met toevoegingen van 16 en 7 verwijderingen
  1. 10 2
      celery/app/amqp.py
  2. 3 4
      celery/app/routes.py
  3. 2 0
      celery/utils/timeutils.py
  4. 1 1
      docs/configuration.rst

+ 10 - 2
celery/app/amqp.py

@@ -181,9 +181,17 @@ class TaskProducer(Producer):
             errbacks=None, mandatory=None, priority=None, immediate=None,
             routing_key=None, serializer=None, delivery_mode=None,
             compression=None, reply_to=None, timeout=None, soft_timeout=None,
-            timeouts=None, **kwargs):
+            timeouts=None, declare=None, **kwargs):
         """Send task message."""
         retry = self.retry if retry is None else retry
+
+        declare = declare or []
+        if queue is not None:
+            if isinstance(queue, basestring):
+                queue = self.queues[queue]
+            exchange = exchange or queue.exchange.name
+            routing_key = routing_key or queue.routing_key
+
         # merge default and custom policy
         retry = self.retry if retry is None else retry
         _rp = (dict(self.retry_policy, **retry_policy) if retry_policy
@@ -227,7 +235,7 @@ class TaskProducer(Producer):
              serializer=serializer or self.serializer,
              compression=compression or self.compression,
              retry=retry, retry_policy=_rp, delivery_mode=delivery_mode,
-             priority=priority, declare=[self.queues[queue]] if queue else [],
+             priority=priority, declare=declare,
              **kwargs)
 
         signals.task_sent.send(sender=task_name, **body)

+ 3 - 4
celery/app/routes.py

@@ -61,7 +61,7 @@ class Router(object):
 
         if queue:  # expand config from configured queue.
             try:
-                dest = self.queues[queue].as_dict()
+                _Q = self.queues[queue]  # noqa
             except KeyError:
                 if not self.create_missing:
                     raise QueueNotFound(
@@ -69,10 +69,9 @@ class Router(object):
                 for key in 'exchange', 'routing_key':
                     if route.get(key) is None:
                         route[key] = queue
-                dest = self.app.amqp.queues.add(queue, **route).as_dict()
+                self.app.amqp.queues.add(queue, **route)
             # needs to be declared by publisher
-            dest['queue'] = queue
-            return lpmerge(dest, route)
+            route['queue'] = queue
         return route
 
     def lookup_route(self, task, args=None, kwargs=None):

+ 2 - 0
celery/utils/timeutils.py

@@ -286,6 +286,8 @@ def localize(dt, tz):
     else:
         try:
             return _normalize(dt, is_dst=None)
+        except TypeError:
+            return _normalize(dt)
         except AmbiguousTimeError:
             return min(_normalize(dt, is_dst=True),
                        _normalize(dt, is_dst=False))

+ 1 - 1
docs/configuration.rst

@@ -1,4 +1,4 @@
-.. _configuration:
+.' _configuration:
 
 ============================
  Configuration and defaults