瀏覽代碼

apps.worker and WorkController refactored for consistency.

They now automatically get defaults from config too,
using ``celery.app.abstract.configured``.  This means we don't have
painfully list out every configuration option, and objects
can inherit the config key lists from other objects too.
Ask Solem 13 年之前
父節點
當前提交
4970d64c76
共有 6 個文件被更改,包括 144 次插入136 次删除
  1. 52 0
      celery/app/abstract.py
  2. 11 2
      celery/app/base.py
  3. 24 1
      celery/app/defaults.py
  4. 22 47
      celery/apps/worker.py
  5. 7 7
      celery/bin/celeryd.py
  6. 28 79
      celery/worker/__init__.py

+ 52 - 0
celery/app/abstract.py

@@ -0,0 +1,52 @@
+from __future__ import absolute_import
+
+
+class from_config(object):
+
+    def __init__(self, key=None):
+        self.key = key
+
+    def get_key(self, attr):
+        return attr if self.key is None else self.key
+
+
+
+class _configurated(type):
+
+    def __new__(cls, name, bases, attrs):
+        C = attrs["__confopts__"] = dict((attr, spec.get_key(attr))
+                                          for attr, spec in attrs.iteritems()
+                                              if isinstance(spec, from_config))
+        inherit_from = attrs.get("inherit_confopts", ())
+        for subcls in inherit_from:
+            attrs["__confopts__"].update(subcls.__confopts__)
+        attrs = dict((k, v if not isinstance(v, from_config) else None)
+                        for k, v in attrs.iteritems())
+        return super(_configurated, cls).__new__(cls, name, bases, attrs)
+
+
+class configurated(object):
+    __metaclass__ = _configurated
+
+
+    def setup_defaults(self, kwargs, namespace="celery"):
+        confopts = self.__confopts__
+        app, find = self.app, self.app.conf.find_value_for_key
+
+        for attr, keyname in confopts.iteritems():
+            try:
+                value = kwargs[attr]
+            except KeyError:
+                value = find(keyname, namespace)
+            else:
+                if value is None:
+                    value = find(keyname, namespace)
+            setattr(self, attr, value)
+
+        for attr_name, attr_value in kwargs.iteritems():
+            if attr_name not in confopts and attr_value is not None:
+                setattr(self, attr_name, attr_value)
+
+    def confopts_as_dict(self):
+        return dict((key, getattr(self, key))
+                        for key in self.__confopts__.iterkeys())

+ 11 - 2
celery/app/base.py

@@ -25,7 +25,7 @@ from .. import datastructures
 from .. import platforms
 from ..utils import cached_property, instantiate, lpmerge
 
-from .defaults import DEFAULTS, find_deprecated_settings
+from .defaults import DEFAULTS, find_deprecated_settings, find
 
 import kombu
 if kombu.VERSION < (1, 1, 0):
@@ -60,11 +60,20 @@ class Settings(datastructures.ConfigurationView):
 
     @property
     def BROKER_HOST(self):
-
         return (os.environ.get("CELERY_BROKER_URL") or
                 self.get("BROKER_URL") or
                 self.get("BROKER_HOST"))
 
+    def find_option(self, name, namespace="celery"):
+        return find(name, namespace)
+
+    def get_by_parts(self, *parts):
+        return self["_".join(filter(None, parts))]
+
+    def find_value_for_key(self, name, namespace="celery"):
+        ns, key, _ = self.find_option(name, namespace=namespace)
+        return self.get_by_parts(ns, key)
+
 
 class BaseApp(object):
     """Base class for apps."""

+ 24 - 1
celery/app/defaults.py

@@ -16,6 +16,8 @@ import sys
 from collections import deque
 from datetime import timedelta
 
+from ..utils.functional import memoize
+
 is_jython = sys.platform.startswith("java")
 is_pypy = hasattr(sys, "pypy_version_info")
 
@@ -209,6 +211,7 @@ def flatten(d, ns=""):
                 stack.append((name + key + '_', value))
             else:
                 yield name + key, value
+DEFAULTS = dict((key, value.default) for key, value in flatten(NAMESPACES))
 
 
 def find_deprecated_settings(source):
@@ -221,4 +224,24 @@ def find_deprecated_settings(source):
                             alternative=opt.alt)
 
 
-DEFAULTS = dict((key, value.default) for key, value in flatten(NAMESPACES))
+@memoize(maxsize=None)
+def find(name, namespace="celery"):
+    # - Try specified namespace first.
+    namespace = namespace.upper()
+    for key, value in NAMESPACES[namespace].iteritems():
+        if key.lower() == name.lower():
+            return namespace, key, value
+
+    # - Try all the other namespaces.
+    for ns, keys in NAMESPACES.iteritems():
+        if isinstance(keys, dict):
+            if ns != namespace:
+                for key, value in keys.iteritems():
+                    if key.lower() == name.lower():
+                        return ns, key, value
+        else:
+            if ns.lower() == name.lower():
+                return None, key, value
+
+    # - See if name is a qualname last.
+    return None, name.upper(), DEFAULTS[name.upper()]

+ 22 - 47
celery/apps/worker.py

@@ -14,6 +14,7 @@ import warnings
 
 from .. import __version__, platforms, signals
 from ..app import app_or_default
+from ..app.abstract import configurated, from_config
 from ..exceptions import ImproperlyConfigured, SystemTerminate
 from ..utils import isatty, LOG_LEVELS, cry, qualname
 from ..worker import WorkController
@@ -68,44 +69,28 @@ def get_process_name():
         return multiprocessing.current_process().name
 
 
-class Worker(object):
+class Worker(configurated):
     WorkController = WorkController
 
-    def __init__(self, concurrency=None, loglevel=None, logfile=None,
-            hostname=None, discard=False, run_clockservice=False,
-            schedule=None, task_time_limit=None, task_soft_time_limit=None,
-            max_tasks_per_child=None, queues=None, events=None, db=None,
-            include=None, app=None, pidfile=None,
-            redirect_stdouts=None, redirect_stdouts_level=None,
-            autoscale=None, scheduler_cls=None, pool=None, **kwargs):
+    inherit_confopts = (WorkController, )
+    loglevel = from_config("log_level")
+    redirect_stdouts = from_config()
+    redirect_stdouts_level = from_config()
+
+    def __init__(self, hostname=None, discard=False, embed_clockservice=False,
+            queues=None, include=None, app=None, pidfile=None,
+            autoscale=None, **kwargs):
         self.app = app = app_or_default(app)
         conf = app.conf
-        self.concurrency = (concurrency or
-                            conf.CELERYD_CONCURRENCY or cpu_count())
-        self.loglevel = loglevel or conf.CELERYD_LOG_LEVEL
-        self.logfile = logfile or conf.CELERYD_LOG_FILE
-
+        self.setup_defaults(kwargs, namespace="celeryd")
+        if not self.concurrency:
+            self.concurrency = cpu_count()
         self.hostname = hostname or socket.gethostname()
         self.discard = discard
-        self.run_clockservice = run_clockservice
-        if self.app.IS_WINDOWS and self.run_clockservice:
+        self.embed_clockservice = embed_clockservice
+        if self.app.IS_WINDOWS and self.embed_clockservice:
             self.die("-B option does not work on Windows.  "
                      "Please run celerybeat as a separate service.")
-        self.schedule = schedule or conf.CELERYBEAT_SCHEDULE_FILENAME
-        self.scheduler_cls = scheduler_cls or conf.CELERYBEAT_SCHEDULER
-        self.events = events if events is not None else conf.CELERY_SEND_EVENTS
-        self.task_time_limit = (task_time_limit or
-                                conf.CELERYD_TASK_TIME_LIMIT)
-        self.task_soft_time_limit = (task_soft_time_limit or
-                                     conf.CELERYD_TASK_SOFT_TIME_LIMIT)
-        self.max_tasks_per_child = (max_tasks_per_child or
-                                    conf.CELERYD_MAX_TASKS_PER_CHILD)
-        self.redirect_stdouts = (redirect_stdouts or
-                                 conf.CELERY_REDIRECT_STDOUTS)
-        self.redirect_stdouts_level = (redirect_stdouts_level or
-                                       conf.CELERY_REDIRECT_STDOUTS_LEVEL)
-        self.pool = pool or conf.CELERYD_POOL
-        self.db = db
         self.use_queues = [] if queues is None else queues
         self.queues = None
         self.include = [] if include is None else include
@@ -220,8 +205,8 @@ class Worker(object):
             "concurrency": concurrency,
             "loglevel": LOG_LEVELS[self.loglevel],
             "logfile": self.logfile or "[stderr]",
-            "celerybeat": "ON" if self.run_clockservice else "OFF",
-            "events": "ON" if self.events else "OFF",
+            "celerybeat": "ON" if self.embed_clockservice else "OFF",
+            "events": "ON" if self.send_events else "OFF",
             "loader": qualname(self.loader),
             "queues": app.amqp.queues.format(indent=18, indent_first=False),
         }
@@ -231,21 +216,11 @@ class Worker(object):
             pidlock = platforms.create_pidlock(self.pidfile).acquire()
             atexit.register(pidlock.release)
         worker = self.WorkController(app=self.app,
-                                concurrency=self.concurrency,
-                                loglevel=self.loglevel,
-                                logfile=self.logfile,
-                                hostname=self.hostname,
-                                ready_callback=self.on_consumer_ready,
-                                embed_clockservice=self.run_clockservice,
-                                schedule_filename=self.schedule,
-                                scheduler_cls=self.scheduler_cls,
-                                send_events=self.events,
-                                db=self.db,
-                                max_tasks_per_child=self.max_tasks_per_child,
-                                task_time_limit=self.task_time_limit,
-                                task_soft_time_limit=self.task_soft_time_limit,
-                                autoscale=self.autoscale,
-                                pool_cls=self.pool)
+                                    hostname=self.hostname,
+                                    ready_callback=self.on_consumer_ready,
+                                    embed_clockservice=self.embed_clockservice,
+                                    autoscale=self.autoscale,
+                                    **self.confopts_as_dict())
         self.install_platform_tweaks(worker)
         worker.start()
 

+ 7 - 7
celery/bin/celeryd.py

@@ -106,7 +106,7 @@ class WorkerCommand(Command):
                 help="Number of worker threads/processes"),
             Option('-P', '--pool',
                 default=conf.CELERYD_POOL,
-                action="store", dest="pool", type="str",
+                action="store", dest="pool_cls", type="str",
                 help="Pool implementation: "
                      "processes (default), eventlet, gevent, "
                      "solo or threads."),
@@ -125,13 +125,13 @@ class WorkerCommand(Command):
                 action="store", dest="hostname",
                 help="Set custom host name. E.g. 'foo.example.com'."),
             Option('-B', '--beat', default=False,
-                action="store_true", dest="run_clockservice",
+                action="store_true", dest="embed_clockservice",
                 help="Also run the celerybeat periodic task scheduler. "
                      "NOTE: Only one instance of celerybeat must be"
                      "running at any one time."),
             Option('-s', '--schedule',
                 default=conf.CELERYBEAT_SCHEDULE_FILENAME,
-                action="store", dest="schedule",
+                action="store", dest="schedule_filename",
                 help="Path to the schedule database if running with the -B "
                      "option. The extension '.db' will be appended to the "
                     "filename. Default: %s" % (
@@ -142,12 +142,12 @@ class WorkerCommand(Command):
                 help="Scheduler class. Default is "
                      "celery.beat:PersistentScheduler"),
             Option('-S', '--statedb', default=conf.CELERYD_STATE_DB,
-                action="store", dest="db",
+                action="store", dest="state_db",
                 help="Path to the state database. The extension '.db' will "
                      "be appended to the filename. Default: %s" % (
                         conf.CELERYD_STATE_DB, )),
             Option('-E', '--events', default=conf.CELERY_SEND_EVENTS,
-                action="store_true", dest="events",
+                action="store_true", dest="send_events",
                 help="Send events so the worker can be monitored by "
                      "celeryev, celerymon and other monitors.."),
             Option('--time-limit',
@@ -172,11 +172,11 @@ class WorkerCommand(Command):
                 action="store", dest="include",
                 help="Comma separated list of additional modules to import. "
                  "Example: -I foo.tasks,bar.tasks"),
-            Option('--pidfile', default=None,
+            Option('--pidfile', dest="pidfile", default=None,
                 help="Optional file used to store the workers pid. "
                      "The worker will not start if this file already exists "
                      "and the pid is still alive."),
-            Option('--autoscale', default=None,
+            Option('--autoscale', dest="autoscale", default=None,
                 help="Enable autoscaling by providing "
                      "max_concurrency,min_concurrency. Example: "
                      "--autoscale=10,3 (always keep 3 processes, "

+ 28 - 79
celery/worker/__init__.py

@@ -24,6 +24,7 @@ from .. import beat
 from .. import concurrency as _concurrency
 from .. import registry, platforms, signals
 from ..app import app_or_default
+from ..app.abstract import configured, from_config
 from ..exceptions import SystemTerminate
 from ..log import SilenceRepeated
 from ..utils import noop, instantiate
@@ -67,111 +68,59 @@ def process_initializer(app, hostname):
     signals.worker_process_init.send(sender=None)
 
 
-class WorkController(object):
+class WorkController(configurated):
     """Unmanaged worker instance."""
     RUN = RUN
     CLOSE = CLOSE
     TERMINATE = TERMINATE
 
-    #: The number of simultaneous processes doing work (default:
-    #: :setting:`CELERYD_CONCURRENCY`)
-    concurrency = None
-
-    #: The loglevel used (default: :const:`logging.INFO`)
+    concurrency = from_config()
     loglevel = logging.ERROR
-
-    #: The logfile used, if no logfile is specified it uses `stderr`
-    #: (default: :setting:`CELERYD_LOG_FILE`).
-    logfile = None
-
-    #: If :const:`True`, celerybeat is embedded, running in the main worker
-    #: process as a thread.
-    embed_clockservice = None
-
-    #: Enable the sending of monitoring events, these events can be captured
-    #: by monitors (celerymon).
-    send_events = False
-
-    #: The :class:`logging.Logger` instance used for logging.
-    logger = None
-
-    #: The pool instance used.
-    pool = None
-
-    #: The internal queue object that holds tasks ready for immediate
-    #: processing.
-    ready_queue = None
-
-    #: Instance of :class:`celery.worker.mediator.Mediator`.
-    mediator = None
-
-    #: Consumer instance.
-    consumer = None
+    logfile = from_config("log_file")
+    send_events = from_config()
+    pool_cls = from_config("pool")
+    consumer_cls = from_config("consumer")
+    mediator_cls = from_config("mediator")
+    eta_scheduler_cls = from_config("eta_scheduler")
+    eta_scheduler_precision = from_config()
+    autoscaler_cls = from_config("autoscaler")
+    schedule_filename = from_config()
+    scheduler_cls = from_config("celerybeat_scheduler")
+    task_time_limit = from_config()
+    task_soft_time_limit = from_config()
+    max_tasks_per_child = from_config()
+    pool_putlocks = from_config()
+    prefetch_multiplier = from_config()
+    state_db = from_config()
+    disable_rate_limits = from_config()
 
     _state = None
     _running = 0
 
-    def __init__(self, concurrency=None, logfile=None, loglevel=None,
-            send_events=None, hostname=None, ready_callback=noop,
-            embed_clockservice=False, pool_cls=None, consumer_cls=None,
-            mediator_cls=None, eta_scheduler_cls=None,
-            schedule_filename=None, task_time_limit=None,
-            task_soft_time_limit=None, max_tasks_per_child=None,
-            pool_putlocks=None, db=None, prefetch_multiplier=None,
-            eta_scheduler_precision=None, disable_rate_limits=None,
-            autoscale=None, autoscaler_cls=None, scheduler_cls=None,
-            queues=None, app=None):
-
+    def __init__(self, loglevel=None, hostname=None, logger=None,
+            ready_callback=noop, embed_clockservice=False, autoscale=None,
+            queues=None, app=None, **kwargs):
         self.app = app_or_default(app)
         conf = self.app.conf
         self._shutdown_complete = threading.Event()
+        self.setup_defaults(kwargs, namespace="celeryd")
         self.app.select_queues(queues)  # select queues subset.
 
         # Options
+        self.pool_cls = _concurrency.get_implementation(self.pool_cls)
+        self.autoscale = autoscale
         self.loglevel = loglevel or self.loglevel
-        self.concurrency = concurrency or conf.CELERYD_CONCURRENCY
-        self.logfile = logfile or conf.CELERYD_LOG_FILE
         self.logger = self.app.log.get_default_logger()
-        if send_events is None:
-            send_events = conf.CELERY_SEND_EVENTS
-        self.send_events = send_events
-        self.pool_cls = _concurrency.get_implementation(
-                            pool_cls or conf.CELERYD_POOL)
-        self.consumer_cls = consumer_cls or conf.CELERYD_CONSUMER
-        self.mediator_cls = mediator_cls or conf.CELERYD_MEDIATOR
-        self.eta_scheduler_cls = eta_scheduler_cls or \
-                                    conf.CELERYD_ETA_SCHEDULER
-
-        self.autoscaler_cls = autoscaler_cls or \
-                                    conf.CELERYD_AUTOSCALER
-        self.schedule_filename = schedule_filename or \
-                                    conf.CELERYBEAT_SCHEDULE_FILENAME
-        self.scheduler_cls = scheduler_cls or conf.CELERYBEAT_SCHEDULER
         self.hostname = hostname or socket.gethostname()
         self.embed_clockservice = embed_clockservice
         self.ready_callback = ready_callback
-        self.task_time_limit = task_time_limit or \
-                                conf.CELERYD_TASK_TIME_LIMIT
-        self.task_soft_time_limit = task_soft_time_limit or \
-                                conf.CELERYD_TASK_SOFT_TIME_LIMIT
-        self.max_tasks_per_child = max_tasks_per_child or \
-                                conf.CELERYD_MAX_TASKS_PER_CHILD
-        self.pool_putlocks = pool_putlocks or \
-                                conf.CELERYD_POOL_PUTLOCKS
-        self.eta_scheduler_precision = eta_scheduler_precision or \
-                                conf.CELERYD_ETA_SCHEDULER_PRECISION
-        self.prefetch_multiplier = prefetch_multiplier or \
-                                conf.CELERYD_PREFETCH_MULTIPLIER
         self.timer_debug = SilenceRepeated(self.logger.debug,
                                            max_iterations=10)
-        self.db = db or conf.CELERYD_STATE_DB
-        self.disable_rate_limits = disable_rate_limits or \
-                                conf.CELERY_DISABLE_RATE_LIMITS
         self._finalize = Finalize(self, self.stop, exitpriority=1)
         self._finalize_db = None
 
-        if self.db:
-            self._persistence = state.Persistent(self.db)
+        if self.state_db:
+            self._persistence = state.Persistent(self.state_db)
             atexit.register(self._persistence.save)
 
         # Queues