Ask Solem 9 years ago
parent
commit
d4e72df2a7
44 changed files with 269 additions and 168 deletions
  1. 4 3
      celery/__init__.py
  2. 2 1
      celery/app/base.py
  3. 2 1
      celery/apps/worker.py
  4. 0 2
      celery/backends/__init__.py
  5. 2 2
      celery/backends/database/__init__.py
  6. 10 10
      celery/backends/mongodb.py
  7. 3 3
      celery/bin/beat.py
  8. 2 1
      celery/bin/celery.py
  9. 3 3
      celery/bin/events.py
  10. 3 3
      celery/concurrency/eventlet.py
  11. 2 1
      celery/concurrency/prefork.py
  12. 2 2
      celery/events/cursesmon.py
  13. 15 16
      celery/five.py
  14. 5 5
      celery/fixups/django.py
  15. 136 46
      celery/local.py
  16. 3 3
      celery/platforms.py
  17. 1 1
      celery/task/trace.py
  18. 8 2
      celery/tests/app/test_app.py
  19. 2 1
      celery/tests/app/test_loaders.py
  20. 2 2
      celery/tests/case.py
  21. 4 4
      celery/tests/compat_modules/test_compat.py
  22. 2 1
      celery/tests/events/test_state.py
  23. 3 1
      celery/tests/tasks/test_chord.py
  24. 3 1
      celery/tests/utils/test_platforms.py
  25. 3 1
      celery/tests/utils/test_threads.py
  26. 2 1
      celery/tests/worker/test_hub.py
  27. 5 2
      celery/utils/term.py
  28. 2 2
      celery/worker/job.py
  29. 0 2
      docs/_ext/applyxrefs.py
  30. 6 14
      docs/conf.py
  31. 3 2
      examples/app/myapp.py
  32. 1 1
      examples/django/proj/__init__.py
  33. 1 1
      examples/django/proj/celery.py
  34. 1 1
      examples/django/proj/wsgi.py
  35. 1 1
      examples/eventlet/celeryconfig.py
  36. 1 1
      examples/gevent/celeryconfig.py
  37. 1 1
      examples/next-steps/setup.py
  38. 4 2
      extra/release/bump_version.py
  39. 7 7
      funtests/benchmarks/bench_worker.py
  40. 0 1
      funtests/stress/stress/fbi.py
  41. 1 0
      funtests/suite/__init__.py
  42. 3 7
      funtests/suite/test_basic.py
  43. 1 3
      funtests/suite/test_leak.py
  44. 7 3
      setup.py

+ 4 - 3
celery/__init__.py

@@ -7,6 +7,9 @@
 
 
 from __future__ import absolute_import
 from __future__ import absolute_import
 
 
+import os
+import sys
+
 from collections import namedtuple
 from collections import namedtuple
 
 
 version_info_t = namedtuple(
 version_info_t = namedtuple(
@@ -30,8 +33,6 @@ VERSION_BANNER = '{0} ({1})'.format(__version__, SERIES)
 
 
 # -eof meta-
 # -eof meta-
 
 
-import os
-import sys
 if os.environ.get('C_IMPDEBUG'):  # pragma: no cover
 if os.environ.get('C_IMPDEBUG'):  # pragma: no cover
     from .five import builtins
     from .five import builtins
     real_import = builtins.__import__
     real_import = builtins.__import__
@@ -127,7 +128,7 @@ def maybe_patch_concurrency(argv=sys.argv,
         concurrency.get_implementation(pool)
         concurrency.get_implementation(pool)
 
 
 # Lazy loading
 # Lazy loading
-from celery import five
+from celery import five  # noqa
 
 
 old_module, new_module = five.recreate_module(  # pragma: no cover
 old_module, new_module = five.recreate_module(  # pragma: no cover
     __name__,
     __name__,

+ 2 - 1
celery/app/base.py

@@ -221,7 +221,8 @@ class Celery(object):
 
 
             def _create_task_cls(fun):
             def _create_task_cls(fun):
                 if shared:
                 if shared:
-                    cons = lambda app: app._task_from_fun(fun, **opts)
+                    def cons(app):
+                        return app._task_from_fun(fun, **opts)
                     cons.__name__ = fun.__name__
                     cons.__name__ = fun.__name__
                     connect_on_app_finalize(cons)
                     connect_on_app_finalize(cons)
                 if self.accept_magic_kwargs:  # compat mode
                 if self.accept_magic_kwargs:  # compat mode

+ 2 - 1
celery/apps/worker.py

@@ -313,7 +313,8 @@ if not is_jython:  # pragma: no cover
         _shutdown_handler, sig='SIGINT', callback=on_SIGINT
         _shutdown_handler, sig='SIGINT', callback=on_SIGINT
     )
     )
 else:  # pragma: no cover
 else:  # pragma: no cover
-    install_worker_int_handler = lambda *a, **kw: None
+    def install_worker_int_handler(*a, **kw):
+        pass
 
 
 
 
 def _reload_current_worker():
 def _reload_current_worker():

+ 0 - 2
celery/backends/__init__.py

@@ -10,8 +10,6 @@ from __future__ import absolute_import
 
 
 import sys
 import sys
 
 
-from kombu.utils.url import urlparse
-
 from celery.local import Proxy
 from celery.local import Proxy
 from celery._state import current_app
 from celery._state import current_app
 from celery.five import reraise
 from celery.five import reraise

+ 2 - 2
celery/backends/database/__init__.py

@@ -37,8 +37,8 @@ def _sqlalchemy_installed():
     return sqlalchemy
     return sqlalchemy
 _sqlalchemy_installed()
 _sqlalchemy_installed()
 
 
-from sqlalchemy.exc import DatabaseError, InvalidRequestError
-from sqlalchemy.orm.exc import StaleDataError
+from sqlalchemy.exc import DatabaseError, InvalidRequestError  # noqa
+from sqlalchemy.orm.exc import StaleDataError  # noqa
 
 
 
 
 @contextmanager
 @contextmanager

+ 10 - 10
celery/backends/mongodb.py

@@ -10,6 +10,16 @@ from __future__ import absolute_import
 
 
 from datetime import datetime
 from datetime import datetime
 
 
+from kombu.syn import detect_environment
+from kombu.utils import cached_property
+
+from celery import states
+from celery.exceptions import ImproperlyConfigured
+from celery.five import items, string_t
+from celery.utils.timeutils import maybe_timedelta
+
+from .base import BaseBackend
+
 try:
 try:
     import pymongo
     import pymongo
 except ImportError:  # pragma: no cover
 except ImportError:  # pragma: no cover
@@ -23,16 +33,6 @@ if pymongo:
 else:                                       # pragma: no cover
 else:                                       # pragma: no cover
     Binary = None                           # noqa
     Binary = None                           # noqa
 
 
-from kombu.syn import detect_environment
-from kombu.utils import cached_property
-
-from celery import states
-from celery.exceptions import ImproperlyConfigured
-from celery.five import items, string_t
-from celery.utils.timeutils import maybe_timedelta
-
-from .base import BaseBackend
-
 __all__ = ['MongoBackend']
 __all__ = ['MongoBackend']
 
 
 
 

+ 3 - 3
celery/bin/beat.py

@@ -87,9 +87,9 @@ class beat(Command):
                     default=c.CELERYBEAT_SCHEDULE_FILENAME),
                     default=c.CELERYBEAT_SCHEDULE_FILENAME),
              Option('--max-interval', type='float'),
              Option('--max-interval', type='float'),
              Option('-S', '--scheduler', dest='scheduler_cls'),
              Option('-S', '--scheduler', dest='scheduler_cls'),
-             Option('-l', '--loglevel', default=c.CELERYBEAT_LOG_LEVEL))
-            + daemon_options(default_pidfile='celerybeat.pid')
-            + tuple(self.app.user_options['beat'])
+             Option('-l', '--loglevel', default=c.CELERYBEAT_LOG_LEVEL)) +
+            daemon_options(default_pidfile='celerybeat.pid') +
+            tuple(self.app.user_options['beat'])
         )
         )
 
 
 
 

+ 2 - 1
celery/bin/celery.py

@@ -115,7 +115,8 @@ class list_(Command):
         except NotImplementedError:
         except NotImplementedError:
             raise self.Error('Your transport cannot list bindings.')
             raise self.Error('Your transport cannot list bindings.')
 
 
-        fmt = lambda q, e, r: self.out('{0:<28} {1:<28} {2}'.format(q, e, r))
+        def fmt(q, e, r):
+            return self.out('{0:<28} {1:<28} {2}'.format(q, e, r))
         fmt('Queue', 'Exchange', 'Routing Key')
         fmt('Queue', 'Exchange', 'Routing Key')
         fmt('-' * 16, '-' * 16, '-' * 16)
         fmt('-' * 16, '-' * 16, '-' * 16)
         for b in bindings:
         for b in bindings:

+ 3 - 3
celery/bin/events.py

@@ -125,9 +125,9 @@ class events(Command):
              Option('-F', '--frequency', '--freq',
              Option('-F', '--frequency', '--freq',
                     type='float', default=1.0),
                     type='float', default=1.0),
              Option('-r', '--maxrate'),
              Option('-r', '--maxrate'),
-             Option('-l', '--loglevel', default='INFO'))
-            + daemon_options(default_pidfile='celeryev.pid')
-            + tuple(self.app.user_options['events'])
+             Option('-l', '--loglevel', default='INFO')) +
+            daemon_options(default_pidfile='celeryev.pid') +
+            tuple(self.app.user_options['events'])
         )
         )
 
 
 
 

+ 3 - 3
celery/concurrency/eventlet.py

@@ -29,10 +29,10 @@ for mod in (mod for mod in sys.modules if mod.startswith(RACE_MODS)):
             warnings.warn(RuntimeWarning(W_RACE % side))
             warnings.warn(RuntimeWarning(W_RACE % side))
 
 
 
 
-from celery import signals
-from celery.utils import timer2
+from celery import signals  # noqa
+from celery.utils import timer2  # noqa
 
 
-from . import base
+from . import base  # noqa
 
 
 
 
 def apply_target(target, args=(), kwargs={}, callback=None,
 def apply_target(target, args=(), kwargs={}, callback=None,

+ 2 - 1
celery/concurrency/prefork.py

@@ -159,7 +159,8 @@ class TaskPool(BasePool):
         try:
         try:
             write_stats = self._pool.human_write_stats
             write_stats = self._pool.human_write_stats
         except AttributeError:
         except AttributeError:
-            write_stats = lambda: 'N/A'  # only supported by asynpool
+            def write_stats():
+                return 'N/A'  # only supported by asynpool
         return {
         return {
             'max-concurrency': self.limit,
             'max-concurrency': self.limit,
             'processes': [p.pid for p in self._pool._pool],
             'processes': [p.pid for p in self._pool._pool],

+ 2 - 2
celery/events/cursesmon.py

@@ -318,8 +318,8 @@ class CursesMonitor(object):  # pragma: no cover
         def alert_callback(my, mx, xs):
         def alert_callback(my, mx, xs):
             y = count(xs)
             y = count(xs)
             task = self.state.tasks[self.selected_task]
             task = self.state.tasks[self.selected_task]
-            result = (getattr(task, 'result', None)
-                      or getattr(task, 'exception', None))
+            result = (getattr(task, 'result', None) or
+                      getattr(task, 'exception', None))
             for line in wrap(result, mx - 2):
             for line in wrap(result, mx - 2):
                 self.win.addstr(next(y), 3, line)
                 self.win.addstr(next(y), 3, line)
 
 

+ 15 - 16
celery/five.py

@@ -10,15 +10,14 @@
 """
 """
 from __future__ import absolute_import
 from __future__ import absolute_import
 
 
-__all__ = ['Counter', 'reload', 'UserList', 'UserDict', 'Queue', 'Empty',
-           'zip_longest', 'map', 'string', 'string_t',
-           'long_t', 'text_t', 'range', 'int_types', 'items', 'keys', 'values',
-           'nextfun', 'reraise', 'WhateverIO', 'with_metaclass',
-           'OrderedDict', 'THREAD_TIMEOUT_MAX', 'format_d',
-           'class_property', 'reclassmethod', 'create_module',
-           'recreate_module', 'monotonic']
-
 import io
 import io
+import operator
+import sys
+
+from importlib import import_module
+from types import ModuleType
+
+from kombu.five import monotonic
 
 
 try:
 try:
     from collections import Counter
     from collections import Counter
@@ -28,8 +27,15 @@ except ImportError:  # pragma: no cover
     def Counter():  # noqa
     def Counter():  # noqa
         return defaultdict(int)
         return defaultdict(int)
 
 
+__all__ = ['Counter', 'reload', 'UserList', 'UserDict', 'Queue', 'Empty',
+           'zip_longest', 'map', 'string', 'string_t',
+           'long_t', 'text_t', 'range', 'int_types', 'items', 'keys', 'values',
+           'nextfun', 'reraise', 'WhateverIO', 'with_metaclass',
+           'OrderedDict', 'THREAD_TIMEOUT_MAX', 'format_d',
+           'class_property', 'reclassmethod', 'create_module',
+           'recreate_module', 'monotonic']
+
 # ############# py3k #########################################################
 # ############# py3k #########################################################
-import sys
 PY3 = sys.version_info[0] == 3
 PY3 = sys.version_info[0] == 3
 
 
 try:
 try:
@@ -48,8 +54,6 @@ except ImportError:                         # pragma: no cover
     from collections import UserDict        # noqa
     from collections import UserDict        # noqa
 
 
 
 
-from kombu.five import monotonic
-
 if PY3:  # pragma: no cover
 if PY3:  # pragma: no cover
     import builtins
     import builtins
 
 
@@ -175,8 +179,6 @@ else:  # pragma: no cover
 # recreate modules, either for lazy loading or
 # recreate modules, either for lazy loading or
 # to create old modules at runtime instead of
 # to create old modules at runtime instead of
 # having them litter the source tree.
 # having them litter the source tree.
-import operator
-import sys
 
 
 # import fails in python 2.5. fallback to reduce in stdlib
 # import fails in python 2.5. fallback to reduce in stdlib
 try:
 try:
@@ -184,9 +186,6 @@ try:
 except ImportError:
 except ImportError:
     pass
     pass
 
 
-from importlib import import_module
-from types import ModuleType
-
 MODULE_DEPRECATED = """
 MODULE_DEPRECATED = """
 The module %s is deprecated and will be removed in a future version.
 The module %s is deprecated and will be removed in a future version.
 """
 """

+ 5 - 5
celery/fixups/django.py

@@ -4,11 +4,6 @@ import os
 import sys
 import sys
 import warnings
 import warnings
 
 
-if sys.version_info[0] < 3 and not hasattr(sys, 'pypy_version_info'):
-    from StringIO import StringIO
-else:
-    from io import StringIO
-
 from kombu.utils import cached_property, symbol_by_name
 from kombu.utils import cached_property, symbol_by_name
 
 
 from datetime import datetime
 from datetime import datetime
@@ -17,6 +12,11 @@ from importlib import import_module
 from celery import signals
 from celery import signals
 from celery.exceptions import FixupWarning
 from celery.exceptions import FixupWarning
 
 
+if sys.version_info[0] < 3 and not hasattr(sys, 'pypy_version_info'):
+    from StringIO import StringIO
+else:
+    from io import StringIO
+
 __all__ = ['DjangoFixup', 'fixup']
 __all__ = ['DjangoFixup', 'fixup']
 
 
 ERR_NOT_INSTALLED = """\
 ERR_NOT_INSTALLED = """\

+ 136 - 46
celery/local.py

@@ -154,54 +154,144 @@ class Proxy(object):
     def __delslice__(self, i, j):
     def __delslice__(self, i, j):
         del self._get_current_object()[i:j]
         del self._get_current_object()[i:j]
 
 
-    __setattr__ = lambda x, n, v: setattr(x._get_current_object(), n, v)
-    __delattr__ = lambda x, n: delattr(x._get_current_object(), n)
-    __str__ = lambda x: str(x._get_current_object())
-    __lt__ = lambda x, o: x._get_current_object() < o
-    __le__ = lambda x, o: x._get_current_object() <= o
-    __eq__ = lambda x, o: x._get_current_object() == o
-    __ne__ = lambda x, o: x._get_current_object() != o
-    __gt__ = lambda x, o: x._get_current_object() > o
-    __ge__ = lambda x, o: x._get_current_object() >= o
-    __hash__ = lambda x: hash(x._get_current_object())
-    __call__ = lambda x, *a, **kw: x._get_current_object()(*a, **kw)
-    __len__ = lambda x: len(x._get_current_object())
-    __getitem__ = lambda x, i: x._get_current_object()[i]
-    __iter__ = lambda x: iter(x._get_current_object())
-    __contains__ = lambda x, i: i in x._get_current_object()
-    __getslice__ = lambda x, i, j: x._get_current_object()[i:j]
-    __add__ = lambda x, o: x._get_current_object() + o
-    __sub__ = lambda x, o: x._get_current_object() - o
-    __mul__ = lambda x, o: x._get_current_object() * o
-    __floordiv__ = lambda x, o: x._get_current_object() // o
-    __mod__ = lambda x, o: x._get_current_object() % o
-    __divmod__ = lambda x, o: x._get_current_object().__divmod__(o)
-    __pow__ = lambda x, o: x._get_current_object() ** o
-    __lshift__ = lambda x, o: x._get_current_object() << o
-    __rshift__ = lambda x, o: x._get_current_object() >> o
-    __and__ = lambda x, o: x._get_current_object() & o
-    __xor__ = lambda x, o: x._get_current_object() ^ o
-    __or__ = lambda x, o: x._get_current_object() | o
-    __div__ = lambda x, o: x._get_current_object().__div__(o)
-    __truediv__ = lambda x, o: x._get_current_object().__truediv__(o)
-    __neg__ = lambda x: -(x._get_current_object())
-    __pos__ = lambda x: +(x._get_current_object())
-    __abs__ = lambda x: abs(x._get_current_object())
-    __invert__ = lambda x: ~(x._get_current_object())
-    __complex__ = lambda x: complex(x._get_current_object())
-    __int__ = lambda x: int(x._get_current_object())
-    __float__ = lambda x: float(x._get_current_object())
-    __oct__ = lambda x: oct(x._get_current_object())
-    __hex__ = lambda x: hex(x._get_current_object())
-    __index__ = lambda x: x._get_current_object().__index__()
-    __coerce__ = lambda x, o: x._get_current_object().__coerce__(o)
-    __enter__ = lambda x: x._get_current_object().__enter__()
-    __exit__ = lambda x, *a, **kw: x._get_current_object().__exit__(*a, **kw)
-    __reduce__ = lambda x: x._get_current_object().__reduce__()
+    def __setattr__(self, name, value):
+        setattr(self._get_current_object(), name, value)
+
+    def __delattr__(self, name):
+        delattr(self._get_current_object(), name)
+
+    def __str__(self):
+        return str(self._get_current_object())
+
+    def __lt__(self, other):
+        return self._get_current_object() < other
+
+    def __le__(self, other):
+        return self._get_current_object() <= other
+
+    def __eq__(self, other):
+        return self._get_current_object() == other
+
+    def __ne__(self, other):
+        return self._get_current_object() != other
+
+    def __gt__(self, other):
+        return self._get_current_object() > other
+
+    def __ge__(self, other):
+        return self._get_current_object() >= other
+
+    def __hash__(self):
+        return hash(self._get_current_object())
+
+    def __call__(self, *a, **kw):
+        return self._get_current_object()(*a, **kw)
+
+    def __len__(self):
+        return len(self._get_current_object())
+
+    def __getitem__(self, i):
+        return self._get_current_object()[i]
+
+    def __iter__(self):
+        return iter(self._get_current_object())
+
+    def __contains__(self, i):
+        return i in self._get_current_object()
+
+    def __getslice__(self, i, j):
+        return self._get_current_object()[i:j]
+
+    def __add__(self, other):
+        return self._get_current_object() + other
+
+    def __sub__(self, other):
+        return self._get_current_object() - other
+
+    def __mul__(self, other):
+        return self._get_current_object() * other
+
+    def __floordiv__(self, other):
+        return self._get_current_object() // other
+
+    def __mod__(self, other):
+        return self._get_current_object() % other
+
+    def __divmod__(self, other):
+        return self._get_current_object().__divmod__(other)
+
+    def __pow__(self, other):
+        return self._get_current_object() ** other
+
+    def __lshift__(self, other):
+        return self._get_current_object() << other
+
+    def __rshift__(self, other):
+        return self._get_current_object() >> other
+
+    def __and__(self, other):
+        return self._get_current_object() & other
+
+    def __xor__(self, other):
+        return self._get_current_object() ^ other
+
+    def __or__(self, other):
+        return self._get_current_object() | other
+
+    def __div__(self, other):
+        return self._get_current_object().__div__(other)
+
+    def __truediv__(self, other):
+        return self._get_current_object().__truediv__(other)
+
+    def __neg__(self):
+        return -(self._get_current_object())
+
+    def __pos__(self):
+        return +(self._get_current_object())
+
+    def __abs__(self):
+        return abs(self._get_current_object())
+
+    def __invert__(self):
+        return ~(self._get_current_object())
+
+    def __complex__(self):
+        return complex(self._get_current_object())
+
+    def __int__(self):
+        return int(self._get_current_object())
+
+    def __float__(self):
+        return float(self._get_current_object())
+
+    def __oct__(self):
+        return oct(self._get_current_object())
+
+    def __hex__(self):
+        return hex(self._get_current_object())
+
+    def __index__(self):
+        return self._get_current_object().__index__()
+
+    def __coerce__(self, other):
+        return self._get_current_object().__coerce__(other)
+
+    def __enter__(self):
+        return self._get_current_object().__enter__()
+
+    def __exit__(self, *a, **kw):
+        return self._get_current_object().__exit__(*a, **kw)
+
+    def __reduce__(self):
+        return self._get_current_object().__reduce__()
 
 
     if not PY3:
     if not PY3:
-        __cmp__ = lambda x, o: cmp(x._get_current_object(), o)  # noqa
-        __long__ = lambda x: long(x._get_current_object())      # noqa
+        def __cmp__(self, other):
+            return cmp(self._get_current_object(), other)  # noqa
+
+        def __long__(self):
+            return long(self._get_current_object())  # noqa
 
 
 
 
 class PromiseProxy(Proxy):
 class PromiseProxy(Proxy):

+ 3 - 3
celery/platforms.py

@@ -265,9 +265,9 @@ def _create_pidlock(pidfile):
     pidlock.acquire()
     pidlock.acquire()
     return pidlock
     return pidlock
 
 
+
 def fd_by_path(paths):
 def fd_by_path(paths):
-    """
-    Return a list of fds.
+    """Return a list of fds.
 
 
     This method returns list of fds corresponding to
     This method returns list of fds corresponding to
     file paths passed in paths variable.
     file paths passed in paths variable.
@@ -300,7 +300,7 @@ def fd_by_path(paths):
         except OSError:
         except OSError:
             return False
             return False
 
 
-    return [fd for fd in range(get_fdmax(2048)) if fd_in_stats(fd)]
+    return [_fd for _fd in range(get_fdmax(2048)) if fd_in_stats(_fd)]
 
 
 
 
 if hasattr(os, 'closerange'):
 if hasattr(os, 'closerange'):

+ 1 - 1
celery/task/trace.py

@@ -3,10 +3,10 @@ from __future__ import absolute_import
 
 
 import sys
 import sys
 
 
+from celery.app import trace
 from celery.utils import warn_deprecated
 from celery.utils import warn_deprecated
 
 
 warn_deprecated('celery.task.trace', removal='3.2',
 warn_deprecated('celery.task.trace', removal='3.2',
                 alternative='Please use celery.app.trace instead.')
                 alternative='Please use celery.app.trace instead.')
 
 
-from celery.app import trace
 sys.modules[__name__] = trace
 sys.modules[__name__] = trace

+ 8 - 2
celery/tests/app/test_app.py

@@ -144,7 +144,10 @@ class test_App(AppCase):
     def test_add_defaults(self):
     def test_add_defaults(self):
         self.assertFalse(self.app.configured)
         self.assertFalse(self.app.configured)
         _conf = {'FOO': 300}
         _conf = {'FOO': 300}
-        conf = lambda: _conf
+
+        def conf():
+            return _conf
+
         self.app.add_defaults(conf)
         self.app.add_defaults(conf)
         self.assertIn(conf, self.app._pending_defaults)
         self.assertIn(conf, self.app._pending_defaults)
         self.assertFalse(self.app.configured)
         self.assertFalse(self.app.configured)
@@ -208,7 +211,10 @@ class test_App(AppCase):
 
 
     def test_autodiscover_tasks_lazy(self):
     def test_autodiscover_tasks_lazy(self):
         with patch('celery.signals.import_modules') as import_modules:
         with patch('celery.signals.import_modules') as import_modules:
-            packages = lambda: [1, 2, 3]
+
+            def packages():
+                return [1, 2, 3]
+
             self.app.autodiscover_tasks(packages)
             self.app.autodiscover_tasks(packages)
             self.assertTrue(import_modules.connect.called)
             self.assertTrue(import_modules.connect.called)
             prom = import_modules.connect.call_args[0][0]
             prom = import_modules.connect.call_args[0][0]

+ 2 - 1
celery/tests/app/test_loaders.py

@@ -99,7 +99,8 @@ class test_LoaderBase(AppCase):
         self.assertEqual(self.loader.conf['foo'], 'bar')
         self.assertEqual(self.loader.conf['foo'], 'bar')
 
 
     def test_import_default_modules(self):
     def test_import_default_modules(self):
-        modnames = lambda l: [m.__name__ for m in l]
+        def modnames(l):
+            return [m.__name__ for m in l]
         self.app.conf.CELERY_IMPORTS = ('os', 'sys')
         self.app.conf.CELERY_IMPORTS = ('os', 'sys')
         self.assertEqual(
         self.assertEqual(
             sorted(modnames(self.loader.import_default_modules())),
             sorted(modnames(self.loader.import_default_modules())),

+ 2 - 2
celery/tests/case.py

@@ -102,8 +102,8 @@ CELERY_TEST_CONFIG = {
         'host': os.environ.get('MONGO_HOST') or 'localhost',
         'host': os.environ.get('MONGO_HOST') or 'localhost',
         'port': os.environ.get('MONGO_PORT') or 27017,
         'port': os.environ.get('MONGO_PORT') or 27017,
         'database': os.environ.get('MONGO_DB') or 'celery_unittests',
         'database': os.environ.get('MONGO_DB') or 'celery_unittests',
-        'taskmeta_collection': (os.environ.get('MONGO_TASKMETA_COLLECTION')
-                                or 'taskmeta_collection'),
+        'taskmeta_collection': (os.environ.get('MONGO_TASKMETA_COLLECTION') or
+                                'taskmeta_collection'),
         'user': os.environ.get('MONGO_USER'),
         'user': os.environ.get('MONGO_USER'),
         'password': os.environ.get('MONGO_PASSWORD'),
         'password': os.environ.get('MONGO_PASSWORD'),
     }
     }

+ 4 - 4
celery/tests/compat_modules/test_compat.py

@@ -5,14 +5,14 @@ from datetime import timedelta
 import sys
 import sys
 sys.modules.pop('celery.task', None)
 sys.modules.pop('celery.task', None)
 
 
-from celery.schedules import schedule
-from celery.task import (
+from celery.schedules import schedule  # noqa
+from celery.task import (  # noqa
     periodic_task,
     periodic_task,
     PeriodicTask
     PeriodicTask
 )
 )
-from celery.utils.timeutils import timedelta_seconds
+from celery.utils.timeutils import timedelta_seconds  # noqa
 
 
-from celery.tests.case import AppCase, depends_on_current_app
+from celery.tests.case import AppCase, depends_on_current_app  # noqa
 
 
 
 
 class test_Task(AppCase):
 class test_Task(AppCase):

+ 2 - 1
celery/tests/events/test_state.py

@@ -26,7 +26,8 @@ except TypeError:  # pragma: no cover
     # Py2.6: Must first convert float to str
     # Py2.6: Must first convert float to str
     _float_to_decimal = str
     _float_to_decimal = str
 else:
 else:
-    _float_to_decimal = lambda f: f  # noqa
+    def _float_to_decimal(f):  # noqa
+        return f
 
 
 
 
 class replay(object):
 class replay(object):

+ 3 - 1
celery/tests/tasks/test_chord.py

@@ -10,7 +10,9 @@ from celery.five import range
 from celery.result import AsyncResult, GroupResult, EagerResult
 from celery.result import AsyncResult, GroupResult, EagerResult
 from celery.tests.case import AppCase, Mock
 from celery.tests.case import AppCase, Mock
 
 
-passthru = lambda x: x
+
+def passthru(x):
+    return x
 
 
 
 
 class ChordCase(AppCase):
 class ChordCase(AppCase):

+ 3 - 1
celery/tests/utils/test_platforms.py

@@ -56,6 +56,7 @@ class test_find_option_with_arg(Case):
             'bar'
             'bar'
         )
         )
 
 
+
 class test_fd_by_path(Case):
 class test_fd_by_path(Case):
 
 
     def test_finds(self):
     def test_finds(self):
@@ -147,7 +148,8 @@ class test_Signals(Case):
 
 
     @patch('signal.signal')
     @patch('signal.signal')
     def test_setitem(self, set):
     def test_setitem(self, set):
-        handle = lambda *a: a
+        def handle(*a):
+            return a
         signals['INT'] = handle
         signals['INT'] = handle
         set.assert_called_with(signal.SIGINT, handle)
         set.assert_called_with(signal.SIGINT, handle)
 
 

+ 3 - 1
celery/tests/utils/test_threads.py

@@ -90,7 +90,9 @@ class test_LocalManager(Case):
         self.assertListEqual(x.locals, [])
         self.assertListEqual(x.locals, [])
         self.assertTrue(x.ident_func)
         self.assertTrue(x.ident_func)
 
 
-        ident = lambda: 1
+        def ident():
+            return 1
+
         loc = Local()
         loc = Local()
         x = LocalManager([loc], ident_func=ident)
         x = LocalManager([loc], ident_func=ident)
         self.assertListEqual(x.locals, [loc])
         self.assertListEqual(x.locals, [loc])

+ 2 - 1
celery/tests/worker/test_hub.py

@@ -162,7 +162,8 @@ class test_Hub(Case):
         e1, e2, e3 = Mock(), Mock(), Mock()
         e1, e2, e3 = Mock(), Mock(), Mock()
         entries = [e1, e2, e3]
         entries = [e1, e2, e3]
 
 
-        reset = lambda: [m.reset() for m in [e1, e2, e3]]
+        def reset():
+            return [m.reset() for m in [e1, e2, e3]]
 
 
         def se():
         def se():
             while 1:
             while 1:

+ 5 - 2
celery/utils/term.py

@@ -17,13 +17,16 @@ from celery.five import string
 
 
 __all__ = ['colored']
 __all__ = ['colored']
 
 
+IS_WINDOWS = platform.system() == 'Windows'
+
 BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8)
 BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8)
 OP_SEQ = '\033[%dm'
 OP_SEQ = '\033[%dm'
 RESET_SEQ = '\033[0m'
 RESET_SEQ = '\033[0m'
 COLOR_SEQ = '\033[1;%dm'
 COLOR_SEQ = '\033[1;%dm'
-fg = lambda s: COLOR_SEQ % s
 
 
-IS_WINDOWS = platform.system() == 'Windows'
+
+def fg(s):
+    return COLOR_SEQ % s
 
 
 
 
 class colored(object):
 class colored(object):

+ 2 - 2
celery/worker/job.py

@@ -563,8 +563,8 @@ class Request(object):
 
 
     @property
     @property
     def store_errors(self):
     def store_errors(self):
-        return (not self.task.ignore_result
-                or self.task.store_errors_even_if_ignored)
+        return (not self.task.ignore_result or
+                self.task.store_errors_even_if_ignored)
 
 
     @property
     @property
     def task_id(self):
     def task_id(self):

+ 0 - 2
docs/_ext/applyxrefs.py

@@ -49,7 +49,6 @@ def has_target(fn):
         if not readok:
         if not readok:
             return (True, None)
             return (True, None)
 
 
-    #print fn, len(lines)
     if len(lines) < 1:
     if len(lines) < 1:
         print("Not touching empty file %s." % fn)
         print("Not touching empty file %s." % fn)
         return (True, None)
         return (True, None)
@@ -71,7 +70,6 @@ def main(argv=None):
             files.extend([(dirpath, f) for f in filenames])
             files.extend([(dirpath, f) for f in filenames])
     files.sort()
     files.sort()
     files = [os.path.join(p, fn) for p, fn in files if fn.endswith('.txt')]
     files = [os.path.join(p, fn) for p, fn in files if fn.endswith('.txt')]
-    #print files
 
 
     for fn in files:
     for fn in files:
         if fn in DONT_TOUCH:
         if fn in DONT_TOUCH:

+ 6 - 14
docs/conf.py

@@ -10,7 +10,7 @@ this = os.path.dirname(os.path.abspath(__file__))
 # absolute, like shown here.
 # absolute, like shown here.
 sys.path.insert(0, os.path.join(this, os.pardir))
 sys.path.insert(0, os.path.join(this, os.pardir))
 sys.path.append(os.path.join(this, '_ext'))
 sys.path.append(os.path.join(this, '_ext'))
-import celery
+import celery  # noqa
 
 
 # General configuration
 # General configuration
 # ---------------------
 # ---------------------
@@ -67,10 +67,10 @@ release = celery.__version__
 
 
 exclude_trees = ['.build']
 exclude_trees = ['.build']
 
 
-#unused_docs = [
-#    'xreftest.rst',
-#    'tutorials/otherqueues',
-#]
+unused_docs = [
+    'xreftest.rst',
+    'tutorials/otherqueues',
+]
 
 
 # If true, '()' will be appended to :func: etc. cross-reference text.
 # If true, '()' will be appended to :func: etc. cross-reference text.
 add_function_parentheses = True
 add_function_parentheses = True
@@ -115,7 +115,7 @@ html_sidebars = {
            'sourcelink.html', 'searchbox.html'],
            'sourcelink.html', 'searchbox.html'],
 }
 }
 
 
-### Issuetracker
+# ### Issuetracker
 
 
 github_project = 'celery/celery'
 github_project = 'celery/celery'
 
 
@@ -141,14 +141,6 @@ epub_identifier = 'celeryproject.org'
 # A unique identification for the text.
 # A unique identification for the text.
 epub_uid = 'Celery Manual, Version {0}'.format(version)
 epub_uid = 'Celery Manual, Version {0}'.format(version)
 
 
-# HTML files that should be inserted before the pages created by sphinx.
-# The format is a list of tuples containing the path and title.
-#epub_pre_files = []
-
-# HTML files shat should be inserted after the pages created by sphinx.
-# The format is a list of tuples containing the path and title.
-#epub_post_files = []
-
 # A list of files that should not be packed into the epub file.
 # A list of files that should not be packed into the epub file.
 epub_exclude_files = ['search.html']
 epub_exclude_files = ['search.html']
 
 

+ 3 - 2
examples/app/myapp.py

@@ -27,10 +27,11 @@ from celery import Celery
 app = Celery(
 app = Celery(
     'myapp',
     'myapp',
     broker='amqp://guest@localhost//',
     broker='amqp://guest@localhost//',
-    # add result backend here if needed.
-    #backend='rpc'
+    # ### add result backend here if needed.
+    # backend='rpc'
 )
 )
 
 
+
 @app.task()
 @app.task()
 def add(x, y):
 def add(x, y):
     return x + y
     return x + y

+ 1 - 1
examples/django/proj/__init__.py

@@ -2,4 +2,4 @@ from __future__ import absolute_import
 
 
 # This will make sure the app is always imported when
 # This will make sure the app is always imported when
 # Django starts so that shared_task will use this app.
 # Django starts so that shared_task will use this app.
-from .celery import app as celery_app
+from .celery import app as celery_app  # noqa

+ 1 - 1
examples/django/proj/celery.py

@@ -7,7 +7,7 @@ from celery import Celery
 # set the default Django settings module for the 'celery' program.
 # set the default Django settings module for the 'celery' program.
 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
 
 
-from django.conf import settings
+from django.conf import settings  # noqa
 
 
 app = Celery('proj')
 app = Celery('proj')
 
 

+ 1 - 1
examples/django/proj/wsgi.py

@@ -20,7 +20,7 @@ os.environ.setdefault("DJANGO_SETTINGS_MODULE", "proj.settings")
 # This application object is used by any WSGI server configured to use this
 # This application object is used by any WSGI server configured to use this
 # file. This includes Django's development server, if the WSGI_APPLICATION
 # file. This includes Django's development server, if the WSGI_APPLICATION
 # setting points here.
 # setting points here.
-from django.core.wsgi import get_wsgi_application
+from django.core.wsgi import get_wsgi_application  # noqa
 application = get_wsgi_application()
 application = get_wsgi_application()
 
 
 # Apply WSGI middleware here.
 # Apply WSGI middleware here.

+ 1 - 1
examples/eventlet/celeryconfig.py

@@ -2,7 +2,7 @@ import os
 import sys
 import sys
 sys.path.insert(0, os.getcwd())
 sys.path.insert(0, os.getcwd())
 
 
-## Start worker with -P eventlet
+# ### Start worker with -P eventlet
 # Never use the CELERYD_POOL setting as that will patch
 # Never use the CELERYD_POOL setting as that will patch
 # the worker too late.
 # the worker too late.
 
 

+ 1 - 1
examples/gevent/celeryconfig.py

@@ -2,7 +2,7 @@ import os
 import sys
 import sys
 sys.path.insert(0, os.getcwd())
 sys.path.insert(0, os.getcwd())
 
 
-### Note: Start worker with -P gevent,
+# ## Note: Start worker with -P gevent,
 # do not use the CELERYD_POOL option.
 # do not use the CELERYD_POOL option.
 
 
 BROKER_URL = 'amqp://guest:guest@localhost:5672//'
 BROKER_URL = 'amqp://guest:guest@localhost:5672//'

+ 1 - 1
examples/next-steps/setup.py

@@ -15,6 +15,6 @@ setup(
     zip_safe=False,
     zip_safe=False,
     install_requires=[
     install_requires=[
         'celery>=3.0',
         'celery>=3.0',
-        #'requests',
+        # 'requests',
     ],
     ],
 )
 )

+ 4 - 2
extra/release/bump_version.py

@@ -12,11 +12,13 @@ import sys
 from contextlib import contextmanager
 from contextlib import contextmanager
 from tempfile import NamedTemporaryFile
 from tempfile import NamedTemporaryFile
 
 
-rq = lambda s: s.strip("\"'")
-
 str_t = str if sys.version_info[0] >= 3 else basestring
 str_t = str if sys.version_info[0] >= 3 else basestring
 
 
 
 
+def rq(s):
+    return s.strip("\"'")
+
+
 def cmd(*args):
 def cmd(*args):
     return subprocess.Popen(args, stdout=subprocess.PIPE).communicate()[0]
     return subprocess.Popen(args, stdout=subprocess.PIPE).communicate()[0]
 
 

+ 7 - 7
funtests/benchmarks/bench_worker.py

@@ -8,16 +8,16 @@ os.environ.update(
     USE_FAST_LOCALS='yes',
     USE_FAST_LOCALS='yes',
 )
 )
 
 
-import anyjson
+import anyjson  # noqa
 JSONIMP = os.environ.get('JSONIMP')
 JSONIMP = os.environ.get('JSONIMP')
 if JSONIMP:
 if JSONIMP:
     anyjson.force_implementation(JSONIMP)
     anyjson.force_implementation(JSONIMP)
 
 
 print('anyjson implementation: {0!r}'.format(anyjson.implementation.name))
 print('anyjson implementation: {0!r}'.format(anyjson.implementation.name))
 
 
-from celery import Celery, group
-from celery.five import range
-from kombu.five import monotonic
+from celery import Celery, group  # noqa
+from celery.five import range  # noqa
+from kombu.five import monotonic  # noqa
 
 
 DEFAULT_ITS = 40000
 DEFAULT_ITS = 40000
 
 
@@ -54,8 +54,9 @@ def tdiff(then):
 
 
 @app.task(cur=0, time_start=None, queue='bench.worker', bare=True)
 @app.task(cur=0, time_start=None, queue='bench.worker', bare=True)
 def it(_, n):
 def it(_, n):
-    i = it.cur  # use internal counter, as ordering can be skewed
-                # by previous runs, or the broker.
+    # use internal counter, as ordering can be skewed
+    # by previous runs, or the broker.
+    i = it.cur
     if i and not i % 5000:
     if i and not i % 5000:
         print('({0} so far: {1}s)'.format(i, tdiff(it.subt)), file=sys.stderr)
         print('({0} so far: {1}s)'.format(i, tdiff(it.subt)), file=sys.stderr)
         it.subt = monotonic()
         it.subt = monotonic()
@@ -77,7 +78,6 @@ def bench_apply(n=DEFAULT_ITS):
     task = it._get_current_object()
     task = it._get_current_object()
     with app.producer_or_acquire() as producer:
     with app.producer_or_acquire() as producer:
         [task.apply_async((i, n), producer=producer) for i in range(n)]
         [task.apply_async((i, n), producer=producer) for i in range(n)]
-    #group(s(i, n) for i in range(n))()
     print('-- apply {0} tasks: {1}s'.format(n, monotonic() - time_start))
     print('-- apply {0} tasks: {1}s'.format(n, monotonic() - time_start))
 
 
 
 

+ 0 - 1
funtests/stress/stress/fbi.py

@@ -64,4 +64,3 @@ class FBI(object):
             self.ffwd()
             self.ffwd()
             for tid in ids:
             for tid in ids:
                 print(self.state_of(tid), file=file)
                 print(self.state_of(tid), file=file)
-            #print(self.query(ids), file=file)

+ 1 - 0
funtests/suite/__init__.py

@@ -1,6 +1,7 @@
 import os
 import os
 import sys
 import sys
 
 
+sys.path.insert(0, os.getcwd())
 sys.path.insert(0, os.path.join(os.getcwd(), os.pardir))
 sys.path.insert(0, os.path.join(os.getcwd(), os.pardir))
 
 
 config = os.environ.setdefault('CELERY_FUNTEST_CONFIG_MODULE',
 config = os.environ.setdefault('CELERY_FUNTEST_CONFIG_MODULE',

+ 3 - 7
funtests/suite/test_basic.py

@@ -1,17 +1,13 @@
 import operator
 import operator
-import os
-import sys
-
-# funtest config
-sys.path.insert(0, os.getcwd())
-sys.path.insert(0, os.path.join(os.getcwd(), os.pardir))
-import suite  # noqa
 
 
 from celery.five import range
 from celery.five import range
 from celery.tests.case import unittest
 from celery.tests.case import unittest
 from celery.tests.functional import tasks
 from celery.tests.functional import tasks
 from celery.tests.functional.case import WorkerCase
 from celery.tests.functional.case import WorkerCase
 
 
+# funtest config
+import suite  # noqa
+
 
 
 class test_basic(WorkerCase):
 class test_basic(WorkerCase):
 
 

+ 1 - 3
funtests/suite/test_leak.py

@@ -6,13 +6,11 @@ import sys
 import shlex
 import shlex
 import subprocess
 import subprocess
 
 
-sys.path.insert(0, os.getcwd())
-sys.path.insert(0, os.path.join(os.getcwd(), os.pardir))
-
 from celery import current_app
 from celery import current_app
 from celery.five import range
 from celery.five import range
 from celery.tests.case import SkipTest, unittest
 from celery.tests.case import SkipTest, unittest
 
 
+# funtest config
 import suite  # noqa
 import suite  # noqa
 
 
 GET_RSIZE = '/bin/ps -p {pid} -o rss='
 GET_RSIZE = '/bin/ps -p {pid} -o rss='

+ 7 - 3
setup.py

@@ -14,6 +14,7 @@ except ImportError:
     is_setuptools = False
     is_setuptools = False
 
 
 import os
 import os
+import re
 import sys
 import sys
 import codecs
 import codecs
 
 
@@ -88,11 +89,13 @@ PYPY = hasattr(sys, 'pypy_version_info')
 
 
 # -*- Distribution Meta -*-
 # -*- Distribution Meta -*-
 
 
-import re
 re_meta = re.compile(r'__(\w+?)__\s*=\s*(.*)')
 re_meta = re.compile(r'__(\w+?)__\s*=\s*(.*)')
 re_vers = re.compile(r'VERSION\s*=.*?\((.*?)\)')
 re_vers = re.compile(r'VERSION\s*=.*?\((.*?)\)')
 re_doc = re.compile(r'^"""(.+?)"""')
 re_doc = re.compile(r'^"""(.+?)"""')
-rq = lambda s: s.strip("\"'")
+
+
+def rq(s):
+    return s.strip("\"'")
 
 
 
 
 def add_default(m):
 def add_default(m):
@@ -167,7 +170,8 @@ if CELERY_COMPAT_PROGRAMS:
     ])
     ])
 
 
 if is_setuptools:
 if is_setuptools:
-    extras = lambda *p: reqs('extras', *p)
+    def extras(*p):
+        return reqs('extras', *p)
     # Celery specific
     # Celery specific
     specific_list = ['auth', 'cassandra', 'memcache', 'couchbase', 'threads',
     specific_list = ['auth', 'cassandra', 'memcache', 'couchbase', 'threads',
                      'eventlet', 'gevent', 'msgpack', 'yaml', 'redis',
                      'eventlet', 'gevent', 'msgpack', 'yaml', 'redis',