瀏覽代碼

explicitly drain events when gossip/heartbeat will not - fix for #1847

kudos for @sabw8217 who's fix i copied verbatim.
Bryan Helmig 9 年之前
父節點
當前提交
0308ce626b
共有 2 個文件被更改,包括 9 次插入0 次删除
  1. 4 0
      celery/tests/worker/test_loops.py
  2. 5 0
      celery/worker/loops.py

+ 4 - 0
celery/tests/worker/test_loops.py

@@ -120,6 +120,10 @@ class test_asynloop(AppCase):
             return x + y
         self.add = add
 
+    def test_drain_after_consume(self):
+        x, _ = get_task_callback(self.app)
+        x.connection.drain_events.assert_called_with()
+
     def test_setup_heartbeat(self):
         x = X(self.app, heartbeat=10)
         x.hub.call_repeatedly = Mock(name='x.hub.call_repeatedly()')

+ 5 - 0
celery/worker/loops.py

@@ -47,6 +47,11 @@ def asynloop(obj, connection, consumer, blueprint, hub, qos,
     if not obj.restart_count and not obj.pool.did_start_ok():
         raise WorkerLostError('Could not start worker processes')
 
+    # consumer.consume() may have prefetched up to our
+    # limit - drain an event so we are in a clean state
+    # prior to starting our event loop.
+    connection.drain_events()
+
     # FIXME: Use loop.run_forever
     # Tried and works, but no time to test properly before release.
     hub.propagate_errors = errors