Browse Source

The dirty work of merging Sean Creeley's consumerset branch with 0.6.0 has
been done. Tests are passing, but there are no consumerset related tests.

Ask Solem 15 years ago
parent
commit
c01a9885bb
5 changed files with 20 additions and 15 deletions
  1. 11 7
      celery/execute.py
  2. 2 2
      celery/messaging.py
  3. 4 1
      celery/task/base.py
  4. 0 2
      celery/tests/test_worker.py
  5. 3 3
      celery/worker/__init__.py

+ 11 - 7
celery/execute.py

@@ -9,10 +9,10 @@ from datetime import datetime, timedelta
 import inspect
 
 
-def apply_async(task, args=None, kwargs=None, routing_key=None,
-        immediate=None, mandatory=None, connection=None,
-        connect_timeout=AMQP_CONNECTION_TIMEOUT, priority=None,
-        countdown=None, eta=None, **opts):
+def apply_async(task, args=None, kwargs=None, countdown=None, eta=None,
+        routing_key=None, exchange=None,
+        immediate=None, mandatory=None, priority=None, connection=None,
+        connect_timeout=AMQP_CONNECTION_TIMEOUT, **opts):
     """Run a task asynchronously by the celery daemon(s).
 
     :param task: The task to run (a callable object, or a :class:`Task`
@@ -34,6 +34,9 @@ def apply_async(task, args=None, kwargs=None, routing_key=None,
     :keyword routing_key: The routing key used to route the task to a worker
         server.
 
+    :keyword exchange: The named exchange to send the task to. Defaults to
+        :attr:`celery.task.base.Task.exchange`.
+
     :keyword immediate: Request immediate delivery. Will raise an exception
         if the task cannot be routed to a worker immediately.
         (Do not confuse this parameter with the ``countdown`` and ``eta``
@@ -54,6 +57,7 @@ def apply_async(task, args=None, kwargs=None, routing_key=None,
     args = args or []
     kwargs = kwargs or {}
     routing_key = routing_key or getattr(task, "routing_key", None)
+    exchange = exchange or getattr(task, "exchange", None)
     immediate = immediate or getattr(task, "immediate", None)
     mandatory = mandatory or getattr(task, "mandatory", None)
     priority = priority or getattr(task, "priority", None)
@@ -78,9 +82,9 @@ def apply_async(task, args=None, kwargs=None, routing_key=None,
         delay_task = curry(publisher.delay_task_in_set, taskset_id)
 
     task_id = delay_task(task.name, args, kwargs,
-                         routing_key=routing_key, mandatory=mandatory,
-                         immediate=immediate, priority=priority,
-                         eta=eta)
+                         routing_key=routing_key, exchange=exchange,
+                         mandatory=mandatory, immediate=immediate,
+                         priority=priority, eta=eta)
 
     if need_to_close_connection:
         publisher.close()

+ 2 - 2
celery/messaging.py

@@ -64,8 +64,8 @@ class TaskPublisher(Publisher):
         return task_id
 
 
-def get_consumer_set(connection, queues=conf.AMQP_CONSUMER_QUEUES):
-    return ConsumerSet(connection, from_dict=queues, decoder=pickle.loads)
+def get_consumer_set(connection, queues=conf.AMQP_CONSUMER_QUEUES, **options):
+    return ConsumerSet(connection, from_dict=queues, **options)
 
 
 class TaskConsumer(Consumer):

+ 4 - 1
celery/task/base.py

@@ -34,6 +34,10 @@ class Task(object):
 
         Override the global default ``routing_key`` for this task.
 
+    .. attribute:: exchange
+
+        Override the global default ``exchange`` for this task.
+
     .. attribute:: mandatory
 
         If set, the message has mandatory routing. By default the message
@@ -77,7 +81,6 @@ class Task(object):
 
         >>> from celery.task import tasks, Task
         >>> class MyTask(Task):
-        ...     name = "mytask"
         ...
         ...     def run(self, some_arg=None, **kwargs):
         ...         logger = self.get_logger(**kwargs)

+ 0 - 2
celery/tests/test_worker.py

@@ -92,7 +92,6 @@ class TestAMQPListener(unittest.TestCase):
         l = AMQPListener(self.bucket_queue, self.hold_queue, self.logger)
 
         c = l.reset_connection()
-        self.assertTrue(isinstance(c, TaskConsumer))
         self.assertTrue(c is l.task_consumer)
         self.assertTrue(isinstance(l.amqp_connection, AMQPConnection))
 
@@ -101,7 +100,6 @@ class TestAMQPListener(unittest.TestCase):
         self.assertTrue(l.task_consumer is None)
 
         c = l.reset_connection()
-        self.assertTrue(isinstance(c, TaskConsumer))
         self.assertTrue(c is l.task_consumer)
         self.assertTrue(isinstance(l.amqp_connection, AMQPConnection))
 

+ 3 - 3
celery/worker/__init__.py

@@ -9,7 +9,7 @@ from carrot.connection import DjangoAMQPConnection
 from celery.worker.controllers import Mediator, PeriodicWorkController
 from celery.worker.job import TaskWrapper
 from celery.registry import NotRegistered
-from celery.messaging import TaskConsumer
+from celery.messaging import get_consumer_set
 from celery.conf import DAEMON_CONCURRENCY, DAEMON_LOG_FILE
 from celery.log import setup_logger
 from celery.pool import TaskPool
@@ -101,7 +101,7 @@ class AMQPListener(object):
 
     def reset_connection(self):
         """Reset the AMQP connection, and reinitialize the
-        :class:`celery.messaging.TaskConsumer` instance.
+        :class:`carrot.messaging.ConsumerSet` instance.
 
         Resets the task consumer in :attr:`task_consumer`.
 
@@ -110,7 +110,7 @@ class AMQPListener(object):
                 "AMQPListener: Re-establishing connection to the broker...")
         self.close_connection()
         self.amqp_connection = DjangoAMQPConnection()
-        self.task_consumer = TaskConsumer(connection=self.amqp_connection)
+        self.task_consumer = get_consumer_set(connection=self.amqp_connection)
         self.task_consumer.register_callback(self.receive_message)
         return self.task_consumer