Explorar o código

WorkController now supports queues= argument which is a list like -Q

Ask Solem %!s(int64=13) %!d(string=hai) anos
pai
achega
a5f0251d60
Modificáronse 3 ficheiros con 20 adicións e 13 borrados
  1. 5 0
      celery/app/base.py
  2. 13 12
      celery/apps/worker.py
  3. 2 1
      celery/worker/__init__.py

+ 5 - 0
celery/app/base.py

@@ -273,6 +273,11 @@ class BaseApp(object):
                                        use_ssl=self.conf.EMAIL_USE_SSL,
                                        use_tls=self.conf.EMAIL_USE_TLS)
 
+    def select_queues(self, queues=None):
+        if queues is not None:
+            return self.amqp.queues.select_subset(queues,
+                  self.conf.CELERY_CREATE_MISSING_QUEUES)
+
     def either(self, default_key, *values):
         """Fallback to the value of a configuration key if none of the
         `*values` are true."""

+ 13 - 12
celery/apps/worker.py

@@ -45,6 +45,14 @@ EXTRA_INFO_FMT = """
 %(tasks)s
 """
 
+UNKNOWN_QUEUE_ERROR = """\
+Trying to select queue subset of %r, but queue %s is not
+defined in the CELERY_QUEUES setting.
+
+If you want to automatically declare unknown queues you can
+enable the CELERY_CREATE_MISSING_QUEUES setting.
+"""
+
 
 def cpu_count():
     if multiprocessing is not None:
@@ -153,18 +161,11 @@ class Worker(object):
         print("celery@%s has started." % self.hostname)
 
     def init_queues(self):
-        if self.use_queues:
-            create_missing = self.app.conf.CELERY_CREATE_MISSING_QUEUES
-            try:
-                self.app.amqp.queues.select_subset(self.use_queues,
-                                                   create_missing)
-            except KeyError, exc:
-                raise ImproperlyConfigured(
-                    "Trying to select queue subset of %r, but queue %s"
-                    "is not defined in CELERY_QUEUES. If you want to "
-                    "automatically declare unknown queues you have to "
-                    "enable CELERY_CREATE_MISSING_QUEUES" % (
-                        self.use_queues, exc))
+        try:
+            self.app.select_queues(self.use_queues)
+        except KeyError, exc:
+            raise ImproperlyConfigured(
+                        UNKNOWN_QUEUE_ERROR % (self.use_queues, exc))
 
     def init_loader(self):
         self.loader = self.app.loader

+ 2 - 1
celery/worker/__init__.py

@@ -120,11 +120,12 @@ class WorkController(object):
             pool_putlocks=None, db=None, prefetch_multiplier=None,
             eta_scheduler_precision=None, disable_rate_limits=None,
             autoscale=None, autoscaler_cls=None, scheduler_cls=None,
-            app=None):
+            queues=None, app=None):
 
         self.app = app_or_default(app)
         conf = self.app.conf
         self._shutdown_complete = threading.Event()
+        self.app.select_queues(queues)  # select queues subset.
 
         # Options
         self.loglevel = loglevel or self.loglevel