Browse Source

BaseApp/BaseLoader: Set config keys from cmdline on the App, not in the original dict.

Ask Solem 14 years ago
parent
commit
bbdad03c46
2 changed files with 112 additions and 38 deletions
  1. 112 34
      celery/app/base.py
  2. 0 4
      celery/loaders/base.py

+ 112 - 34
celery/app/base.py

@@ -17,6 +17,16 @@ class MultiDictView(AttributeDictMixin):
     * When getting a key, the dicts are searched in order.
     * When setting a key, the key is added to the first dict.
 
+    >>> d1 = {"x": 3"}
+    >>> d2 = {"x": 1, "y": 2, "z": 3}
+    >>> x = MultiDictView([d1, d2])
+
+    >>> x["x"]
+    3
+
+    >>>  x["y"]
+    2
+
     """
     dicts = None
 
@@ -64,29 +74,63 @@ class MultiDictView(AttributeDictMixin):
 
 
 class BaseApp(object):
-    _amqp = None
-    _backend = None
-    _conf = None
-    _control = None
-    _loader = None
-    _log = None
-
-    def __init__(self, loader=None, backend_cls=None):
+    """Base class for apps."""
+
+    def __init__(self, loader=None, backend=None, set_as_current=True):
         self.loader_cls = loader or "app"
-        self.backend_cls = backend_cls
+        self.backend_cls = backend
+        self._amqp = None
+        self._backend = None
+        self._conf = None
+        self._control = None
+        self._loader = None
+        self._log = None
+        self.set_as_current = set_as_current
+        self.on_init()
+
+    def on_init(self):
+        """Called at the end of the constructor."""
+        pass
 
     def config_from_object(self, obj, silent=False):
+        """Read configuration from object, where object is either
+        a real object, or the name of an object to import.
+
+            >>> celery.config_from_object("myapp.celeryconfig")
+
+            >>> from myapp import celeryconfig
+            >>> celery.config_from_object(celeryconfig)
+
+        """
         self._conf = None
         return self.loader.config_from_object(obj, silent=silent)
 
     def config_from_envvar(self, variable_name, silent=False):
+        """Read configuration from environment variable.
+
+        The value of the environment variable must be the name
+        of an object to import.
+
+            >>> os.environ["CELERY_CONFIG_MODULE"] = "myapp.celeryconfig"
+            >>> celery.config_from_envvar("CELERY_CONFIG_MODULE")
+
+        """
         self._conf = None
         return self.loader.config_from_envvar(variable_name, silent=silent)
 
     def config_from_cmdline(self, argv, namespace="celery"):
-        return self.loader.config_from_cmdline(argv, namespace)
+        """Read configuration from argv.
+
+        The config
+
+        """
+        config = self.loader.cmdline_config_parser(argv, namespace)
+        for key, value in config.items():
+            self.conf[key] = value
 
     def either(self, default_key, *values):
+        """Fallback to the value of a configuration key if none of the
+        ``*values`` are true."""
         for value in values:
             if value is not None:
                 return value
@@ -101,24 +145,15 @@ class BaseApp(object):
                 b[key] = value
         return b
 
-    def AsyncResult(self, task_id, backend=None):
-        from celery.result import BaseAsyncResult
-        return BaseAsyncResult(task_id, app=self,
-                               backend=backend or self.backend)
-
-    def TaskSetResult(self, taskset_id, results, **kwargs):
-        from celery.result import TaskSetResult
-        return TaskSetResult(taskset_id, results, app=self)
-
     def send_task(self, name, args=None, kwargs=None, countdown=None,
             eta=None, task_id=None, publisher=None, connection=None,
             connect_timeout=None, result_cls=None, expires=None,
             **options):
         """Send task by name.
 
-        Useful if you don't have access to the task class.
-
-        :param name: Name of task to execute.
+        :param name: Name of task to execute (e.g. ``"tasks.add"``).
+        :keyword result_cls: Specify custom result class. Default is
+            using :meth:`AsyncResult`.
 
         Supports the same arguments as
         :meth:`~celery.task.base.BaseTask.apply_async`.
@@ -148,7 +183,22 @@ class BaseApp(object):
     def broker_connection(self, hostname=None, userid=None,
             password=None, virtual_host=None, port=None, ssl=None,
             insist=None, connect_timeout=None, backend_cls=None):
-        """Establish a connection to the message broker."""
+        """Establish a connection to the message broker.
+
+        :keyword hostname: defaults to the ``BROKER_HOST`` setting.
+        :keyword userid: defaults to the ``BROKER_USER`` setting.
+        :keyword password: defaults to the ``BROKER_PASSWORD`` setting.
+        :keyword virtual_host: defaults to the ``BROKER_VHOST`` setting.
+        :keyword port: defaults to the ``BROKER_PORT`` setting.
+        :keyword ssl: defaults to the ``BROKER_USE_SSL`` setting.
+        :keyword insist: defaults to the ``BROKER_INSIST`` setting.
+        :keyword connect_timeout: defaults to the
+            ``BROKER_CONNECTION_TIMEOUT`` setting.
+        :keyword backend_cls: defaults to the ``BROKER_BACKEND`` setting.
+
+        :returns :class:`carrot.connection.BrokerConnection`:
+
+        """
         return self.amqp.BrokerConnection(
                     hostname or self.conf.BROKER_HOST,
                     userid or self.conf.BROKER_USER,
@@ -162,6 +212,14 @@ class BaseApp(object):
                                 "BROKER_CONNECTION_TIMEOUT", connect_timeout))
 
     def with_default_connection(self, fun):
+        """With any function accepting ``connection`` and ``connect_timeout``
+        keyword arguments, establishes a default connection if one is
+        not already passed to it.
+
+        Any automatically established connection will be closed after
+        the function returns.
+
+        """
 
         @wraps(fun)
         def _inner(*args, **kwargs):
@@ -178,6 +236,7 @@ class BaseApp(object):
         return _inner
 
     def pre_config_merge(self, c):
+        """Prepare configuration before it is merged with the defaults."""
         if not c.get("CELERY_RESULT_BACKEND"):
             rbackend = c.get("CELERY_BACKEND")
             if rbackend:
@@ -189,11 +248,14 @@ class BaseApp(object):
         return c
 
     def post_config_merge(self, c):
+        """Prepare configuration after it has been merged with the
+        defaults."""
         if not c.get("CELERY_QUEUES"):
-            c["CELERY_QUEUES"] = {c.CELERY_DEFAULT_QUEUE: {
-                "exchange": c.CELERY_DEFAULT_EXCHANGE,
-                "exchange_type": c.CELERY_DEFAULT_EXCHANGE_TYPE,
-                "binding_key": c.CELERY_DEFAULT_ROUTING_KEY}}
+            c["CELERY_QUEUES"] = {
+                c.CELERY_DEFAULT_QUEUE: {
+                    "exchange": c.CELERY_DEFAULT_EXCHANGE,
+                    "exchange_type": c.CELERY_DEFAULT_EXCHANGE_TYPE,
+                    "binding_key": c.CELERY_DEFAULT_ROUTING_KEY}}
         c["CELERY_ROUTES"] = routes.prepare(c.get("CELERY_ROUTES") or {})
         if c.get("CELERYD_LOG_COLOR") is None:
             c["CELERYD_LOG_COLOR"] = not c.CELERYD_LOG_FILE and \
@@ -220,6 +282,27 @@ class BaseApp(object):
                              self.conf.EMAIL_HOST_PASSWORD)
         mailer.send(message, fail_silently=fail_silently)
 
+    def AsyncResult(self, task_id, backend=None):
+        """Create :class:`celery.result.BaseAsyncResult` instance."""
+        from celery.result import BaseAsyncResult
+        return BaseAsyncResult(task_id, app=self,
+                               backend=backend or self.backend)
+
+    def TaskSetResult(self, taskset_id, results, **kwargs):
+        """Create :class:`celery.result.TaskSetResult` instance."""
+        from celery.result import TaskSetResult
+        return TaskSetResult(taskset_id, results, app=self)
+
+    def _get_backend(self):
+        from celery.backends import get_backend_cls
+        backend_cls = self.backend_cls or self.conf.CELERY_RESULT_BACKEND
+        backend_cls = get_backend_cls(backend_cls, loader=self.loader)
+        return backend_cls(app=self)
+
+    def _get_config(self):
+        return self.post_config_merge(MultiDictView(
+                    self.pre_config_merge(self.loader.conf), DEFAULTS))
+
     @property
     def amqp(self):
         if self._amqp is None:
@@ -230,10 +313,7 @@ class BaseApp(object):
     @property
     def backend(self):
         if self._backend is None:
-            from celery.backends import get_backend_cls
-            backend_cls = self.backend_cls or self.conf.CELERY_RESULT_BACKEND
-            backend_cls = get_backend_cls(backend_cls, loader=self.loader)
-            self._backend = backend_cls(app=self)
+            self._backend = self._get_backend()
         return self._backend
 
     @property
@@ -246,9 +326,7 @@ class BaseApp(object):
     @property
     def conf(self):
         if self._conf is None:
-            config = self.pre_config_merge(self.loader.conf)
-            self._conf = self.post_config_merge(
-                            MultiDictView(config, DEFAULTS))
+            self._conf = self._get_config()
         return self._conf
 
     @property

+ 0 - 4
celery/loaders/base.py

@@ -62,10 +62,6 @@ class BaseLoader(object):
             self.worker_initialized = True
             self.on_worker_init()
 
-    def config_from_cmdline(self, args, namespace="celery"):
-        for key, value in self.cmdline_config_parser(args, namespace).items():
-            self.conf[key] = value
-
     def cmdline_config_parser(self, args, namespace="celery",
                 re_type=re.compile(r"\((\w+)\)"),
                 extra_types={"json": anyjson.deserialize},