Ask Solem 13 lat temu
rodzic
commit
77ad54d669

+ 15 - 15
celery/app/amqp.py

@@ -19,7 +19,7 @@ from kombu.pools import ProducerPool
 
 from .. import routes as _routes
 from .. import signals
-from ..utils import cached_property, uuid
+from ..utils import cached_property, lpmerge, uuid
 from ..utils import text
 
 #: List of known options to a Kombu producers send method.
@@ -113,17 +113,18 @@ class Queues(dict):
                                  in `wanted` will raise :exc:`KeyError`.
 
         """
-        acc = {}
-        for queue in wanted:
-            try:
-                options = self[queue]
-            except KeyError:
-                if not create_missing:
-                    raise
-                options = self.options(queue, queue)
-            acc[queue] = options
-        self._consume_from = acc
-        self.update(acc)
+        if wanted:
+            acc = {}
+            for queue in wanted:
+                try:
+                    options = self[queue]
+                except KeyError:
+                    if not create_missing:
+                        raise
+                    options = self.options(queue, queue)
+                acc[queue] = options
+            self._consume_from = acc
+            self.update(acc)
 
     @property
     def consume_from(self):
@@ -309,8 +310,7 @@ class AMQP(object):
         default_queue_name, default_queue = self.get_default_queue()
         defaults = dict({"queue": default_queue_name}, **default_queue)
         defaults["routing_key"] = defaults.pop("binding_key", None)
-        return self.Consumer(*args,
-                             **self.app.merge(defaults, kwargs))
+        return self.Consumer(*args, **lpmerge(defaults, kwargs))
 
     def TaskPublisher(self, *args, **kwargs):
         """Returns publisher used to send tasks.
@@ -329,7 +329,7 @@ class AMQP(object):
                     "retry_policy": conf.CELERY_TASK_PUBLISH_RETRY_POLICY,
                     "enable_utc": conf.CELERY_ENABLE_UTC,
                     "app": self.app}
-        return TaskPublisher(*args, **self.app.merge(defaults, kwargs))
+        return TaskPublisher(*args, **lpmerge(defaults, kwargs))
 
     def get_task_consumer(self, connection, queues=None, **kwargs):
         """Return consumer configured to consume from all known task

+ 47 - 76
celery/app/base.py

@@ -12,25 +12,31 @@
 from __future__ import absolute_import
 from __future__ import with_statement
 
+import celery
+import kombu
 import os
-import warnings
 import platform as _platform
+import warnings
 
 from collections import deque
 from contextlib import contextmanager
 from copy import deepcopy
 from functools import wraps
-from pprint import pformat
 
 from kombu.clocks import LamportClock
 
 from .. import datastructures
 from .. import platforms
+from ..backends import get_backend_cls
 from ..exceptions import AlwaysEagerIgnored
+from ..loaders import get_loader_cls
 from ..local import maybe_evaluate
-from ..utils import cached_property, lpmerge
+from ..utils import cached_property, lpmerge, register_after_fork
+from ..utils.functional import first
 from ..utils.imports import instantiate, qualname
+from ..utils.text import pretty
 
+from .builtins import load_builtin_tasks
 from .defaults import DEFAULTS, find_deprecated_settings, find
 
 import kombu
@@ -40,8 +46,8 @@ if kombu.VERSION < (2, 0):
 SETTINGS_INFO = """%s %s"""
 
 BUGREPORT_INFO = """
-platform -> system:%(system)s arch:%(arch)s imp:%(py_i)s
 software -> celery:%(celery_v)s kombu:%(kombu_v)s py:%(py_v)s
+platform -> system:%(system)s arch:%(arch)s imp:%(py_i)s
 loader   -> %(loader)s
 settings -> transport:%(transport)s results:%(results)s
 
@@ -55,15 +61,14 @@ class Settings(datastructures.ConfigurationView):
     @property
     def CELERY_RESULT_BACKEND(self):
         """Resolves deprecated alias ``CELERY_BACKEND``."""
-        return self.get("CELERY_RESULT_BACKEND") or self.get("CELERY_BACKEND")
+        return self.first("CELERY_RESULT_BACKEND", "CELERY_BACKEND")
 
     @property
     def BROKER_TRANSPORT(self):
         """Resolves compat aliases :setting:`BROKER_BACKEND`
         and :setting:`CARROT_BACKEND`."""
-        return (self.get("BROKER_TRANSPORT") or
-                self.get("BROKER_BACKEND") or
-                self.get("CARROT_BACKEND"))
+        return self.first("BROKER_TRANSPORT",
+                          "BROKER_BACKEND", "CARROT_BACKEND")
 
     @property
     def BROKER_BACKEND(self):
@@ -73,8 +78,14 @@ class Settings(datastructures.ConfigurationView):
     @property
     def BROKER_HOST(self):
         return (os.environ.get("CELERY_BROKER_URL") or
-                self.get("BROKER_URL") or
-                self.get("BROKER_HOST"))
+                self.first("BROKER_URL", "BROKER_HOST"))
+
+    def without_defaults(self):
+        # the last stash is the default settings, so just skip that
+        return Settings({}, self._order[:-1])
+
+    def find_value_for_key(self, name, namespace="celery"):
+        return self.get_by_parts(*self.find_option(name, namespace)[:-1])
 
     def find_option(self, name, namespace="celery"):
         return find(name, namespace)
@@ -82,16 +93,16 @@ class Settings(datastructures.ConfigurationView):
     def get_by_parts(self, *parts):
         return self["_".join(filter(None, parts))]
 
-    def find_value_for_key(self, name, namespace="celery"):
-        ns, key, _ = self.find_option(name, namespace=namespace)
-        return self.get_by_parts(ns, key)
+    def humanize(self):
+        return "\n".join(SETTINGS_INFO % (key + ':', pretty(value, width=50))
+                    for key, value in self.without_defaults().iteritems())
+
 
 
 class BaseApp(object):
     """Base class for apps."""
     SYSTEM = platforms.SYSTEM
-    IS_OSX = platforms.IS_OSX
-    IS_WINDOWS = platforms.IS_WINDOWS
+    IS_OSX, IS_WINDOWS = platforms.IS_OSX, platforms.IS_WINDOWS
 
     amqp_cls = "celery.app.amqp:AMQP"
     backend_cls = None
@@ -100,13 +111,13 @@ class BaseApp(object):
     log_cls = "celery.app.log:Logging"
     control_cls = "celery.app.control:Control"
     registry_cls = "celery.app.registry:TaskRegistry"
-
     _pool = None
 
     def __init__(self, main=None, loader=None, backend=None,
             amqp=None, events=None, log=None, control=None,
             set_as_current=True, accept_magic_kwargs=False,
             tasks=None, **kwargs):
+        self.clock = LamportClock()
         self.main = main
         self.amqp_cls = amqp or self.amqp_cls
         self.backend_cls = backend or self.backend_cls
@@ -115,13 +126,12 @@ class BaseApp(object):
         self.log_cls = log or self.log_cls
         self.control_cls = control or self.control_cls
         self.set_as_current = set_as_current
-        self.accept_magic_kwargs = accept_magic_kwargs
-        self.clock = LamportClock()
         self.registry_cls = self.registry_cls if tasks is None else tasks
-        self._tasks = instantiate(self.registry_cls)
+        self.accept_magic_kwargs = accept_magic_kwargs
 
-        self._pending = deque()
         self.finalized = False
+        self._pending = deque()
+        self._tasks = instantiate(self.registry_cls)
 
         self.on_init()
 
@@ -131,6 +141,8 @@ class BaseApp(object):
 
     def finalize(self):
         if not self.finalized:
+            load_builtin_tasks(self)
+
             pending = self._pending
             while pending:
                 maybe_evaluate(pending.pop())
@@ -163,11 +175,7 @@ class BaseApp(object):
         return self.loader.config_from_envvar(variable_name, silent=silent)
 
     def config_from_cmdline(self, argv, namespace="celery"):
-        """Read configuration from argv.
-
-        The config
-
-        """
+        """Read configuration from argv."""
         self.conf.update(self.loader.cmdline_config_parser(argv, namespace))
 
     def send_task(self, name, args=None, kwargs=None, countdown=None,
@@ -212,9 +220,9 @@ class BaseApp(object):
 
     def AsyncResult(self, task_id, backend=None, task_name=None):
         """Create :class:`celery.result.BaseAsyncResult` instance."""
-        from ..result import BaseAsyncResult
-        return BaseAsyncResult(task_id, app=self, task_name=task_name,
-                               backend=backend or self.backend)
+        from ..result import AsyncResult
+        return AsyncResult(task_id, app=self, task_name=task_name,
+                           backend=backend or self.backend)
 
     def TaskSetResult(self, taskset_id, results, **kwargs):
         """Create :class:`celery.result.TaskSetResult` instance."""
@@ -295,8 +303,7 @@ class BaseApp(object):
 
     def prepare_config(self, c):
         """Prepare configuration before it is merged with the defaults."""
-        find_deprecated_settings(c)
-        return c
+        return find_deprecated_settings(c)
 
     def now(self):
         return self.loader.now()
@@ -316,25 +323,15 @@ class BaseApp(object):
                                        use_tls=self.conf.EMAIL_USE_TLS)
 
     def select_queues(self, queues=None):
-        if queues:
-            return self.amqp.queues.select_subset(queues,
-                                    self.conf.CELERY_CREATE_MISSING_QUEUES)
+        return self.amqp.queues.select_subset(queues,
+                                self.conf.CELERY_CREATE_MISSING_QUEUES)
 
     def either(self, default_key, *values):
         """Fallback to the value of a configuration key if none of the
         `*values` are true."""
-        for value in values:
-            if value is not None:
-                return value
-        return self.conf.get(default_key)
-
-    def merge(self, l, r):
-        """Like `dict(a, **b)` except it will keep values from `a`
-        if the value in `b` is :const:`None`."""
-        return lpmerge(l, r)
+        return first(None, values) or self.conf.get(default_key)
 
     def _get_backend(self):
-        from ..backends import get_backend_cls
         return get_backend_cls(
                     self.backend_cls or self.conf.CELERY_RESULT_BACKEND,
                     loader=self.loader)(app=self)
@@ -349,8 +346,6 @@ class BaseApp(object):
             self._pool = None
 
     def bugreport(self):
-        import celery
-        import kombu
         return BUGREPORT_INFO % {"system": _platform.system(),
                                  "arch": _platform.architecture(),
                                  "py_i": platforms.pyimplementation(),
@@ -359,39 +354,13 @@ class BaseApp(object):
                                  "py_v": _platform.python_version(),
                                  "transport": self.conf.BROKER_TRANSPORT,
                                  "results": self.conf.CELERY_RESULT_BACKEND,
-                                 "human_settings": self.human_settings(),
+                                 "human_settings": self.conf.humanize(),
                                  "loader": qualname(self.loader.__class__)}
 
-    def _pformat(self, value, width=80, nl_width=80, **kw):
-
-        if isinstance(value, dict):
-            return "{\n %s" % (pformat(value, 4, nl_width)[1:])
-        elif isinstance(value, tuple):
-            return "\n%s%s" % (' ' * 4,
-                                pformat(value, width=nl_width, **kw))
-        else:
-            return pformat(value, width=width, **kw)
-
-    def human_settings(self):
-        return "\n".join(SETTINGS_INFO % (key + ':',
-                                          self._pformat(value, width=50))
-                    for key, value in self.filter_user_settings().iteritems())
-
-    def filter_user_settings(self):
-        user_settings = {}
-        # the last stash is the default settings, so just skip that
-        for stash in self.conf._order[:-1]:
-            user_settings.update(stash)
-        return user_settings
-
     @property
     def pool(self):
         if self._pool is None:
-            try:
-                from multiprocessing.util import register_after_fork
-                register_after_fork(self, self._after_fork)
-            except ImportError:
-                pass
+            register_after_fork(self, self._after_fork)
             self._pool = self.broker_connection().Pool(
                             limit=self.conf.BROKER_POOL_LIMIT)
         return self._pool
@@ -426,7 +395,6 @@ class BaseApp(object):
     @cached_property
     def loader(self):
         """Current loader."""
-        from ..loaders import get_loader_cls
         return get_loader_cls(self.loader_cls)(app=self)
 
     @cached_property
@@ -436,7 +404,10 @@ class BaseApp(object):
 
     @cached_property
     def tasks(self):
-        from .task.builtins import load_builtins
-        load_builtins(self)
+        """Registry of available tasks.
+
+        Accessing this attribute will also finalize the app.
+
+        """
         self.finalize()
         return self._tasks

+ 9 - 15
celery/app/task/builtins.py → celery/app/builtins.py

@@ -1,11 +1,11 @@
 # -*- coding: utf-8 -*-
 from __future__ import absolute_import
 
-from ...utils import uuid
+from ..utils import uuid
 
 #: global list of functions defining a built-in task.
 #: these are called for every app instance to setup built-in task.
-_builtins = []
+_builtin_tasks = []
 
 
 def builtin_task(constructor):
@@ -15,14 +15,13 @@ def builtin_task(constructor):
     The function will then be called for every new app instance created
     (lazily, so more exactly when the task registry for that app is needed).
     """
-    _builtins.append(constructor)
+    _builtin_tasks.append(constructor)
     return constructor
 
 
-def load_builtins(app):
+def load_builtin_tasks(app):
     """Loads the built-in tasks for an app instance."""
-    for constructor in _builtins:
-        constructor(app)
+    [constructor(app) for constructor in _builtin_tasks]
 
 
 @builtin_task
@@ -39,11 +38,7 @@ def add_backend_cleanup_task(app):
     may even clean up in realtime so that a periodic cleanup is not necessary.
 
     """
-    @app.task(name="celery.backend_cleanup")
-    def backend_cleanup():
-        app.backend.cleanup()
-
-    return backend_cleanup
+    return app.task(name="celery.backend_cleanup")(app.backend.cleanup)
 
 
 @builtin_task
@@ -54,13 +49,12 @@ def add_unlock_chord_task(app):
     It creates a task chain polling the header for completion.
 
     """
+    from ..result import AsyncResult, TaskSetResult
+    from ..task.sets import subtask
 
     @app.task(name="celery.chord_unlock", max_retries=None)
     def unlock_chord(setid, callback, interval=1, propagate=False,
             max_retries=None, result=None):
-        from ...result import AsyncResult, TaskSetResult
-        from ...task.sets import subtask
-
         result = TaskSetResult(setid, map(AsyncResult, result))
         j = result.join_native if result.supports_native_join else result.join
         if result.ready():
@@ -76,11 +70,11 @@ def add_chord_task(app):
     """Every chord is executed in a dedicated task, so that the chord
     can be used as a subtask, and this generates the task
     responsible for that."""
+    from ..task.sets import TaskSet
 
     @app.task(name="celery.chord", accept_magic_kwargs=False)
     def chord(set, body, interval=1, max_retries=None,
             propagate=False, **kwargs):
-        from ...task.sets import TaskSet
 
         if not isinstance(set, TaskSet):
             set = TaskSet(set)

+ 1 - 0
celery/app/defaults.py

@@ -233,6 +233,7 @@ def find_deprecated_settings(source):
                             deprecation=opt.deprecate_by,
                             removal=opt.remove_by,
                             alternative=opt.alt)
+    return source
 
 
 @memoize(maxsize=None)

+ 4 - 1
celery/datastructures.py

@@ -21,7 +21,7 @@ from itertools import chain
 
 from kombu.utils.limits import TokenBucket  # noqa
 
-from .utils.functional import LRUCache, uniq  # noqa
+from .utils.functional import LRUCache, first, uniq  # noqa
 
 
 class CycleError(Exception):
@@ -305,6 +305,9 @@ class ConfigurationView(AttributeDictMixin):
     def __setitem__(self, key, value):
         self.changes[key] = value
 
+    def first(self, *keys):
+        return first(None, (self.get(key) for key in keys))
+
     def get(self, key, default=None):
         try:
             return self[key]

+ 2 - 2
celery/local.py

@@ -14,14 +14,14 @@
 from __future__ import absolute_import
 
 
-def try_import(module):
+def try_import(module, default=None):
     """Try to import and return module, or return
     None if the module does not exist."""
     from importlib import import_module
     try:
         return import_module(module)
     except ImportError:
-        pass
+        return default
 
 
 class Proxy(object):

+ 7 - 5
celery/utils/__init__.py

@@ -25,7 +25,11 @@ from pprint import pprint
 from ..exceptions import CPendingDeprecationWarning, CDeprecationWarning
 from .compat import StringIO
 
-from .imports import qualname
+from .imports import symbol_by_name, qualname
+from .functional import noop
+
+register_after_fork = symbol_by_name(
+    "multiprocessing.util.register_after_fork", default=noop)
 
 PENDING_DEPRECATION_FMT = """
     %(description)s is scheduled for deprecation in \
@@ -165,10 +169,8 @@ def maybe_reraise():
 # - XXX Compat
 from .log import LOG_LEVELS     # noqa
 from .imports import (          # noqa
-        qualname as get_full_cls_name,
-        symbol_by_name as gete_cls_by_name,
-        instantiate,
-        import_from_cwd
+        qualname as get_full_cls_name, symbol_by_name as get_cls_by_name,
+        instantiate, import_from_cwd
 )
 from .functional import chunks, noop            # noqa
 from kombu.utils import cached_property, uuid   # noqa

+ 5 - 2
celery/utils/functional.py

@@ -12,7 +12,9 @@
 from __future__ import absolute_import
 from __future__ import with_statement
 
-from functools import wraps
+import operator
+
+from functools import partial, wraps
 from itertools import islice
 from threading import Lock, RLock
 
@@ -27,6 +29,7 @@ from kombu.utils.functional import promise, maybe_promise
 from .compat import UserDict, OrderedDict
 
 KEYWORD_MARK = object()
+is_not_None = partial(operator.is_not, None)
 
 
 class LRUCache(UserDict):
@@ -169,7 +172,7 @@ def noop(*args, **kwargs):
 def first(predicate, iterable):
     """Returns the first element in `iterable` that `predicate` returns a
     :const:`True` value for."""
-    # XXX This function is not used anymore
+    predicate = predicate or is_not_None
     for item in iterable:
         if predicate(item):
             return item

+ 11 - 6
celery/utils/imports.py

@@ -30,7 +30,7 @@ else:
 
 
 def symbol_by_name(name, aliases={}, imp=None, package=None,
-        sep='.', **kwargs):
+        sep='.', default=None, **kwargs):
     """Get symbol by qualified name.
 
     The name should be the full dot-separated path to the class::
@@ -76,11 +76,16 @@ def symbol_by_name(name, aliases={}, imp=None, package=None,
     if not module_name and package:
         module_name = package
     try:
-        module = imp(module_name, package=package, **kwargs)
-    except ValueError, exc:
-        raise ValueError, ValueError(
-                "Couldn't import %r: %s" % (name, exc)), sys.exc_info()[2]
-    return getattr(module, cls_name)
+        try:
+            module = imp(module_name, package=package, **kwargs)
+        except ValueError, exc:
+            raise ValueError, ValueError(
+                    "Couldn't import %r: %s" % (name, exc)), sys.exc_info()[2]
+        return getattr(module, cls_name)
+    except (ImportError, AttributeError):
+        if default is None:
+            raise
+    return default
 
 
 def instantiate(name, *args, **kwargs):

+ 11 - 0
celery/utils/text.py

@@ -1,5 +1,7 @@
 from __future__ import absolute_import
 
+from pprint import pformat
+
 
 def abbr(S, max, ellipsis="..."):
     if S is None:
@@ -35,3 +37,12 @@ def pluralize(n, text, suffix='s'):
     if n > 1:
         return text + suffix
     return text
+
+
+def pretty(value, width=80, nl_width=80, **kw):
+    if isinstance(value, dict):
+        return "{\n %s" % (pformat(value, 4, nl_width)[1:])
+    elif isinstance(value, tuple):
+        return "\n%s%s" % (' ' * 4, pformat(value, width=nl_width, **kw))
+    else:
+        return pformat(value, width=width, **kw)

+ 0 - 0
docs/reference/celery.app.task.builtins.rst → docs/reference/celery.app.builtins.rst


+ 1 - 1
docs/reference/index.rst

@@ -10,11 +10,11 @@
 
     celery.app
     celery.app.task
-    celery.app.task.builtins
     celery.app.amqp
     celery.app.defaults
     celery.app.control
     celery.app.registry
+    celery.app.builtins
     celery.task
     celery.task.base
     celery.task.sets