Ask Solem %!s(int64=11) %!d(string=hai) anos
pai
achega
8997c26786

+ 0 - 2
celery/app/base.py

@@ -9,12 +9,10 @@
 from __future__ import absolute_import
 
 import os
-import sys
 import threading
 import warnings
 
 from collections import defaultdict, deque
-from contextlib import contextmanager
 from copy import deepcopy
 from operator import attrgetter
 

+ 2 - 2
celery/app/log.py

@@ -176,8 +176,8 @@ class Logging(object):
             formatter=TaskFormatter, **kwargs
         )
         logger.setLevel(loglevel)
-        logger.propagate = int(propagate)    # this is an int for some reason.
-                                             # better to not question why.
+        # this is an int for some reason, better not question why.
+        logger.propagate = int(propagate)
         signals.after_setup_task_logger.send(
             sender=None, logger=logger,
             loglevel=loglevel, logfile=logfile,

+ 1 - 5
celery/concurrency/asynpool.py

@@ -603,7 +603,7 @@ class AsynPool(_pool.Pool):
         active_writers = self._active_writers
         busy_workers = self._busy_workers
         diff = all_inqueues.difference
-        add_reader, add_writer = hub.add_reader, hub.add_writer
+        add_writer = hub.add_writer
         hub_add, hub_remove = hub.add, hub.remove
         mark_write_fd_as_active = active_writes.add
         mark_write_gen_as_active = active_writers.add
@@ -646,8 +646,6 @@ class AsynPool(_pool.Pool):
 
             def on_poll_start():
                 if outbound and len(busy_workers) < len(all_inqueues):
-                    #print('ALL: %r ACTIVE: %r' % (len(all_inqueues),
-                    #                              len(active_writes)))
                     inactive = diff(active_writes)
                     [hub_add(fd, None, WRITE | ERR, consolidate=True)
                      for fd in inactive]
@@ -1155,8 +1153,6 @@ class AsynPool(_pool.Pool):
                     self._queues[self.create_process_queues()] = None
             except ValueError:
                 pass
-                # Not in queue map, make sure sockets are closed.
-                #self.destroy_queues((proc.inq, proc.outq, proc.synq))
             assert len(self._queues) == before
 
     def destroy_queues(self, queues, proc):

+ 5 - 5
celery/five.py

@@ -28,7 +28,7 @@ except ImportError:  # pragma: no cover
     def Counter():  # noqa
         return defaultdict(int)
 
-############## py3k #########################################################
+# ############# py3k #########################################################
 import sys
 PY3 = sys.version_info[0] == 3
 
@@ -144,17 +144,17 @@ def with_metaclass(Type, skip_attrs=set(['__dict__', '__weakref__'])):
     return _clone_with_metaclass
 
 
-############## collections.OrderedDict ######################################
+# ############# collections.OrderedDict ######################################
 # was moved to kombu
 from kombu.utils.compat import OrderedDict  # noqa
 
-############## threading.TIMEOUT_MAX #######################################
+# ############# threading.TIMEOUT_MAX ########################################
 try:
     from threading import TIMEOUT_MAX as THREAD_TIMEOUT_MAX
 except ImportError:
     THREAD_TIMEOUT_MAX = 1e10  # noqa
 
-############## format(int, ',d') ##########################
+# ############# format(int, ',d') ############################################
 
 if sys.version_info >= (2, 7):  # pragma: no cover
     def format_d(i):
@@ -169,7 +169,7 @@ else:  # pragma: no cover
         return s + ','.join(reversed(groups))
 
 
-############## Module Generation ##########################
+# ############# Module Generation ############################################
 
 # Utilities to dynamically
 # recreate modules, either for lazy loading or

+ 2 - 2
celery/loaders/base.py

@@ -190,12 +190,12 @@ class BaseLoader(object):
             """Parse a single configuration definition from
             the command-line."""
 
-            ## find key/value
+            # ## find key/value
             # ns.key=value|ns_key=value (case insensitive)
             key, value = arg.split('=', 1)
             key = key.upper().replace('.', '_')
 
-            ## find namespace.
+            # ## find namespace.
             # .key=value|_key=value expands to default namespace.
             if key[0] == '_':
                 ns, key = namespace, key[1:]

+ 1 - 1
celery/task/base.py

@@ -38,7 +38,7 @@ class Task(BaseTask):
     __bound__ = False
     __v2_compat__ = True
 
-    #- Deprecated compat. attributes -:
+    # - Deprecated compat. attributes -:
 
     queue = None
     routing_key = None

+ 10 - 6
celery/tests/backends/test_amqp.py

@@ -271,12 +271,16 @@ class test_AMQPBackend(AppCase):
             tids.append(tid)
 
         res = list(b.get_many(tids, timeout=1))
-        expected_results = [(tid, {'status': states.SUCCESS,
-                                   'result': i,
-                                   'traceback': None,
-                                   'task_id': tid,
-                                   'children': None})
-                            for i, tid in enumerate(tids)]
+        expected_results = [
+            (task_id, {
+                'status': states.SUCCESS,
+                'result': i,
+                'traceback': None,
+                'task_id': tid,
+                'children': None,
+            })
+            for i, task_id in enumerate(tids)
+        ]
         self.assertEqual(sorted(res), sorted(expected_results))
         self.assertDictEqual(b._cache[res[0][0]], res[0][1])
         cached_res = list(b.get_many(tids, timeout=1))

+ 9 - 3
celery/tests/bin/test_worker.py

@@ -206,7 +206,10 @@ class test_Worker(WorkerAppCase):
         # test when there are too few output lines
         # to draft the ascii art onto
         prev, cd.ARTLINES = cd.ARTLINES, ['the quick brown fox']
-        self.assertTrue(worker.startup_info())
+        try:
+            self.assertTrue(worker.startup_info())
+        finally:
+            cd.ARTLINES = prev
 
     @disable_stdouts
     def test_run(self):
@@ -322,8 +325,11 @@ class test_Worker(WorkerAppCase):
             app=self.app, redirect_stdouts=False, no_color=True,
         )
         prev, self.app.log.setup = self.app.log.setup, Mock()
-        worker.setup_logging()
-        self.assertFalse(self.app.log.setup.call_args[1]['colorize'])
+        try:
+            worker.setup_logging()
+            self.assertFalse(self.app.log.setup.call_args[1]['colorize'])
+        finally:
+            self.app.log.setup = prev
 
     @disable_stdouts
     def test_startup_info_pool_is_str(self):

+ 1 - 1
celery/tests/case.py

@@ -235,7 +235,7 @@ def _is_magic_module(m):
 
     # pyflakes refuses to accept 'noqa' for this isinstance.
     cls, modtype = m.__class__, types.ModuleType
-    return (not cls is modtype and (
+    return (cls is not modtype and (
         '__getattr__' in vars(m.__class__) or
         '__getattribute__' in vars(m.__class__)))
 

+ 2 - 2
celery/tests/compat_modules/test_sets.py

@@ -134,8 +134,8 @@ class test_subtask(SetsCase):
         s = self.MockTask.subtask(
             (2, ), {'cache': True}, {'routing_key': 'CPU-bound'},
         )
-        s.args = list(s.args)                   # tuples are not preserved
-                                                # but this doesn't matter.
+        # tuples are not preserved, but this doesn't matter.
+        s.args = list(s.args)
         self.assertEqual(s, self.subtask(anyjson.loads(anyjson.dumps(s))))
 
     def test_repr(self):

+ 3 - 1
celery/utils/objects.py

@@ -74,7 +74,9 @@ class FallbackContext(object):
     def __enter__(self):
         if self.provided is not None:
             return self.provided
-        context = self._context = self.fallback(*self.fb_args, **self.fb_kwargs).__enter__()
+        context = self._context = self.fallback(
+            *self.fb_args, **self.fb_kwargs
+        ).__enter__()
         return context
 
     def __exit__(self, *exc_info):

+ 1 - 1
celery/worker/consumer.py

@@ -270,7 +270,7 @@ class Consumer(object):
             self.on_task_request(request)
 
     def start(self):
-        blueprint, loop = self.blueprint, self.loop
+        blueprint = self.blueprint
         while blueprint.state != CLOSE:
             self.restart_count += 1
             maybe_shutdown()

+ 3 - 1
celery/worker/control.py

@@ -367,7 +367,9 @@ def active_queues(state):
 
 
 def _wanted_config_key(key):
-    return isinstance(key, string_t) and key.isupper() and not key.startswith('__')
+    return (isinstance(key, string_t) and
+            key.isupper() and
+            not key.startswith('__'))
 
 
 @Panel.register

+ 5 - 3
celery/worker/job.py

@@ -540,9 +540,11 @@ class Request(object):
                 'worker_pid': self.worker_pid}
 
     def __str__(self):
-        return '{0.name}[{0.id}]{1}{2}'.format(self,
-               ' eta:[{0}]'.format(self.eta) if self.eta else '',
-               ' expires:[{0}]'.format(self.expires) if self.expires else '')
+        return '{0.name}[{0.id}]{1}{2}'.format(
+            self,
+            ' eta:[{0}]'.format(self.eta) if self.eta else '',
+            ' expires:[{0}]'.format(self.expires) if self.expires else '',
+        )
     shortinfo = __str__
 
     def __repr__(self):

+ 3 - 2
celery/worker/pidbox.py

@@ -31,8 +31,9 @@ class Pidbox(object):
         self._forward_clock = self.c.app.clock.forward
 
     def on_message(self, body, message):
-        self._forward_clock()  # just increase clock as clients usually don't
-                               # have a valid clock to adjust with.
+        # just increase clock as clients usually don't
+        # have a valid clock to adjust with.
+        self._forward_clock()
         try:
             self.node.handle_message(body, message)
         except KeyError as exc: