瀏覽代碼

PEP8ify + pyflakes + tests passing

Ask Solem 14 年之前
父節點
當前提交
7fd0af74d9

+ 1 - 1
celery/concurrency/processes/pool.py

@@ -88,6 +88,7 @@ class LaxBoundedSemaphore(threading._Semaphore):
 # Exceptions
 # Exceptions
 #
 #
 
 
+
 class MaybeEncodingError(Exception):
 class MaybeEncodingError(Exception):
     """Wraps unpickleable object."""
     """Wraps unpickleable object."""
 
 
@@ -116,7 +117,6 @@ def soft_timeout_sighandler(signum, frame):
 #
 #
 
 
 
 
-
 def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None):
 def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None):
     pid = os.getpid()
     pid = os.getpid()
     assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0)
     assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0)

+ 0 - 2
celery/loaders/base.py

@@ -20,7 +20,6 @@ Please set this variable and make it point to
 a configuration module.""")
 a configuration module.""")
 
 
 
 
-
 class BaseLoader(object):
 class BaseLoader(object):
     """The base class for loaders.
     """The base class for loaders.
 
 
@@ -101,7 +100,6 @@ class BaseLoader(object):
         self._conf = obj
         self._conf = obj
         return True
         return True
 
 
-
     def cmdline_config_parser(self, args, namespace="celery",
     def cmdline_config_parser(self, args, namespace="celery",
                 re_type=re.compile(r"\((\w+)\)"),
                 re_type=re.compile(r"\((\w+)\)"),
                 extra_types={"json": deserialize},
                 extra_types={"json": deserialize},

+ 3 - 3
celery/tests/test_worker/test_worker.py

@@ -13,6 +13,7 @@ from celery.concurrency.base import BasePool
 from celery.exceptions import SystemTerminate
 from celery.exceptions import SystemTerminate
 from celery.task import task as task_dec
 from celery.task import task as task_dec
 from celery.task import periodic_task as periodic_task_dec
 from celery.task import periodic_task as periodic_task_dec
+from celery.utils import timer2
 from celery.utils import gen_unique_id
 from celery.utils import gen_unique_id
 from celery.worker import WorkController
 from celery.worker import WorkController
 from celery.worker.buckets import FastQueue
 from celery.worker.buckets import FastQueue
@@ -369,14 +370,13 @@ class test_Consumer(unittest.TestCase):
         l.event_dispatcher = MockEventDispatcher()
         l.event_dispatcher = MockEventDispatcher()
         l.pidbox_node = MockNode()
         l.pidbox_node = MockNode()
 
 
-        from celery.worker import consumer
-        prev, consumer.to_timestamp = consumer.to_timestamp, to_timestamp
+        prev, timer2.to_timestamp = timer2.to_timestamp, to_timestamp
         try:
         try:
             l.receive_message(m.decode(), m)
             l.receive_message(m.decode(), m)
             self.assertTrue(m.acknowledged)
             self.assertTrue(m.acknowledged)
             self.assertTrue(called[0])
             self.assertTrue(called[0])
         finally:
         finally:
-            consumer.to_timestamp = prev
+            timer2.to_timestamp = prev
 
 
     def test_receive_message_InvalidTaskError(self):
     def test_receive_message_InvalidTaskError(self):
         logger = MockLogger()
         logger = MockLogger()

+ 22 - 66
celery/tests/test_worker/test_worker_heartbeat.py

@@ -25,84 +25,40 @@ class MockDispatcherRaising(object):
             raise Exception("foo")
             raise Exception("foo")
 
 
 
 
-class MockHeart(Heart):
-    _alive = True
-    _joined = False
+class MockTimer(object):
 
 
-    def isAlive(self):
-        return self._alive
+    def apply_interval(self, msecs, fun, args=(), kwargs={}):
 
 
-    def join(self, timeout=None):
-        self._joined = True
+        class entry(tuple):
+            cancelled = False
+
+            def cancel(self):
+                self.cancelled = True
+
+        return entry((msecs, fun, args, kwargs))
 
 
 
 
 class TestHeart(unittest.TestCase):
 class TestHeart(unittest.TestCase):
 
 
     def test_stop(self):
     def test_stop(self):
+        timer = MockTimer()
         eventer = MockDispatcher()
         eventer = MockDispatcher()
-        h = MockHeart(eventer, interval=1)
-        h._state = "RUN"
+        h = Heart(timer, eventer, interval=1)
+        h.start()
+        self.assertTrue(h.tref)
         h.stop()
         h.stop()
-        self.assertTrue(h._joined)
-
-        h2 = MockHeart(eventer, interval=1)
-        h2._alive = False
-        h2._state = "RUN"
-        h2.stop()
-        self.assertFalse(h2._joined)
-
-    def test_time_raises_TypeError(self):
-        from celery.worker import heartbeat
-
-        def raises_TypeError(exc):
-            raise TypeError("x")
-
-        prev_time, heartbeat.time = heartbeat.time, raises_TypeError
-        try:
-            eventer = MockDispatcher()
-            heart = Heart(eventer, interval=0.1)
-            heart.run()
-            self.assertIn("worker-online", eventer.sent)
-            self.assertNotIn("worker-heartbeat", eventer.sent)
-
-        finally:
-            heartbeat.time = prev_time
+        self.assertIsNone(h.tref)
 
 
     @sleepdeprived
     @sleepdeprived
     def test_run_manages_cycle(self):
     def test_run_manages_cycle(self):
         eventer = MockDispatcher()
         eventer = MockDispatcher()
-        heart = Heart(eventer, interval=0.1)
+        heart = Heart(MockTimer(), eventer, interval=0.1)
         eventer.heart = heart
         eventer.heart = heart
-        heart.run()
-        self.assertEqual(heart._state, "RUN")
-        self.assertTrue(heart._shutdown.isSet())
-        heart._shutdown.clear()
-        heart._stopped.clear()
-        eventer.next_iter = 0
-        heart.run()
-
-    def test_run(self):
-        eventer = MockDispatcher()
-
-        heart = Heart(eventer, interval=1)
-        heart._shutdown.set()
-        heart.run()
-        self.assertEqual(heart._state, "RUN")
-        self.assertIn("worker-online", eventer.sent)
-        self.assertIn("worker-heartbeat", eventer.sent)
-        self.assertIn("worker-offline", eventer.sent)
-
-        heart.stop()
+        heart.start()
+        msecs, fun, args, kwargs = tref = heart.tref
+        self.assertEqual(msecs, 0.1 * 1000)
+        self.assertEqual(tref.fun, eventer.send)
+        self.assertTrue(tref.args)
+        self.assertTrue(tref.kwargs)
         heart.stop()
         heart.stop()
-        self.assertEqual(heart._state, "CLOSE")
-
-        heart = Heart(eventer, interval=0.00001)
-        heart._shutdown.set()
-        for i in range(10):
-            heart.run()
-
-    def test_run_exception(self):
-        eventer = MockDispatcherRaising()
-        heart = Heart(eventer, interval=1)
-        heart._shutdown.set()
-        self.assertRaises(Exception, heart.run)
+        self.assertTrue(tref.cancelled)

+ 7 - 5
celery/utils/compat.py

@@ -505,18 +505,19 @@ else:
             (A file has changed if its device or inode have changed.)
             (A file has changed if its device or inode have changed.)
             If it has changed, the old file stream is closed, and the file
             If it has changed, the old file stream is closed, and the file
             opened to get a new stream.
             opened to get a new stream.
-        
+
             This handler is not appropriate for use under Windows, because
             This handler is not appropriate for use under Windows, because
             under Windows open files cannot be moved or renamed - logging
             under Windows open files cannot be moved or renamed - logging
             opens the files with exclusive locks - and so there is no need
             opens the files with exclusive locks - and so there is no need
             for such a handler. Furthermore, ST_INO is not supported under
             for such a handler. Furthermore, ST_INO is not supported under
             Windows; stat always returns zero for this value.
             Windows; stat always returns zero for this value.
-        
+
             This handler is based on a suggestion and patch by Chad J.
             This handler is based on a suggestion and patch by Chad J.
             Schroeder.
             Schroeder.
             """
             """
             def __init__(self, filename, mode='a', encoding=None, delay=0):
             def __init__(self, filename, mode='a', encoding=None, delay=0):
-                logging.FileHandler.__init__(self, filename, mode, encoding, delay)
+                logging.FileHandler.__init__(self, filename, mode,
+                                             encoding, delay)
                 if not os.path.exists(self.baseFilename):
                 if not os.path.exists(self.baseFilename):
                     self.dev, self.ino = -1, -1
                     self.dev, self.ino = -1, -1
                 else:
                 else:
@@ -526,7 +527,7 @@ else:
             def emit(self, record):
             def emit(self, record):
                 """
                 """
                 Emit a record.
                 Emit a record.
-        
+
                 First check if the underlying file has changed, and if it
                 First check if the underlying file has changed, and if it
                 has, close the old stream and reopen the file to get the
                 has, close the old stream and reopen the file to get the
                 current stream.
                 current stream.
@@ -536,7 +537,8 @@ else:
                     changed = 1
                     changed = 1
                 else:
                 else:
                     stat = os.stat(self.baseFilename)
                     stat = os.stat(self.baseFilename)
-                    changed = (stat[ST_DEV] != self.dev) or (stat[ST_INO] != self.ino)
+                    changed = ((stat[ST_DEV] != self.dev) or
+                               (stat[ST_INO] != self.ino))
                 if changed and self.stream is not None:
                 if changed and self.stream is not None:
                     self.stream.flush()
                     self.stream.flush()
                     self.stream.close()
                     self.stream.close()

+ 3 - 2
celery/worker/consumer.py

@@ -157,8 +157,9 @@ class QoS(object):
         if pcount != self.prev:
         if pcount != self.prev:
             new_value = pcount
             new_value = pcount
             if pcount > PREFETCH_COUNT_MAX:
             if pcount > PREFETCH_COUNT_MAX:
-                self.logger.warning("QoS: Disabled: prefetch_count exceeds %r" % (
-                    PREFETCH_COUNT_MAX, ))
+                self.logger.warning(
+                    "QoS: Disabled: prefetch_count exceeds %r" % (
+                        PREFETCH_COUNT_MAX, ))
                 new_value = 0
                 new_value = 0
             self.logger.debug("basic.qos: prefetch_count->%s" % new_value)
             self.logger.debug("basic.qos: prefetch_count->%s" % new_value)
             self.consumer.qos(prefetch_count=new_value)
             self.consumer.qos(prefetch_count=new_value)

+ 0 - 1
celery/worker/heartbeat.py

@@ -1,5 +1,4 @@
 from celery.worker.state import SOFTWARE_INFO
 from celery.worker.state import SOFTWARE_INFO
-from celery.utils import timer2
 
 
 
 
 class Heart(object):
 class Heart(object):

+ 1 - 0
contrib/release/flakesignore.txt

@@ -73,3 +73,4 @@ celery/beat.py:16: redefinition of unused 'multiprocessing' from line 14
 celery/worker/state.py:62: redefinition of function 'task_reserved' from line 36
 celery/worker/state.py:62: redefinition of function 'task_reserved' from line 36
 celery/worker/state.py:68: redefinition of function 'task_ready' from line 47
 celery/worker/state.py:68: redefinition of function 'task_ready' from line 47
 celery/beat.py:11: redefinition of unused 'multiprocessing' from line 9
 celery/beat.py:11: redefinition of unused 'multiprocessing' from line 9
+celery/utils/compat.py:498: redefinition of unused 'WatchedFileHandler' from line 496