Browse Source

Removes configurated/from_config

Ask Solem 11 years ago
parent
commit
2a1b6b92f3

+ 0 - 65
celery/app/abstract.py

@@ -1,65 +0,0 @@
-# -*- coding: utf-8 -*-
-"""
-    celery.app.abstract
-    ~~~~~~~~~~~~~~~~~~~
-
-    Abstract class that takes default attribute values
-    from the configuration.
-
-"""
-from __future__ import absolute_import
-
-from celery.five import items, with_metaclass
-
-
-class from_config(object):
-
-    def __init__(self, key=None):
-        self.key = key
-
-    def get_key(self, attr):
-        return attr if self.key is None else self.key
-
-
-class _configurated(type):
-
-    def __new__(cls, name, bases, attrs):
-        attrs['__confopts__'] = dict((attr, spec.get_key(attr))
-                                     for attr, spec in items(attrs)
-                                     if isinstance(spec, from_config))
-        inherit_from = attrs.get('inherit_confopts', ())
-        for subcls in bases:
-            try:
-                attrs['__confopts__'].update(subcls.__confopts__)
-            except AttributeError:
-                pass
-        for subcls in inherit_from:
-            attrs['__confopts__'].update(subcls.__confopts__)
-        attrs = dict((k, v if not isinstance(v, from_config) else None)
-                     for k, v in items(attrs))
-        return super(_configurated, cls).__new__(cls, name, bases, attrs)
-
-
-@with_metaclass(_configurated)
-class configurated(object):
-
-    def setup_defaults(self, kwargs, namespace='celery'):
-        confopts = self.__confopts__
-        app, find = self.app, self.app.conf.find_value_for_key
-
-        for attr, keyname in items(confopts):
-            try:
-                value = kwargs[attr]
-            except KeyError:
-                value = find(keyname, namespace)
-            else:
-                if value is None:
-                    value = find(keyname, namespace)
-            setattr(self, attr, value)
-
-        for attr_name, attr_value in items(kwargs):
-            if attr_name not in confopts and attr_value is not None:
-                setattr(self, attr_name, attr_value)
-
-    def confopts_as_dict(self):
-        return dict((key, getattr(self, key)) for key in self.__confopts__)

+ 20 - 11
celery/apps/beat.py

@@ -17,7 +17,6 @@ import sys
 
 from celery import VERSION_BANNER, platforms, beat
 from celery.app import app_or_default
-from celery.app.abstract import configurated, from_config
 from celery.utils.imports import qualname
 from celery.utils.log import LOG_LEVELS, get_logger
 from celery.utils.timeutils import humanize_seconds
@@ -35,22 +34,27 @@ Configuration ->
 logger = get_logger('celery.beat')
 
 
-class Beat(configurated):
+class Beat(object):
     Service = beat.Service
-
     app = None
-    loglevel = from_config('log_level')
-    logfile = from_config('log_file')
-    schedule = from_config('schedule_filename')
-    scheduler_cls = from_config('scheduler')
-    redirect_stdouts = from_config()
-    redirect_stdouts_level = from_config()
 
     def __init__(self, max_interval=None, app=None,
-                 socket_timeout=30, pidfile=None, no_color=None, **kwargs):
+                 socket_timeout=30, pidfile=None, no_color=None,
+                 loglevel=None, logfile=None, schedule=None,
+                 scheduler_cls=None, redirect_stdouts=None,
+                 redirect_stdouts_level=None, **kwargs):
         """Starts the beat task scheduler."""
         self.app = app = app_or_default(app or self.app)
-        self.setup_defaults(kwargs, namespace='celerybeat')
+        self.loglevel = self._getopt('log_level', loglevel)
+        self.logfile = self._getopt('log_file', logfile)
+        self.schedule = self._getopt('schedule_filename', schedule)
+        self.scheduler_cls = self._getopt('scheduler', scheduler_cls)
+        self.redirect_stdouts = self._getopt(
+            'redirect_stdouts', redirect_stdouts,
+        )
+        self.redirect_stdouts_level = self._getopt(
+            'redirect_stdouts_level', redirect_stdouts_level,
+        )
 
         self.max_interval = max_interval
         self.socket_timeout = socket_timeout
@@ -64,6 +68,11 @@ class Beat(configurated):
         if not isinstance(self.loglevel, int):
             self.loglevel = LOG_LEVELS[self.loglevel.upper()]
 
+    def _getopt(self, key, value):
+        if value is not None:
+            return value
+        return self.app.conf.find_value_for_key(key, namespace='celerybeat')
+
     def run(self):
         print(str(self.colored.cyan(
             'celery beat v{0} is starting.'.format(VERSION_BANNER))))

+ 14 - 7
celery/apps/worker.py

@@ -24,7 +24,6 @@ from billiard import current_process
 from kombu.utils.encoding import safe_str
 
 from celery import VERSION_BANNER, platforms, signals
-from celery.app.abstract import from_config
 from celery.exceptions import SystemTerminate
 from celery.five import string, string_t
 from celery.loaders.app import AppLoader
@@ -86,19 +85,26 @@ EXTRA_INFO_FMT = """
 
 
 class Worker(WorkController):
-    redirect_stdouts = from_config()
-    redirect_stdouts_level = from_config()
 
-    def on_before_init(self, purge=False, no_color=None, **kwargs):
-        # apply task execution optimizations
+    def on_before_init(self, **kwargs):
         trace.setup_worker_optimizations(self.app)
 
         # this signal can be used to set up configuration for
         # workers by name.
-        conf = self.app.conf
         signals.celeryd_init.send(
-            sender=self.hostname, instance=self, conf=conf,
+            sender=self.hostname, instance=self, conf=self.app.conf,
+        )
+
+    def on_after_init(self, purge=False, no_color=None,
+                      redirect_stdouts=None, redirect_stdouts_level=None,
+                      **kwargs):
+        self.redirect_stdouts = self._getopt(
+            'redirect_stdouts', redirect_stdouts,
+        )
+        self.redirect_stdouts_level = self._getopt(
+            'redirect_stdouts_level', redirect_stdouts_level,
         )
+        super(Worker, self).setup_defaults(**kwargs)
         self.purge = purge
         self.no_color = no_color
         self._isatty = isatty(sys.stdout)
@@ -110,6 +116,7 @@ class Worker(WorkController):
     def on_init_namespace(self):
         self._custom_logging = self.setup_logging()
         # apply task execution optimizations
+        # -- This will finalize the app!
         trace.setup_worker_optimizations(self.app)
 
     def on_start(self):

+ 3 - 2
celery/concurrency/processes.py

@@ -654,8 +654,9 @@ class TaskPool(BasePool):
                     try:
                         proc = fileno_to_inq[ready_fd]
                     except KeyError:
-                        # write was scheduled for this fd but the process has since
-                        # exited and the message must be sent to another process.
+                        # write was scheduled for this fd but the process
+                        # has since exited and the message must be sent to
+                        # another process.
                         return put_message(job)
                     cor = _write_job(proc, ready_fd, job)
                     job._writer = ref(cor)

+ 1 - 1
celery/utils/debug.py

@@ -13,7 +13,7 @@ import os
 from contextlib import contextmanager
 from functools import partial
 
-from celery.five import format_d, range
+from celery.five import range
 from celery.platforms import signals
 
 try:

+ 60 - 25
celery/worker/__init__.py

@@ -26,7 +26,6 @@ from celery import concurrency as _concurrency
 from celery import platforms
 from celery import signals
 from celery.app import app_or_default
-from celery.app.abstract import configurated, from_config
 from celery.exceptions import (
     ImproperlyConfigured, SystemTerminate, TaskRevokedError,
 )
@@ -51,31 +50,9 @@ def default_nodename(hostname):
     return nodename(name or 'celery', host or socket.gethostname())
 
 
-class WorkController(configurated):
+class WorkController(object):
     """Unmanaged worker instance."""
     app = None
-    concurrency = from_config()
-    loglevel = from_config('log_level')
-    logfile = from_config('log_file')
-    send_events = from_config()
-    pool_cls = from_config('pool')
-    consumer_cls = from_config('consumer')
-    timer_cls = from_config('timer')
-    timer_precision = from_config('timer_precision')
-    autoscaler_cls = from_config('autoscaler')
-    autoreloader_cls = from_config('autoreloader')
-    schedule_filename = from_config()
-    scheduler_cls = from_config('celerybeat_scheduler')
-    task_time_limit = from_config()
-    task_soft_time_limit = from_config()
-    max_tasks_per_child = from_config()
-    pool_putlocks = from_config()
-    pool_restarts = from_config()
-    force_execv = from_config()
-    prefetch_multiplier = from_config()
-    state_db = from_config()
-    disable_rate_limits = from_config()
-    worker_lost_wait = from_config()
 
     pidlock = None
     namespace = None
@@ -103,6 +80,8 @@ class WorkController(configurated):
         self.hostname = default_nodename(hostname)
         self.app.loader.init_worker()
         self.on_before_init(**kwargs)
+        self.setup_defaults(**kwargs)
+        self.on_after_init(**kwargs)
 
         self._finalize = [
             Finalize(self, self.stop, exitpriority=1),
@@ -113,7 +92,6 @@ class WorkController(configurated):
     def setup_instance(self, queues=None, ready_callback=None, pidfile=None,
                        include=None, use_eventloop=None, **kwargs):
         self.pidfile = pidfile
-        self.setup_defaults(kwargs, namespace='celeryd')
         self.setup_queues(queues)
         self.setup_includes(include)
 
@@ -154,6 +132,9 @@ class WorkController(configurated):
     def on_before_init(self, **kwargs):
         pass
 
+    def on_after_init(self, **kwargs):
+        pass
+
     def on_start(self):
         if self.pidfile:
             self.pidlock = platforms.create_pidlock(self.pidfile)
@@ -318,3 +299,57 @@ class WorkController(configurated):
     @property
     def state(self):
         return state
+
+    def setup_defaults(self, concurrency=None, loglevel=None, logfile=None,
+                       send_events=None, pool_cls=None, consumer_cls=None,
+                       timer_cls=None, timer_precision=None,
+                       autoscaler_cls=None, autoreloader_cls=None,
+                       pool_putlocks=None, pool_restarts=None,
+                       force_execv=None, state_db=None,
+                       schedule_filename=None, scheduler_cls=None,
+                       task_time_limit=None, task_soft_time_limit=None,
+                       max_tasks_per_child=None, prefetch_multiplier=None,
+                       disable_rate_limits=None, worker_lost_wait=None, **_kw):
+        self.concurrency = self._getopt('concurrency', concurrency)
+        self.loglevel = self._getopt('log_level', loglevel)
+        self.logfile = self._getopt('log_file', logfile)
+        self.send_events = self._getopt('send_events', send_events)
+        self.pool_cls = self._getopt('pool', pool_cls)
+        self.consumer_cls = self._getopt('consumer', consumer_cls)
+        self.timer_cls = self._getopt('timer', timer_cls)
+        self.timer_precision = self._getopt('timer_precision', timer_precision)
+        self.autoscaler_cls = self._getopt('autoscaler', autoscaler_cls)
+        self.autoreloader_cls = self._getopt('autoreloader', autoreloader_cls)
+        self.pool_putlocks = self._getopt('pool_putlocks', pool_putlocks)
+        self.pool_restarts = self._getopt('pool_restarts', pool_restarts)
+        self.force_execv = self._getopt('force_execv', force_execv)
+        self.state_db = self._getopt('state_db', state_db)
+        self.schedule_filename = self._getopt(
+            'schedule_filename', schedule_filename,
+        )
+        self.scheduler_cls = self._getopt(
+            'celerybeat_scheduler', scheduler_cls,
+        )
+        self.task_time_limit = self._getopt(
+            'task_time_limit', task_time_limit,
+        )
+        self.task_soft_time_limit = self._getopt(
+            'task_soft_time_limit', task_soft_time_limit,
+        )
+        self.max_tasks_per_child = self._getopt(
+            'max_tasks_per_child', max_tasks_per_child,
+        )
+        self.prefetch_multiplier = self._getopt(
+            'prefetch_multiplier', prefetch_multiplier,
+        )
+        self.disable_rate_limits = self._getopt(
+            'disable_rate_limits', disable_rate_limits,
+        )
+        self.worker_lost_wait = self._getopt(
+            'worker_lost_wait', worker_lost_wait,
+        )
+
+    def _getopt(self, key, value):
+        if value is not None:
+            return value
+        return self.app.conf.find_value_for_key(key, namespace='celeryd')

+ 0 - 11
docs/internals/reference/celery.app.abstract.rst

@@ -1,11 +0,0 @@
-==========================================
- celery.app.abstract
-==========================================
-
-.. contents::
-    :local:
-.. currentmodule:: celery.app.abstract
-
-.. automodule:: celery.app.abstract
-    :members:
-    :undoc-members:

+ 0 - 1
docs/internals/reference/index.rst

@@ -41,7 +41,6 @@
     celery.backends.redis
     celery.backends.cassandra
     celery.task.trace
-    celery.app.abstract
     celery.app.annotations
     celery.app.routes
     celery.security.certificate