Browse Source

Cosmetics

Ask Solem 8 years ago
parent
commit
c259cca413

+ 58 - 34
celery/app/base.py

@@ -58,19 +58,31 @@ __all__ = ['Celery']
 
 logger = get_logger(__name__)
 
-USING_EXECV = os.environ.get('FORKED_BY_MULTIPROCESSING')
 BUILTIN_FIXUPS = {
     'celery.fixups.django:fixup',
 }
+USING_EXECV = os.environ.get('FORKED_BY_MULTIPROCESSING')
 
-ERR_ENVVAR_NOT_SET = """\
+ERR_ENVVAR_NOT_SET = """
 The environment variable {0!r} is not set,
 and as such the configuration could not be loaded.
-Please set this variable and make it point to
-a configuration module."""
+
+Please set this variable and make sure it points to
+a valid configuration module.
+
+Example:
+    {0}="proj.celeryconfig"
+"""
 
 
 def app_has_custom(app, attr):
+    """Returns true if app has customized method `attr`.
+
+    Note:
+        This is used for optimizations in cases where we know
+        how the default behavior works, but need to account
+        for someone using inheritance to override a method/property.
+    """
     return mro_lookup(app.__class__, attr, stop={Celery, object},
                       monkey_patched=[__name__])
 
@@ -82,6 +94,8 @@ def _unpickle_appattr(reverse_name, args):
 
 
 def _after_fork_cleanup_app(app):
+    # This is used with multiprocessing.register_after_fork,
+    # so need to be at module level.
     try:
         app._after_fork()
     except Exception as exc:
@@ -115,27 +129,29 @@ class Celery(object):
         main (str): Name of the main module if running as `__main__`.
             This is used as the prefix for auto-generated task names.
 
+    Keyword Arguments:
         broker (str): URL of the default broker used.
-        loader (str, type): The loader class, or the name of the loader
-            class to use.  Default is :class:`celery.loaders.app.AppLoader`.
-        backend (str, type): The result store backend class, or the name of the
-            backend class to use.  Default is the value of the
-            :setting:`result_backend` setting.
-        amqp (str, type): AMQP object or class name.
-        events (str, type): Events object or class name.
-        log (str, type): Log object or class name.
-        control (str, type): Control object or class name.
+        backend (Union[str, type]): The result store backend class,
+            or the name of the backend class to use.
+
+            Default is the value of the :setting:`result_backend` setting.
+        autofinalize (bool): If set to False a :exc:`RuntimeError`
+            will be raised if the task registry or tasks are used before
+            the app is finalized.
         set_as_current (bool):  Make this the global current app.
-        tasks (str, type): A task registry or the name of a registry class.
         include (List[str]): List of modules every worker should import.
+
+        amqp (Union[str, type]): AMQP object or class name.
+        events (Union[str, type]): Events object or class name.
+        log (Union[str, type]): Log object or class name.
+        control (Union[str, type]): Control object or class name.
+        tasks (Union[str, type]): A task registry, or the name of
+            a registry class.
         fixups (List[str]): List of fix-up plug-ins (e.g., see
             :mod:`celery.fixups.django`).
-        autofinalize (bool): If set to False a :exc:`RuntimeError`
-            will be raised if the task registry or tasks are used before
-            the app is finalized.
-        config_source (str, type): receives a class with class level attributes
-            that allows configurating Celery from a single object.
-            All attributes described in the documentation can be defined.
+        config_source (Union[str, type]): Take configuration from a class,
+            or object.  Attributes may include any setings described in
+            the documentation.
     """
     #: This is deprecated, use :meth:`reduce_keys` instead
     Pickler = AppPickler
@@ -278,12 +294,6 @@ class Celery(object):
             if register_after_fork is not None:
                 register_after_fork(self, _after_fork_cleanup_app)
 
-    def __enter__(self):
-        return self
-
-    def __exit__(self, *exc_info):
-        self.close()
-
     def close(self):
         """Clean up after the application.
 
@@ -511,7 +521,7 @@ class Celery(object):
             if silent:
                 return False
             raise ImproperlyConfigured(
-                ERR_ENVVAR_NOT_SET.format(variable_name))
+                ERR_ENVVAR_NOT_SET.strip().format(variable_name))
         return self.config_from_object(module_name, silent=silent, force=force)
 
     def config_from_cmdline(self, argv, namespace='celery'):
@@ -555,7 +565,7 @@ class Celery(object):
 
         If the name is empty, this will be delegated to fix-ups (e.g., Django).
 
-        For example if you have an directory layout like this:
+        For example if you have a directory layout like this:
 
         .. code-block:: text
 
@@ -821,6 +831,18 @@ class Celery(object):
             self.loader)
         return backend(app=self, url=url)
 
+    def _get_from_conf_and_finalize(self, key):
+        """Get value for configuration key and finalize
+        loading the configuration.
+
+        Note:
+            This is used by PendingConfiguration:
+                as soon as you access a key the configuration
+                is read.
+        """
+        conf = self._conf = self._load_config()
+        return conf[key]
+
     def _load_config(self):
         if isinstance(self.on_configure, Signal):
             self.on_configure.send(sender=self)
@@ -866,7 +888,7 @@ class Celery(object):
     def signature(self, *args, **kwargs):
         """Return a new :class:`~celery.Signature` bound to this app."""
         kwargs['app'] = self
-        return self.canvas.signature(*args, **kwargs)
+        return self._canvas.signature(*args, **kwargs)
 
     def add_periodic_task(self, schedule, sig,
                           args=(), kwargs=(), name=None, **opts):
@@ -941,6 +963,12 @@ class Celery(object):
     def _rgetattr(self, path):
         return attrgetter(path)(self)
 
+    def __enter__(self):
+        return self
+
+    def __exit__(self, *exc_info):
+        self.close()
+
     def __repr__(self):
         return '<{0} {1}>'.format(type(self).__name__, appstr(self))
 
@@ -1093,10 +1121,6 @@ class Celery(object):
             self._conf = self._load_config()
         return self._conf
 
-    def _get_from_conf_and_finalize(self, key):
-        conf = self._conf = self._load_config()
-        return conf[key]
-
     @conf.setter
     def conf(self, d):  # noqa
         self._conf = d
@@ -1122,7 +1146,7 @@ class Celery(object):
         return instantiate(self.log_cls, app=self)
 
     @cached_property
-    def canvas(self):
+    def _canvas(self):
         from celery import canvas
         return canvas
 

+ 2 - 2
celery/app/builtins.py

@@ -86,13 +86,13 @@ def add_unlock_chord_task(app):
                 reason = 'Dependency {0.id} raised {1!r}'.format(culprit, exc)
             except StopIteration:
                 reason = repr(exc)
-            logger.error('Chord %r raised: %r', group_id, exc, exc_info=1)
+            logger.exception('Chord %r raised: %r', group_id, exc)
             app.backend.chord_error_from_stack(callback, ChordError(reason))
         else:
             try:
                 callback.delay(ret)
             except Exception as exc:
-                logger.error('Chord %r raised: %r', group_id, exc, exc_info=1)
+                logger.exception('Chord %r raised: %r', group_id, exc)
                 app.backend.chord_error_from_stack(
                     callback,
                     exc=ChordError('Callback error: {0!r}'.format(exc)),

+ 4 - 5
celery/backends/base.py

@@ -677,7 +677,7 @@ class BaseKeyValueStoreBackend(Backend):
             deps = GroupResult.restore(gid, backend=self)
         except Exception as exc:
             callback = maybe_signature(request.chord, app=app)
-            logger.error('Chord %r raised: %r', gid, exc, exc_info=1)
+            logger.exception('Chord %r raised: %r', gid, exc)
             return self.chord_error_from_stack(
                 callback,
                 ChordError('Cannot restore group: {0!r}'.format(exc)),
@@ -687,8 +687,7 @@ class BaseKeyValueStoreBackend(Backend):
                 raise ValueError(gid)
             except ValueError as exc:
                 callback = maybe_signature(request.chord, app=app)
-                logger.error('Chord callback %r raised: %r', gid, exc,
-                             exc_info=1)
+                logger.exception('Chord callback %r raised: %r', gid, exc)
                 return self.chord_error_from_stack(
                     callback,
                     ChordError('GroupResult {0} no longer exists'.format(gid)),
@@ -713,13 +712,13 @@ class BaseKeyValueStoreBackend(Backend):
                 except StopIteration:
                     reason = repr(exc)
 
-                logger.error('Chord %r raised: %r', gid, reason, exc_info=1)
+                logger.exception('Chord %r raised: %r', gid, reason)
                 self.chord_error_from_stack(callback, ChordError(reason))
             else:
                 try:
                     callback.delay(ret)
                 except Exception as exc:
-                    logger.error('Chord %r raised: %r', gid, exc, exc_info=1)
+                    logger.exception('Chord %r raised: %r', gid, exc)
                     self.chord_error_from_stack(
                         callback,
                         ChordError('Callback error: {0!r}'.format(exc)),

+ 12 - 13
celery/backends/redis.py

@@ -32,16 +32,14 @@ except ImportError:                 # pragma: no cover
 
 __all__ = ['RedisBackend']
 
-REDIS_MISSING = """\
+E_REDIS_MISSING = """
 You need to install the redis library in order to use \
-the Redis result store backend."""
-
-E_LOST = """\
-Connection to Redis lost: Retry (%s/%s) %s.\
+the Redis result store backend.
 """
 
+E_LOST = 'Connection to Redis lost: Retry (%s/%s) %s.'
+
 logger = get_logger(__name__)
-error = logger.error
 
 
 class ResultConsumer(async.BaseResultConsumer):
@@ -112,7 +110,7 @@ class RedisBackend(base.BaseKeyValueStoreBackend, async.AsyncBackendMixin):
         super(RedisBackend, self).__init__(expires_type=int, **kwargs)
         _get = self.app.conf.get
         if self.redis is None:
-            raise ImproperlyConfigured(REDIS_MISSING)
+            raise ImproperlyConfigured(E_REDIS_MISSING.strip())
 
         if host and '://' in host:
             url, host = host, None
@@ -193,8 +191,9 @@ class RedisBackend(base.BaseKeyValueStoreBackend, async.AsyncBackendMixin):
 
     def on_connection_error(self, max_retries, exc, intervals, retries):
         tts = next(intervals)
-        error(E_LOST, retries, max_retries or 'Inf',
-              humanize_seconds(tts, 'in '))
+        logger.error(
+            E_LOST.strip(),
+            retries, max_retries or 'Inf', humanize_seconds(tts, 'in '))
         return tts
 
     def set(self, key, value, **retry_policy):
@@ -272,17 +271,17 @@ class RedisBackend(base.BaseKeyValueStoreBackend, async.AsyncBackendMixin):
                 try:
                     callback.delay([unpack(tup, decode) for tup in resl])
                 except Exception as exc:
-                    error('Chord callback for %r raised: %r',
-                          request.group, exc, exc_info=1)
+                    logger.exception(
+                        'Chord callback for %r raised: %r', request.group, exc)
                     return self.chord_error_from_stack(
                         callback,
                         ChordError('Callback error: {0!r}'.format(exc)),
                     )
         except ChordError as exc:
-            error('Chord %r raised: %r', request.group, exc, exc_info=1)
+            logger.exception('Chord %r raised: %r', request.group, exc)
             return self.chord_error_from_stack(callback, exc)
         except Exception as exc:
-            error('Chord %r raised: %r', request.group, exc, exc_info=1)
+            logger.exception('Chord %r raised: %r', request.group, exc)
             return self.chord_error_from_stack(
                 callback,
                 ChordError('Join error: {0!r}'.format(exc)),

+ 2 - 4
celery/bootsteps.py

@@ -153,10 +153,8 @@ class Blueprint(object):
                     except Exception as exc:
                         if propagate:
                             raise
-                        logger.error(
-                            'Error on %s %s: %r',
-                            description, step.alias, exc, exc_info=1,
-                        )
+                        logger.exception(
+                            'Error on %s %s: %r', description, step.alias, exc)
 
     def stop(self, parent, close=True, terminate=False):
         what = 'terminating' if terminate else 'stopping'

+ 2 - 2
celery/concurrency/asynpool.py

@@ -818,8 +818,8 @@ class AsynPool(_pool.Pool):
         self._quick_put = send_job
 
         def on_not_recovering(proc, fd, job, exc):
-            error('Process inqueue damaged: %r %r: %r',
-                  proc, proc.exitcode, exc, exc_info=1)
+            logger.exception(
+                'Process inqueue damaged: %r %r: %r', proc, proc.exitcode, exc)
             if proc._is_alive():
                 proc.terminate()
             hub.remove(fd)

+ 2 - 2
celery/utils/dispatch/signal.py

@@ -169,8 +169,8 @@ class Signal(object):  # pragma: no cover
             try:
                 response = receiver(signal=self, sender=sender, **named)
             except Exception as exc:
-                logger.error('Signal handler %r raised: %r',
-                             receiver, exc, exc_info=1)
+                logger.exception(
+                    'Signal handler %r raised: %r', receiver, exc)
             else:
                 responses.append((receiver, response))
         return responses

+ 1 - 1
celery/worker/consumer/consumer.py

@@ -223,7 +223,7 @@ class Consumer(object):
                 try:
                     self._pending_operations.pop()()
                 except Exception as exc:
-                    error('Pending callback raised: %r', exc, exc_info=1)
+                    logger.exception('Pending callback raised: %r', exc)
 
     def bucket_for_task(self, type):
         limit = rate(getattr(type, 'rate_limit', None))

+ 6 - 6
celery/worker/consumer/gossip.py

@@ -18,7 +18,7 @@ from .mingle import Mingle
 
 __all__ = ['Gossip']
 logger = get_logger(__name__)
-debug, info, error = logger.debug, logger.info, logger.error
+debug, info = logger.debug, logger.info
 
 
 class Gossip(bootsteps.ConsumerStep):
@@ -84,14 +84,14 @@ class Gossip(bootsteps.ConsumerStep):
         try:
             self.app.signature(task).apply_async()
         except Exception as exc:
-            error('Could not call task: %r', exc, exc_info=1)
+            logger.exception('Could not call task: %r', exc)
 
     def on_elect(self, event):
         try:
             (id_, clock, hostname, pid,
              topic, action, _) = self._cons_stamp_fields(event)
         except KeyError as exc:
-            return error('election request missing field %s', exc, exc_info=1)
+            return logger.exception('election request missing field %s', exc)
         heappush(
             self.consensus_requests[id_],
             (clock, '%s.%s' % (hostname, pid), topic, action),
@@ -120,7 +120,7 @@ class Gossip(bootsteps.ConsumerStep):
                 try:
                     handler = self.election_handlers[topic]
                 except KeyError:
-                    error('Unknown election topic %r', topic, exc_info=1)
+                    logger.exception('Unknown election topic %r', topic)
                 else:
                     handler(action)
             else:
@@ -145,8 +145,8 @@ class Gossip(bootsteps.ConsumerStep):
             try:
                 handler(*args, **kwargs)
             except Exception as exc:
-                error('Ignored error from handler %r: %r',
-                      handler, exc, exc_info=1)
+                logger.exception(
+                    'Ignored error from handler %r: %r', handler, exc)
 
     def register_timer(self):
         if self._tref is not None:

+ 2 - 4
celery/worker/loops.py

@@ -13,7 +13,6 @@ from . import state
 __all__ = ['asynloop', 'synloop']
 
 logger = get_logger(__name__)
-error = logger.error
 
 
 def _quick_drain(connection, timeout=0.1):
@@ -92,9 +91,8 @@ def asynloop(obj, connection, consumer, blueprint, hub, qos,
         try:
             hub.reset()
         except Exception as exc:
-            error(
-                'Error cleaning up after event loop: %r', exc, exc_info=1,
-            )
+            logger.exception(
+                'Error cleaning up after event loop: %r', exc)
 
 
 def synloop(obj, connection, consumer, blueprint, hub, qos,

+ 0 - 1
t/unit/app/test_annotations.py

@@ -1,5 +1,4 @@
 from __future__ import absolute_import, unicode_literals
-
 from celery.app.annotations import MapAnnotation, prepare
 from celery.utils.imports import qualname
 

+ 1 - 1
t/unit/app/test_app.py

@@ -775,7 +775,7 @@ class test_App:
             _appbase.register_after_fork = prev
 
     def test_canvas(self):
-        assert self.app.canvas.Signature
+        assert self.app._canvas.Signature
 
     def test_signature(self):
         sig = self.app.signature('foo', (1, 2))

+ 0 - 2
t/unit/app/test_registry.py

@@ -1,7 +1,5 @@
 from __future__ import absolute_import, unicode_literals
-
 import pytest
-
 from celery.app.registry import _unpickle_task, _unpickle_task_v2
 
 

+ 0 - 3
t/unit/app/test_utils.py

@@ -1,9 +1,6 @@
 from __future__ import absolute_import, unicode_literals
-
 from collections import Mapping, MutableMapping
-
 from case import Mock
-
 from celery.app.utils import Settings, filter_hidden_settings, bugreport
 
 

+ 6 - 5
t/unit/backends/test_redis.py

@@ -198,16 +198,17 @@ class test_RedisBackend:
         })
         self.Backend(app=self.app)
 
-    @patch('celery.backends.redis.error')
-    def test_on_connection_error(self, error):
+    @patch('celery.backends.redis.logger')
+    def test_on_connection_error(self, logger):
         intervals = iter([10, 20, 30])
         exc = KeyError()
         assert self.b.on_connection_error(None, exc, intervals, 1) == 10
-        error.assert_called_with(self.E_LOST, 1, 'Inf', 'in 10.00 seconds')
+        logger.error.assert_called_with(
+            self.E_LOST, 1, 'Inf', 'in 10.00 seconds')
         assert self.b.on_connection_error(10, exc, intervals, 2) == 20
-        error.assert_called_with(self.E_LOST, 2, 10, 'in 20.00 seconds')
+        logger.error.assert_called_with(self.E_LOST, 2, 10, 'in 20.00 seconds')
         assert self.b.on_connection_error(10, exc, intervals, 3) == 30
-        error.assert_called_with(self.E_LOST, 3, 10, 'in 30.00 seconds')
+        logger.error.assert_called_with(self.E_LOST, 3, 10, 'in 30.00 seconds')
 
     def test_incr(self):
         self.b.client = Mock(name='client')

+ 6 - 6
t/unit/worker/test_consumer.py

@@ -376,9 +376,9 @@ class test_Gossip:
         signature.return_value.apply_async.assert_called_with()
 
         signature.return_value.apply_async.side_effect = MemoryError()
-        with patch('celery.worker.consumer.gossip.error') as error:
+        with patch('celery.worker.consumer.gossip.logger') as logger:
             g.call_task(task)
-            error.assert_called()
+            logger.exception.assert_called()
 
     def Event(self, id='id', clock=312,
               hostname='foo@example.com', pid=4312,
@@ -406,9 +406,9 @@ class test_Gossip:
         g.dispatcher.send.assert_called_with('worker-elect-ack', id='id1')
 
         event.pop('clock')
-        with patch('celery.worker.consumer.gossip.error') as error:
+        with patch('celery.worker.consumer.gossip.logger') as logger:
             g.on_elect(event)
-            error.assert_called()
+            logger.exception.assert_called()
 
     def Consumer(self, hostname='foo@x.com', pid=4312):
         c = Mock()
@@ -466,9 +466,9 @@ class test_Gossip:
         c.app.connection_for_read = _amqp_connection()
         g = Gossip(c)
         g.election_handlers = {}
-        with patch('celery.worker.consumer.gossip.error') as error:
+        with patch('celery.worker.consumer.gossip.logger') as logger:
             self.setup_election(g, c)
-            error.assert_called()
+            logger.exception.assert_called()
 
     def test_on_node_join(self):
         c = self.Consumer()