ソースを参照

Merge branch 'master' of github.com:celery/celery

Mher Movsisyan 12 年 前
コミット
51e6b34f7b

+ 2 - 0
celery/app/control.py

@@ -176,6 +176,8 @@ class Control(object):
         :keyword exchange_type: Type of exchange (defaults to 'direct')
             command to, when empty broadcast to all workers.
         :keyword routing_key: Optional routing key.
+        :keyword options: Additional options as supported
+            by :meth:`kombu.entitiy.Queue.from_dict`.
 
         See :meth:`broadcast` for supported keyword arguments.
 

+ 4 - 4
celery/app/task.py

@@ -175,15 +175,15 @@ class Task(object):
     #: If enabled the worker will not store task state and return values
     #: for this task.  Defaults to the :setting:`CELERY_IGNORE_RESULT`
     #: setting.
-    ignore_result = False
+    ignore_result = None
 
     #: When enabled errors will be stored even if the task is otherwise
     #: configured to ignore results.
-    store_errors_even_if_ignored = False
+    store_errors_even_if_ignored = None
 
     #: If enabled an email will be sent to :setting:`ADMINS` whenever a task
     #: of this type fails.
-    send_error_emails = False
+    send_error_emails = None
 
     #: The name of a serializer that are registered with
     #: :mod:`kombu.serialization.registry`.  Default is `'pickle'`.
@@ -214,7 +214,7 @@ class Task(object):
     #:
     #: The application default can be overridden using the
     #: :setting:`CELERY_TRACK_STARTED` setting.
-    track_started = False
+    track_started = None
 
     #: When enabled messages for this task will be acknowledged **after**
     #: the task has been executed, and not *just before* which is the

+ 4 - 0
celery/apps/worker.py

@@ -148,6 +148,10 @@ class Worker(configurated):
     def run(self):
         self.init_queues()
         self.app.loader.init_worker()
+        # this signal can be used to e.g. change queues after
+        # the -Q option has been applied.
+        signals.celeryd_after_setup.send(sender=self.hostname, instance=self,
+                                         conf=self.app.conf)
 
         if getattr(os, 'getuid', None) and os.getuid() == 0:
             warnings.warn(RuntimeWarning(

+ 46 - 15
celery/contrib/migrate.py

@@ -14,7 +14,7 @@ import socket
 from functools import partial
 from itertools import cycle, islice
 
-from kombu import eventloop
+from kombu import eventloop, Queue
 from kombu.exceptions import StdChannelError
 from kombu.utils.encoding import ensure_bytes
 
@@ -98,13 +98,13 @@ def migrate_tasks(source, dest, migrate=migrate_task, app=None,
                         on_declare_queue=on_declare_queue, **kwargs)
 
 
-def move_tasks(conn, predicate, exchange, routing_key, app=None, **kwargs):
+def move(predicate, conn, exchange=None, routing_key=None, app=None, **kwargs):
     """Find tasks by filtering them and move the tasks to a new queue.
 
-    :param conn: Connection to use.
     :param predicate: Filter function with signature ``(body, message)``.
-    :param exchange: Destination exchange.
-    :param routing_key: Destination routing key.
+    :param conn: Connection to use.
+    :keyword exchange: Default destination exchange.
+    :keyword routing_key: Default destination routing key.
 
     Also supports the same keyword arguments as :func:`start_filter`.
 
@@ -117,21 +117,54 @@ def move_tasks(conn, predicate, exchange, routing_key, app=None, **kwargs):
             if body['id'] == wanted_id:
                 return True
 
-        move_tasks(conn, is_wanted_task, exchange, routing_key)
+        move(is_wanted_task, conn, exchange, routing_key)
+
+
+    The predicate may also return a tuple of ``(exchange, routing_key)``
+    to specify the destination to where the task should be moved,
+    or a :class:`~kombu.entitiy.Queue` instance.
+    Any other true value means that the task will be moved to the
+    default exchange/routing_key.
 
     """
     app = app_or_default(app)
     producer = app.amqp.TaskProducer(conn)
 
     def on_task(body, message):
-        if predicate(body, message):
+        ret = predicate(body, message)
+        if ret:
+            if isinstance(ret, Queue):
+                ex, rk = ret.exchange.name, ret.routing_key
+            else:
+                ex, rk = expand_dest(ret, exchange, routing_key)
             republish(producer, message,
-                      exchange=exchange, routing_key=routing_key)
+                      exchange=ex, routing_key=rk)
             message.ack()
 
     return start_filter(app, conn, on_task, **kwargs)
 
 
+def expand_dest(ret, exchange, routing_key):
+    try:
+        ex, rk = ret
+    except (TypeError, ValueError):
+        ex, rk = exchange, routing_key
+    return ex, rk
+
+
+
+# XXX Deprecated (arguments rearranged)
+move_tasks = lambda conn, pred, *a, **kw: move(pred, conn, *a, **kw)
+
+
+def task_id_eq(task_id, body, message):
+    return body['id'] == task_id
+
+
+def task_id_in(ids, body, message):
+    return body['id'] in ids
+
+
 def move_task_by_id(conn, task_id, exchange, routing_key, **kwargs):
     """Find a task by id and move it to another queue.
 
@@ -143,11 +176,8 @@ def move_task_by_id(conn, task_id, exchange, routing_key, **kwargs):
     Also supports the same keyword arguments as :func:`start_filter`.
 
     """
-    def predicate(body, message):
-        if body['id'] == task_id:
-            return True
-
-    return move_tasks(conn, predicate, exchange, routing_key, **kwargs)
+    return move(conn, partial(task_id_eq, task_id),
+                exchange, routing_key, **kwargs)
 
 
 def prepare_queues(queues):
@@ -163,7 +193,8 @@ def prepare_queues(queues):
 
 def start_filter(app, conn, filter, limit=None, timeout=1.0,
         ack_messages=False, migrate=migrate_task, tasks=None, queues=None,
-        callback=None, forever=False, on_declare_queue=None, **kwargs):
+        callback=None, forever=False, on_declare_queue=None,
+        consume_from=None, **kwargs):
     state = State()
     queues = prepare_queues(queues)
     if isinstance(tasks, basestring):
@@ -177,7 +208,7 @@ def start_filter(app, conn, filter, limit=None, timeout=1.0,
     def ack_message(body, message):
         message.ack()
 
-    consumer = app.amqp.TaskConsumer(conn)
+    consumer = app.amqp.TaskConsumer(conn, queues=consume_from)
 
     if tasks:
         filter = filter_callback(filter, tasks)

+ 1 - 1
celery/loaders/default.py

@@ -75,4 +75,4 @@ class Loader(BaseLoader):
             return self.setup_settings(usercfg)
 
     def wanted_module_item(self, item):
-        return item[0].isupper() and not item.startswith('_')
+        return not item.startswith('_')

+ 2 - 1
celery/signals.py

@@ -24,7 +24,8 @@ task_success = Signal(providing_args=['result'])
 task_failure = Signal(providing_args=[
     'task_id', 'exception', 'args', 'kwargs', 'traceback', 'einfo'])
 task_revoked = Signal(providing_args=['terminated', 'signum', 'expired'])
-celeryd_init = Signal(providing_args=['instance'])
+celeryd_init = Signal(providing_args=['instance', 'conf'])
+celeryd_after_setup = Signal(providing_args=['instance', 'conf'])
 worker_init = Signal(providing_args=[])
 worker_process_init = Signal(providing_args=[])
 worker_ready = Signal(providing_args=[])

+ 8 - 4
celery/worker/consumer.py

@@ -767,14 +767,18 @@ class Consumer(object):
     def add_task_queue(self, queue, exchange=None, exchange_type=None,
             routing_key=None, **options):
         cset = self.task_consumer
-        exchange = queue if exchange is None else exchange
-        routing_key = queue if routing_key is None else routing_key
-        exchange_type = 'direct' if exchange_type is None else exchange_type
-        if not cset.consuming_from(queue):
+        try:
+            q = self.app.amqp.queues[queue]
+        except KeyError:
+            exchange = queue if exchange is None else exchange
+            routing_key = queue if routing_key is None else routing_key
+            exchange_type = 'direct' if exchange_type is None \
+                                     else exchange_type
             q = self.app.amqp.queues.add(queue,
                     exchange=exchange,
                     exchange_type=exchange_type,
                     routing_key=routing_key, **options)
+        if not cset.consuming_from(queue):
             cset.add_queue(q)
             cset.consume()
             logger.info('Started consuming from %r', queue)

+ 40 - 0
docs/userguide/signals.rst

@@ -205,6 +205,43 @@ Provides arguments:
 Worker Signals
 --------------
 
+.. signal:: celeryd_after_setup
+
+celeryd_after_setup
+~~~~~~~~~~~~~~~~~~~
+
+This signal is sent after the worker instance is set up,
+but before it calls run.  This means that any queues from the :option:`-Q`
+option is enabled, logging has been set up and so on.
+
+It can be used to e.g. add custom queues that should always be consumed
+from, disregarding the :option:`-Q` option.  Here's an example
+that sets up a direct queue for each worker, these queues can then be
+used to route a task to any specific worker:
+
+.. code-block:: python
+
+    from celery.signals import celeryd_after_setup
+
+    @celeryd_after_setup.connect
+    def setup_direct_queue(sender, instance, **kwargs):
+        queue_name = '%s.dq' % sender   # sender is the hostname of the worker
+        instance.app.queues.add(queue_name, routing_key=queue_name)
+
+Provides arguments:
+
+* sender
+  Hostname of the worker.
+
+* instance
+    This is the :class:`celery.apps.worker.Worker` instance to be initialized.
+    Note that only the :attr:`app` and :attr:`hostname` attributes have been
+    set so far, and the rest of ``__init__`` has not been executed.
+
+* conf
+    The configuration of the current app.
+
+
 .. signal:: celeryd_init
 
 celeryd_init
@@ -238,6 +275,9 @@ sender when you connect:
 
 Provides arguments:
 
+* sender
+  Hostname of the worker.
+
 * instance
     This is the :class:`celery.apps.worker.Worker` instance to be initialized.
     Note that only the :attr:`app` and :attr:`hostname` attributes have been