Jelajahi Sumber

Fixes flakes after flake8 update

Ask Solem 10 tahun lalu
induk
melakukan
4e52e8f1f5

+ 2 - 2
celery/app/log.py

@@ -176,8 +176,8 @@ class Logging(object):
             formatter=TaskFormatter, **kwargs
         )
         logger.setLevel(loglevel)
-        logger.propagate = int(propagate)    # this is an int for some reason.
-                                             # better to not question why.
+        # this is an int for some reason, better to not question why.
+        logger.propagate = int(propagate)
         signals.after_setup_task_logger.send(
             sender=None, logger=logger,
             loglevel=loglevel, logfile=logfile,

+ 3 - 5
celery/concurrency/asynpool.py

@@ -595,7 +595,7 @@ class AsynPool(_pool.Pool):
         active_writers = self._active_writers
         busy_workers = self._busy_workers
         diff = all_inqueues.difference
-        add_reader, add_writer = hub.add_reader, hub.add_writer
+        add_writer = hub.add_writer
         hub_add, hub_remove = hub.add, hub.remove
         mark_write_fd_as_active = active_writes.add
         mark_write_gen_as_active = active_writers.add
@@ -638,8 +638,8 @@ class AsynPool(_pool.Pool):
 
             def on_poll_start():
                 if outbound and len(busy_workers) < len(all_inqueues):
-                    #print('ALL: %r ACTIVE: %r' % (len(all_inqueues),
-                    #                              len(active_writes)))
+                    #  print('ALL: %r ACTIVE: %r' % (len(all_inqueues),
+                    #                                len(active_writes)))
                     inactive = diff(active_writes)
                     [hub_add(fd, None, WRITE | ERR, consolidate=True)
                      for fd in inactive]
@@ -1134,8 +1134,6 @@ class AsynPool(_pool.Pool):
                     self._queues[self.create_process_queues()] = None
             except ValueError:
                 pass
-                # Not in queue map, make sure sockets are closed.
-                #self.destroy_queues((proc.inq, proc.outq, proc.synq))
             assert len(self._queues) == before
 
     def destroy_queues(self, queues, proc):

+ 1 - 1
celery/five.py

@@ -20,7 +20,7 @@ from amqp.five import __all__ as _all_five
 
 __all__ += _all_five
 
-############## Module Generation ##########################
+#  ############# Module Generation ##########################
 
 # Utilities to dynamically
 # recreate modules, either for lazy loading or

+ 2 - 2
celery/loaders/base.py

@@ -190,12 +190,12 @@ class BaseLoader(object):
             """Parse a single configuration definition from
             the command-line."""
 
-            ## find key/value
+            # ## find key/value
             # ns.key=value|ns_key=value (case insensitive)
             key, value = arg.split('=', 1)
             key = key.upper().replace('.', '_')
 
-            ## find namespace.
+            # ## find namespace.
             # .key=value|_key=value expands to default namespace.
             if key[0] == '_':
                 ns, key = namespace, key[1:]

+ 1 - 1
celery/task/base.py

@@ -39,7 +39,7 @@ class Task(BaseTask):
     __bound__ = False
     __v2_compat__ = True
 
-    #- Deprecated compat. attributes -:
+    # - Deprecated compat. attributes -:
 
     queue = None
     routing_key = None

+ 8 - 6
celery/tests/backends/test_amqp.py

@@ -271,12 +271,14 @@ class test_AMQPBackend(AppCase):
             tids.append(tid)
 
         res = list(b.get_many(tids, timeout=1))
-        expected_results = [(tid, {'status': states.SUCCESS,
-                                   'result': i,
-                                   'traceback': None,
-                                   'task_id': tid,
-                                   'children': None})
-                            for i, tid in enumerate(tids)]
+        expected_results = [
+            (_tid, {'status': states.SUCCESS,
+                    'result': i,
+                    'traceback': None,
+                    'task_id': _tid,
+                    'children': None})
+            for i, _tid in enumerate(tids)
+        ]
         self.assertEqual(sorted(res), sorted(expected_results))
         self.assertDictEqual(b._cache[res[0][0]], res[0][1])
         cached_res = list(b.get_many(tids, timeout=1))

+ 9 - 3
celery/tests/bin/test_worker.py

@@ -207,7 +207,10 @@ class test_Worker(WorkerAppCase):
         # test when there are too few output lines
         # to draft the ascii art onto
         prev, cd.ARTLINES = cd.ARTLINES, ['the quick brown fox']
-        self.assertTrue(worker.startup_info())
+        try:
+            self.assertTrue(worker.startup_info())
+        finally:
+            cd.ARTLINES = prev
 
     @disable_stdouts
     def test_run(self):
@@ -323,8 +326,11 @@ class test_Worker(WorkerAppCase):
             app=self.app, redirect_stdouts=False, no_color=True,
         )
         prev, self.app.log.setup = self.app.log.setup, Mock()
-        worker.setup_logging()
-        self.assertFalse(self.app.log.setup.call_args[1]['colorize'])
+        try:
+            worker.setup_logging()
+            self.assertFalse(self.app.log.setup.call_args[1]['colorize'])
+        finally:
+            self.app.log.setup = prev
 
     @disable_stdouts
     def test_startup_info_pool_is_str(self):

+ 1 - 1
celery/tests/case.py

@@ -235,7 +235,7 @@ def _is_magic_module(m):
 
     # pyflakes refuses to accept 'noqa' for this isinstance.
     cls, modtype = m.__class__, types.ModuleType
-    return (not cls is modtype and (
+    return (cls is not modtype and (
         '__getattr__' in vars(m.__class__) or
         '__getattribute__' in vars(m.__class__)))
 

+ 2 - 2
celery/tests/compat_modules/test_sets.py

@@ -135,8 +135,8 @@ class test_subtask(SetsCase):
         s = self.MockTask.subtask(
             (2, ), {'cache': True}, {'routing_key': 'CPU-bound'},
         )
-        s.args = list(s.args)                   # tuples are not preserved
-                                                # but this doesn't matter.
+        # tuples are not preserved, but this doesn't matter.
+        s.args = list(s.args)
         self.assertEqual(s, self.subtask(json.loads(json.dumps(s))))
 
     def test_repr(self):

+ 1 - 1
celery/worker/consumer.py

@@ -263,7 +263,7 @@ class Consumer(object):
             self.on_task_request(request)
 
     def start(self):
-        blueprint, loop = self.blueprint, self.loop
+        blueprint = self.blueprint
         while blueprint.state != CLOSE:
             self.restart_count += 1
             maybe_shutdown()

+ 0 - 2
celery/worker/loops.py

@@ -26,11 +26,9 @@ def asynloop(obj, connection, consumer, blueprint, hub, qos,
     """Non-blocking event loop consuming messages until connection is lost,
     or shutdown is requested."""
     update_qos = qos.update
-    readers, writers = hub.readers, hub.writers
     hbtick = connection.heartbeat_check
     errors = connection.connection_errors
     heartbeat = connection.get_heartbeat_interval()  # negotiated
-    hub_add, hub_remove = hub.add, hub.remove
 
     on_task_received = obj.create_task_handler()
 

+ 3 - 2
celery/worker/pidbox.py

@@ -31,8 +31,9 @@ class Pidbox(object):
         self._forward_clock = self.c.app.clock.forward
 
     def on_message(self, body, message):
-        self._forward_clock()  # just increase clock as clients usually don't
-                               # have a valid clock to adjust with.
+        # just increase clock as clients usually don't
+        # have a valid clock to adjust with.
+        self._forward_clock()
         try:
             self.node.handle_message(body, message)
         except KeyError as exc:

+ 5 - 3
celery/worker/request.py

@@ -388,9 +388,11 @@ class Request(object):
                 'worker_pid': self.worker_pid}
 
     def __str__(self):
-        return '{0.name}[{0.id}]{1}{2}'.format(self,
-               ' eta:[{0}]'.format(self.eta) if self.eta else '',
-               ' expires:[{0}]'.format(self.expires) if self.expires else '')
+        return '{0.name}[{0.id}]{1}{2}'.format(
+            self,
+            ' eta:[{0}]'.format(self.eta) if self.eta else '',
+            ' expires:[{0}]'.format(self.expires) if self.expires else '',
+        )
     shortinfo = __str__
 
     def __repr__(self):

+ 0 - 2
docs/_ext/applyxrefs.py

@@ -49,7 +49,6 @@ def has_target(fn):
         if not readok:
             return (True, None)
 
-    #print fn, len(lines)
     if len(lines) < 1:
         print("Not touching empty file %s." % fn)
         return (True, None)
@@ -71,7 +70,6 @@ def main(argv=None):
             files.extend([(dirpath, f) for f in filenames])
     files.sort()
     files = [os.path.join(p, fn) for p, fn in files if fn.endswith('.txt')]
-    #print files
 
     for fn in files:
         if fn in DONT_TOUCH:

+ 7 - 12
docs/conf.py

@@ -67,11 +67,6 @@ release = celery.__version__
 
 exclude_trees = ['.build']
 
-#unused_docs = [
-#    'xreftest.rst',
-#    'tutorials/otherqueues',
-#]
-
 # If true, '()' will be appended to :func: etc. cross-reference text.
 add_function_parentheses = True
 
@@ -116,7 +111,7 @@ html_sidebars = {
            'sourcelink.html', 'searchbox.html'],
 }
 
-### Issuetracker
+# ## Issuetracker
 
 github_project = 'celery/celery'
 
@@ -142,13 +137,13 @@ epub_identifier = 'celeryproject.org'
 # A unique identification for the text.
 epub_uid = 'Celery Manual, Version {0}'.format(version)
 
-# HTML files that should be inserted before the pages created by sphinx.
-# The format is a list of tuples containing the path and title.
-#epub_pre_files = []
+# ## HTML files that should be inserted before the pages created by sphinx.
+# ## The format is a list of tuples containing the path and title.
+# epub_pre_files = []
 
-# HTML files shat should be inserted after the pages created by sphinx.
-# The format is a list of tuples containing the path and title.
-#epub_post_files = []
+# ## HTML files shat should be inserted after the pages created by sphinx.
+# ## The format is a list of tuples containing the path and title.
+# epub_post_files = []
 
 # A list of files that should not be packed into the epub file.
 epub_exclude_files = ['search.html']

+ 4 - 3
examples/app/myapp.py

@@ -27,11 +27,12 @@ from celery import Celery
 app = Celery(
     'myapp',
     broker='amqp://guest@localhost//',
-    # add result backend here if needed.
-    #backend='rpc'
+    # ## add result backend here if needed.
+    # backend='rpc'
 )
 
-@app.task()
+
+@app.task
 def add(x, y):
     return x + y
 

+ 2 - 0
examples/django/proj/__init__.py

@@ -3,3 +3,5 @@ from __future__ import absolute_import
 # This will make sure the app is always imported when
 # Django starts so that shared_task will use this app.
 from .celery import app as celery_app
+
+__all__ = ['celery_app']

+ 1 - 1
examples/eventlet/celeryconfig.py

@@ -2,7 +2,7 @@ import os
 import sys
 sys.path.insert(0, os.getcwd())
 
-## Start worker with -P eventlet
+# ## Start worker with -P eventlet
 # Never use the CELERYD_POOL setting as that will patch
 # the worker too late.
 

+ 1 - 1
examples/gevent/celeryconfig.py

@@ -2,7 +2,7 @@ import os
 import sys
 sys.path.insert(0, os.getcwd())
 
-### Note: Start worker with -P gevent,
+# ## Note: Start worker with -P gevent,
 # do not use the CELERYD_POOL option.
 
 BROKER_URL = 'amqp://guest:guest@localhost:5672//'

+ 1 - 1
examples/next-steps/setup.py

@@ -15,6 +15,6 @@ setup(
     zip_safe=False,
     install_requires=[
         'celery>=3.0',
-        #'requests',
+        #  'requests',
     ],
 )

+ 3 - 2
funtests/benchmarks/bench_worker.py

@@ -48,8 +48,9 @@ def tdiff(then):
 
 @app.task(cur=0, time_start=None, queue='bench.worker', bare=True)
 def it(_, n):
-    i = it.cur  # use internal counter, as ordering can be skewed
-                # by previous runs, or the broker.
+    # use internal counter, as ordering can be skewed
+    # by previous runs, or the broker.
+    i = it.cur
     if i and not i % 5000:
         print('({0} so far: {1}s)'.format(i, tdiff(it.subt)), file=sys.stderr)
         it.subt = time.time()

+ 0 - 1
funtests/stress/stress/fbi.py

@@ -64,4 +64,3 @@ class FBI(object):
             self.ffwd()
             for tid in ids:
                 print(self.state_of(tid), file=file)
-            #print(self.query(ids), file=file)

+ 2 - 3
funtests/stress/stress/templates.py

@@ -2,8 +2,6 @@ from __future__ import absolute_import
 
 import os
 
-from functools import partial
-
 from celery.five import items
 from kombu import Exchange, Queue
 from kombu.utils import symbol_by_name
@@ -121,11 +119,12 @@ class execv(default):
 
 @template()
 class sqs(default):
-    BROKER_URL='sqs://'
+    BROKER_URL = 'sqs://'
     BROKER_TRANSPORT_OPTIONS = {
         'region': os.environ.get('AWS_REGION', 'us-east-1'),
     }
 
+
 @template()
 class proto1(default):
     CELERY_TASK_PROTOCOL = 1