Browse Source

Use task.get_publisher with apply_async so the exchange is configured properly.

Ask Solem 15 years ago
parent
commit
b9e9fb4257
2 changed files with 6 additions and 3 deletions
  1. 2 1
      celery/execute/__init__.py
  2. 4 2
      celery/task/base.py

+ 2 - 1
celery/execute/__init__.py

@@ -70,12 +70,13 @@ def apply_async(task, args=None, kwargs=None, countdown=None, eta=None,
     if conf.ALWAYS_EAGER:
         return apply(task, args, kwargs)
 
+    task = tasks[task.name] # Get instance.
     options = dict(extract_exec_options(task), **options)
 
     if countdown: # Convert countdown to ETA.
         eta = datetime.now() + timedelta(seconds=countdown)
 
-    publish = publisher or TaskPublisher(connection)
+    publish = publisher or task.get_publisher(connection)
     try:
         task_id = publish.delay_task(task.name, args or [], kwargs or {},
                                      task_id=task_id,

+ 4 - 2
celery/task/base.py

@@ -208,7 +208,7 @@ class Task(object):
         """Establish a connection to the message broker."""
         return _establish_connection(connect_timeout)
 
-    def get_publisher(self, connection=None,
+    def get_publisher(self, connection=None, exchange=None,
             connect_timeout=conf.AMQP_CONNECTION_TIMEOUT):
         """Get a celery task message publisher.
 
@@ -222,9 +222,11 @@ class Task(object):
             >>> publisher.connection.close()
 
         """
+        if exchange is None:
+            exchange = self.exchange
         connection = connection or self.establish_connection(connect_timeout)
         return TaskPublisher(connection=connection,
-                             exchange=self.exchange,
+                             exchange=exchange,
                              routing_key=self.routing_key)
 
     def get_consumer(self, connection=None,