Explorar o código

events.State.alive_workers now returns generator instead of list

Ask Solem %!s(int64=8) %!d(string=hai) anos
pai
achega
08651316f5

+ 1 - 1
celery/events/state.py

@@ -620,7 +620,7 @@ class State(object):
 
     def alive_workers(self):
         """Return a list of (seemingly) alive workers."""
-        return [w for w in values(self.workers) if w.alive]
+        return (w for w in values(self.workers) if w.alive)
 
     def __repr__(self):
         return R_STATE.format(self)

+ 5 - 5
celery/tests/events/test_state.py

@@ -430,10 +430,10 @@ class test_State(AppCase):
     def test_worker_online_offline(self):
         r = ev_worker_online_offline(State())
         next(r)
-        self.assertTrue(r.state.alive_workers())
+        self.assertTrue(list(r.state.alive_workers()))
         self.assertTrue(r.state.workers['utest1'].alive)
         r.play()
-        self.assertFalse(r.state.alive_workers())
+        self.assertFalse(list(r.state.alive_workers()))
         self.assertFalse(r.state.workers['utest1'].alive)
 
     def test_itertasks(self):
@@ -444,10 +444,10 @@ class test_State(AppCase):
     def test_worker_heartbeat_expire(self):
         r = ev_worker_heartbeats(State())
         next(r)
-        self.assertFalse(r.state.alive_workers())
+        self.assertFalse(list(r.state.alive_workers()))
         self.assertFalse(r.state.workers['utest1'].alive)
         r.play()
-        self.assertTrue(r.state.alive_workers())
+        self.assertTrue(list(r.state.alive_workers()))
         self.assertTrue(r.state.workers['utest1'].alive)
 
     def test_task_states(self):
@@ -596,7 +596,7 @@ class test_State(AppCase):
     def test_alive_workers(self):
         r = ev_snapshot(State())
         r.play()
-        self.assertEqual(len(r.state.alive_workers()), 3)
+        self.assertEqual(len(list(r.state.alive_workers())), 3)
 
     def test_tasks_by_worker(self):
         r = ev_snapshot(State())

+ 1 - 1
celery/worker/consumer/gossip.py

@@ -107,7 +107,7 @@ class Gossip(bootsteps.ConsumerStep):
             replies = self.consensus_replies[id]
         except KeyError:
             return  # not for us
-        alive_workers = self.state.alive_workers()
+        alive_workers = set(self.state.alive_workers())
         replies.append(event['hostname'])
 
         if len(replies) >= len(alive_workers):

+ 1 - 1
docs/userguide/extending.rst

@@ -415,7 +415,7 @@ Attributes
                 self.last_size = None
 
             def on_cluster_size_change(self, worker):
-                cluster_size = len(self.c.gossip.state.alive_workers())
+                cluster_size = len(list(self.c.gossip.state.alive_workers()))
                 if cluster_size != self.last_size:
                     for task in self.tasks:
                         task.rate_limit = 1.0 / cluster_size