瀏覽代碼

celeryd -Q option: Ability to specifiy list of queues to use, disabling other configured queues. Closes #78

Ask Solem 15 年之前
父節點
當前提交
8df76d0a5d
共有 4 個文件被更改,包括 44 次插入8 次删除
  1. 39 4
      celery/bin/celeryd.py
  2. 2 1
      celery/conf.py
  3. 2 2
      celery/messaging.py
  4. 1 1
      celery/utils/info.py

+ 39 - 4
celery/bin/celeryd.py

@@ -26,6 +26,12 @@
     Also run the ``celerybeat`` periodic task scheduler. Please note that
     there must only be one instance of this service.
 
+.. cmdoption:: -Q, queues
+
+    List of queues to enable for this worker separated by comma.
+    By default all configured queues are enabled.
+    Example: ``-Q video,image``
+
 .. cmdoption:: -s, --schedule
 
     Path to the schedule database if running with the ``-B`` option.
@@ -42,6 +48,19 @@
     **WARNING**: This is unrecoverable, and the tasks will be
     deleted from the messaging server.
 
+.. cmdoption:: --time-limit
+
+    Enables a hard time limit (in seconds) for tasks.
+
+.. cmdoption:: --soft-time-limit
+
+    Enables a soft time limit (in seconds) for tasks.
+
+.. cmdoption:: --maxtasksperchild
+
+    Maximum number of tasks a pool worker can execute before it's 
+    terminated and replaced by a new worker.
+
 """
 import os
 import sys
@@ -113,16 +132,21 @@ OPTION_LIST = (
     optparse.make_option('--time-limit',
             default=conf.CELERYD_TASK_TIME_LIMIT,
             action="store", type="int", dest="task_time_limit",
-            help="Enables a hard time limit (in seconds) for task run times."),
+            help="Enables a hard time limit (in seconds) for tasks."),
     optparse.make_option('--soft-time-limit',
             default=conf.CELERYD_TASK_SOFT_TIME_LIMIT,
             action="store", type="int", dest="task_soft_time_limit",
-            help="Enables a soft time limit for task run times."),
+            help="Enables a soft time limit (in seconds) for tasks."),
     optparse.make_option('--maxtasksperchild',
             default=conf.CELERYD_MAX_TASKS_PER_CHILD,
             action="store", type="int", dest="max_tasks_per_child",
             help="Maximum number of tasks a pool worker can execute"
-                 "before it's replaced with a new one."),
+                 "before it's terminated and replaced by a new worker."),
+    optparse.make_option('--queues', '-Q', default=[],
+            action="store", dest="queues",
+            help="Comma separated list of queues to enable for this worker. "
+                 "By default all configured queues are enabled. "
+                 "Example: -Q video,image"),
 )
 
 
@@ -135,7 +159,7 @@ class Worker(object):
             task_time_limit=conf.CELERYD_TASK_TIME_LIMIT,
             task_soft_time_limit=conf.CELERYD_TASK_SOFT_TIME_LIMIT,
             max_tasks_per_child=conf.CELERYD_MAX_TASKS_PER_CHILD,
-            events=False, **kwargs):
+            queues=None, events=False, **kwargs):
         self.concurrency = concurrency or multiprocessing.cpu_count()
         self.loglevel = loglevel
         self.logfile = logfile
@@ -147,6 +171,11 @@ class Worker(object):
         self.task_time_limit = task_time_limit
         self.task_soft_time_limit = task_soft_time_limit
         self.max_tasks_per_child = max_tasks_per_child
+        self.queues = queues or []
+
+        if isinstance(self.queues, basestring):
+            self.queues = self.queues.split(",")
+
         if not isinstance(self.loglevel, int):
             self.loglevel = conf.LOG_LEVELS[self.loglevel.upper()]
 
@@ -155,6 +184,7 @@ class Worker(object):
                                               celery.__version__))
 
         self.init_loader()
+        self.init_queues()
 
         if conf.RESULT_BACKEND == "database" \
                 and self.settings.DATABASE_ENGINE == "sqlite3" and \
@@ -183,6 +213,11 @@ class Worker(object):
         signals.worker_ready.send(sender=listener)
         print("celery@%s has started." % self.hostname)
 
+    def init_queues(self):
+        conf.QUEUES = dict((queue, options)
+                                for queue, options in conf.QUEUES.items()
+                                    if queue in self.queues)
+
     def init_loader(self):
         from celery.loaders import current_loader, load_settings
         self.loader = current_loader()

+ 2 - 1
celery/conf.py

@@ -208,4 +208,5 @@ def _init_routing_table(queues):
 
     return dict((queue, _defaults(opts)) for queue, opts in queues.items())
 
-routing_table = _init_routing_table(QUEUES)
+def get_routing_table():
+    return _init_routing_table(QUEUES)

+ 2 - 2
celery/messaging.py

@@ -23,7 +23,7 @@ MSG_OPTIONS = ("mandatory", "priority",
 
 get_msg_options = mitemgetter(*MSG_OPTIONS)
 extract_msg_options = lambda d: dict(zip(MSG_OPTIONS, get_msg_options(d)))
-default_queue = conf.routing_table[conf.DEFAULT_QUEUE]
+default_queue = conf.get_routing_table()[conf.DEFAULT_QUEUE]
 
 _queues_declared = False
 _exchanges_declared = {}
@@ -242,7 +242,7 @@ def get_consumer_set(connection, queues=None, **options):
     Defaults to the queues in ``CELERY_QUEUES``.
 
     """
-    queues = queues or conf.routing_table
+    queues = queues or conf.get_routing_table()
     cset = ConsumerSet(connection)
     for queue_name, queue_options in queues.items():
         queue_options = dict(queue_options)

+ 1 - 1
celery/utils/info.py

@@ -33,7 +33,7 @@ def textindent(t, indent=0):
 
 def format_routing_table(table=None, indent=0):
     """Format routing table into string for log dumps."""
-    table = table or conf.routing_table
+    table = table or conf.get_routing_table()
     format = lambda **route: ROUTE_FORMAT.strip() % route
     routes = "\n".join(format(name=name, **route)
                             for name, route in table.items())