Ask Solem 8 gadi atpakaļ
vecāks
revīzija
0e93ee0cf7

+ 2 - 0
.landscape.yml

@@ -47,5 +47,7 @@ pylint:
         - unnecessary-lambda
         - too-few-public-methods
         - attribute-defined-outside-init
+        - too-many-ancestors
+        - too-many-return-statements
     options:
         exclude-protected: _reader, _writer, _popen, _sentinel_poll, _job, _is_alive, _write_to, _scheduled_for, _terminated, _accepted, _set_terminated, _payload, _cancel

+ 7 - 6
celery/__init__.py

@@ -29,7 +29,7 @@ __all__ = [
     'Celery', 'bugreport', 'shared_task', 'task',
     'current_app', 'current_task', 'maybe_signature',
     'chain', 'chord', 'chunks', 'group', 'signature',
-    'xmap', 'xstarmap', 'uuid', 'version', '__version__',
+    'xmap', 'xstarmap', 'uuid',
 ]
 
 VERSION_BANNER = '{0} ({1})'.format(__version__, SERIES)
@@ -44,8 +44,8 @@ _temp = re.match(
     r'(\d+)\.(\d+).(\d+)(.+)?', __version__).groups()
 VERSION = version_info = version_info_t(
     int(_temp[0]), int(_temp[1]), int(_temp[2]), _temp[3] or '', '')
-del(_temp)
-del(re)
+del _temp
+del re
 
 if os.environ.get('C_IMPDEBUG'):  # pragma: no cover
     from .five import builtins
@@ -109,14 +109,15 @@ def _patch_eventlet():
 
 
 def _patch_gevent():
-    from gevent import monkey, signal as gsignal, version_info
+    import gevent
+    from gevent import monkey, signal as gevent_signal
 
     monkey.patch_all()
-    if version_info[0] == 0:  # pragma: no cover
+    if gevent.version_info[0] == 0:  # pragma: no cover
         # Signals aren't working in gevent versions <1.0,
         # and aren't monkey patched by patch_all()
         _signal = __import__('signal')
-        _signal.signal = gsignal
+        _signal.signal = gevent_signal
 
 
 def maybe_patch_concurrency(argv=sys.argv,

+ 3 - 3
celery/app/base.py

@@ -99,7 +99,7 @@ def _after_fork_cleanup_app(app):
     # so need to be at module level.
     try:
         app._after_fork()
-    except Exception as exc:
+    except Exception as exc:  # pylint: disable=broad-except
         logger.info('after forker raised exception: %r', exc, exc_info=1)
 
 
@@ -826,14 +826,14 @@ class Celery(object):
         """
         return self.amqp.queues.select(queues)
 
-    def either(self, default_key, *values):
+    def either(self, default_key, *defaults):
         """Get key from configuration or use default values.
 
         Fallback to the value of a configuration key if none of the
         `*values` are true.
         """
         return first(None, [
-            first(None, values), starpromise(self.conf.get, default_key),
+            first(None, defaults), starpromise(self.conf.get, default_key),
         ])
 
     def bugreport(self):

+ 3 - 2
celery/app/builtins.py

@@ -34,6 +34,7 @@ def add_accumulate_task(app):
     def accumulate(self, *args, **kwargs):
         index = kwargs.get('index')
         return args[index] if index is not None else args
+    return accumulate
 
 
 @connect_on_app_finalize
@@ -79,7 +80,7 @@ def add_unlock_chord_task(app):
         try:
             with allow_join_result():
                 ret = j(timeout=3.0, propagate=True)
-        except Exception as exc:
+        except Exception as exc:  # pylint: disable=broad-except
             try:
                 culprit = next(deps._failed_join_report())
                 reason = 'Dependency {0.id} raised {1!r}'.format(culprit, exc)
@@ -90,7 +91,7 @@ def add_unlock_chord_task(app):
         else:
             try:
                 callback.delay(ret)
-            except Exception as exc:
+            except Exception as exc:  # pylint: disable=broad-except
                 logger.exception('Chord %r raised: %r', group_id, exc)
                 app.backend.chord_error_from_stack(
                     callback,

+ 1 - 1
celery/app/utils.py

@@ -303,7 +303,7 @@ def bugreport(app):
         driver_v = '{0}:{1}'.format(conn.transport.driver_name,
                                     conn.transport.driver_version())
         transport = conn.transport_cls
-    except Exception:
+    except Exception:  # pylint: disable=broad-except
         transport = driver_v = ''
 
     return BUGREPORT_INFO.format(

+ 1 - 1
celery/apps/multi.py

@@ -48,7 +48,7 @@ def build_nodename(name, prefix, suffix):
 def build_expander(nodename, shortname, hostname):
     return partial(
         node_format,
-        nodename=nodename,
+        name=nodename,
         N=shortname,
         d=hostname,
         h=nodename,

+ 7 - 6
celery/backends/base.py

@@ -190,7 +190,8 @@ class Backend(object):
                                  traceback=traceback, request=request)
 
     def chord_error_from_stack(self, callback, exc=None):
-        from celery import group
+        # need below import for test for some crazy reason
+        from celery import group  # pylint: disable
         app = self.app
         backend = app._tasks[callback.task].backend
         try:
@@ -199,7 +200,7 @@ class Backend(object):
                  for errback in callback.options.get('link_error') or []],
                 app=app,
             ).apply_async((callback.id,))
-        except Exception as eb_exc:
+        except Exception as eb_exc:  # pylint: disable=broad-except
             return backend.fail_from_current_stack(callback.id, exc=eb_exc)
         else:
             return backend.fail_from_current_stack(callback.id, exc=exc)
@@ -212,7 +213,7 @@ class Backend(object):
             self.mark_as_failure(task_id, exc, ei.traceback)
             return ei
         finally:
-            del(tb)
+            del tb
 
     def prepare_exception(self, exc, serializer=None):
         """Prepare exception for serialization."""
@@ -675,7 +676,7 @@ class BaseKeyValueStoreBackend(Backend):
         key = self.get_key_for_chord(gid)
         try:
             deps = GroupResult.restore(gid, backend=self)
-        except Exception as exc:
+        except Exception as exc:  # pylint: disable=broad-except
             callback = maybe_signature(request.chord, app=app)
             logger.exception('Chord %r raised: %r', gid, exc)
             return self.chord_error_from_stack(
@@ -703,7 +704,7 @@ class BaseKeyValueStoreBackend(Backend):
             try:
                 with allow_join_result():
                     ret = j(timeout=3.0, propagate=True)
-            except Exception as exc:
+            except Exception as exc:  # pylint: disable=broad-except
                 try:
                     culprit = next(deps._failed_join_report())
                     reason = 'Dependency {0.id} raised {1!r}'.format(
@@ -717,7 +718,7 @@ class BaseKeyValueStoreBackend(Backend):
             else:
                 try:
                     callback.delay(ret)
-                except Exception as exc:
+                except Exception as exc:  # pylint: disable=broad-except
                     logger.exception('Chord %r raised: %r', gid, exc)
                     self.chord_error_from_stack(
                         callback,

+ 2 - 2
celery/backends/redis.py

@@ -269,7 +269,7 @@ class RedisBackend(base.BaseKeyValueStoreBackend, async.AsyncBackendMixin):
                         .execute()
                 try:
                     callback.delay([unpack(tup, decode) for tup in resl])
-                except Exception as exc:
+                except Exception as exc:  # pylint: disable=broad-except
                     logger.exception(
                         'Chord callback for %r raised: %r', request.group, exc)
                     return self.chord_error_from_stack(
@@ -279,7 +279,7 @@ class RedisBackend(base.BaseKeyValueStoreBackend, async.AsyncBackendMixin):
         except ChordError as exc:
             logger.exception('Chord %r raised: %r', request.group, exc)
             return self.chord_error_from_stack(callback, exc)
-        except Exception as exc:
+        except Exception as exc:  # pylint: disable=broad-except
             logger.exception('Chord %r raised: %r', request.group, exc)
             return self.chord_error_from_stack(
                 callback,

+ 5 - 3
celery/beat.py

@@ -218,7 +218,7 @@ class Scheduler(object):
         info('Scheduler: Sending due task %s (%s)', entry.name, entry.task)
         try:
             result = self.apply_async(entry, producer=producer, advance=False)
-        except Exception as exc:
+        except Exception as exc:  # pylint: disable=broad-except
             error('Message Error: %s\n%s',
                   exc, traceback.format_stack(), exc_info=True)
         else:
@@ -242,6 +242,8 @@ class Scheduler(object):
         Returns:
             float: preferred delay in seconds for next call.
         """
+        # pylint disable=redefined-outer-name
+
         def _when(entry, next_time_to_run):
             return (mktime(entry.schedule.now().timetuple()) +
                     (adjust(next_time_to_run) or 0))
@@ -300,7 +302,7 @@ class Scheduler(object):
                 return self.send_task(entry.task, entry.args, entry.kwargs,
                                       producer=producer,
                                       **entry.options)
-        except Exception as exc:
+        except Exception as exc:  # pylint: disable=broad-except
             reraise(SchedulingError, SchedulingError(
                 "Couldn't apply scheduled task {0.name}: {exc}".format(
                     entry, exc=exc)), sys.exc_info()[2])
@@ -428,7 +430,7 @@ class PersistentScheduler(Scheduler):
             # successfully opened but the error will be raised on first key
             # retrieving.
             self._store.keys()
-        except Exception as exc:
+        except Exception as exc:  # pylint: disable=broad-except
             self._store = self._destroy_open_corrupted_schedule(exc)
 
         for _ in (1, 2):

+ 2 - 2
celery/bin/celeryd_detach.py

@@ -40,7 +40,7 @@ def detach(path, argv, logfile=None, pidfile=None, uid=None,
             if executable is not None:
                 path = executable
             os.execv(path, [path] + argv)
-        except Exception:
+        except Exception:  # pylint: disable=broad-except
             if app is None:
                 from celery import current_app
                 app = current_app
@@ -154,7 +154,7 @@ class detached_celeryd(object):
                     seen_cargs = 1
                     config.append(arg)
         prog_name = os.path.basename(argv[0])
-        options, values, leftovers = self.parse_options(prog_name, argv[1:])
+        options, _, leftovers = self.parse_options(prog_name, argv[1:])
         sys.exit(detach(
             app=self.app, path=self.execv_path,
             argv=self.execv_argv + leftovers + config,

+ 18 - 16
celery/canvas.py

@@ -200,23 +200,25 @@ class Signature(dict):
         init = dict.__init__
 
         if isinstance(task, dict):
-            return init(self, task)  # works like dict(d)
-
-        # Also supports using task class/instance instead of string name.
-        try:
-            task_name = task.name
-        except AttributeError:
-            task_name = task
+            init(self, task)  # works like dict(d)
         else:
-            self._type = task
-
-        init(self,
-             task=task_name, args=tuple(args or ()),
-             kwargs=kwargs or {},
-             options=dict(options or {}, **ex),
-             subtask_type=subtask_type,
-             immutable=immutable,
-             chord_size=None)
+            # Also supports using task class/instance instead of string name.
+            try:
+                task_name = task.name
+            except AttributeError:
+                task_name = task
+            else:
+                self._type = task
+
+            init(
+                self,
+                task=task_name, args=tuple(args or ()),
+                kwargs=kwargs or {},
+                options=dict(options or {}, **ex),
+                subtask_type=subtask_type,
+                immutable=immutable,
+                chord_size=None,
+            )
 
     def __call__(self, *partial_args, **partial_kwargs):
         """Call the task directly (in the current process)."""

+ 2 - 3
celery/events/cursesmon.py

@@ -41,7 +41,6 @@ class CursesMonitor(object):  # pragma: no cover
 
     keymap = {}
     win = None
-    screen_width = None
     screen_delay = 10
     selected_task = None
     selected_position = 0
@@ -152,7 +151,7 @@ class CursesMonitor(object):  # pragma: no cover
     def handle_keypress(self):
         try:
             key = self.win.getkey().upper()
-        except Exception:
+        except Exception:  # pylint: disable=broad-except
             return
         key = self.keyalias.get(key) or key
         handler = self.keymap.get(key)
@@ -174,7 +173,7 @@ class CursesMonitor(object):  # pragma: no cover
         while 1:
             try:
                 return self.win.getkey().upper()
-            except Exception:
+            except Exception:  # pylint: disable=broad-except
                 pass
 
     def selection_rate_limit(self):

+ 1 - 1
celery/exceptions.py

@@ -66,7 +66,7 @@ class Retry(TaskPredicate):
         else:
             self.exc, self.excs = exc, safe_repr(exc) if exc else None
         self.when = when
-        Exception.__init__(self, exc, when, **kwargs)
+        super(Retry, self).__init__(self, exc, when, **kwargs)
 
     def humanize(self):
         if isinstance(self.when, numbers.Number):

+ 30 - 28
celery/platforms.py

@@ -24,6 +24,7 @@ from kombu.utils.compat import maybe_fileno
 from kombu.utils.encoding import safe_str
 from contextlib import contextmanager
 
+from .exceptions import SecurityError
 from .local import try_import
 from .five import items, reraise, string_t
 
@@ -475,7 +476,7 @@ def setgroups(groups):
     max_groups = None
     try:
         max_groups = os.sysconf('SC_NGROUPS_MAX')
-    except Exception:
+    except Exception:  # pylint: disable=broad-except
         pass
     try:
         return _setgroups_hack(groups[:max_groups])
@@ -528,7 +529,7 @@ def maybe_drop_privileges(uid=None, gid=None):
     if os.geteuid():
         # no point trying to setuid unless we're root.
         if not os.getuid():
-            raise AssertionError('contact support')
+            raise SecurityError('contact support')
     uid = uid and parse_uid(uid)
     gid = gid and parse_gid(gid)
 
@@ -553,17 +554,18 @@ def maybe_drop_privileges(uid=None, gid=None):
         except OSError as exc:
             if exc.errno != errno.EPERM:
                 raise
-            pass  # Good: cannot restore privileges.
+            # we should get here: cannot restore privileges,
+            # everything was fine.
         else:
             raise RuntimeError(
                 'non-root user able to restore privileges after setuid.')
     else:
         gid and setgid(gid)
 
-    if uid and (not os.getuid()) and not (os.geteuid()):
-        raise AssertionError('Still root uid after drop privileges!')
-    if gid and (not os.getgid()) and not (os.getegid()):
-        raise AssertionError('Still root gid after drop privileges!')
+    if uid and not os.getuid() and not os.geteuid():
+        raise SecurityError('Still root uid after drop privileges!')
+    if gid and not os.getgid() and not os.getegid():
+        raise SecurityError('Still root gid after drop privileges!')
 
 
 class Signals(object):
@@ -623,23 +625,23 @@ class Signals(object):
     def reset_alarm(self):
         return _signal.alarm(0)
 
-    def supported(self, signal_name):
-        """Return true value if ``signal_name`` exists on this platform."""
+    def supported(self, name):
+        """Return true value if signal by ``name`` exists on this platform."""
         try:
-            return self.signum(signal_name)
+            return self.signum(name)
         except AttributeError:
             pass
 
-    def signum(self, signal_name):
-        """Get signal number from signal name."""
-        if isinstance(signal_name, numbers.Integral):
-            return signal_name
-        if not isinstance(signal_name, string_t) \
-                or not signal_name.isupper():
+    def signum(self, name):
+        """Get signal number by name."""
+        if isinstance(name, numbers.Integral):
+            return name
+        if not isinstance(name, string_t) \
+                or not name.isupper():
             raise TypeError('signal name must be uppercase string.')
-        if not signal_name.startswith('SIG'):
-            signal_name = 'SIG' + signal_name
-        return getattr(_signal, signal_name)
+        if not name.startswith('SIG'):
+            name = 'SIG' + name
+        return getattr(_signal, name)
 
     def reset(self, *signal_names):
         """Reset signals to the default signal handler.
@@ -649,32 +651,32 @@ class Signals(object):
         """
         self.update((sig, self.default) for sig in signal_names)
 
-    def ignore(self, *signal_names):
+    def ignore(self, *names):
         """Ignore signal using :const:`SIG_IGN`.
 
         Does nothing if the platform has no support for signals,
         or the specified signal in particular.
         """
-        self.update((sig, self.ignored) for sig in signal_names)
+        self.update((sig, self.ignored) for sig in names)
 
-    def __getitem__(self, signal_name):
-        return _signal.getsignal(self.signum(signal_name))
+    def __getitem__(self, name):
+        return _signal.getsignal(self.signum(name))
 
-    def __setitem__(self, signal_name, handler):
+    def __setitem__(self, name, handler):
         """Install signal handler.
 
         Does nothing if the current platform has no support for signals,
         or the specified signal in particular.
         """
         try:
-            _signal.signal(self.signum(signal_name), handler)
+            _signal.signal(self.signum(name), handler)
         except (AttributeError, ValueError):
             pass
 
     def update(self, _d_=None, **sigmap):
         """Set signal handlers from a mapping."""
-        for signal_name, handler in items(dict(_d_ or {}, **sigmap)):
-            self[signal_name] = handler
+        for name, handler in items(dict(_d_ or {}, **sigmap)):
+            self[name] = handler
 
 signals = Signals()
 get_signal = signals.signum                   # compat
@@ -770,7 +772,7 @@ def check_privileges(accept_content):
     if hasattr(os, 'fchown'):
         if not all(hasattr(os, attr)
                    for attr in ['getuid', 'getgid', 'geteuid', 'getegid']):
-            raise AssertionError('suspicious platform, contact support')
+            raise SecurityError('suspicious platform, contact support')
 
     if not uid or not gid or not euid or not egid:
         if ('pickle' in accept_content or

+ 43 - 34
celery/schedules.py

@@ -62,8 +62,48 @@ class ParseException(Exception):
     """Raised by :class:`crontab_parser` when the input can't be parsed."""
 
 
+class BaseSchedule(object):
+
+    def __init__(self, nowfun=None, app=None):
+        self.nowfun = nowfun
+        self._app = app
+
+    def now(self):
+        return (self.nowfun or self.app.now)()
+
+    def remaining_estimate(self, last_run_at):
+        raise NotImplementedError()
+
+    def is_due(self, last_run_at):
+        raise NotImplementedError()
+
+    def maybe_make_aware(self, dt):
+        return maybe_make_aware(dt, self.tz)
+
+    @property
+    def app(self):
+        return self._app or current_app._get_current_object()
+
+    @app.setter  # noqa
+    def app(self, app):
+        self._app = app
+
+    @cached_property
+    def tz(self):
+        return self.app.timezone
+
+    @cached_property
+    def utc_enabled(self):
+        return self.app.conf.enable_utc
+
+    def to_local(self, dt):
+        if not self.utc_enabled:
+            return timezone.to_local_fallback(dt)
+        return dt
+
+
 @python_2_unicode_compatible
-class schedule(object):
+class schedule(BaseSchedule):
     """Schedule for periodic task.
 
     Arguments:
@@ -80,11 +120,7 @@ class schedule(object):
     def __init__(self, run_every=None, relative=False, nowfun=None, app=None):
         self.run_every = maybe_timedelta(run_every)
         self.relative = relative
-        self.nowfun = nowfun
-        self._app = app
-
-    def now(self):
-        return (self.nowfun or self.app.now)()
+        super(schedule, self).__init__(nowfun=nowfun, app=app)
 
     def remaining_estimate(self, last_run_at):
         return remaining(
@@ -129,9 +165,6 @@ class schedule(object):
             return schedstate(is_due=True, next=self.seconds)
         return schedstate(is_due=False, next=remaining_s)
 
-    def maybe_make_aware(self, dt):
-        return maybe_make_aware(dt, self.tz)
-
     def __repr__(self):
         return '<freq: {0.human_seconds}>'.format(self)
 
@@ -154,27 +187,6 @@ class schedule(object):
     def human_seconds(self):
         return humanize_seconds(self.seconds)
 
-    @property
-    def app(self):
-        return self._app or current_app._get_current_object()
-
-    @app.setter  # noqa
-    def app(self, app):
-        self._app = app
-
-    @cached_property
-    def tz(self):
-        return self.app.timezone
-
-    @cached_property
-    def utc_enabled(self):
-        return self.app.conf.enable_utc
-
-    def to_local(self, dt):
-        if not self.utc_enabled:
-            return timezone.to_local_fallback(dt)
-        return dt
-
 
 class crontab_parser(object):
     """Parser for Crontab expressions.
@@ -301,7 +313,7 @@ class crontab_parser(object):
 
 
 @python_2_unicode_compatible
-class crontab(schedule):
+class crontab(BaseSchedule):
     """Crontab schedule.
 
     A Crontab can be used as the ``run_every`` value of a
@@ -510,9 +522,6 @@ class crontab(schedule):
                     second=0,
                     microsecond=0)
 
-    def now(self):
-        return (self.nowfun or self.app.now)()
-
     def __repr__(self):
         return CRON_REPR.format(self)
 

+ 5 - 7
celery/utils/collections.py

@@ -158,7 +158,7 @@ class DictAttribute(object):
         except KeyError:
             return default
 
-    def setdefault(self, key, default):
+    def setdefault(self, key, default=None):
         if key not in self:
             self[key] = default
 
@@ -284,7 +284,7 @@ class ChainMap(MutableMapping):
         return any(self.maps)
     __nonzero__ = __bool__  # Py2
 
-    def setdefault(self, key, default):
+    def setdefault(self, key, default=None):
         key = self._key(key)
         if key not in self:
             self[key] = default
@@ -308,6 +308,8 @@ class ChainMap(MutableMapping):
     def _iter(self, op):
         # defaults must be first in the stream, so values in
         # changes take precedence.
+        # pylint: disable=bad-reversed-sequence
+        #   Someone should teach pylint about properties.
         return chain(*[op(d) for d in reversed(self.maps)])
 
     def _iterate_keys(self):
@@ -482,10 +484,6 @@ class LimitedSet(object):
         self._data = {}
         self._heap = []
 
-        # make shortcuts
-        self.__len__ = self._data.__len__
-        self.__contains__ = self._data.__contains__
-
         if data:
             # import items from data
             self.update(data)
@@ -787,7 +785,7 @@ class BufferMap(OrderedDict, Evictable):
         return self[self._LRUkey()].take(*default)
 
     def _pop_to_evict(self):
-        for i in range(100):
+        for _ in range(100):
             key = self._LRUkey()
             buf = self[key]
             try:

+ 2 - 2
celery/utils/dispatch/signal.py

@@ -168,7 +168,7 @@ class Signal(object):  # pragma: no cover
         for receiver in self._live_receivers(_make_id(sender)):
             try:
                 response = receiver(signal=self, sender=sender, **named)
-            except Exception as exc:
+            except Exception as exc:  # pylint: disable=broad-except
                 logger.exception(
                     'Signal handler %r raised: %r', receiver, exc)
             else:
@@ -184,7 +184,7 @@ class Signal(object):  # pragma: no cover
         none_senderkey = _make_id(None)
         receivers = []
 
-        for (receiverkey, r_senderkey), receiver in self.receivers:
+        for (_, r_senderkey), receiver in self.receivers:
             if r_senderkey == none_senderkey or r_senderkey == senderkey:
                 if isinstance(receiver, WEAKREF_TYPES):
                     # Dereference the weak reference.

+ 8 - 3
celery/utils/functional.py

@@ -128,8 +128,8 @@ def chunks(it, n):
         >>> list(x)
         [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9, 10]]
     """
-    for first in it:
-        yield [first] + list(islice(it, n - 1))
+    for item in it:
+        yield [item] + list(islice(it, n - 1))
 
 
 def padlist(container, size, default=None):
@@ -179,6 +179,9 @@ class _regen(UserList, list):
     # must be subclass of list so that json can encode.
 
     def __init__(self, it):
+        # pylint: disable=super-init-not-called
+        # UserList creates a new list and sets .data, so we don't
+        # want to call init here.
         self.__it = it
         self.__index = 0
         self.__consumed = []
@@ -199,7 +202,7 @@ class _regen(UserList, list):
             return self.__consumed[index]
         except IndexError:
             try:
-                for i in range(self.__index, index + 1):
+                for _ in range(self.__index, index + 1):
                     self.__consumed.append(next(self.__it))
             except StopIteration:
                 raise IndexError(index)
@@ -251,6 +254,8 @@ def head_from_fun(fun, bound=False, debug=False):
     if debug:  # pragma: no cover
         print(definition, file=sys.stderr)
     namespace = {'__name__': fun.__module__}
+    # pylint: disable=use-of-exec
+    # Tasks are rarely, if ever, created at runtime - exec here is fine.
     exec(definition, namespace)
     result = namespace[name]
     result._source = definition

+ 2 - 2
celery/utils/iso8601.py

@@ -45,11 +45,11 @@ __all__ = ['parse_iso8601']
 ISO8601_REGEX = re.compile(
     r'(?P<year>[0-9]{4})(-(?P<month>[0-9]{1,2})(-(?P<day>[0-9]{1,2})'
     r'((?P<separator>.)(?P<hour>[0-9]{2}):(?P<minute>[0-9]{2})'
-    '(:(?P<second>[0-9]{2})(\.(?P<fraction>[0-9]+))?)?'
+    r'(:(?P<second>[0-9]{2})(\.(?P<fraction>[0-9]+))?)?'
     r'(?P<timezone>Z|(([-+])([0-9]{2}):([0-9]{2})))?)?)?)?'
 )
 TIMEZONE_REGEX = re.compile(
-    '(?P<prefix>[+-])(?P<hours>[0-9]{2}).(?P<minutes>[0-9]{2})'
+    r'(?P<prefix>[+-])(?P<hours>[0-9]{2}).(?P<minutes>[0-9]{2})'
 )
 
 

+ 14 - 10
celery/utils/log.py

@@ -51,9 +51,9 @@ def iter_open_logger_fds():
     seen = set()
     loggers = (list(values(logging.Logger.manager.loggerDict)) +
                [logging.getLogger(None)])
-    for logger in loggers:
+    for l in loggers:
         try:
-            for handler in logger.handlers:
+            for handler in l.handlers:
                 try:
                     if handler not in seen:  # pragma: no cover
                         yield handler.stream
@@ -93,12 +93,17 @@ def logger_isa(l, p, max=1000):
     return False
 
 
+def _using_logger_parent(base_logger, logger_):
+    if not logger_isa(logger_, base_logger):
+        logger_.parent = base_logger
+    return logger_
+
+
 def get_logger(name):
     """Get logger by name."""
     l = _get_logger(name)
     if logging.root not in (l, l.parent) and l is not base_logger:
-        if not logger_isa(l, base_logger):  # pragma: no cover
-            l.parent = base_logger
+        l = _using_logger_parent(base_logger, l)
     return l
 task_logger = get_logger('celery.task')
 worker_logger = get_logger('celery.worker')
@@ -108,10 +113,7 @@ def get_task_logger(name):
     """Get logger for task module by name."""
     if name in RESERVED_LOGGER_NAMES:
         raise RuntimeError('Logger name {0!r} is reserved!'.format(name))
-    logger = get_logger(name)
-    if not logger_isa(logger, task_logger):
-        logger.parent = task_logger
-    return logger
+    return _using_logger_parent(task_logger, get_logger(name))
 
 
 def mlevel(level):
@@ -164,7 +166,7 @@ class ColorFormatter(logging.Formatter):
                     return safe_str(color(msg))
                 except UnicodeDecodeError:  # pragma: no cover
                     return safe_str(msg)  # skip colors
-            except Exception as exc:
+            except Exception as exc:  # pylint: disable=broad-exc
                 prev_msg, record.exc_info, record.msg = (
                     record.msg, 1, '<Unrepresentable {0!r}: {1!r}>'.format(
                         type(msg), exc
@@ -193,6 +195,8 @@ class LoggingProxy(object):
     _thread = threading.local()
 
     def __init__(self, logger, loglevel=None):
+        # pylint: disable=redefined-outer-name
+        # Note that the logger global is redefined here, be careful changing.
         self.logger = logger
         self.loglevel = mlevel(loglevel or self.logger.level or self.loglevel)
         self._safewrap_handlers()
@@ -260,7 +264,7 @@ def get_multiprocessing_logger():
     try:
         from billiard import util
     except ImportError:  # pragma: no cover
-            pass
+        pass
     else:
         return util.get_logger()
 

+ 5 - 5
celery/utils/nodenames.py

@@ -64,9 +64,9 @@ def anon_nodename(hostname=None, prefix='gen'):
                     hostname or gethostname())
 
 
-def nodesplit(nodename):
+def nodesplit(name):
     """Split node name into tuple of name/hostname."""
-    parts = nodename.split(NODENAME_SEP, 1)
+    parts = name.split(NODENAME_SEP, 1)
     if len(parts) == 1:
         return None, parts[0]
     return parts
@@ -78,11 +78,11 @@ def default_nodename(hostname):
     return nodename(name or NODENAME_DEFAULT, host or gethostname())
 
 
-def node_format(s, nodename, **extra):
+def node_format(s, name, **extra):
     """Format worker node name (name@host.com)."""
-    name, host = nodesplit(nodename)
+    shortname, host = nodesplit(name)
     return host_format(
-        s, host, name or NODENAME_DEFAULT, p=nodename, **extra)
+        s, host, shortname or NODENAME_DEFAULT, p=name, **extra)
 
 
 def _fmt_process_index(prefix='', default='0'):

+ 3 - 1
celery/utils/threads.py

@@ -70,7 +70,7 @@ class bgThread(threading.Thread):
             while not shutdown_set():
                 try:
                     body()
-                except Exception as exc:
+                except Exception as exc:  # pylint: disable=broad-except
                     try:
                         self.on_crash('{0!r} crashed: {1!r}', self.name, exc)
                         self._set_stopped()
@@ -209,6 +209,8 @@ class _LocalStack(object):
         """Push a new item to the stack."""
         rv = getattr(self._local, 'stack', None)
         if rv is None:
+            # pylint: disable=assigning-non-slot
+            # This attribute is defined now.
             self._local.stack = rv = []
         rv.append(obj)
         return rv

+ 7 - 5
celery/utils/time.py

@@ -117,6 +117,7 @@ class LocalTimezone(tzinfo):
 class _Zone(object):
 
     def tz_or_local(self, tzinfo=None):
+        # pylint: disable=redefined-outer-name
         if tzinfo is None:
             return self.local
         return self.get_timezone(tzinfo)
@@ -216,13 +217,13 @@ def remaining(start, ends_in, now=None, relative=False):
     return ret
 
 
-def rate(rate):
+def rate(r):
     """Convert rate string (`"100/m"`, `"2/h"` or `"0.5/s"`) to seconds."""
-    if rate:
-        if isinstance(rate, string_t):
-            ops, _, modifier = rate.partition('/')
+    if r:
+        if isinstance(r, string_t):
+            ops, _, modifier = r.partition('/')
             return RATE_MODIFIER_MAP[modifier or 's'](float(ops)) or 0
-        return rate or 0
+        return r or 0
     return 0
 
 
@@ -336,6 +337,7 @@ class ffwd(object):
         self.year = year
         self.month = month
         self.weeks = weeks
+        # pylint: disable=redefined-outer-name
         self.weekday = weekday
         self.day = day
         self.hour = hour

+ 4 - 4
celery/worker/consumer/consumer.py

@@ -226,7 +226,7 @@ class Consumer(object):
             while self._pending_operations:
                 try:
                     self._pending_operations.pop()()
-                except Exception as exc:
+                except Exception as exc:  # pylint: disable=broad-except
                     logger.exception('Pending callback raised: %r', exc)
 
     def bucket_for_task(self, type):
@@ -336,7 +336,7 @@ class Consumer(object):
         warn(CONNECTION_RETRY, exc_info=True)
         try:
             self.connection.collect()
-        except Exception:
+        except Exception:  # pylint: disable=broad-except
             pass
 
     def register_with_event_loop(self, hub):
@@ -534,7 +534,7 @@ class Consumer(object):
             except KeyError:
                 try:
                     payload = message.decode()
-                except Exception as exc:
+                except Exception as exc:  # pylint: disable=broad-except
                     return self.on_decode_error(message, exc)
                 try:
                     type_, payload = payload['task'], payload  # protocol v1
@@ -556,7 +556,7 @@ class Consumer(object):
                     return on_invalid_task(payload, message, exc)
                 except MemoryError:
                     raise
-                except Exception as exc:
+                except Exception as exc:  # pylint: disable=broad-except
                     # XXX handle as internal error?
                     return on_invalid_task(payload, message, exc)
 

+ 1 - 0
celery/worker/consumer/control.py

@@ -29,6 +29,7 @@ class Control(bootsteps.StartStopStep):
         self.start = self.box.start
         self.stop = self.box.stop
         self.shutdown = self.box.shutdown
+        super(Control, self).__init__(c, **kwargs)
 
     def include_if(self, c):
         return (c.app.conf.worker_enable_remote_control and

+ 1 - 0
celery/worker/consumer/events.py

@@ -27,6 +27,7 @@ class Events(bootsteps.StartStopStep):
             not without_heartbeat
         )
         c.event_dispatcher = None
+        super(Events, self).__init__(c, **kwargs)
 
     def start(self, c):
         # flush events sent while connection was down.

+ 1 - 0
celery/worker/consumer/heart.py

@@ -26,6 +26,7 @@ class Heart(bootsteps.StartStopStep):
         self.enabled = not without_heartbeat
         self.heartbeat_interval = heartbeat_interval
         c.heart = None
+        super(Heart, self).__init__(c, **kwargs)
 
     def start(self, c):
         c.heart = heartbeat.Heart(

+ 1 - 0
celery/worker/consumer/tasks.py

@@ -20,6 +20,7 @@ class Tasks(bootsteps.StartStopStep):
 
     def __init__(self, c, **kwargs):
         c.task_consumer = c.qos = None
+        super(Tasks, self).__init__(c, **kwargs)
 
     def start(self, c):
         """Start task consumer."""

+ 14 - 5
celery/worker/control.py

@@ -147,6 +147,9 @@ def revoke(state, task_id, terminate=False, signal=None, **kwargs):
         terminate (bool): Also terminate the process if the task is active.
         signal (str): Name of signal to use for terminate (e.g., ``KILL``).
     """
+    # pylint: disable=redefined-outer-name
+    # XXX Note that this redefines `terminate`:
+    #     Outside of this scope that is a function.
     # supports list argument since 3.1
     task_ids, task_id = set(maybe_list(task_id) or []), None
     size = len(task_ids)
@@ -196,6 +199,9 @@ def rate_limit(state, task_name, rate_limit, **kwargs):
         task_name (str): Type of task to set rate limit for.
         rate_limit (int, str): New rate limit.
     """
+    # pylint: disable=redefined-outer-name
+    # XXX Note that this redefines `terminate`:
+    #     Outside of this scope that is a function.
     try:
         rate(rate_limit)
     except ValueError as exc:
@@ -303,6 +309,9 @@ def heartbeat(state):
 @inspect_command(visible=False)
 def hello(state, from_node, revoked=None, **kwargs):
     """Request mingle sync-data."""
+    # pylint: disable=redefined-outer-name
+    # XXX Note that this redefines `revoked`:
+    #     Outside of this scope that is a function.
     if from_node != state.hostname:
         logger.info('sync with %s', from_node)
         if revoked:
@@ -419,14 +428,14 @@ def objgraph(state, num=200, max_depth=10, type='Request'):  # pragma: no cover
         type (str): Name of object to graph.  Default is ``"Request"``.
     """
     try:
-        import objgraph
+        import objgraph as _objgraph
     except ImportError:
         raise ImportError('Requires the objgraph library')
     logger.info('Dumping graph for type %r', type)
     with tempfile.NamedTemporaryFile(prefix='cobjg',
                                      suffix='.png', delete=False) as fh:
-        objects = objgraph.by_type(type)[:num]
-        objgraph.show_backrefs(
+        objects = _objgraph.by_type(type)[:num]
+        _objgraph.show_backrefs(
             objects,
             max_depth=max_depth, highlight=lambda v: v in objects,
             filename=fh.name,
@@ -447,9 +456,9 @@ def memsample(state, **kwargs):
 )
 def memdump(state, samples=10, **kwargs):  # pragma: no cover
     """Dump statistics of previous memsample requests."""
-    from celery.utils.debug import memdump
+    from celery.utils import debug
     out = io.StringIO()
-    memdump(file=out)
+    debug.memdump(file=out)
     return out.getvalue()
 
 # -- Pool

+ 2 - 1
examples/celery_http_gateway/settings.py

@@ -67,7 +67,8 @@ MEDIA_URL = ''
 ADMIN_MEDIA_PREFIX = '/media/'
 
 # Make this unique, and don't share it with anybody.
-SECRET_KEY = '#1i=edpk55k3781$z-p%b#dbn&n+-rtt83pgz2o9o)v8g7(owq'
+# XXX TODO FIXME Set this secret key to anything you want, just change it!
+SECRET_KEY = 'This is not a secret, be sure to change this.'
 
 # List of callables that know how to import templates from various sources.
 TEMPLATE_LOADERS = (

+ 2 - 1
examples/django/proj/settings.py

@@ -97,7 +97,8 @@ STATICFILES_FINDERS = (
 )
 
 # Make this unique, and don't share it with anybody.
-SECRET_KEY = 'x2$s&amp;0z2xehpnt_99i8q3)4)t*5q@+n(+6jrqz4@rt%a8fdf+!'
+# XXX TODO FIXME Set this to any random value!
+SECRET_KEY = 'This is not a secret, please change me!'
 
 # List of callables that know how to import templates from various sources.
 TEMPLATE_LOADERS = (

+ 1 - 1
examples/eventlet/webcrawler.py

@@ -54,7 +54,7 @@ def crawl(url, seen=None):
     with Timeout(5, False):
         try:
             response = requests.get(url)
-        except Exception:
+        except requests.exception.RequestError:
             return
 
     location = domain(url)

+ 4 - 3
t/unit/utils/test_platforms.py

@@ -11,6 +11,7 @@ from case import Mock, call, mock, patch, skip
 
 from celery import _find_option_with_arg
 from celery import platforms
+from celery.exceptions import SecurityError
 from celery.five import WhateverIO
 from celery.platforms import (
     get_fdmax,
@@ -256,7 +257,7 @@ class test_maybe_drop_privileges:
             mock.side_effect = on_first_call
         to_root_on_second_call(geteuid, 10)
         to_root_on_second_call(getuid, 10)
-        with pytest.raises(AssertionError):
+        with pytest.raises(SecurityError):
             maybe_drop_privileges(uid='user')
 
         getuid.return_value = getuid.side_effect = None
@@ -264,7 +265,7 @@ class test_maybe_drop_privileges:
         getegid.return_value = 0
         getgid.return_value = 0
         setuid.side_effect = raise_on_second_call
-        with pytest.raises(AssertionError):
+        with pytest.raises(SecurityError):
             maybe_drop_privileges(gid='group')
 
         getuid.reset_mock()
@@ -824,7 +825,7 @@ def test_check_privileges():
         fchown = 13
     prev, platforms.os = platforms.os, Obj()
     try:
-        with pytest.raises(AssertionError):
+        with pytest.raises(SecurityError):
             check_privileges({'pickle'})
     finally:
         platforms.os = prev