Parcourir la source

PEP8ify + pyflakes

Ask Solem il y a 14 ans
Parent
commit
57dc4f4abb

+ 0 - 1
celery/app/amqp.py

@@ -243,7 +243,6 @@ class PublisherPool(Resource):
         return maybe_promise(publisher)
 
 
-
 class AMQP(object):
     BrokerConnection = BrokerConnection
     Publisher = messaging.Publisher

+ 1 - 1
celery/apps/worker.py

@@ -13,7 +13,7 @@ from celery import platforms
 from celery import signals
 from celery.app import app_or_default
 from celery.exceptions import ImproperlyConfigured, SystemTerminate
-from celery.utils import get_full_cls_name, LOG_LEVELS, isatty, cry
+from celery.utils import get_full_cls_name, LOG_LEVELS, cry
 from celery.utils import term
 from celery.worker import WorkController
 

+ 1 - 2
celery/backends/amqp.py

@@ -1,7 +1,6 @@
 # -*- coding: utf-8 -*-
 import socket
 import time
-import warnings
 
 from datetime import timedelta
 
@@ -98,7 +97,7 @@ class AMQPBackend(BaseDictBackend):
                 connection._result_producer_chan.connection is not None:
             channel = connection._result_producer_chan
         else:
-            channel = connection_result_producer_chan = connection.channel()
+            channel = connection._result_producer_chan = connection.channel()
 
         try:
             self._create_producer(task_id, channel).publish(meta)

+ 1 - 1
celery/datastructures.py

@@ -16,7 +16,7 @@ import traceback
 from itertools import chain
 from Queue import Queue, Empty as QueueEmpty
 
-from celery.utils.compat import OrderedDict, UserList
+from celery.utils.compat import OrderedDict
 
 
 class AttributeDictMixin(object):

+ 5 - 4
celery/events/cursesmon.py

@@ -296,7 +296,7 @@ class CursesMonitor(object):
     def display_task_row(self, lineno, task):
         state_color = self.state_colors.get(task.state)
         attr = curses.A_NORMAL
-        if task.uuid== self.selected_task:
+        if task.uuid == self.selected_task:
             attr = curses.A_STANDOUT
         timestamp = datetime.fromtimestamp(
                         task.timestamp or time.time())
@@ -307,12 +307,12 @@ class CursesMonitor(object):
         self.win.addstr(lineno, LEFT_BORDER_OFFSET, line, attr)
 
         if state_color:
-            self.win.addstr(lineno, len(line) - STATE_WIDTH + BORDER_SPACING - 1,
+            self.win.addstr(lineno,
+                            len(line) - STATE_WIDTH + BORDER_SPACING - 1,
                             task.state, state_color | attr)
         if task.ready:
             task.visited = time.time()
 
-
     def draw(self):
         win = self.win
         self.handle_keypress()
@@ -391,7 +391,8 @@ class CursesMonitor(object):
 
         # Help
         self.safe_add_str(my - 2, x, self.help_title, curses.A_BOLD)
-        self.safe_add_str(my - 2, x + len(self.help_title), self.help, curses.A_DIM)
+        self.safe_add_str(my - 2, x + len(self.help_title), self.help,
+                          curses.A_DIM)
         win.refresh()
 
     def safe_add_str(self, y, x, string, *args, **kwargs):

+ 2 - 21
celery/result.py

@@ -378,25 +378,6 @@ class TaskSetResult(object):
                                         interval=interval))
         return results
 
-
-
-
-        while True:
-            for position, pending_result in enumerate(self.subtasks):
-                state = pending_result.state
-                if state in states.READY_STATES:
-                    if propagate and state in states.PROPAGATE_STATES:
-                        raise pending_result.result
-                    results[position] = pending_result.result
-            if results.full():
-                # Make list copy, so the returned type is not a position
-                # queue.
-                return list(results)
-            else:
-                if (timeout is not None and
-                        time.time() >= time_start + timeout):
-                    raise TimeoutError("join operation timed out.")
-
     def iter_native(self, timeout=None):
         backend = self.subtasks[0].backend
         ids = [subtask.task_id for subtask in self.subtasks]
@@ -462,7 +443,8 @@ class EagerResult(BaseAsyncResult):
         self._traceback = traceback
 
     def __reduce__(self):
-        return (self.__class__, (self.task_id, self._result, self._state,  self._traceback))
+        return (self.__class__, (self.task_id, self._result,
+                                 self._state, self._traceback))
 
     def successful(self):
         """Returns :const:`True` if the task executed without failure."""
@@ -506,4 +488,3 @@ class EagerResult(BaseAsyncResult):
     def status(self):
         """The tasks status (alias to :attr:`state`)."""
         return self._state
-

+ 0 - 1
celery/tests/__init__.py

@@ -29,7 +29,6 @@ def teardown():
                 remaining_threads))
 
 
-
 def find_distribution_modules(name=__name__, file=__file__):
     current_dist_depth = len(name.split(".")) - 1
     current_dist = os.path.join(os.path.dirname(file),

+ 1 - 2
celery/tests/test_app.py

@@ -207,11 +207,10 @@ class test_App(unittest.TestCase):
         self.assertIn("bar_exchange", amqp._exchanges_declared)
 
 
-
 class test_BaseApp(unittest.TestCase):
 
     def test_on_init(self):
-        app = BaseApp()
+        BaseApp()
 
 
 class test_defaults(unittest.TestCase):

+ 0 - 2
celery/tests/test_backends/test_amqp.py

@@ -222,7 +222,6 @@ class test_AMQPBackend(unittest.TestCase):
         self.assertRaises(socket.timeout, list,
                           b.get_many(uuids, timeout=0.01))
 
-
     def test_test_get_many_raises_outer_block(self):
 
         class Backend(AMQPBackend):
@@ -243,7 +242,6 @@ class test_AMQPBackend(unittest.TestCase):
         b = Backend()
         self.assertRaises(KeyError, b.get_many(["id1"]).next)
 
-
     def test_no_expires(self):
         b = self.create_backend(expires=None)
         app = app_or_default()

+ 1 - 2
celery/tests/test_bin/test_celerybeat.py

@@ -12,6 +12,7 @@ from celery.utils.compat import defaultdict
 
 from celery.tests.utils import unittest
 
+
 class MockedShelveModule(object):
     shelves = defaultdict(lambda: {})
 
@@ -101,7 +102,6 @@ class test_Beat(unittest.TestCase):
         MockService.in_sync = False
 
     def test_setup_logging(self):
-        import sys
         b = beatapp.Beat()
         b.redirect_stdouts = False
         b.setup_logging()
@@ -128,7 +128,6 @@ class test_Beat(unittest.TestCase):
     def test_use_pidfile(self, stdout, stderr):
         from celery import platforms
 
-
         class create_pidlock(object):
             instance = [None]
 

+ 1 - 4
celery/tests/test_bin/test_celeryd.py

@@ -114,7 +114,6 @@ class test_Worker(unittest.TestCase):
         worker.init_loader()
         worker.run()
 
-
     @disable_stdouts
     def test_purge_messages(self):
         self.Worker().purge_messages()
@@ -194,7 +193,6 @@ class test_Worker(unittest.TestCase):
     def test_use_pidfile(self):
         from celery import platforms
 
-
         class create_pidlock(object):
             instance = [None]
 
@@ -228,9 +226,8 @@ class test_Worker(unittest.TestCase):
         self.assertRaises(AttributeError, getattr, sys.stdout, "logger")
 
     def test_redirect_stdouts_already_handled(self):
-        from celery import signals
-
         logging_setup = [False]
+
         def on_logging_setup(**kwargs):
             logging_setup[0] = True
 

+ 0 - 3
celery/tests/test_bin/test_celeryev.py

@@ -1,8 +1,5 @@
-import sys
-
 from celery.app import app_or_default
 from celery.bin import celeryev
-from celery.utils.functional import wraps
 
 from celery.tests.utils import unittest
 from celery.tests.utils import patch

+ 8 - 9
celery/tests/test_concurrency_base.py

@@ -19,10 +19,10 @@ class test_BasePool(unittest.TestCase):
 
             return callback
 
-        res = apply_target(gen_callback("target", 42),
-                           args=(8, 16),
-                           callback=gen_callback("callback"),
-                           accept_callback=gen_callback("accept_callback"))
+        apply_target(gen_callback("target", 42),
+                     args=(8, 16),
+                     callback=gen_callback("callback"),
+                     accept_callback=gen_callback("accept_callback"))
 
         self.assertDictEqual(scratch,
                              {"accept_callback": (0, ()),
@@ -31,10 +31,10 @@ class test_BasePool(unittest.TestCase):
 
         # No accept callback
         scratch.clear()
-        res2 = apply_target(gen_callback("target", 42),
-                            args=(8, 16),
-                            callback=gen_callback("callback"),
-                            accept_callback=None)
+        apply_target(gen_callback("target", 42),
+                     args=(8, 16),
+                     callback=gen_callback("callback"),
+                     accept_callback=None)
         self.assertDictEqual(scratch,
                               {"target": (3, (8, 16)),
                                "callback": (4, (42, ))})
@@ -56,4 +56,3 @@ class test_BasePool(unittest.TestCase):
         self.assertFalse(p.active)
         p._state = p.RUN
         self.assertTrue(p.active)
-

+ 0 - 2
celery/tests/test_concurrency_evlet.py

@@ -4,7 +4,6 @@ import sys
 from nose import SkipTest
 
 from celery.tests.utils import unittest
-from celery.tests.utils import patch
 
 
 class EventletCase(unittest.TestCase):
@@ -17,7 +16,6 @@ class EventletCase(unittest.TestCase):
                 "eventlet not installed, skipping related tests.")
 
 
-
 class test_eventlet_patch(EventletCase):
 
     def test_is_patched(self):

+ 2 - 4
celery/tests/test_events.py

@@ -77,7 +77,6 @@ class TestEventDispatcher(unittest.TestCase):
             self.assertEqual(dispatcher.publisher.serializer,
                             self.app.conf.CELERY_EVENT_SERIALIZER)
 
-
             created_channel = dispatcher.publisher.channel
             dispatcher.disable()
             dispatcher.disable()  # Disable with no active publisher
@@ -155,6 +154,7 @@ class TestEventReceiver(unittest.TestCase):
         channel = connection.channel()
         try:
             events_received = [0]
+
             def handler(event):
                 events_received[0] += 1
 
@@ -168,7 +168,7 @@ class TestEventReceiver(unittest.TestCase):
             for ev in evs:
                 producer.send(ev)
             it = r.itercapture(limit=4, wakeup=True)
-            consumer = it.next()
+            it.next()  # skip consumer (see itercapture)
             list(it)
             self.assertEqual(events_received[0], 4)
         finally:
@@ -176,13 +176,11 @@ class TestEventReceiver(unittest.TestCase):
             connection.close()
 
 
-
 class test_misc(unittest.TestCase):
 
     def setUp(self):
         self.app = app_or_default()
 
-
     def test_State(self):
         state = self.app.events.State()
         self.assertDictEqual(dict(state.workers), {})

+ 0 - 2
celery/tests/test_events_snapshot.py

@@ -95,7 +95,6 @@ class test_Polaroid(unittest.TestCase):
         self.assertEqual(shutter_signal_sent[0], 1)
 
 
-
 class test_evcam(unittest.TestCase):
 
     class MockReceiver(object):
@@ -110,7 +109,6 @@ class test_evcam(unittest.TestCase):
         def Receiver(self, *args, **kwargs):
             return test_evcam.MockReceiver()
 
-
     def setUp(self):
         self.app = app_or_default()
         self.app.events = self.MockEvents()

+ 0 - 4
celery/tests/test_loaders.py

@@ -157,8 +157,6 @@ class TestLoaderBase(unittest.TestCase):
                          ["broker.port=foobar"])
 
 
-
-
 class TestDefaultLoader(unittest.TestCase):
 
     def test_wanted_module_item(self):
@@ -257,5 +255,3 @@ class test_AppLoader(unittest.TestCase):
         sys.modules.pop("subprocess", None)
         self.loader.on_worker_init()
         self.assertIn("subprocess", sys.modules)
-
-

+ 0 - 1
celery/tests/test_result.py

@@ -379,4 +379,3 @@ class TestEagerResult(unittest.TestCase):
     def test_revoke(self):
         res = RaisingTask.apply(args=[3, 3])
         self.assertFalse(res.revoke())
-

+ 0 - 1
celery/tests/test_states.py

@@ -28,4 +28,3 @@ class test_state_precedence(unittest.TestCase):
         self.assertTrue(state(states.REVOKED) < state("CRASHED"))
         self.assertTrue(state(states.REVOKED) <= state("CRASHED"))
         self.assertTrue(state("CRASHED") >= state(states.REVOKED))
-

+ 0 - 4
celery/tests/test_task.py

@@ -335,7 +335,6 @@ class TestCeleryTasks(unittest.TestCase):
         T1 = self.createTaskCls("T1", "c.unittest.t.t1")
         conn = T1.app.broker_connection()
         chan = conn.channel()
-        prev = T1.app.conf.CELERY_SEND_TASK_SENT_EVENT
         T1.app.conf.CELERY_SEND_TASK_SENT_EVENT = True
         dispatcher = [None]
 
@@ -354,8 +353,6 @@ class TestCeleryTasks(unittest.TestCase):
 
         self.assertTrue(dispatcher[0])
 
-
-
     def test_get_publisher(self):
         from celery.app import amqp
         old_pub = amqp.TaskPublisher
@@ -415,7 +412,6 @@ class TestCeleryTasks(unittest.TestCase):
         self.assertTrue(logger)
 
 
-
 class TestTaskSet(unittest.TestCase):
 
     @with_eager_tasks

+ 2 - 3
celery/tests/test_task_builtins.py

@@ -1,6 +1,5 @@
 import warnings
 
-from celery.app import app_or_default
 from celery.task import ping, PingTask, backend_cleanup
 from celery.tests.compat import catch_warnings
 from celery.tests.utils import unittest, execute_context
@@ -34,8 +33,8 @@ class test_deprecated(unittest.TestCase):
 
         def block(log):
             from celery.task.base import TaskSet, subtask
-            x = TaskSet()
-            y = subtask(PingTask)
+            TaskSet()
+            subtask(PingTask)
             return log[0].message, log[1].message
 
         for w in execute_context(catch_warnings(record=True), block):

+ 0 - 1
celery/tests/test_task_control.py

@@ -109,7 +109,6 @@ class test_inspect(unittest.TestCase):
         self.assertIn("cancel_consumer", MockMailbox.sent)
 
 
-
 class test_Broadcast(unittest.TestCase):
 
     def setUp(self):

+ 1 - 1
celery/tests/test_utils.py

@@ -116,7 +116,7 @@ class test_utils(unittest.TestCase):
         def fun(obj):
             return fun.value
 
-        x =  utils.cached_property(fun)
+        x = utils.cached_property(fun)
         self.assertIs(x.__get__(None), x)
         self.assertIs(x.__set__(None, None), x)
         self.assertIs(x.__delete__(None), x)

+ 0 - 3
celery/tests/test_worker.py

@@ -428,7 +428,6 @@ class test_Consumer(unittest.TestCase):
         self.assertRaises(IndexError, l.maybe_conn_error,
                 raises(IndexError("foo")))
 
-
     def test_apply_eta_task(self):
         from celery.worker import state
         l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
@@ -648,8 +647,6 @@ class test_WorkController(unittest.TestCase):
             platforms.set_mp_process_title = psetproctitle
             default_app.set_current()
 
-
-
     def test_with_rate_limits_disabled(self):
         worker = WorkController(concurrency=1, loglevel=0,
                                 disable_rate_limits=True)

+ 0 - 2
celery/tests/test_worker_autoscale.py

@@ -94,8 +94,6 @@ class test_Autoscaler(unittest.TestCase):
         self.assertTrue(x._stopped.isSet())
         self.assertTrue(x.scale_called)
 
-
-
     def test_shrink_raises_exception(self):
         x = autoscale.Autoscaler(self.pool, 10, 3, logger=logger)
         x.scale_up(3)

+ 0 - 2
celery/tests/test_worker_control.py

@@ -121,7 +121,6 @@ class test_ControlPanel(unittest.TestCase):
     def test_active(self):
         from celery.worker import state
         from celery.worker.job import TaskRequest
-        from celery.task import PingTask
 
         r = TaskRequest(PingTask.name, "do re mi", (), {})
         state.active_requests.add(r)
@@ -178,7 +177,6 @@ class test_ControlPanel(unittest.TestCase):
         panel.handle("cancel_consumer", {"queue": "MyQueue"})
         self.assertIn("MyQueue", consumer.task_consumer.cancelled)
 
-
     def test_revoked(self):
         from celery.worker import state
         state.revoked.clear()

+ 0 - 1
celery/tests/test_worker_job.py

@@ -197,7 +197,6 @@ class test_TaskRequest(unittest.TestCase):
         finally:
             mytask.ignore_result = False
 
-
     def test_send_email(self):
         app = app_or_default()
         old_mail_admins = app.mail_admins

+ 1 - 3
celery/tests/utils.py

@@ -63,7 +63,6 @@ def fallback_contextmanager(fun):
 def execute_context(context, fun):
     val = context.__enter__()
     exc_info = (None, None, None)
-    retval = None
     try:
         try:
             return fun(val)
@@ -124,6 +123,7 @@ def with_environ(env_name, env_value):
         return _patch_environ
     return _envpatched
 
+
 def sleepdeprived(module=time):
 
     def _sleepdeprived(fun):
@@ -261,5 +261,3 @@ def patch(module, name, mocked):
                 setattr(module, name, prev)
         return __patched
     return _patch
-
-

+ 3 - 3
celery/worker/control/builtins.py

@@ -2,7 +2,7 @@ import sys
 from datetime import datetime
 
 from celery.registry import tasks
-from celery.utils import timeutils, LOG_LEVELS
+from celery.utils import timeutils
 from celery.worker import state
 from celery.worker.state import revoked
 from celery.worker.control.registry import Panel
@@ -203,9 +203,9 @@ def cancel_consumer(panel, queue=None, **_):
     cset = panel.consumer.task_consumer
     cset.cancel_by_queue(queue)
     return {"ok": "no longer consuming from %s" % (queue, )}
-    
+
+
 @Panel.register
 def worker_queues(panel):
     """Returns the queues associated with each worker."""
     return dict(panel.consumer.queues.iteritems())
-

+ 7 - 0
contrib/release/flakesignore.txt

@@ -40,4 +40,11 @@ celery/utils/compat.py:(.+?): 'UserList' imported but unused
 celery/utils/compat.py:(.+?): redefinition of unused 'UserDict' from line 10
 celery/utils/compat.py:(.+?): 'UserDict' imported but unused
 celery/utils/compat.py:(.+?): redefinition of unused 'OrderedDict' from line 260
+celery/app/amqp.py:(.+?): redefinition of function 'queues' from line
+celery/backends/pyredis.py:(.+?): redefinition of function 'client' from line 82
+celery/tests/utils.py:(.+?): redefinition of unused 'builtins' from line 14
+celery/utils/compat.py:(.+?): redefinition of unused 'StringIO' from line 15
+celery/utils/compat.py:(.+?): redefinition of unused 'StringIO' from line 18
+celery/utils/compat.py:(.+?): 'StringIO' imported but unused
+celery/utils/compat.py:(.+?): redefinition of unused 'OrderedDict' from line 268