Ask Solem 8 years ago
parent
commit
b2027b9c6c

+ 3 - 0
celery/_state.py

@@ -66,6 +66,8 @@ class _TLS(threading.local):
     #: sets this, so it will always contain the last instantiated app,
     #: and is the default app returned by :func:`app_or_default`.
     current_app = None
+
+
 _tls = _TLS()
 
 _task_stack = LocalStack()
@@ -188,6 +190,7 @@ def disable_trace():
     global app_or_default
     app_or_default = _app_or_default
 
+
 if os.environ.get('CELERY_TRACE_APP'):  # pragma: no cover
     enable_trace()
 else:

+ 1 - 1
celery/app/base.py

@@ -1227,4 +1227,4 @@ class Celery(object):
             return (timezone.get_timezone('UTC') if conf.enable_utc
                     else timezone.local)
         return timezone.get_timezone(conf.timezone)
-App = Celery  # compat
+App = Celery  # noqa: E305 XXX compat

+ 3 - 0
celery/app/defaults.py

@@ -72,6 +72,7 @@ class Option(object):
         return '<Option: type->{0} default->{1!r}>'.format(self.type,
                                                            self.default)
 
+
 NAMESPACES = Namespace(
     accept_content=Option(DEFAULT_ACCEPT_CONTENT, type='list', old=OLD_NS),
     enable_utc=Option(True, type='bool'),
@@ -309,6 +310,8 @@ def flatten(d, root='', keyfilter=_flatten_keys):
             else:
                 for ret in keyfilter(ns, key, opt):
                     yield ret
+
+
 DEFAULTS = {
     key: opt.default for key, opt in flatten(NAMESPACES)
 }

+ 1 - 2
celery/app/task.py

@@ -123,7 +123,6 @@ class Context(object):
             'expires': self.expires,
             'soft_time_limit': limit_soft,
             'time_limit': limit_hard,
-            'reply_to': self.reply_to,
             'headers': self.headers,
             'retries': self.retries,
             'reply_to': self.reply_to,
@@ -997,4 +996,4 @@ class Task(object):
     @property
     def __name__(self):
         return self.__class__.__name__
-BaseTask = Task  # compat alias
+BaseTask = Task  # noqa: E305 XXX compat alias

+ 1 - 1
celery/app/trace.py

@@ -509,7 +509,7 @@ def _trace_task_ret(name, uuid, request, body, content_type,
     R, I, T, Rstr = trace_task(app.tasks[name],
                                uuid, args, kwargs, request, app=app)
     return (1, R, T) if I else (0, Rstr, T)
-trace_task_ret = _trace_task_ret
+trace_task_ret = _trace_task_ret  # noqa: E305
 
 
 def _fast_trace_task(task, uuid, request, body, content_type,

+ 14 - 10
celery/apps/worker.py

@@ -42,16 +42,6 @@ logger = get_logger(__name__)
 is_jython = sys.platform.startswith('java')
 is_pypy = hasattr(sys, 'pypy_version_info')
 
-
-def active_thread_count():
-    from threading import enumerate
-    return sum(1 for t in enumerate()
-               if not t.name.startswith('Dummy-'))
-
-
-def safe_say(msg):
-    print('\n{0}'.format(msg), file=sys.__stderr__)
-
 ARTLINES = [
     ' --------------',
     '---- **** -----',
@@ -89,6 +79,16 @@ EXTRA_INFO_FMT = """
 """
 
 
+def active_thread_count():
+    from threading import enumerate
+    return sum(1 for t in enumerate()
+               if not t.name.startswith('Dummy-'))
+
+
+def safe_say(msg):
+    print('\n{0}'.format(msg), file=sys.__stderr__)
+
+
 class Worker(WorkController):
     """Worker as a program."""
 
@@ -282,6 +282,8 @@ def _shutdown_handler(worker, sig='TERM', how='Warm',
                 raise exc(exitcode)
     _handle_request.__name__ = str('worker_{0}'.format(how))
     platforms.signals[sig] = _handle_request
+
+
 install_worker_term_handler = partial(
     _shutdown_handler, sig='SIGTERM', how='Warm', exc=WorkerShutdown,
 )
@@ -298,6 +300,8 @@ else:  # pragma: no cover
 def on_SIGINT(worker):
     safe_say('worker: Hitting Ctrl+C again will terminate all running tasks!')
     install_worker_term_hard_handler(worker, sig='SIGINT')
+
+
 if not is_jython:  # pragma: no cover
     install_worker_int_handler = partial(
         _shutdown_handler, sig='SIGINT', callback=on_SIGINT,

+ 1 - 1
celery/backends/base.py

@@ -510,7 +510,7 @@ class SyncBackendMixin(object):
 
 class BaseBackend(Backend, SyncBackendMixin):
     """Base (synchronous) result backend."""
-BaseDictBackend = BaseBackend  # XXX compat
+BaseDictBackend = BaseBackend  # noqa: E305 XXX compat
 
 
 class BaseKeyValueStoreBackend(Backend):

+ 1 - 0
celery/bin/amqp.py

@@ -369,5 +369,6 @@ class amqp(Command):
 def main():
     amqp().execute_from_commandline()
 
+
 if __name__ == '__main__':  # pragma: no cover
     main()

+ 1 - 0
celery/bin/beat.py

@@ -127,5 +127,6 @@ class beat(Command):
 def main(app=None):
     beat(app=app).execute_from_commandline()
 
+
 if __name__ == '__main__':      # pragma: no cover
     main()

+ 1 - 0
celery/bin/celeryd_detach.py

@@ -131,5 +131,6 @@ class detached_celeryd(object):
 def main(app=None):
     detached_celeryd(app).execute_from_commandline()
 
+
 if __name__ == '__main__':  # pragma: no cover
     main()

+ 1 - 0
celery/bin/events.py

@@ -173,5 +173,6 @@ def main():
     ev = events()
     ev.execute_from_commandline()
 
+
 if __name__ == '__main__':              # pragma: no cover
     main()

+ 1 - 0
celery/bin/multi.py

@@ -451,5 +451,6 @@ class MultiTool(TermLogger):
     def DOWN(self):
         return str(self.colored.magenta('DOWN'))
 
+
 if __name__ == '__main__':              # pragma: no cover
     main()

+ 2 - 3
celery/canvas.py

@@ -1362,7 +1362,7 @@ def signature(varies, *args, **kwargs):
             return varies.clone()
         return Signature.from_dict(varies, app=app)
     return Signature(varies, *args, **kwargs)
-subtask = signature   # XXX compat
+subtask = signature  # noqa: E305 XXX compat
 
 
 def maybe_signature(d, app=None, clone=False):
@@ -1390,5 +1390,4 @@ def maybe_signature(d, app=None, clone=False):
         if app is not None:
             d._app = app
     return d
-
-maybe_subtask = maybe_signature  # XXX compat
+maybe_subtask = maybe_signature  # noqa: E305 XXX compat

+ 1 - 0
celery/events/dumper.py

@@ -103,5 +103,6 @@ def evdump(app=None, out=sys.stdout):
         except conn.connection_errors + conn.channel_errors:
             dumper.say('-> Connection lost, attempting reconnect')
 
+
 if __name__ == '__main__':  # pragma: no cover
     evdump()

+ 1 - 1
celery/events/state.py

@@ -101,7 +101,7 @@ class CallableDefaultdict(defaultdict):
 
     def __call__(self, *args, **kwargs):
         return self.fun(*args, **kwargs)
-Callable.register(CallableDefaultdict)
+Callable.register(CallableDefaultdict)  # noqa: E305
 
 
 @memoize(maxsize=1000, keyfun=lambda a, _: a[0])

+ 2 - 2
celery/exceptions.py

@@ -159,7 +159,7 @@ class Retry(TaskPredicate):
 
     def __reduce__(self):
         return self.__class__, (self.message, self.excs, self.when)
-RetryTaskError = Retry   # XXX compat
+RetryTaskError = Retry  # noqa: E305 XXX compat
 
 
 class Ignore(TaskPredicate):
@@ -242,7 +242,7 @@ class CDeprecationWarning(DeprecationWarning):
 
 class WorkerTerminate(SystemExit):
     """Signals that the worker should terminate immediately."""
-SystemTerminate = WorkerTerminate  # XXX compat
+SystemTerminate = WorkerTerminate  # noqa: E305 XXX compat
 
 
 class WorkerShutdown(SystemExit):

+ 2 - 0
celery/local.py

@@ -390,6 +390,7 @@ def maybe_evaluate(obj):
 
 # import fails in python 2.5. fallback to reduce in stdlib
 
+
 MODULE_DEPRECATED = """
 The module %s is deprecated and will be removed in a future version.
 """
@@ -420,6 +421,7 @@ def _compat_periodic_task_decorator(*args, **kwargs):
     from celery.task import periodic_task
     return periodic_task(*args, **kwargs)
 
+
 COMPAT_MODULES = {
     'celery': {
         'execute': {

+ 2 - 1
celery/platforms.py

@@ -229,7 +229,7 @@ class Pidfile(object):
                     "Inconsistency: Pidfile content doesn't match at re-read")
         finally:
             rfh.close()
-PIDFile = Pidfile  # compat alias
+PIDFile = Pidfile  # noqa: E305 XXX compat alias
 
 
 def create_pidlock(pidfile):
@@ -682,6 +682,7 @@ class Signals(object):
         for name, handler in items(dict(_d_ or {}, **sigmap)):
             self[name] = handler
 
+
 signals = Signals()
 get_signal = signals.signum                   # compat
 install_signal_handler = signals.__setitem__  # compat

+ 1 - 0
celery/states.py

@@ -124,6 +124,7 @@ class state(str):
     def __le__(self, other):
         return precedence(self) >= precedence(other)
 
+
 #: Task state is unknown (assumed pending since you know the id).
 PENDING = 'PENDING'
 #: Task was received by a worker (only used in events).

+ 3 - 3
celery/utils/collections.py

@@ -229,7 +229,7 @@ class DictAttribute(object):
         def values(self):
             # type: () -> List[Any]
             return list(self._iterate_values())
-MutableMapping.register(DictAttribute)
+MutableMapping.register(DictAttribute)  # noqa: E305
 
 
 class ChainMap(MutableMapping):
@@ -707,7 +707,7 @@ class LimitedSet(object):
         # type: () -> float
         """Compute how much is heap bigger than data [percents]."""
         return len(self._heap) * 100 / max(len(self._data), 1) - 100
-MutableSet.register(LimitedSet)
+MutableSet.register(LimitedSet)  # noqa: E305
 
 
 class Evictable(object):
@@ -809,7 +809,7 @@ class Messagebuffer(Evictable):
     def _evictcount(self):
         # type: () -> int
         return len(self)
-Sequence.register(Messagebuffer)
+Sequence.register(Messagebuffer)  # noqa: E305
 
 
 @python_2_unicode_compatible

+ 2 - 0
celery/utils/dispatch/signal.py

@@ -30,6 +30,8 @@ def _make_id(target):  # pragma: no cover
     if hasattr(target, '__func__'):
         return (id(target.__self__), id(target.__func__))
     return id(target)
+
+
 NONE_ID = _make_id(None)
 
 NO_RECEIVERS = object()

+ 2 - 0
celery/utils/log.py

@@ -105,6 +105,8 @@ def get_logger(name):
     if logging.root not in (l, l.parent) and l is not base_logger:
         l = _using_logger_parent(base_logger, l)
     return l
+
+
 task_logger = get_logger('celery.task')
 worker_logger = get_logger('celery.worker')
 

+ 2 - 0
celery/utils/nodenames.py

@@ -85,6 +85,8 @@ def _fmt_process_index(prefix='', default='0'):
     from .log import current_process_index
     index = current_process_index()
     return '{0}{1}'.format(prefix, index) if index else default
+
+
 _fmt_process_index_with_prefix = partial(_fmt_process_index, '-', '')
 
 

+ 2 - 0
celery/utils/text.py

@@ -60,6 +60,8 @@ def ensure_sep(sep, s, n=2):
     # type: (str, str, int) -> str
     """Ensure text s ends in separator sep'."""
     return s + sep * (n - s.count(sep))
+
+
 ensure_newlines = partial(ensure_sep, '\n')
 
 

+ 1 - 0
celery/utils/threads.py

@@ -325,6 +325,7 @@ class _FastLocalStack(threading.local):
     def __len__(self):
         return len(self.stack)
 
+
 if USE_FAST_LOCALS:  # pragma: no cover
     LocalStack = _FastLocalStack
 else:

+ 2 - 0
celery/utils/time.py

@@ -156,6 +156,8 @@ class _Zone(object):
     @cached_property
     def utc(self):
         return self.get_timezone('UTC')
+
+
 timezone = _Zone()
 
 

+ 1 - 1
celery/worker/request.py

@@ -53,7 +53,7 @@ def __optimize__():
     global _does_info
     _does_debug = logger.isEnabledFor(logging.DEBUG)
     _does_info = logger.isEnabledFor(logging.INFO)
-__optimize__()
+__optimize__()  # noqa: E305
 
 # Localize
 tz_or_local = timezone.tz_or_local

+ 1 - 0
t/unit/app/test_app.py

@@ -33,6 +33,7 @@ class ObjectConfig(object):
     FOO = 1
     BAR = 2
 
+
 object_config = ObjectConfig()
 dict_config = dict(FOO=10, BAR=20)
 

+ 1 - 0
t/unit/backends/test_base.py

@@ -32,6 +32,7 @@ class wrapobject(object):
     def __init__(self, *args, **kwargs):
         self.args = args
 
+
 if sys.version_info[0] == 3 or getattr(sys, 'pypy_version_info', None):
     Oldstyle = None
 else:

+ 1 - 0
t/unit/bin/test_base.py

@@ -13,6 +13,7 @@ from celery.bin.base import (
 class MyApp(object):
     user_options = {'preload': None}
 
+
 APP = MyApp()  # <-- Used by test_with_custom_app
 
 

+ 1 - 1
t/unit/bin/test_events.py

@@ -31,7 +31,7 @@ class MockCommand(object):
 
 def proctitle(prog, info=None):
     proctitle.last = (prog, info)
-proctitle.last = ()
+proctitle.last = ()  # noqa: E305
 
 
 class test_events:

+ 2 - 0
t/unit/events/test_snapshot.py

@@ -11,6 +11,8 @@ class MockTimer(object):
     def call_repeatedly(self, secs, fun, *args, **kwargs):
         self.installed.append(fun)
         return Mock(name='TRef')
+
+
 timer = MockTimer()
 
 

+ 2 - 0
t/unit/tasks/test_context.py

@@ -15,6 +15,8 @@ def get_context_as_dict(ctx, getter=getattr):
             continue   # Ignore methods and other non-trivial types
         defaults[attr_name] = attr
     return defaults
+
+
 default_context = get_context_as_dict(Context())
 
 

+ 1 - 0
t/unit/utils/test_dispatcher.py

@@ -38,6 +38,7 @@ class Callable(object):
     def a(self, val, **kwargs):
         return val
 
+
 a_signal = Signal(providing_args=['val'], use_caching=False)