Ask Solem 11 years ago
parent
commit
e650e3b774

+ 1 - 1
celery/utils/log.py

@@ -23,7 +23,7 @@ from celery.five import string_t
 from .encoding import safe_str, str_t
 from .term import colored
 
-__all__ = ['ColorFormatter', 'LoggingProxy' 'base_logger',
+__all__ = ['ColorFormatter', 'LoggingProxy', 'base_logger',
            'set_in_sighandler', 'in_sighandler', 'get_logger',
            'get_task_logger', 'mlevel', 'ensure_process_aware_logger',
            'get_multiprocessing_logger', 'reset_multiprocessing_logger']

+ 0 - 1
celery/utils/timer2.py

@@ -8,7 +8,6 @@
 """
 from __future__ import absolute_import
 
-import atexit
 import heapq
 import os
 import sys

+ 1 - 1
celery/worker/autoreload.py

@@ -36,7 +36,7 @@ except ImportError:         # pragma: no cover
     _ProcessEvent = object  # noqa
 
 __all__ = [
-    'WorkerComponent', 'Autoreloader', 'Monitor' 'BaseMonitor',
+    'WorkerComponent', 'Autoreloader', 'Monitor', 'BaseMonitor',
     'StatMonitor', 'KQueueMonitor', 'InotifyMonitor', 'file_hash',
 ]
 

+ 1 - 1
celery/worker/consumer.py

@@ -53,7 +53,7 @@ except NameError:  # pragma: no cover
         pass
 
 __all__ = [
-    'Consumer', 'Conneciton', 'Events', 'Heart', 'Control',
+    'Consumer', 'Connection', 'Events', 'Heart', 'Control',
     'Tasks', 'Evloop', 'Agent', 'Mingle', 'Gossip', 'dump_body',
 ]
 

+ 2 - 2
docs/_ext/applyxrefs.py

@@ -6,8 +6,8 @@ import os
 testing = False
 
 DONT_TOUCH = (
-        './index.txt',
-        )
+    './index.txt',
+)
 
 
 def target_name(fn):

+ 2 - 6
docs/_ext/celerydocs.py

@@ -1,7 +1,6 @@
 from docutils import nodes
 
 from sphinx.environment import NoUri
-from sphinx.util.nodes import make_refnode
 
 APPATTRS = {
     "amqp": "celery.app.amqp.AMQP",
@@ -65,9 +64,7 @@ def get_abbr(pre, rest, type):
     return ABBR_EMPTY.get(type, DEFAULT_EMPTY), rest, ABBR_EMPTY
 
 
-
 def resolve(S, type):
-    is_appattr = False
     if S.startswith('@'):
         S = S.lstrip('@-')
         try:
@@ -91,9 +88,8 @@ def basename(module_fqdn):
 def modify_textnode(T, newtarget, node, src_dict, type):
     src = node.children[0].rawsource
     return nodes.Text(
-        typeify(basename(T), type) if '~' in src
-                                   else typeify(shorten(T, newtarget,
-                                                        src_dict), type),
+        (typeify(basename(T), type) if '~' in src
+         else typeify(shorten(T, newtarget, src_dict), type)),
         src,
     )
 

+ 3 - 2
docs/_ext/literals_to_xrefs.py

@@ -100,8 +100,9 @@ def fixliterals(fname):
                 replace_type in ("class", "func", "meth"):
             default = default[:-2]
         replace_value = input(
-            colorize("Text <target> [", fg="yellow") + default + \
-                    colorize("]: ", fg="yellow")).strip()
+            colorize("Text <target> [", fg="yellow") +
+            default + colorize("]: ", fg="yellow"),
+        ).strip()
         if not replace_value:
             replace_value = default
         new.append(":%s:`%s`" % (replace_type, replace_value))

+ 5 - 4
docs/conf.py

@@ -44,6 +44,7 @@ LINKCODE_URL = 'http://github.com/{proj}/tree/{branch}/{filename}.py'
 GITHUB_PROJECT = 'celery/celery'
 GITHUB_BRANCH = 'master'
 
+
 def linkcode_resolve(domain, info):
     if domain != 'py' or not info['module']:
         return
@@ -51,7 +52,7 @@ def linkcode_resolve(domain, info):
     return LINKCODE_URL.format(
         proj=GITHUB_PROJECT,
         branch=GITHUB_BRANCH,
-        filename=FILENAME,
+        filename=filename,
     )
 
 html_show_sphinx = False
@@ -116,8 +117,8 @@ html_use_modindex = True
 html_use_index = True
 
 latex_documents = [
-  ('index', 'Celery.tex', 'Celery Documentation',
-   'Ask Solem & Contributors', 'manual'),
+    ('index', 'Celery.tex', 'Celery Documentation',
+     'Ask Solem & Contributors', 'manual'),
 ]
 
 html_theme = "celery"
@@ -135,7 +136,7 @@ if False:
     issuetracker_project = "celery/celery"
     issuetracker_issue_pattern = r'[Ii]ssue #(\d+)'
 
-# -- Options for Epub output ---------------------------------------------------
+# -- Options for Epub output -------------------------------------------
 
 # Bibliographic Dublin Core info.
 epub_title = 'Celery Manual, Version {0}'.format(version)

+ 0 - 1
examples/celery_http_gateway/settings.py

@@ -69,7 +69,6 @@ SECRET_KEY = '#1i=edpk55k3781$z-p%b#dbn&n+-rtt83pgz2o9o)v8g7(owq'
 TEMPLATE_LOADERS = (
     'django.template.loaders.filesystem.load_template_source',
     'django.template.loaders.app_directories.load_template_source',
-#     'django.template.loaders.eggs.load_template_source',
 )
 
 MIDDLEWARE_CLASSES = (

+ 5 - 2
examples/celery_http_gateway/urls.py

@@ -1,4 +1,6 @@
-from django.conf.urls.defaults import *
+from django.conf.urls.defaults import (  # noqa
+    url, patterns, include, handler404, handler500,
+)
 
 from djcelery import views as celery_views
 
@@ -8,7 +10,8 @@ from celery_http_gateway.tasks import hello_world
 # from django.contrib import admin
 # admin.autodiscover()
 
-urlpatterns = patterns('',
+urlpatterns = patterns(
+    '',
     url(r'^apply/(?P<task_name>.+?)/', celery_views.apply),
     url(r'^hello/', celery_views.task_view(hello_world)),
     url(r'^(?P<task_id>[\w\d\-]+)/done/?$', celery_views.is_task_successful,

+ 1 - 1
examples/django/demoapp/models.py

@@ -1,3 +1,3 @@
-from django.db import models
+from django.db import models  # noqa
 
 # Create your models here.

+ 11 - 9
examples/django/proj/settings.py

@@ -11,12 +11,15 @@ MANAGERS = ADMINS
 
 DATABASES = {
     'default': {
-        'ENGINE': 'django.db.backends.', # Add 'postgresql_psycopg2', 'mysql', 'sqlite3' or 'oracle'.
-        'NAME': '',                      # Or path to database file if using sqlite3.
-        'USER': '',                      # Not used with sqlite3.
-        'PASSWORD': '',                  # Not used with sqlite3.
-        'HOST': '',                      # Set to empty string for localhost. Not used with sqlite3.
-        'PORT': '',                      # Set to empty string for default. Not used with sqlite3.
+        # Add 'postgresql_psycopg2', 'mysql', 'sqlite3' or 'oracle'.
+        'ENGINE': 'django.db.backends.',
+        'NAME': '',        # path to database file if using sqlite3.
+        'USER': '',        # Not used with sqlite3.
+        'PASSWORD': '',    # Not used with sqlite3.
+        'HOST': '',        # Set to empty string for localhost.
+                           # Not used with sqlite3.
+        'PORT': '',        # Set to empty string for default.
+                           # Not used with sqlite3.
     }
 }
 
@@ -74,7 +77,6 @@ STATICFILES_DIRS = (
 STATICFILES_FINDERS = (
     'django.contrib.staticfiles.finders.FileSystemFinder',
     'django.contrib.staticfiles.finders.AppDirectoriesFinder',
-#    'django.contrib.staticfiles.finders.DefaultStorageFinder',
 )
 
 # Make this unique, and don't share it with anybody.
@@ -84,7 +86,6 @@ SECRET_KEY = 'x2$s&amp;0z2xehpnt_99i8q3)4)t*5q@+n(+6jrqz4@rt%a8fdf+!'
 TEMPLATE_LOADERS = (
     'django.template.loaders.filesystem.Loader',
     'django.template.loaders.app_directories.Loader',
-#     'django.template.loaders.eggs.Loader',
 )
 
 MIDDLEWARE_CLASSES = (
@@ -103,7 +104,8 @@ ROOT_URLCONF = 'proj.urls'
 WSGI_APPLICATION = 'proj.wsgi.application'
 
 TEMPLATE_DIRS = (
-    # Put strings here, like "/home/html/django_templates" or "C:/www/django/templates".
+    # Put strings here, like "/home/html/django_templates"
+    # or "C:/www/django/templates".
     # Always use forward slashes, even on Windows.
     # Don't forget to use absolute paths, not relative paths.
 )

+ 5 - 2
examples/django/proj/urls.py

@@ -1,10 +1,13 @@
-from django.conf.urls import patterns, include, url
+from django.conf.urls import (  # noqa
+    patterns, include, url, handler404, handler500,
+)
 
 # Uncomment the next two lines to enable the admin:
 # from django.contrib import admin
 # admin.autodiscover()
 
-urlpatterns = patterns('',
+urlpatterns = patterns(
+    '',
     # Examples:
     # url(r'^$', 'proj.views.home', name='home'),
     # url(r'^proj/', include('proj.foo.urls')),

+ 3 - 2
examples/eventlet/bulk_task_producer.py

@@ -43,8 +43,9 @@ class ProducerPool(object):
         return receipt
 
     def _run(self):
-        self._producers = [spawn_n(self._producer)
-                                for _ in range(self.size)]
+        self._producers = [
+            spawn_n(self._producer) for _ in range(self.size)
+        ]
 
     def _producer(self):
         connection = current_app.connection()

+ 1 - 2
examples/eventlet/webcrawler.py

@@ -23,12 +23,11 @@ to "zlib", and the serializer to "pickle".
 
 
 import re
-import time
 
 try:
     from urllib.parse import urlsplit
 except ImportError:
-    from urlparse import urlsplit
+    from urlparse import urlsplit  # noqa
 
 import requests
 

+ 1 - 1
examples/gevent/tasks.py

@@ -7,7 +7,7 @@ from celery import task
 def urlopen(url):
     print('Opening: {0}'.format(url))
     try:
-        _response = requests.get(url)
+        requests.get(url)
     except Exception as exc:
         print('Exception for {0}: {1!r}'.format(url, exc))
         return url, 0

+ 0 - 1
examples/httpexample/settings.py

@@ -64,7 +64,6 @@ SECRET_KEY = 'p^@q$@nal#-0+w@v_3bcj2ug(zbh5_m2on8^kkn&!e!b=a@o__'
 TEMPLATE_LOADERS = (
     'django.template.loaders.filesystem.load_template_source',
     'django.template.loaders.app_directories.load_template_source',
-#     'django.template.loaders.eggs.load_template_source',
 )
 
 MIDDLEWARE_CLASSES = (

+ 6 - 3
examples/httpexample/urls.py

@@ -1,10 +1,13 @@
-from django.conf.urls.defaults import *
+from django.conf.urls.defaults import (  # noqa
+    url, patterns, include, handler500, handler404,
+)
 from . import views
 
 # Uncomment the next two lines to enable the admin:
 # from django.contrib import admin
 # admin.autodiscover()
 
-urlpatterns = patterns('',
-        url(r'^multiply/', views.multiply, name='multiply'),
+urlpatterns = patterns(
+    '',
+    url(r'^multiply/', views.multiply, name='multiply'),
 )

+ 8 - 5
examples/resultgraph/tasks.py

@@ -19,10 +19,11 @@
 #    ...                           A_callback.subtask()), countdown=1)
 
 
-from celery import chord, task, subtask
+from celery import chord, group, task, subtask, uuid
 from celery.result import AsyncResult, ResultSet
 from collections import deque
 
+
 @task()
 def add(x, y):
     return x + y
@@ -42,8 +43,10 @@ def B_callback(urls, id):
 
 @task()
 def B(id):
-    return chord(make_request.s(id, '{0} {1!r}'.format(id, i))
-                    for i in range(10))(B_callback.s(id))
+    return chord(
+        make_request.s(id, '{0} {1!r}'.format(id, i))
+        for i in range(10)
+    )(B_callback.s(id))
 
 
 @task()
@@ -71,8 +74,8 @@ def joinall(R, timeout=None, propagate=True):
 
 
 @task()
-def unlock_graph(result, callback, interval=1, propagate=False,
-        max_retries=None):
+def unlock_graph(result, callback,
+                 interval=1, propagate=False, max_retries=None):
     if result.ready():
         second_level_res = result.get()
         if second_level_res.ready():

+ 1 - 0
examples/tutorial/tasks.py

@@ -4,6 +4,7 @@ from celery import Celery
 
 celery = Celery('tasks', broker='amqp://')
 
+
 @celery.task()
 def add(x, y):
     return x + y

+ 3 - 3
extra/release/attribution.py

@@ -24,13 +24,13 @@ def find_missing_authors(seen):
         known = [author(line) for line in authors.readlines()]
 
     seen_authors = set(filter(proper_name, (t[0] for t in seen)))
-    seen_emails = set(t[1] for t in seen)
     known_authors = set(t[0] for t in known)
-    known_emails = set(t[1] for t in known)
+    # maybe later?:
+    #   seen_emails = set(t[1] for t in seen)
+    #   known_emails = set(t[1] for t in known)
 
     pprint(seen_authors - known_authors)
 
 
 if __name__ == "__main__":
     find_missing_authors([author(line) for line in fileinput.input()])
-

+ 1 - 0
extra/release/bump_version.py

@@ -125,6 +125,7 @@ _filetype_to_type = {"py": PyVersion,
                      "c": CPPVersion,
                      "h": CPPVersion}
 
+
 def filetype_to_type(filename):
     _, _, suffix = filename.rpartition(".")
     return _filetype_to_type[suffix](filename)

+ 10 - 6
extra/release/verify_config_reference.py

@@ -27,19 +27,23 @@ def is_ignored(setting, option):
 def find_undocumented_settings(directive='.. setting:: '):
     settings = dict(flatten(NAMESPACES))
     all = set(settings)
-    documented = set(line.strip()[len(directive):].strip()
-                        for line in _input()
-                            if line.strip().startswith(directive))
+    documented = set(
+        line.strip()[len(directive):].strip() for line in _input()
+        if line.strip().startswith(directive)
+    )
     return [setting for setting in all ^ documented
-                if not is_ignored(setting, settings[setting])]
+            if not is_ignored(setting, settings[setting])]
 
 
 if __name__ == '__main__':
     sep = '\n  * '
     missing = find_undocumented_settings()
     if missing:
-        print('Error: found undocumented settings:{0}{1}'.format(
-                sep, sep.join(sorted(missing))), file=stderr)
+        print(
+            'Error: found undocumented settings:{0}{1}'.format(
+                sep, sep.join(sorted(missing))),
+            file=stderr,
+        )
         exit(1)
     print('OK: Configuration reference complete :-)')
     exit(0)

+ 26 - 23
funtests/benchmarks/bench_worker.py

@@ -25,27 +25,28 @@ BROKER_TRANSPORT = os.environ.get('BROKER', 'librabbitmq')
 if hasattr(sys, 'pypy_version_info'):
     BROKER_TRANSPORT = 'pyamqp'
 
-celery = Celery(__name__)
-celery.conf.update(BROKER_TRANSPORT=BROKER_TRANSPORT,
-                   BROKER_POOL_LIMIT=10,
-                   CELERYD_POOL='solo',
-                   CELERYD_PREFETCH_MULTIPLIER=0,
-                   CELERY_DISABLE_RATE_LIMITS=True,
-                   CELERY_DEFAULT_DELIVERY_MODE=1,
-                   CELERY_QUEUES = {
-                       'bench.worker': {
-                           'exchange': 'bench.worker',
-                           'routing_key': 'bench.worker',
-                           'no_ack': True,
-                           'exchange_durable': False,
-                           'queue_durable': False,
-                           'auto_delete': True,
-                        }
-                   },
-                   CELERY_TASK_SERIALIZER='json',
-                   CELERY_DEFAULT_QUEUE='bench.worker',
-                   CELERY_BACKEND=None,
-                   )#CELERY_MESSAGE_COMPRESSION='zlib')
+celery = Celery('bench_worker')
+celery.conf.update(
+    BROKER_TRANSPORT=BROKER_TRANSPORT,
+    BROKER_POOL_LIMIT=10,
+    CELERYD_POOL='solo',
+    CELERYD_PREFETCH_MULTIPLIER=0,
+    CELERY_DISABLE_RATE_LIMITS=True,
+    CELERY_DEFAULT_DELIVERY_MODE=1,
+    CELERY_QUEUES={
+        'bench.worker': {
+            'exchange': 'bench.worker',
+            'routing_key': 'bench.worker',
+            'no_ack': True,
+            'exchange_durable': False,
+            'queue_durable': False,
+            'auto_delete': True,
+        }
+    },
+    CELERY_TASK_SERIALIZER='json',
+    CELERY_DEFAULT_QUEUE='bench.worker',
+    CELERY_BACKEND=None,
+),
 
 
 def tdiff(then):
@@ -65,7 +66,8 @@ def it(_, n):
         total = tdiff(it.time_start)
         print('({0} so far: {1}s)'.format(i, tdiff(it.subt)), file=sys.stderr)
         print('-- process {0} tasks: {1}s total, {2} tasks/s} '.format(
-                n, total, n / (total + .0)))
+            n, total, n / (total + .0),
+        ))
         sys.exit()
     it.cur += 1
 
@@ -100,7 +102,8 @@ def main(argv=sys.argv):
     n = DEFAULT_ITS
     if len(argv) < 2:
         print('Usage: {0} [apply|work|both] [n=20k]'.format(
-                os.path.basename(argv[0])))
+            os.path.basename(argv[0]),
+        ))
         return sys.exit(1)
     try:
         try:

+ 0 - 37
funtests/benchmarks/req.py

@@ -1,37 +0,0 @@
-from celery import current_app, task, uuid
-from celery.five import Queue, range
-from celery.worker.consumer import Consumer
-from celery.worker.job import Request
-from celery.concurrency.solo import TaskPool
-from celery.app.amqp import TASK_BARE
-from time import time
-from librabbitmq import Message
-import socket
-import sys
-
-@task(accept_magic_kwargs=False)
-def T():
-    pass
-
-tid = uuid()
-P = TaskPool()
-hostname = socket.gethostname()
-task = {'task': T.name, 'args': (), 'kwargs': {}, 'id': tid, 'flags': 0}
-app = current_app._get_current_object()
-
-def on_task(req):
-    req.execute_using_pool(P)
-
-def on_ack(*a): pass
-
-
-m = Message(None, {}, {}, task)
-
-x = Consumer(on_task, hostname=hostname, app=app)
-x.update_strategies()
-name = T.name
-ts = time()
-for i in range(100000):
-    x.strategies[name](m, m.body, on_ack)
-print(time() - ts)
-

+ 0 - 96
funtests/benchmarks/reqi.py

@@ -1,96 +0,0 @@
-from celery import current_app, task, uuid
-from celery.five import Queue, range
-from celery.worker.consumer import Consumer
-#from celery.worker.job import Request
-from celery.app.task import Context
-from celery.concurrency.solo import TaskPool
-from celery.app.amqp import TASK_BARE
-from time import time
-from librabbitmq import Message
-from celery.utils.functional import noop
-from celery.worker.job import NEEDS_KWDICT
-from celery.datastructures import AttributeDict
-import socket
-import sys
-
-@task(accept_magic_kwargs=False)
-def T():
-    pass
-
-class Request(object):
-    #__slots__ = ('app', 'name', 'id', 'args', 'kwargs',
-    #             'on_ack', 'delivery_info', 'hostname',
-    #             'eventer', 'connection_errors',
-    #             'task', 'eta', 'expires', 'flags',
-    #             'request_dict', 'acknowledged',
-    #             'worker_pid', 'started',
-    #             '_already_revoked', '_terminate_on_ack', '_tzlocal')
-    eta = None
-    started = False
-    acknowledged = _already_revoked = False
-    worker_pid = _terminate_on_ack = None
-    _tzlocal = None
-    expires = None
-    delivery_info = {}
-    flags = 0
-    args = ()
-
-    def __init__(self, body, on_ack=noop,
-            hostname=None, eventer=None, app=None,
-            connection_errors=None, request_dict=None,
-            delivery_info=None, task=None, Context=Context, **opts):
-        self.app = app
-        self.name = body['task']
-        self.id = body['id']
-        self.args = body['args']
-        try:
-            self.kwargs = body['kwargs']
-            if NEEDS_KWDICT:
-                self.kwargs = kwdict(self.kwargs)
-        except KeyError:
-            self.kwargs = {}
-        try:
-            self.flags = body['flags']
-        except KeyError:
-            pass
-        self.on_ack = on_ack
-        self.hostname = hostname
-        self.eventer = eventer
-        self.connection_errors = connection_errors or ()
-        self.task = task or self.app._tasks[self.name]
-        if 'eta' in body:
-            eta = body['eta']
-            tz = tz_utc if utc else self.tzlocal
-            self.eta = tz_to_local(maybe_iso8601(eta), self.tzlocal, tz)
-        if 'expires' in body:
-            expires = body['expires']
-            tz = tz_utc if utc else self.tzlocal
-            self.expires = tz_to_local(maybe_iso8601(expires),
-                                       self.tzlocal, tz)
-        if delivery_info:
-            self.delivery_info = {
-                'exchange': delivery_info.get('exchange'),
-                'routing_key': delivery_info.get('routing_key'),
-            }
-
-        self.request_dict = AttributeDict(
-                {'called_directly': False,
-                 'callbacks': [],
-                 'errbacks': [],
-                 'chord': None}, **body)
-
-
-
-
-tid = uuid()
-hostname = socket.gethostname()
-task = {'task': T.name, 'args': (), 'kwargs': {}, 'id': tid, 'flags': 0}
-app = current_app._get_current_object()
-
-m = Message(None, {}, {}, task)
-
-ts = time()
-for i in range(1000000):
-    x = Request(task, hostname=hostname, app=app, task=task)
-print(time() - ts)
-

+ 0 - 36
funtests/benchmarks/timer.py

@@ -1,36 +0,0 @@
-# -*- coding: utf-8 -*-
-from __future__ import absolute_import
-
-import sys
-
-from time import sleep
-from celery.five import range
-from celery.utils import timer2 as timer
-
-def noop(*args, **kwargs):
-    return
-
-
-def insert(s, n=100000):
-    for i in range(n):
-        s.apply_after(1 + (i and i / 10.0), noop, (i, ))
-
-
-def slurp(s, n=100000):
-    i = 0
-    it = iter(s)
-    while i < n:
-        delay, entry = next(it)
-        if entry:
-            i += 1
-            s.apply_entry(entry)
-        #else:
-            #if delay:
-            #    sleep(delay)
-
-if __name__ == '__main__':
-    s = timer.Schedule()
-    insert(s)
-    if '--insert-only' not in sys.argv:
-        slurp(s)
-

+ 0 - 44
funtests/benchmarks/trace.py

@@ -1,44 +0,0 @@
-from celery import current_app, task, uuid
-from celery.five import Queue, range
-from celery.worker.consumer import Consumer
-from celery.worker.job import Request
-from celery.concurrency.solo import TaskPool
-from celery.app.amqp import TASK_BARE
-from time import time
-from librabbitmq import Message
-import socket
-import sys
-
-@task(accept_magic_kwargs=False)
-def T():
-    pass
-
-tid = uuid()
-P = TaskPool()
-hostname = socket.gethostname()
-task = {'task': T.name, 'args': (), 'kwargs': {}, 'id': tid, 'flags': 0}
-app = current_app._get_current_object()
-
-def on_task(req):
-    req.execute_using_pool(P)
-
-def on_ack(*a): pass
-
-
-m = Message(None, {}, {}, task)
-
-x = Consumer(on_task, hostname=hostname, app=app)
-x.update_strategies()
-name = T.name
-ts = time()
-from celery.datastructures import AttributeDict
-from celery.app.trace import trace_task_ret
-request = AttributeDict(
-                {'called_directly': False,
-                 'callbacks': [],
-                 'errbacks': [],
-                 'chord': None}, **task)
-for i in range(100000):
-    trace_task_ret(T, tid, (), {}, request)
-print(time() - ts)
-

+ 4 - 3
funtests/setup.py

@@ -7,15 +7,16 @@ try:
 except ImportError:
     from ez_setup import use_setuptools
     use_setuptools()
-    from setuptools import setup
-    from setuptools.command.install import install
+    from setuptools import setup  # noqa
+    from setuptools.command.install import install  # noqa
 
 import os
 import sys
 
 sys.path.insert(0, os.getcwd())
 sys.path.insert(0, os.path.join(os.getcwd(), os.pardir))
-import suite
+import suite  # noqa
+
 
 class no_install(install):
 

+ 0 - 24
funtests/stress/testbuf.py

@@ -1,24 +0,0 @@
-from __future__ import absolute_import
-
-import sys
-import time
-
-from celery.result import ResultSet
-
-from stress.app import app, sleeping
-
-
-
-def testbuf(padbytes=0, megabytes=0):
-    padding = float(padbytes) + 2 ** 20 * float(megabytes)
-    results = []
-    print('> padding: %r' % (padding, ))
-
-    for i in range(8 * 4):
-        results.append(sleeping.delay(1, kw='x' * int(padding)))
-        time.sleep(0.01)
-
-    res = ResultSet(results)
-    print(res.join())
-
-testbuf(*sys.argv[1:])

+ 7 - 8
funtests/suite/test_basic.py

@@ -1,12 +1,11 @@
 import operator
 import os
 import sys
-import time
 
 # funtest config
 sys.path.insert(0, os.getcwd())
 sys.path.insert(0, os.path.join(os.getcwd(), os.pardir))
-import suite
+import suite  # noqa
 
 from celery.five import range
 from celery.tests.utils import unittest
@@ -22,13 +21,13 @@ class test_basic(WorkerCase):
     def test_roundtrip_simple_task(self):
         publisher = tasks.add.get_publisher()
         results = [(tasks.add.apply_async(i, publisher=publisher), i)
-                        for i in zip(range(100), range(100))]
+                   for i in zip(range(100), range(100))]
         for result, i in results:
             self.assertEqual(result.get(timeout=10), operator.add(*i))
 
     def test_dump_active(self, sleep=1):
         r1 = tasks.sleeptask.delay(sleep)
-        r2 = tasks.sleeptask.delay(sleep)
+        tasks.sleeptask.delay(sleep)
         self.ensure_accepted(r1.id)
         active = self.inspect().active(safe=True)
         self.assertTrue(active)
@@ -39,9 +38,9 @@ class test_basic(WorkerCase):
 
     def test_dump_reserved(self, sleep=1):
         r1 = tasks.sleeptask.delay(sleep)
-        r2 = tasks.sleeptask.delay(sleep)
-        r3 = tasks.sleeptask.delay(sleep)
-        r4 = tasks.sleeptask.delay(sleep)
+        tasks.sleeptask.delay(sleep)
+        tasks.sleeptask.delay(sleep)
+        tasks.sleeptask.delay(sleep)
         self.ensure_accepted(r1.id)
         reserved = self.inspect().reserved(safe=True)
         self.assertTrue(reserved)
@@ -51,7 +50,7 @@ class test_basic(WorkerCase):
 
     def test_dump_schedule(self, countdown=1):
         r1 = tasks.add.apply_async((2, 2), countdown=countdown)
-        r2 = tasks.add.apply_async((2, 2), countdown=countdown)
+        tasks.add.apply_async((2, 2), countdown=countdown)
         self.ensure_scheduled(r1.id, interval=0.1)
         schedule = self.inspect().scheduled(safe=True)
         self.assertTrue(schedule)

+ 6 - 5
funtests/suite/test_leak.py

@@ -15,7 +15,7 @@ from celery import current_app
 from celery.five import range
 from celery.tests.utils import unittest
 
-import suite
+import suite  # noqa
 
 GET_RSIZE = '/bin/ps -p {pid} -o rss='
 QUICKTEST = int(os.environ.get('QUICKTEST', 0))
@@ -40,8 +40,9 @@ class LeakFunCase(unittest.TestCase):
     def get_rsize(self, cmd=GET_RSIZE):
         try:
             return int(subprocess.Popen(
-                        shlex.split(cmd.format(pid=os.getpid())),
-                            stdout=subprocess.PIPE).communicate()[0].strip())
+                shlex.split(cmd.format(pid=os.getpid())),
+                stdout=subprocess.PIPE).communicate()[0].strip()
+            )
         except OSError as exc:
             raise SkipTest(
                 'Cannot execute command: {0!r}: {1!r}'.format(cmd, exc))
@@ -73,7 +74,7 @@ class LeakFunCase(unittest.TestCase):
                     first = after
                 if self.debug:
                     print('{0!r} {1}: before/after: {2}/{3}'.format(
-                            fun, i, before, after))
+                          fun, i, before, after))
                 else:
                     sys.stderr.write('.')
                 sizes.add(self.appx(after))
@@ -102,7 +103,7 @@ class test_leaks(LeakFunCase):
         try:
             pool_limit = self.app.conf.BROKER_POOL_LIMIT
         except AttributeError:
-            return self.assertFreed(self.iterations, foo.delay)
+            return self.assertFreed(self.iterations, task1.delay)
 
         self.app.conf.BROKER_POOL_LIMIT = None
         try:

+ 5 - 4
pavement.py

@@ -1,12 +1,12 @@
 import sys
-from paver.easy import *
-from paver import doctools
-from paver.setuputils import setup
+from paver.easy import task, sh, cmdopts, path, needs, options, Bunch
+from paver import doctools  # noqa
+from paver.setuputils import setup  # noqa
 
 PYCOMPILE_CACHES = ['*.pyc', '*$py.class']
 
 options(
-        sphinx=Bunch(builddir='.build'),
+    sphinx=Bunch(builddir='.build'),
 )
 
 
@@ -103,6 +103,7 @@ def bump(options):
             celery/__init__.py docs/includes/introduction.txt \
             --before-commit='paver readme'")
 
+
 @task
 @cmdopts([
     ('coverage', 'c', 'Enable coverage'),

+ 1 - 0
setup.py

@@ -145,6 +145,7 @@ py_version = sys.version_info
 def strip_comments(l):
     return l.split('#', 1)[0].strip()
 
+
 def reqs(*f):
     return [
         r for r in (