Browse Source

Adds CELERY_WORKER_DIRECT option which adds a dedicated queue for each worker, so that tasks can be routed to specific workers.

Ask Solem 12 năm trước cách đây
mục cha
commit
eb8c36ca85
5 tập tin đã thay đổi với 47 bổ sung2 xóa
  1. 0 1
      celery/app/amqp.py
  2. 1 0
      celery/app/defaults.py
  3. 3 1
      celery/apps/worker.py
  4. 17 0
      celery/utils/__init__.py
  5. 26 0
      docs/configuration.rst

+ 0 - 1
celery/app/amqp.py

@@ -28,7 +28,6 @@ QUEUE_FORMAT = """
 . %(name)s exchange:%(exchange)s(%(exchange_type)s) binding:%(routing_key)s
 """
 
-
 class Queues(dict):
     """Queue name⇒ declaration mapping.
 

+ 1 - 0
celery/app/defaults.py

@@ -154,6 +154,7 @@ NAMESPACES = {
         'SECURITY_KEY': Option(type='string'),
         'SECURITY_CERTIFICATE': Option(type='string'),
         'SECURITY_CERT_STORE': Option(type='string'),
+        'WORKER_DIRECT': Option(False, type='bool'),
     },
     'CELERYD': {
         'AUTOSCALER': Option('celery.worker.autoscale.Autoscaler'),

+ 3 - 1
celery/apps/worker.py

@@ -27,7 +27,7 @@ from celery.app import app_or_default
 from celery.app.abstract import configurated, from_config
 from celery.exceptions import ImproperlyConfigured, SystemTerminate
 from celery.loaders.app import AppLoader
-from celery.utils import cry, isatty
+from celery.utils import cry, isatty, worker_direct
 from celery.utils.imports import qualname
 from celery.utils.log import get_logger, mlevel, set_in_sighandler
 from celery.utils.text import pluralize
@@ -181,6 +181,8 @@ class Worker(configurated):
             self.app.select_queues(self.use_queues)
         except KeyError, exc:
             raise ImproperlyConfigured(UNKNOWN_QUEUE % (self.use_queues, exc))
+        if self.app.conf.CELERY_WORKER_DIRECT:
+            self.app.amqp.queues.select_add(worker_direct(self.hostname))
 
     def redirect_stdouts_to_logger(self):
         self.app.log.setup(self.loglevel, self.logfile,

+ 17 - 0
celery/utils/__init__.py

@@ -20,6 +20,8 @@ from functools import partial, wraps
 from inspect import getargspec
 from pprint import pprint
 
+from kombu import Exchange, Queue
+
 from celery.exceptions import CPendingDeprecationWarning, CDeprecationWarning
 from .compat import StringIO
 
@@ -42,6 +44,21 @@ DEPRECATION_FMT = """
 #: task to be that of ``App.main``.
 MP_MAIN_FILE = os.environ.get('MP_MAIN_FILE') or None
 
+#: Exchange for worker direct queues.
+WORKER_DIRECT_EXCHANGE = Exchange('C.dq')
+
+#: Format for worker direct queue names.
+WORKER_DIRECT_QUEUE_FORMAT = '%s.dq'
+
+
+def worker_direct(hostname):
+    if isinstance(hostname, Queue):
+        return hostname
+    return Queue(WORKER_DIRECT_QUEUE_FORMAT % hostname,
+                 WORKER_DIRECT_EXCHANGE,
+                 hostname,
+                 auto_delete=True)
+
 
 def warn_deprecated(description=None, deprecation=None, removal=None,
         alternative=None):

+ 26 - 0
docs/configuration.rst

@@ -605,6 +605,32 @@ A list of routers, or a single router used to route tasks to queues.
 When deciding the final destination of a task the routers are consulted
 in order.  See :ref:`routers` for more information.
 
+.. setting:: CELERY_WORKER_DIRECT
+
+CELERY_WORKER_DIRECT
+~~~~~~~~~~~~~~~~~~~~
+
+This option enables so that every worker has a dedicated queue,
+so that tasks can be routed to specific workers.
+
+The queue name for each worker is automatically generated based on
+the worker hostname and a ``.dq`` suffix, using the ``C.dq`` exchange.
+
+For example the queue name for the worker with hostname ``w1.example.com``
+becomes::
+
+    w1.example.com.dq
+
+Then you can route the task to the task by specifying the hostname
+as the routung key and the ``C.dq`` exchange::
+
+    CELERY_ROUTES = {
+        'tasks.add': {'exchange': 'C.dq', 'routing_key': 'w1.example.com'}
+    }
+
+This setting is mandatory if you want to use the ``move_to_worker`` features
+of :mod:`celery.contrib.migrate`.
+
 .. setting:: CELERY_CREATE_MISSING_QUEUES
 
 CELERY_CREATE_MISSING_QUEUES