Browse Source

TaskPublisher: Ensure entities are declare when retry enabled

Ask Solem 14 years ago
parent
commit
f804855bc1
1 changed files with 38 additions and 35 deletions
  1. 38 35
      celery/app/amqp.py

+ 38 - 35
celery/app/amqp.py

@@ -145,70 +145,73 @@ class TaskPublisher(messaging.Publisher):
             super(TaskPublisher, self).declare()
             _exchanges_declared.add(self.exchange.name)
 
+    def _declare_queue(self, name, retry=False, retry_policy={}):
+        options = self.app.queues[name]
+        queue = messaging.entry_to_queue(name, **options)(self.channel)
+        if retry:
+            self.connection.ensure(queue, queue.declare, **retry_policy)()
+        else:
+            queue.declare()
+        return queue
+
+    def _declare_exchange(self, name, type, retry=False, retry_policy={}):
+        ex = Exchange(name, type=type, durable=self.durable,
+                      auto_delete=self.auto_delete)(self.channel)
+        if retry:
+            return self.connection.ensure(ex, ex.declare, **retry_policy)
+        return ex.declare()
+
     def delay_task(self, task_name, task_args=None, task_kwargs=None,
             countdown=None, eta=None, task_id=None, taskset_id=None,
             expires=None, exchange=None, exchange_type=None,
             event_dispatcher=None, retry=None, retry_policy=None,
-            queue=None, **kwargs):
+            queue=None, now=None, retries=0, **kwargs):
         """Send task message."""
+        connection = self.connection
+        _retry_policy = self.retry_policy
+        if retry_policy:  # merge default and custom policy
+            _retry_policy = dict(_retry_policy, **retry_policy)
 
+        # declare entities
         if queue and queue not in _queues_declared:
-            options = self.app.queues[queue]
-            entity = messaging.entry_to_queue(queue, **options)
-            entity(self.channel).declare()
+            entity = self._declare_queue(queue, retry, _retry_policy)
             _exchanges_declared.add(entity.exchange.name)
             _queues_declared.add(entity.name)
+        if exchange and exchange not in _exchanges_declared:
+            self._declare_exchange(exchange,
+                    exchange_type or self.exchange_type, retry, _retry_policy)
+            _exchanges_declared.add(exchange)
 
         task_id = task_id or gen_unique_id()
         task_args = task_args or []
         task_kwargs = task_kwargs or {}
-        now = None
-        if countdown:                           # Convert countdown to ETA.
-            now = datetime.now()
-            eta = now + timedelta(seconds=countdown)
-
         if not isinstance(task_args, (list, tuple)):
             raise ValueError("task args must be a list or tuple")
         if not isinstance(task_kwargs, dict):
             raise ValueError("task kwargs must be a dictionary")
-
+        if countdown:                           # Convert countdown to ETA.
+            now = now or datetime.now()
+            eta = now + timedelta(seconds=countdown)
         if isinstance(expires, int):
             now = now or datetime.now()
             expires = now + timedelta(seconds=expires)
-
-        retries = kwargs.get("retries", 0)
         eta = eta and eta.isoformat()
         expires = expires and expires.isoformat()
 
-        body = {
-            "task": task_name,
-            "id": task_id,
-            "args": task_args or [],
-            "kwargs": task_kwargs or {},
-            "retries": retries,
-            "eta": eta,
-            "expires": expires,
-        }
-
+        body = {"task": task_name,
+                "id": task_id,
+                "args": task_args or [],
+                "kwargs": task_kwargs or {},
+                "retries": retries or 0,
+                "eta": eta,
+                "expires": expires}
         if taskset_id:
             body["taskset"] = taskset_id
 
-        # custom exchange passed, need to declare it.
-        if exchange and exchange not in _exchanges_declared:
-            exchange_type = exchange_type or self.exchange_type
-            self.backend.exchange_declare(exchange=exchange,
-                                          type=exchange_type,
-                                          durable=self.durable,
-                                          auto_delete=self.auto_delete)
-            _exchanges_declared.add(exchange)
-
         send = self.send
         if retry is None and self.retry or retry:
-            send = self.connection.ensure(self, self.send,
-                            **dict(self.retry_policy, **retry_policy or {}))
-
+            send = connection.ensure(self, self.send, **_retry_policy)
         send(body, exchange=exchange, **extract_msg_options(kwargs))
-
         signals.task_sent.send(sender=task_name, **body)
         if event_dispatcher:
             event_dispatcher.send("task-sent", uuid=task_id,