Ask Solem преди 9 години
родител
ревизия
5106352570

+ 4 - 3
celery/__init__.py

@@ -8,6 +8,9 @@
 
 from __future__ import absolute_import, print_function, unicode_literals
 
+import os
+import sys
+
 from collections import namedtuple
 
 version_info_t = namedtuple(
@@ -31,8 +34,6 @@ VERSION_BANNER = '{0} ({1})'.format(__version__, SERIES)
 
 # -eof meta-
 
-import os
-import sys
 if os.environ.get('C_IMPDEBUG'):  # pragma: no cover
     from .five import builtins
     real_import = builtins.__import__
@@ -128,7 +129,7 @@ def maybe_patch_concurrency(argv=sys.argv,
         concurrency.get_implementation(pool)
 
 # Lazy loading
-from celery import five
+from celery import five  # noqa
 
 old_module, new_module = five.recreate_module(  # pragma: no cover
     __name__,

+ 2 - 1
celery/app/base.py

@@ -247,7 +247,8 @@ class Celery(object):
 
             def _create_task_cls(fun):
                 if shared:
-                    cons = lambda app: app._task_from_fun(fun, **opts)
+                    def cons(app):
+                        return app._task_from_fun(fun, **opts)
                     cons.__name__ = fun.__name__
                     connect_on_app_finalize(cons)
                 if not lazy or self.finalized:

+ 2 - 1
celery/apps/worker.py

@@ -318,7 +318,8 @@ if not is_jython:  # pragma: no cover
         exitcode=EX_FAILURE,
     )
 else:  # pragma: no cover
-    install_worker_int_handler = lambda *a, **kw: None
+    def install_worker_int_handler(*args, **kwargs):
+        pass
 
 
 def _reload_current_worker():

+ 8 - 14
celery/backends/database/__init__.py

@@ -22,25 +22,19 @@ from .models import Task
 from .models import TaskSet
 from .session import SessionManager
 
+try:
+    from sqlalchemy.exc import DatabaseError, InvalidRequestError
+    from sqlalchemy.orm.exc import StaleDataError
+except ImportError:
+    raise ImproperlyConfigured(
+        'The database result backend requires SQLAlchemy to be installed.'
+        'See http://pypi.python.org/pypi/SQLAlchemy')
+
 logger = logging.getLogger(__name__)
 
 __all__ = ['DatabaseBackend']
 
 
-def _sqlalchemy_installed():
-    try:
-        import sqlalchemy
-    except ImportError:
-        raise ImproperlyConfigured(
-            'The database result backend requires SQLAlchemy to be installed.'
-            'See http://pypi.python.org/pypi/SQLAlchemy')
-    return sqlalchemy
-_sqlalchemy_installed()
-
-from sqlalchemy.exc import DatabaseError, InvalidRequestError
-from sqlalchemy.orm.exc import StaleDataError
-
-
 @contextmanager
 def session_cleanup(session):
     try:

+ 9 - 9
celery/backends/mongodb.py

@@ -10,6 +10,15 @@ from __future__ import absolute_import
 
 from datetime import datetime, timedelta
 
+from kombu.syn import detect_environment
+from kombu.utils import cached_property
+from kombu.exceptions import EncodeError
+from celery import states
+from celery.exceptions import ImproperlyConfigured
+from celery.five import string_t, items
+
+from .base import BaseBackend
+
 try:
     import pymongo
 except ImportError:  # pragma: no cover
@@ -25,15 +34,6 @@ else:                                       # pragma: no cover
     Binary = None                           # noqa
     InvalidDocument = None                  # noqa
 
-from kombu.syn import detect_environment
-from kombu.utils import cached_property
-from kombu.exceptions import EncodeError
-from celery import states
-from celery.exceptions import ImproperlyConfigured
-from celery.five import string_t, items
-
-from .base import BaseBackend
-
 __all__ = ['MongoBackend']
 
 

+ 2 - 4
celery/backends/riak.py

@@ -32,7 +32,6 @@ if sys.version_info[0] == 3:
     def to_bytes(s):
         return s.encode() if isinstance(s, str) else s
 
-
     def str_decode(s, encoding):
         return to_bytes(s).decode(encoding)
 
@@ -42,7 +41,6 @@ else:
         return s.decode("ascii")
 
 
-
 def is_ascii(s):
     try:
         str_decode(s, 'ascii')
@@ -118,8 +116,8 @@ class RiakBackend(KeyValueStoreBackend):
     def _get_bucket(self):
         """Connect to our bucket."""
         if (
-            self._client is None or not self._client.is_alive()
-            or not self._bucket
+            self._client is None or not self._client.is_alive() or
+            not self._bucket
         ):
             self._bucket = self.client.bucket(self.bucket_name)
         return self._bucket

+ 7 - 7
celery/beat.py

@@ -185,9 +185,9 @@ class Scheduler(object):
                  Producer=None, lazy=False, sync_every_tasks=None, **kwargs):
         self.app = app
         self.data = maybe_evaluate({} if schedule is None else schedule)
-        self.max_interval = (max_interval
-                             or app.conf.CELERYBEAT_MAX_LOOP_INTERVAL
-                             or self.max_interval)
+        self.max_interval = (max_interval or
+                             app.conf.CELERYBEAT_MAX_LOOP_INTERVAL or
+                             self.max_interval)
         self.Producer = Producer or app.amqp.Producer
         self._heap = None
         self.sync_every_tasks = (
@@ -236,8 +236,8 @@ class Scheduler(object):
         """
 
         def _when(entry, next_time_to_run):
-            return (mktime(entry.schedule.now().timetuple())
-                    + (adjust(next_time_to_run) or 0))
+            return (mktime(entry.schedule.now().timetuple()) +
+                    (adjust(next_time_to_run) or 0))
 
         adjust = self.adjust
         max_interval = self.max_interval
@@ -474,8 +474,8 @@ class Service(object):
     def __init__(self, app, max_interval=None, schedule_filename=None,
                  scheduler_cls=None):
         self.app = app
-        self.max_interval = (max_interval
-                             or app.conf.CELERYBEAT_MAX_LOOP_INTERVAL)
+        self.max_interval = (max_interval or
+                             app.conf.CELERYBEAT_MAX_LOOP_INTERVAL)
         self.scheduler_cls = scheduler_cls or self.scheduler_cls
         self.schedule_filename = (
             schedule_filename or app.conf.CELERYBEAT_SCHEDULE_FILENAME)

+ 3 - 3
celery/bin/beat.py

@@ -87,9 +87,9 @@ class beat(Command):
                     default=c.CELERYBEAT_SCHEDULE_FILENAME),
              Option('--max-interval', type='float'),
              Option('-S', '--scheduler', dest='scheduler_cls'),
-             Option('-l', '--loglevel', default=c.CELERYBEAT_LOG_LEVEL))
-            + daemon_options(default_pidfile='celerybeat.pid')
-            + tuple(self.app.user_options['beat'])
+             Option('-l', '--loglevel', default=c.CELERYBEAT_LOG_LEVEL)) +
+            daemon_options(default_pidfile='celerybeat.pid') +
+            tuple(self.app.user_options['beat'])
         )
 
 

+ 2 - 1
celery/bin/celery.py

@@ -117,7 +117,8 @@ class list_(Command):
         except NotImplementedError:
             raise self.Error('Your transport cannot list bindings.')
 
-        fmt = lambda q, e, r: self.out('{0:<28} {1:<28} {2}'.format(q, e, r))
+        def fmt(q, e, r):
+            return self.out('{0:<28} {1:<28} {2}'.format(q, e, r))
         fmt('Queue', 'Exchange', 'Routing Key')
         fmt('-' * 16, '-' * 16, '-' * 16)
         for b in bindings:

+ 3 - 3
celery/bin/events.py

@@ -125,9 +125,9 @@ class events(Command):
              Option('-F', '--frequency', '--freq',
                     type='float', default=1.0),
              Option('-r', '--maxrate'),
-             Option('-l', '--loglevel', default='INFO'))
-            + daemon_options(default_pidfile='celeryev.pid')
-            + tuple(self.app.user_options['events'])
+             Option('-l', '--loglevel', default='INFO')) +
+            daemon_options(default_pidfile='celeryev.pid') +
+            tuple(self.app.user_options['events'])
         )
 
 

+ 5 - 4
celery/concurrency/eventlet.py

@@ -28,12 +28,13 @@ for mod in (mod for mod in sys.modules if mod.startswith(RACE_MODS)):
             import warnings
             warnings.warn(RuntimeWarning(W_RACE % side))
 
-from kombu.async import timer as _timer
+# idiotic pep8.py does not allow expressions before imports
+# so have to silence errors here
+from kombu.async import timer as _timer  # noqa
 
+from celery import signals  # noqa
 
-from celery import signals
-
-from . import base
+from . import base  # noqa
 
 
 def apply_target(target, args=(), kwargs={}, callback=None,

+ 2 - 5
celery/concurrency/prefork.py

@@ -154,10 +154,7 @@ class TaskPool(BasePool):
             self._pool.close()
 
     def _get_info(self):
-        try:
-            write_stats = self._pool.human_write_stats
-        except AttributeError:
-            write_stats = lambda: 'N/A'  # only supported by asynpool
+        write_stats = getattr(self._pool, 'human_write_stats', None)
         return {
             'max-concurrency': self.limit,
             'processes': [p.pid for p in self._pool._pool],
@@ -165,7 +162,7 @@ class TaskPool(BasePool):
             'put-guarded-by-semaphore': self.putlocks,
             'timeouts': (self._pool.soft_timeout or 0,
                          self._pool.timeout or 0),
-            'writes': write_stats()
+            'writes': write_stats() if write_stats is not None else 'N/A',
         }
 
     @property

+ 2 - 2
celery/events/cursesmon.py

@@ -318,8 +318,8 @@ class CursesMonitor(object):  # pragma: no cover
         def alert_callback(my, mx, xs):
             y = count(xs)
             task = self.state.tasks[self.selected_task]
-            result = (getattr(task, 'result', None)
-                      or getattr(task, 'exception', None))
+            result = (getattr(task, 'result', None) or
+                      getattr(task, 'exception', None))
             for line in wrap(result, mx - 2):
                 self.win.addstr(next(y), 3, line)
 

+ 13 - 12
celery/five.py

@@ -10,14 +10,24 @@
 """
 from __future__ import absolute_import
 
-__all__ = [
-    'class_property', 'reclassmethod', 'create_module', 'recreate_module',
-]
+import operator
+import sys
+
+from importlib import import_module
+from types import ModuleType
 
 # extends amqp.five
 from amqp.five import *  # noqa
 from amqp.five import __all__ as _all_five
 
+try:
+    from functools import reduce
+except ImportError:
+    pass
+
+__all__ = [
+    'class_property', 'reclassmethod', 'create_module', 'recreate_module',
+]
 __all__ += _all_five
 
 #  ############# Module Generation ##########################
@@ -26,17 +36,8 @@ __all__ += _all_five
 # recreate modules, either for lazy loading or
 # to create old modules at runtime instead of
 # having them litter the source tree.
-import operator
-import sys
 
 # import fails in python 2.5. fallback to reduce in stdlib
-try:
-    from functools import reduce
-except ImportError:
-    pass
-
-from importlib import import_module
-from types import ModuleType
 
 MODULE_DEPRECATED = """
 The module %s is deprecated and will be removed in a future version.

+ 6 - 5
celery/fixups/django.py

@@ -4,11 +4,6 @@ import os
 import sys
 import warnings
 
-if sys.version_info[0] < 3 and not hasattr(sys, 'pypy_version_info'):
-    from StringIO import StringIO
-else:
-    from io import StringIO
-
 from kombu.utils import cached_property, symbol_by_name
 
 from datetime import datetime
@@ -18,6 +13,12 @@ from celery import signals
 from celery.app import default_app
 from celery.exceptions import FixupWarning
 
+if sys.version_info[0] < 3 and not hasattr(sys, 'pypy_version_info'):
+    from StringIO import StringIO
+else:
+    from io import StringIO
+
+
 __all__ = ['DjangoFixup', 'fixup']
 
 ERR_NOT_INSTALLED = """\

+ 136 - 46
celery/local.py

@@ -154,54 +154,144 @@ class Proxy(object):
     def __delslice__(self, i, j):
         del self._get_current_object()[i:j]
 
-    __setattr__ = lambda x, n, v: setattr(x._get_current_object(), n, v)
-    __delattr__ = lambda x, n: delattr(x._get_current_object(), n)
-    __str__ = lambda x: str(x._get_current_object())
-    __lt__ = lambda x, o: x._get_current_object() < o
-    __le__ = lambda x, o: x._get_current_object() <= o
-    __eq__ = lambda x, o: x._get_current_object() == o
-    __ne__ = lambda x, o: x._get_current_object() != o
-    __gt__ = lambda x, o: x._get_current_object() > o
-    __ge__ = lambda x, o: x._get_current_object() >= o
-    __hash__ = lambda x: hash(x._get_current_object())
-    __call__ = lambda x, *a, **kw: x._get_current_object()(*a, **kw)
-    __len__ = lambda x: len(x._get_current_object())
-    __getitem__ = lambda x, i: x._get_current_object()[i]
-    __iter__ = lambda x: iter(x._get_current_object())
-    __contains__ = lambda x, i: i in x._get_current_object()
-    __getslice__ = lambda x, i, j: x._get_current_object()[i:j]
-    __add__ = lambda x, o: x._get_current_object() + o
-    __sub__ = lambda x, o: x._get_current_object() - o
-    __mul__ = lambda x, o: x._get_current_object() * o
-    __floordiv__ = lambda x, o: x._get_current_object() // o
-    __mod__ = lambda x, o: x._get_current_object() % o
-    __divmod__ = lambda x, o: x._get_current_object().__divmod__(o)
-    __pow__ = lambda x, o: x._get_current_object() ** o
-    __lshift__ = lambda x, o: x._get_current_object() << o
-    __rshift__ = lambda x, o: x._get_current_object() >> o
-    __and__ = lambda x, o: x._get_current_object() & o
-    __xor__ = lambda x, o: x._get_current_object() ^ o
-    __or__ = lambda x, o: x._get_current_object() | o
-    __div__ = lambda x, o: x._get_current_object().__div__(o)
-    __truediv__ = lambda x, o: x._get_current_object().__truediv__(o)
-    __neg__ = lambda x: -(x._get_current_object())
-    __pos__ = lambda x: +(x._get_current_object())
-    __abs__ = lambda x: abs(x._get_current_object())
-    __invert__ = lambda x: ~(x._get_current_object())
-    __complex__ = lambda x: complex(x._get_current_object())
-    __int__ = lambda x: int(x._get_current_object())
-    __float__ = lambda x: float(x._get_current_object())
-    __oct__ = lambda x: oct(x._get_current_object())
-    __hex__ = lambda x: hex(x._get_current_object())
-    __index__ = lambda x: x._get_current_object().__index__()
-    __coerce__ = lambda x, o: x._get_current_object().__coerce__(o)
-    __enter__ = lambda x: x._get_current_object().__enter__()
-    __exit__ = lambda x, *a, **kw: x._get_current_object().__exit__(*a, **kw)
-    __reduce__ = lambda x: x._get_current_object().__reduce__()
+    def __setattr__(self, name, value):
+        setattr(self._get_current_object(), name, value)
+
+    def __delattr__(self, name):
+        delattr(self._get_current_object(), name)
+
+    def __str__(self):
+        return str(self._get_current_object())
+
+    def __lt__(self, other):
+        return self._get_current_object() < other
+
+    def __le__(self, other):
+        return self._get_current_object() <= other
+
+    def __eq__(self, other):
+        return self._get_current_object() == other
+
+    def __ne__(self, other):
+        return self._get_current_object() != other
+
+    def __gt__(self, other):
+        return self._get_current_object() > other
+
+    def __ge__(self, other):
+        return self._get_current_object() >= other
+
+    def __hash__(self):
+        return hash(self._get_current_object())
+
+    def __call__(self, *a, **kw):
+        return self._get_current_object()(*a, **kw)
+
+    def __len__(self):
+        return len(self._get_current_object())
+
+    def __getitem__(self, i):
+        return self._get_current_object()[i]
+
+    def __iter__(self):
+        return iter(self._get_current_object())
+
+    def __contains__(self, i):
+        return i in self._get_current_object()
+
+    def __getslice__(self, i, j):
+        return self._get_current_object()[i:j]
+
+    def __add__(self, other):
+        return self._get_current_object() + other
+
+    def __sub__(self, other):
+        return self._get_current_object() - other
+
+    def __mul__(self, other):
+        return self._get_current_object() * other
+
+    def __floordiv__(self, other):
+        return self._get_current_object() // other
+
+    def __mod__(self, other):
+        return self._get_current_object() % other
+
+    def __divmod__(self, other):
+        return self._get_current_object().__divmod__(other)
+
+    def __pow__(self, other):
+        return self._get_current_object() ** other
+
+    def __lshift__(self, other):
+        return self._get_current_object() << other
+
+    def __rshift__(self, other):
+        return self._get_current_object() >> other
+
+    def __and__(self, other):
+        return self._get_current_object() & other
+
+    def __xor__(self, other):
+        return self._get_current_object() ^ other
+
+    def __or__(self, other):
+        return self._get_current_object() | other
+
+    def __div__(self, other):
+        return self._get_current_object().__div__(other)
+
+    def __truediv__(self, other):
+        return self._get_current_object().__truediv__(other)
+
+    def __neg__(self):
+        return -(self._get_current_object())
+
+    def __pos__(self):
+        return +(self._get_current_object())
+
+    def __abs__(self):
+        return abs(self._get_current_object())
+
+    def __invert__(self):
+        return ~(self._get_current_object())
+
+    def __complex__(self):
+        return complex(self._get_current_object())
+
+    def __int__(self):
+        return int(self._get_current_object())
+
+    def __float__(self):
+        return float(self._get_current_object())
+
+    def __oct__(self):
+        return oct(self._get_current_object())
+
+    def __hex__(self):
+        return hex(self._get_current_object())
+
+    def __index__(self):
+        return self._get_current_object().__index__()
+
+    def __coerce__(self, other):
+        return self._get_current_object().__coerce__(other)
+
+    def __enter__(self):
+        return self._get_current_object().__enter__()
+
+    def __exit__(self, *a, **kw):
+        return self._get_current_object().__exit__(*a, **kw)
+
+    def __reduce__(self):
+        return self._get_current_object().__reduce__()
 
     if not PY3:
-        __cmp__ = lambda x, o: cmp(x._get_current_object(), o)  # noqa
-        __long__ = lambda x: long(x._get_current_object())      # noqa
+        def __cmp__(self, other):
+            return cmp(self._get_current_object(), other)  # noqa
+
+        def __long__(self):
+            return long(self._get_current_object())  # noqa
 
 
 class PromiseProxy(Proxy):

+ 11 - 4
celery/tests/app/test_app.py

@@ -143,7 +143,10 @@ class test_App(AppCase):
     def test_add_defaults(self):
         self.assertFalse(self.app.configured)
         _conf = {'FOO': 300}
-        conf = lambda: _conf
+
+        def conf():
+            return _conf
+
         self.app.add_defaults(conf)
         self.assertIn(conf, self.app._pending_defaults)
         self.assertFalse(self.app.configured)
@@ -196,8 +199,11 @@ class test_App(AppCase):
             ['proj.A', 'proj.B'], 'tasks',
         )
         self.app.loader.autodiscover_tasks = Mock()
+
+        def lazy_list():
+            return ['proj.A', 'proj.B']
         self.app.autodiscover_tasks(
-            lambda: ['proj.A', 'proj.B'],
+            lazy_list,
             related_name='george',
             force=True,
         )
@@ -207,8 +213,9 @@ class test_App(AppCase):
 
     def test_autodiscover_tasks_lazy(self):
         with patch('celery.signals.import_modules') as import_modules:
-            packages = lambda: [1, 2, 3]
-            self.app.autodiscover_tasks(packages)
+            def lazy_list():
+                return [1, 2, 3]
+            self.app.autodiscover_tasks(lazy_list)
             self.assertTrue(import_modules.connect.called)
             prom = import_modules.connect.call_args[0][0]
             self.assertIsInstance(prom, promise)

+ 2 - 1
celery/tests/app/test_loaders.py

@@ -99,7 +99,8 @@ class test_LoaderBase(AppCase):
         self.assertEqual(self.loader.conf['foo'], 'bar')
 
     def test_import_default_modules(self):
-        modnames = lambda l: [m.__name__ for m in l]
+        def modnames(l):
+            return [m.__name__ for m in l]
         self.app.conf.CELERY_IMPORTS = ('os', 'sys')
         self.assertEqual(
             sorted(modnames(self.loader.import_default_modules())),

+ 0 - 7
celery/tests/backends/test_database.py

@@ -12,7 +12,6 @@ from celery.tests.case import (
     AppCase,
     SkipTest,
     depends_on_current_app,
-    mask_modules,
     skip_if_pypy,
     skip_if_jython,
 )
@@ -56,12 +55,6 @@ class test_DatabaseBackend(AppCase):
             raises(max_retries=5)
         self.assertEqual(calls[0], 5)
 
-    def test_missing_SQLAlchemy_raises_ImproperlyConfigured(self):
-        with mask_modules('sqlalchemy'):
-            from celery.backends.database import _sqlalchemy_installed
-            with self.assertRaises(ImproperlyConfigured):
-                _sqlalchemy_installed()
-
     def test_missing_dburi_raises_ImproperlyConfigured(self):
         self.app.conf.CELERY_RESULT_DBURI = None
         with self.assertRaises(ImproperlyConfigured):

+ 2 - 2
celery/tests/case.py

@@ -103,8 +103,8 @@ CELERY_TEST_CONFIG = {
         'host': os.environ.get('MONGO_HOST') or 'localhost',
         'port': os.environ.get('MONGO_PORT') or 27017,
         'database': os.environ.get('MONGO_DB') or 'celery_unittests',
-        'taskmeta_collection': (os.environ.get('MONGO_TASKMETA_COLLECTION')
-                                or 'taskmeta_collection'),
+        'taskmeta_collection': (os.environ.get('MONGO_TASKMETA_COLLECTION') or
+                                'taskmeta_collection'),
         'user': os.environ.get('MONGO_USER'),
         'password': os.environ.get('MONGO_PASSWORD'),
     }

+ 1 - 4
celery/tests/compat_modules/test_compat.py

@@ -2,16 +2,13 @@ from __future__ import absolute_import
 
 from datetime import timedelta
 
-import sys
-sys.modules.pop('celery.task', None)
-
 from celery.schedules import schedule
 from celery.task import (
     periodic_task,
     PeriodicTask
 )
 
-from celery.tests.case import AppCase, depends_on_current_app
+from celery.tests.case import AppCase, depends_on_current_app  # noqa
 
 
 @depends_on_current_app

+ 2 - 1
celery/tests/events/test_state.py

@@ -26,7 +26,8 @@ except TypeError:  # pragma: no cover
     # Py2.6: Must first convert float to str
     _float_to_decimal = str
 else:
-    _float_to_decimal = lambda f: f  # noqa
+    def _float_to_decimal(f):  # noqa
+        return f
 
 
 class replay(object):

+ 3 - 1
celery/tests/tasks/test_chord.py

@@ -10,7 +10,9 @@ from celery.five import range
 from celery.result import AsyncResult, GroupResult, EagerResult
 from celery.tests.case import AppCase, Mock
 
-passthru = lambda x: x
+
+def passthru(x):
+    return x
 
 
 class ChordCase(AppCase):

+ 1 - 1
celery/tests/utils/test_functional.py

@@ -64,7 +64,7 @@ class test_LRUCache(Case):
         self.assertEqual(list(x.keys()), [3, 6, 7])
 
     def assertSafeIter(self, method, interval=0.01, size=10000):
-        if sys.version_info >= (3,5):
+        if sys.version_info >= (3, 5):
             raise SkipTest('Fails on Py3.5')
         from threading import Thread, Event
         from time import sleep

+ 2 - 1
celery/tests/utils/test_platforms.py

@@ -137,7 +137,8 @@ class test_Signals(Case):
 
     @patch('signal.signal')
     def test_setitem(self, set):
-        handle = lambda *a: a
+        def handle(*args):
+            return args
         signals['INT'] = handle
         set.assert_called_with(signal.SIGINT, handle)
 

+ 2 - 1
celery/tests/utils/test_threads.py

@@ -90,7 +90,8 @@ class test_LocalManager(Case):
         self.assertListEqual(x.locals, [])
         self.assertTrue(x.ident_func)
 
-        ident = lambda: 1
+        def ident():
+            return 1
         loc = Local()
         x = LocalManager([loc], ident_func=ident)
         self.assertListEqual(x.locals, [loc])

+ 2 - 1
celery/tests/worker/test_hub.py

@@ -162,7 +162,8 @@ class test_Hub(Case):
         e1, e2, e3 = Mock(), Mock(), Mock()
         entries = [e1, e2, e3]
 
-        reset = lambda: [m.reset() for m in [e1, e2, e3]]
+        def reset():
+            return [m.reset() for m in [e1, e2, e3]]
 
         def se():
             while 1:

+ 4 - 1
celery/utils/term.py

@@ -21,11 +21,14 @@ BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8)
 OP_SEQ = '\033[%dm'
 RESET_SEQ = '\033[0m'
 COLOR_SEQ = '\033[1;%dm'
-fg = lambda s: COLOR_SEQ % s
 
 IS_WINDOWS = platform.system() == 'Windows'
 
 
+def fg(s):
+    return COLOR_SEQ % s
+
+
 class colored(object):
     """Terminal colored text.
 

+ 2 - 2
celery/worker/request.py

@@ -409,8 +409,8 @@ class Request(object):
 
     @property
     def store_errors(self):
-        return (not self.task.ignore_result
-                or self.task.store_errors_even_if_ignored)
+        return (not self.task.ignore_result or
+                self.task.store_errors_even_if_ignored)
 
     @property
     def task_id(self):