Ask Solem 8 years ago
parent
commit
2da41c9301

+ 6 - 0
.landscape.yml

@@ -41,5 +41,11 @@ pylint:
         - too-many-instance-attributes
         - bad-builtin
         - abstract-method
+        - global-statement
+        - too-many-public-methods
+        - no-self-use
+        - unnecessary-lambda
+        - too-few-public-methods
+        - attribute-defined-outside-init
     options:
         exclude-protected: _reader, _writer, _popen, _sentinel_poll, _job, _is_alive, _write_to, _scheduled_for, _terminated, _accepted, _set_terminated, _payload, _cancel

+ 1 - 0
celery/__init__.py

@@ -21,6 +21,7 @@ __author__ = 'Ask Solem'
 __contact__ = 'ask@celeryproject.org'
 __homepage__ = 'http://celeryproject.org'
 __docformat__ = 'restructuredtext'
+__keywords__ = 'task job queue distributed messaging actor'
 
 # -eof meta-
 

+ 2 - 2
celery/__main__.py

@@ -12,8 +12,8 @@ def main():
     """Entrypoint to the ``celery`` umbrella command."""
     if 'multi' not in sys.argv:
         maybe_patch_concurrency()
-    from celery.bin.celery import main
-    main()
+    from celery.bin.celery import main as _main
+    _main()
 
 
 if __name__ == '__main__':  # pragma: no cover

+ 3 - 1
celery/_state.py

@@ -90,7 +90,9 @@ def _set_current_app(app):
 if os.environ.get('C_STRICT_APP'):  # pragma: no cover
     def get_current_app():
         """Return the current app."""
-        raise Exception('USES CURRENT APP')
+        raise RuntimeError('USES CURRENT APP')
+elif os.environ.get('C_WARN_APP'):  # pragma: no cover
+    def get_current_app():  # noqa
         import traceback
         print('-- USES CURRENT_APP', file=sys.stderr)  # noqa+
         traceback.print_stack(file=sys.stderr)

+ 8 - 3
celery/app/trace.py

@@ -12,6 +12,11 @@ from __future__ import absolute_import, unicode_literals
 # but in the end it only resulted in bad performance and horrible tracebacks,
 # so instead we now use one closure per task class.
 
+# pylint: disable=redefined-outer-name
+# We cache globals and attribute lookups, so disable this warning.
+# pylint: disable=broad-except
+# We know what we're doing...
+
 import logging
 import os
 import sys
@@ -181,7 +186,7 @@ class TraceInfo(object):
             })
             return einfo
         finally:
-            del(tb)
+            del tb
 
     def handle_failure(self, task, req, store_errors=True, call_errbacks=True):
         """Handle exception."""
@@ -208,7 +213,7 @@ class TraceInfo(object):
             self._log_error(task, req, einfo)
             return einfo
         finally:
-            del(tb)
+            del tb
 
     def _log_error(self, task, req, einfo):
         eobj = einfo.exception = get_pickled_exception(einfo.exception)
@@ -535,7 +540,7 @@ def report_internal_error(task, exc):
                 exc, exc_info.traceback)))
         return exc_info
     finally:
-        del(_tb)
+        del _tb
 
 
 def setup_worker_optimizations(app, hostname=None):

+ 8 - 13
celery/bin/celery.py

@@ -346,8 +346,8 @@ class multi(Command):
 
     def run_from_argv(self, prog_name, argv, command=None):
         from celery.bin.multi import MultiTool
-        multi = MultiTool(quiet=self.quiet, no_color=self.no_color)
-        return multi.execute_from_commandline([command] + argv)
+        cmd = MultiTool(quiet=self.quiet, no_color=self.no_color)
+        return cmd.execute_from_commandline([command] + argv)
 
 
 class list_(Command):
@@ -535,11 +535,11 @@ class result(Command):
 
         if task:
             result_cls = self.app.tasks[task].AsyncResult
-        result = result_cls(task_id)
+        task_result = result_cls(task_id)
         if traceback:
-            value = result.traceback
+            value = task_result.traceback
         else:
-            value = result.get()
+            value = task_result.get()
         self.out(self.pretty(value)[1])
 
 
@@ -849,6 +849,8 @@ class shell(Command):  # pragma: no cover
         import celery
         import celery.task.base
         self.app.loader.import_default_modules()
+
+        # pylint: disable=attribute-defined-outside-init
         self.locals = {
             'app': self.app,
             'celery': self.app,
@@ -1136,7 +1138,7 @@ class CeleryCommand(Command):
             return sys.modules['__main__'].__file__
         return name
 
-    def handle_argv(self, prog_name, argv):
+    def handle_argv(self, prog_name, argv, **kwargs):
         self.prog_name = self.prepare_prog_name(prog_name)
         argv = self._relocate_args_from_start(argv)
         _, argv = self.prepare_args(None, argv)
@@ -1204,12 +1206,5 @@ class CeleryCommand(Command):
             command_classes.append(('Extensions', names, 'magenta'))
 
 
-def command(*args, **kwargs):
-    # Deprecated: Use classmethod
-    #             :meth:`CeleryCommand.register_command` instead.
-    _register = CeleryCommand.register_command
-    return _register(args[0]) if args else _register
-
-
 if __name__ == '__main__':          # pragma: no cover
     main()

+ 24 - 17
celery/bin/graph.py

@@ -35,12 +35,12 @@ class graph(Command):
         worker = self.app.WorkController()
         include = {arg.lower() for arg in args or ['worker', 'consumer']}
         if 'worker' in include:
-            graph = worker.blueprint.graph
+            worker_graph = worker.blueprint.graph
             if 'consumer' in include:
                 worker.blueprint.connect_with(worker.consumer.blueprint)
         else:
-            graph = worker.consumer.blueprint.graph
-        graph.to_dot(self.stdout)
+            worker_graph = worker.consumer.blueprint.graph
+        worker_graph.to_dot(self.stdout)
 
     def workers(self, *args, **kwargs):
 
@@ -73,14 +73,21 @@ class graph(Command):
                 return self.label()
 
         class Thread(Node):
-            scheme = {'fillcolor': 'lightcyan4', 'fontcolor': 'yellow',
-                      'shape': 'oval', 'fontsize': 10, 'width': 0.3,
-                      'color': 'black'}
+            scheme = {
+                'fillcolor': 'lightcyan4',
+                'fontcolor': 'yellow',
+                'shape': 'oval',
+                'fontsize': 10,
+                'width': 0.3,
+                'color': 'black',
+            }
 
             def __init__(self, label, **kwargs):
-                self._label = 'thr-{0}'.format(next(tids))
                 self.real_label = label
-                self.pos = 0
+                super(Thread, self).__init__(
+                    label='thr-{0}'.format(next(tids)),
+                    pos=0,
+                )
 
         class Formatter(GraphFormatter):
 
@@ -177,24 +184,24 @@ class graph(Command):
         broker = Broker(args.get(
             'broker', self.app.connection_for_read().as_uri()))
         backend = Backend(backend) if backend else None
-        graph = DependencyGraph(formatter=Formatter())
-        graph.add_arc(broker)
+        deps = DependencyGraph(formatter=Formatter())
+        deps.add_arc(broker)
         if backend:
-            graph.add_arc(backend)
+            deps.add_arc(backend)
         curworker = [0]
         for i, worker in enumerate(workers):
             worker = Worker(worker, pos=i)
-            graph.add_arc(worker)
-            graph.add_edge(worker, broker)
+            deps.add_arc(worker)
+            deps.add_edge(worker, broker)
             if backend:
-                graph.add_edge(worker, backend)
+                deps.add_edge(worker, backend)
             threads = threads_for.get(worker._label)
             if threads:
                 for thread in threads:
                     thread = Thread(thread)
-                    graph.add_arc(thread)
-                    graph.add_edge(thread, worker)
+                    deps.add_arc(thread)
+                    deps.add_edge(thread, worker)
 
             curworker[0] += 1
 
-        graph.to_dot(self.stdout)
+        deps.to_dot(self.stdout)

+ 4 - 1
celery/bootsteps.py

@@ -151,7 +151,7 @@ class Blueprint(object):
                                 description.capitalize(), step.alias)
                     try:
                         fun(parent, *args)
-                    except Exception as exc:
+                    except Exception as exc:  # pylint: ignore=broad-except
                         if propagate:
                             raise
                         logger.exception(
@@ -274,6 +274,9 @@ class Blueprint(object):
 class StepType(type):
     """Meta-class for steps."""
 
+    name = None
+    requires = None
+
     def __new__(cls, name, bases, attrs):
         module = attrs.get('__module__')
         qname = '{0}.{1}'.format(module, name) if module else name

+ 3 - 0
celery/concurrency/asynpool.py

@@ -46,6 +46,9 @@ from celery.utils.functional import noop
 from celery.utils.log import get_logger
 from celery.worker import state as worker_state
 
+# pylint: disable=redefined-outer-name
+# We cache globals and attribute lookups, so disable this warning.
+
 try:
     from _billiard import read as __read__
     from struct import unpack_from as _unpack_from

+ 4 - 2
celery/concurrency/gevent.py

@@ -2,7 +2,6 @@
 """Gevent execution pool."""
 from __future__ import absolute_import, unicode_literals
 
-
 try:
     from gevent import Timeout
 except ImportError:  # pragma: no cover
@@ -15,6 +14,9 @@ from . import base
 
 __all__ = ['TaskPool']
 
+# pylint: disable=redefined-outer-name
+# We cache globals and attribute lookups, so disable this warning.
+
 
 def apply_timeout(target, args=(), kwargs={}, callback=None,
                   accept_callback=None, pid=None, timeout=None,
@@ -101,7 +103,7 @@ class TaskPool(base.BasePool):
 
     def on_apply(self, target, args=None, kwargs=None, callback=None,
                  accept_callback=None, timeout=None,
-                 timeout_callback=None, **_):
+                 timeout_callback=None, apply_target=base.apply_target, **_):
         timeout = self.timeout if timeout is None else timeout
         return self._quick_put(apply_timeout if timeout else apply_target,
                                target, args, kwargs, callback, accept_callback,

+ 6 - 1
celery/events/__init__.py

@@ -30,6 +30,9 @@ from celery.utils.time import adjust_timestamp, utcoffset, maybe_s_to_ms
 
 __all__ = ['Events', 'Event', 'EventDispatcher', 'EventReceiver']
 
+# pylint: disable=redefined-outer-name
+# We cache globals and attribute lookups, so disable this warning.
+
 event_exchange = Exchange('celeryev', type='topic')
 
 _TZGETTER = itemgetter('utcoffset', 'timestamp')
@@ -208,7 +211,7 @@ class EventDispatcher(object):
                 headers=self.headers,
                 delivery_mode=self.delivery_mode,
             )
-        except Exception as exc:
+        except Exception as exc:  # pylint: disable=broad-except
             if not self.buffer_while_offline:
                 raise
             self._outbound_buffer.append((event, routing_key, exc))
@@ -430,6 +433,8 @@ class Events(object):
     def default_dispatcher(self, hostname=None, enabled=True,
                            buffer_while_offline=False):
         with self.app.amqp.producer_pool.acquire(block=True) as prod:
+            # pylint: disable=too-many-function-args
+            # This is a property pylint...
             with self.Dispatcher(prod.connection, hostname, enabled,
                                  prod.channel, buffer_while_offline) as d:
                 yield d

+ 11 - 5
celery/events/state.py

@@ -37,15 +37,21 @@ from celery.utils.log import get_logger
 
 __all__ = ['Worker', 'Task', 'State', 'heartbeat_expires']
 
+# pylint: disable=redefined-outer-name
+# We cache globals and attribute lookups, so disable this warning.
+# pylint: disable=too-many-function-args
+# For some reason pylint thinks ._event is a method, when it's a property.
+
+#: Set if running PyPy
 PYPY = hasattr(sys, 'pypy_version_info')
 
-# The window (in percentage) is added to the workers heartbeat
-# frequency.  If the time between updates exceeds this window,
-# then the worker is considered to be offline.
+#: The window (in percentage) is added to the workers heartbeat
+#: frequency.  If the time between updates exceeds this window,
+#: then the worker is considered to be offline.
 HEARTBEAT_EXPIRE_WINDOW = 200
 
-# Max drift between event timestamp and time of event received
-# before we alert that clocks may be unsynchronized.
+#: Max drift between event timestamp and time of event received
+#: before we alert that clocks may be unsynchronized.
 HEARTBEAT_DRIFT_MAX = 16
 
 DRIFT_WARNING = """\

+ 15 - 11
celery/utils/pytest.py

@@ -19,6 +19,10 @@ from celery import Celery
 from celery.app import current_app
 from celery.backends.cache import CacheBackend, DummyClient
 
+# pylint: disable=redefined-outer-name
+# Well, they're called fixtures....
+
+
 CELERY_TEST_CONFIG = {
     #: Don't want log output when running suite.
     'worker_hijack_root_logger': False,
@@ -68,12 +72,12 @@ class UnitLogging(symbol_by_name(Celery.log_cls)):
 def TestApp(name=None, set_as_current=False, log=UnitLogging,
             broker='memory://', backend='cache+memory://', **kwargs):
     """App used for testing."""
-    app = Celery(name or 'celery.tests',
-                 set_as_current=set_as_current,
-                 log=log, broker=broker, backend=backend,
-                 **kwargs)
-    app.add_defaults(deepcopy(CELERY_TEST_CONFIG))
-    return app
+    test_app = Celery(
+        name or 'celery.tests',
+        set_as_current=set_as_current,
+        log=log, broker=broker, backend=backend, **kwargs)
+    test_app.add_defaults(deepcopy(CELERY_TEST_CONFIG))
+    return test_app
 
 
 @pytest.fixture(autouse=True)
@@ -92,22 +96,22 @@ def app(request):
         current_app = trap
     _state._tls = NonTLS()
 
-    app = TestApp(set_as_current=False)
+    test_app = TestApp(set_as_current=False)
     is_not_contained = any([
         not getattr(request.module, 'app_contained', True),
         not getattr(request.cls, 'app_contained', True),
         not getattr(request.function, 'app_contained', True)
     ])
     if is_not_contained:
-        app.set_current()
+        test_app.set_current()
 
-    yield app
+    yield test_app
 
     _state.set_default_app(prev_default_app)
     _state._tls = prev_tls
     _state._tls.current_app = prev_current_app
-    if app is not prev_current_app:
-        app.close()
+    if test_app is not prev_current_app:
+        test_app.close()
     _state._on_app_finalizers = prev_finalizers
     _state._apps = prev_apps
 

+ 3 - 0
celery/utils/saferepr.py

@@ -29,6 +29,9 @@ from .text import truncate, truncate_bytes
 
 __all__ = ['saferepr', 'reprstream']
 
+# pylint: disable=redefined-outer-name
+# We cache globals and attribute lookups, so disable this warning.
+
 IS_PY3 = sys.version_info[0] == 3
 
 if IS_PY3:  # pragma: no cover

+ 24 - 19
celery/utils/serialization.py

@@ -68,7 +68,7 @@ def find_pickleable_exception(exc, loads=pickle.loads,
         try:
             superexc = supercls(*exc_args)
             loads(dumps(superexc))
-        except Exception:
+        except Exception:  # pylint: ignore=broad-except
             pass
         else:
             return superexc
@@ -122,7 +122,7 @@ class UnpickleableExceptionWrapper(Exception):
             try:
                 pickle.dumps(arg)
                 safe_exc_args.append(arg)
-            except Exception:
+            except Exception:  # pylint: ignore=broad-except
                 safe_exc_args.append(safe_repr(arg))
         self.exc_module = exc_module
         self.exc_cls_name = exc_cls_name
@@ -149,7 +149,7 @@ def get_pickleable_exception(exc):
     """Make sure exception is pickleable."""
     try:
         pickle.loads(pickle.dumps(exc))
-    except Exception:
+    except Exception:  # pylint: ignore=broad-except
         pass
     else:
         return exc
@@ -163,7 +163,7 @@ def get_pickleable_etype(cls, loads=pickle.loads, dumps=pickle.dumps):
     """Get pickleable exception type."""
     try:
         loads(dumps(cls))
-    except Exception:
+    except Exception:  # pylint: ignore=broad-except
         return Exception
     else:
         return cls
@@ -199,6 +199,24 @@ def strtobool(term, table={'false': False, 'no': False, '0': False,
     return term
 
 
+def _datetime_to_json(dt):
+    # See "Date Time String Format" in the ECMA-262 specification.
+    if isinstance(dt, datetime.datetime):
+        r = dt.isoformat()
+        if dt.microsecond:
+            r = r[:23] + r[26:]
+        if r.endswith('+00:00'):
+            r = r[:-6] + 'Z'
+        return r
+    elif isinstance(dt, datetime.time):
+        r = dt.isoformat()
+        if dt.microsecond:
+            r = r[:12]
+        return r
+    else:
+        return dt.isoformat()
+
+
 def jsonify(obj,
             builtin_types=(numbers.Real, string_t), key=None,
             keyfilter=None,
@@ -221,21 +239,8 @@ def jsonify(obj,
             k: _jsonify(v, key=k) for k, v in items(obj)
             if (keyfilter(k) if keyfilter else 1)
         }
-    elif isinstance(obj, datetime.datetime):
-        # See "Date Time String Format" in the ECMA-262 specification.
-        r = obj.isoformat()
-        if obj.microsecond:
-            r = r[:23] + r[26:]
-        if r.endswith('+00:00'):
-            r = r[:-6] + 'Z'
-        return r
-    elif isinstance(obj, datetime.date):
-        return obj.isoformat()
-    elif isinstance(obj, datetime.time):
-        r = obj.isoformat()
-        if obj.microsecond:
-            r = r[:12]
-        return r
+    elif isinstance(obj, (datetime.date, datetime.time)):
+        return _datetime_to_json(obj)
     elif isinstance(obj, datetime.timedelta):
         return str(obj)
     else:

+ 10 - 4
celery/worker/components.py

@@ -15,9 +15,10 @@ from celery.exceptions import ImproperlyConfigured
 from celery.platforms import IS_WINDOWS
 from celery.utils.log import worker_logger as logger
 
-
 __all__ = ['Timer', 'Hub', 'Pool', 'Beat', 'StateDB', 'Consumer']
 
+GREEN_POOLS = {'eventlet', 'gevent'}
+
 ERR_B_GREEN = """\
 -B option doesn't work with eventlet/gevent pools: \
 use standalone beat instead.\
@@ -61,6 +62,7 @@ class Hub(bootsteps.StartStopStep):
 
     def __init__(self, w, **kwargs):
         w.hub = None
+        super(Hub, self).__init__(w, **kwargs)
 
     def include_if(self, w):
         return w.use_eventloop
@@ -115,6 +117,7 @@ class Pool(bootsteps.StartStopStep):
         w.max_concurrency = None
         w.min_concurrency = w.concurrency
         self.optimization = w.optimization
+        super(Pool, self).__init__(w, **kwargs)
 
     def close(self, w):
         if w.pool:
@@ -124,9 +127,10 @@ class Pool(bootsteps.StartStopStep):
         if w.pool:
             w.pool.terminate()
 
-    def create(self, w, semaphore=None, max_restarts=None,
-               green_pools={'eventlet', 'gevent'}):
-        if w.app.conf.worker_pool in green_pools:  # pragma: no cover
+    def create(self, w):
+        semaphore = None
+        max_restarts = None
+        if w.app.conf.worker_pool in GREEN_POOLS:  # pragma: no cover
             warnings.warn(UserWarning(W_POOL_SETTING))
         threaded = not w.use_eventloop or IS_WINDOWS
         procs = w.min_concurrency
@@ -178,6 +182,7 @@ class Beat(bootsteps.StartStopStep):
     def __init__(self, w, beat=False, **kwargs):
         self.enabled = w.beat = beat
         w.beat = None
+        super(Beat, self).__init__(w, beat=beat, **kwargs)
 
     def create(self, w):
         from celery.beat import EmbeddedService
@@ -195,6 +200,7 @@ class StateDB(bootsteps.Step):
     def __init__(self, w, **kwargs):
         self.enabled = w.statedb
         w._persistence = None
+        super(StateDB, self).__init__(w, **kwargs)
 
     def create(self, w):
         w._persistence = w.state.Persistent(w.state, w.statedb, w.app.clock)

+ 1 - 0
celery/worker/consumer/agent.py

@@ -16,6 +16,7 @@ class Agent(bootsteps.StartStopStep):
 
     def __init__(self, c, **kwargs):
         self.agent_cls = self.enabled = c.app.conf.worker_agent
+        super(Agent, self).__init__(c, **kwargs)
 
     def create(self, c):
         agent = c.agent = self.instantiate(self.agent_cls, c.connection)

+ 1 - 0
celery/worker/consumer/connection.py

@@ -17,6 +17,7 @@ class Connection(bootsteps.StartStopStep):
 
     def __init__(self, c, **kwargs):
         c.connection = None
+        super(Connection, self).__init__(c, **kwargs)
 
     def start(self, c):
         c.connection = c.connect()

+ 6 - 3
celery/worker/consumer/gossip.py

@@ -73,6 +73,8 @@ class Gossip(bootsteps.ConsumerStep):
             'task': self.call_task
         }
 
+        super(Gossip, self).__init__(c, **kwargs)
+
     def compatible_transport(self, app):
         with app.connection_for_read() as conn:
             return conn.transport.driver_type in self.compatible_transports
@@ -87,7 +89,7 @@ class Gossip(bootsteps.ConsumerStep):
     def call_task(self, task):
         try:
             self.app.signature(task).apply_async()
-        except Exception as exc:
+        except Exception as exc:  # pylint: disable=broad-except
             logger.exception('Could not call task: %r', exc)
 
     def on_elect(self, event):
@@ -148,7 +150,7 @@ class Gossip(bootsteps.ConsumerStep):
         for handler in handlers:
             try:
                 handler(*args, **kwargs)
-            except Exception as exc:
+            except Exception as exc:  # pylint: disable=broad-except
                 logger.exception(
                     'Ignored error from handler %r: %r', handler, exc)
 
@@ -191,10 +193,11 @@ class Gossip(bootsteps.ConsumerStep):
         else:
             return handler(message.payload)
 
+        # proto2: hostname in header; proto1: in body
         hostname = (message.headers.get('hostname') or
                     message.payload['hostname'])
         if hostname != self.hostname:
-            type, event = prepare(message.payload)
+            _, event = prepare(message.payload)
             self.update_state(event)
         else:
             self.clock.forward()

+ 3 - 1
celery/worker/consumer/mingle.py

@@ -29,6 +29,8 @@ class Mingle(bootsteps.StartStopStep):
 
     def __init__(self, c, without_mingle=False, **kwargs):
         self.enabled = not without_mingle and self.compatible_transport(c.app)
+        super(Mingle, self).__init__(
+            c, without_mingle=without_mingle, **kwargs)
 
     def compatible_transport(self, app):
         with app.connection_for_read() as conn:
@@ -62,7 +64,7 @@ class Mingle(bootsteps.StartStopStep):
             self.sync_with_node(c, **reply)
         except MemoryError:
             raise
-        except Exception as exc:
+        except Exception as exc:  # pylint: disable=broad-except
             exception('mingle: sync with %s failed: %r', nodename, exc)
 
     def sync_with_node(self, c, clock=None, revoked=None, **kwargs):

+ 9 - 4
celery/worker/loops.py

@@ -4,7 +4,7 @@ from __future__ import absolute_import, unicode_literals
 import errno
 import socket
 
-from celery.bootsteps import RUN
+from celery import bootsteps
 from celery.exceptions import WorkerShutdown, WorkerTerminate, WorkerLostError
 from celery.utils.log import get_logger
 
@@ -12,13 +12,16 @@ from . import state
 
 __all__ = ['asynloop', 'synloop']
 
+# pylint: disable=redefined-outer-name
+# We cache globals and attribute lookups, so disable this warning.
+
 logger = get_logger(__name__)
 
 
 def _quick_drain(connection, timeout=0.1):
     try:
         connection.drain_events(timeout=timeout)
-    except Exception as exc:
+    except Exception as exc:  # pylint: ignore=broad-except
         exc_errno = getattr(exc, 'errno', None)
         if exc_errno is not None and exc_errno != errno.EAGAIN:
             raise
@@ -32,8 +35,9 @@ def _enable_amqheartbeats(timer, connection, rate=2.0):
 
 
 def asynloop(obj, connection, consumer, blueprint, hub, qos,
-             heartbeat, clock, hbrate=2.0, RUN=RUN):
+             heartbeat, clock, hbrate=2.0):
     """Non-blocking event loop."""
+    RUN = bootsteps.RUN
     update_qos = qos.update
     errors = connection.connection_errors
 
@@ -89,7 +93,7 @@ def asynloop(obj, connection, consumer, blueprint, hub, qos,
     finally:
         try:
             hub.reset()
-        except Exception as exc:
+        except Exception as exc:  # pylint: ignore=broad-except
             logger.exception(
                 'Error cleaning up after event loop: %r', exc)
 
@@ -97,6 +101,7 @@ def asynloop(obj, connection, consumer, blueprint, hub, qos,
 def synloop(obj, connection, consumer, blueprint, hub, qos,
             heartbeat, clock, hbrate=2.0, **kwargs):
     """Fallback blocking event loop for transports that doesn't support AIO."""
+    RUN = bootsteps.RUN
     on_task_received = obj.create_task_handler()
     perform_pending_operations = obj.perform_pending_operations
     consumer.on_message = on_task_received

+ 4 - 0
celery/worker/request.py

@@ -35,6 +35,9 @@ from . import state
 
 __all__ = ['Request']
 
+# pylint: disable=redefined-outer-name
+# We cache globals and attribute lookups, so disable this warning.
+
 IS_PYPY = hasattr(sys, 'pypy_version_info')
 
 logger = get_logger(__name__)
@@ -518,6 +521,7 @@ def create_request_cls(base, task, pool, hostname, eventer,
                 correlation_id=task_id,
             )
             # cannot create weakref to None
+            # pylint: disable=attribute-defined-outside-init
             self._apply_result = maybe(ref, result)
             return result
 

+ 4 - 1
celery/worker/strategy.py

@@ -19,6 +19,9 @@ __all__ = ['default']
 
 logger = get_logger(__name__)
 
+# pylint: disable=redefined-outer-name
+# We cache globals and attribute lookups, so disable this warning.
+
 
 def proto1_to_proto2(message, body):
     """Convert Task message protocol 1 arguments to protocol 2.
@@ -28,7 +31,7 @@ def proto1_to_proto2(message, body):
     """
     try:
         args, kwargs = body['args'], body['kwargs']
-        kwargs.items
+        kwargs.items  # pylint: disable=pointless-statement
     except KeyError:
         raise InvalidTaskError('Message does not have args/kwargs')
     except AttributeError:

+ 1 - 1
examples/eventlet/tasks.py

@@ -10,6 +10,6 @@ def urlopen(url):
     print('-open: {0}'.format(url))
     try:
         response = requests.get(url)
-    except Exception as exc:
+    except requests.exceptions.RequestException as exc:
         print('-url {0} gave error: {1!r}'.format(url, exc))
     return len(response.text)

+ 1 - 1
examples/gevent/tasks.py

@@ -10,7 +10,7 @@ def urlopen(url):
     print('Opening: {0}'.format(url))
     try:
         requests.get(url)
-    except Exception as exc:
+    except requests.exceptions.RequestException as exc:
         print('Exception for {0}: {1!r}'.format(url, exc))
         return url, 0
     print('Done with: {0}'.format(url))

+ 67 - 35
setup.py

@@ -51,6 +51,33 @@ if PY26_OR_LESS:
 elif PY33_OR_LESS and not PYPY24_ATLEAST:
     raise Exception(E_UNSUPPORTED_PYTHON % (PYIMP, '3.4'))
 
+# -*- Extras -*-
+
+EXTENSIONS = {
+    'auth',
+    'cassandra',
+    'elasticsearch',
+    'memcache',
+    'pymemcache',
+    'couchbase',
+    'eventlet',
+    'gevent',
+    'msgpack',
+    'yaml',
+    'redis',
+    'sqs',
+    'couchdb',
+    'riak',
+    'zookeeper',
+    'solar',
+    'sqlalchemy',
+    'librabbitmq',
+    'pyro',
+    'slmq',
+    'tblib',
+    'consul'
+}
+
 # -*- Classifiers -*-
 
 classes = """
@@ -84,17 +111,20 @@ def add_default(m):
 def add_doc(m):
     return (('doc', m.groups()[0]),)
 
-pats = {re_meta: add_default, re_doc: add_doc}
-here = os.path.abspath(os.path.dirname(__file__))
-with open(os.path.join(here, 'celery/__init__.py')) as meta_fh:
-    meta = {}
-    for line in meta_fh:
-        if line.strip() == '# -eof meta-':
-            break
-        for pattern, handler in pats.items():
-            m = pattern.match(line.strip())
-            if m:
-                meta.update(handler(m))
+
+def parse_dist_meta():
+    pats = {re_meta: add_default, re_doc: add_doc}
+    here = os.path.abspath(os.path.dirname(__file__))
+    with open(os.path.join(here, 'celery', '__init__.py')) as meta_fh:
+        meta = {}
+        for line in meta_fh:
+            if line.strip() == '# -eof meta-':
+                break
+            for pattern, handler in pats.items():
+                m = pattern.match(line.strip())
+                if m:
+                    meta.update(handler(m))
+        return meta
 
 # -*- Installation Requires -*-
 
@@ -125,18 +155,22 @@ def reqs(*f):
 def extras(*p):
     return reqs('extras', *p)
 
-install_requires = reqs('default.txt')
-if JYTHON:
-    install_requires.extend(reqs('jython.txt'))
 
-# -*- Long Description -*-
+def install_requires():
+    if JYTHON:
+        return reqs('default.txt') + reqs('jython.txt')
+    return reqs('default.txt')
 
-if os.path.exists('README.rst'):
-    long_description = codecs.open('README.rst', 'r', 'utf-8').read()
-else:
-    long_description = 'See http://pypi.python.org/pypi/celery'
 
-# -*- %%% -*-
+def extras_require():
+    return {x: extras(x + '.txt') for x in EXTENSIONS}
+
+
+def long_description():
+    try:
+        return codecs.open('README.rst', 'r', 'utf-8').read()
+    except IOError:
+        return 'Long description error: Missing README.rst file'
 
 
 class pytest(setuptools.command.test.test):
@@ -147,36 +181,34 @@ class pytest(setuptools.command.test.test):
         self.pytest_args = []
 
     def run_tests(self):
-        import pytest
-        sys.exit(pytest.main(self.pytest_args))
+        import pytest as _pytest
+        sys.exit(_pytest.main(self.pytest_args))
 
+# -*- %%% -*-
+
+meta = parse_dist_meta()
 setuptools.setup(
     name=NAME,
     packages=setuptools.find_packages(exclude=['t', 't.*']),
     version=meta['version'],
     description=meta['doc'],
     long_description=long_description,
-    keywords='task job queue distributed messaging actor',
+    keywords=meta['keywords'],
     author=meta['author'],
     author_email=meta['contact'],
-    platforms=['any'],
-    license='BSD',
     url=meta['homepage'],
-    install_requires=install_requires,
+    license='BSD',
+    platforms=['any'],
+    install_requires=install_requires(),
     tests_require=reqs('test.txt'),
-    extras_require=dict((x, extras(x + '.txt')) for x in set([
-        'auth', 'cassandra', 'elasticsearch', 'memcache', 'pymemcache',
-        'couchbase', 'eventlet', 'gevent', 'msgpack', 'yaml',
-        'redis', 'sqs', 'couchdb', 'riak', 'zookeeper', 'solar',
-        'sqlalchemy', 'librabbitmq', 'pyro', 'slmq', 'tblib', 'consul'
-    ])),
+    extras_require=extras_require(),
     classifiers=classifiers,
+    cmdclass={'test': pytest},
+    include_package_data=True,
+    zip_safe=False,
     entry_points={
         'console_scripts': [
             'celery = celery.__main__:main',
         ]
     },
-    cmdclass={'test': pytest},
-    include_package_data=True,
-    zip_safe=False,
 )

+ 0 - 11
t/unit/bin/test_celery.py

@@ -28,7 +28,6 @@ from celery.bin.celery import (
     multi,
     main as mainfun,
     _RemoteControl,
-    command,
 )
 from celery.five import WhateverIO
 from celery.platforms import EX_FAILURE, EX_USAGE, EX_OK
@@ -535,13 +534,3 @@ class test_main:
         cmd.execute_from_commandline.side_effect = KeyboardInterrupt()
         mainfun()
         cmd.execute_from_commandline.assert_called_with(None)
-
-
-class test_compat:
-
-    def test_compat_command_decorator(self):
-        with patch('celery.bin.celery.CeleryCommand') as CC:
-            assert command() == CC.register_command
-            fun = Mock(name='fun')
-            command(fun)
-            CC.register_command.assert_called_with(fun)