Browse Source

[amqrpc] New backend now works

Ask Solem 12 years ago
parent
commit
51d7141c49
5 changed files with 14 additions and 8 deletions
  1. 1 1
      celery/app/amqp.py
  2. 1 1
      celery/app/task.py
  3. 6 2
      celery/backends/amqp.py
  4. 3 4
      celery/backends/amqrpc.py
  5. 3 0
      celery/backends/base.py

+ 1 - 1
celery/app/amqp.py

@@ -199,7 +199,7 @@ class TaskProducer(Producer):
                 'utc': self.utc,
                 'callbacks': callbacks,
                 'errbacks': errbacks,
-                'reply_to': reply_to}
+                'reply_to': reply_to,
                 'timeouts': timeouts}
         group_id = group_id or taskset_id
         if group_id:

+ 1 - 1
celery/app/task.py

@@ -474,7 +474,7 @@ class Task(object):
                 evd = app.events.Dispatcher(channel=P.channel,
                                             buffer_while_offline=False)
 
-            extra_properties = self.backend.on_task_apply(task_id)
+            extra_properties = self.backend.on_task_call(producer, task_id)
             task_id = P.publish_task(self.name, args, kwargs,
                                      task_id=task_id,
                                      event_dispatcher=evd,

+ 6 - 2
celery/backends/amqp.py

@@ -220,9 +220,13 @@ class AMQPBackend(BaseBackend):
 
             def callback(meta, message):
                 if meta['status'] in states.READY_STATES:
-                    results.append(meta)
+                    task_id = meta['task_id']
+                    if task_id in task_ids:
+                        results.append(meta)
+                    else:
+                        self._cache[task_id] = meta
 
-            bindings = self._many_bindings(task_id)
+            bindings = self._many_bindings(task_ids)
             with self.Consumer(channel, bindings, callbacks=[callback],
                     no_ack=True):
                 wait = conn.drain_events

+ 3 - 4
celery/backends/amqrpc.py

@@ -39,10 +39,9 @@ class AMQRPCBackend(amqp.AMQPBackend):
         return self.Exchange('c.amqrpc', type=type, delivery_mode=1,
                 durable=False, auto_delete=False)
 
-    def on_task_apply(self, task_id):
-        with self.app.pool.acquire_channel(block=True) as (conn, channel):
-            maybe_declare(self.binding(channel), retry=True)
-            return {'reply_to': self.oid}
+    def on_task_call(self, producer, task_id):
+        maybe_declare(self.binding(producer.channel), retry=True)
+        return {'reply_to': self.oid}
 
     def _create_binding(self, task_id):
         return self.binding

+ 3 - 0
celery/backends/base.py

@@ -256,6 +256,9 @@ class BaseBackend(object):
         """Cleanup actions to do at the end of a task worker process."""
         pass
 
+    def on_task_call(self, producer, task_id):
+        return {}
+
     def on_chord_part_return(self, task, propagate=False):
         pass