瀏覽代碼

Compatible with recent kombu changes

Ask Solem 11 年之前
父節點
當前提交
1c062d4c92
共有 3 個文件被更改,包括 11 次插入3 次删除
  1. 1 0
      celery/tests/worker/test_hub.py
  2. 4 1
      celery/worker/components.py
  3. 6 2
      celery/worker/loops.py

+ 1 - 0
celery/tests/worker/test_hub.py

@@ -143,6 +143,7 @@ class test_Hub(Case):
 
         poller = hub.poller
         hub.stop()
+        hub.close()
         poller.close.assert_called_with()
 
     def test_fire_timers(self):

+ 4 - 1
celery/worker/components.py

@@ -71,7 +71,10 @@ class Hub(bootsteps.StartStopStep):
         pass
 
     def stop(self, w):
-        pass
+        w.hub.close()
+
+    def terminate(self, w):
+        w.hub.close()
 
     def _patch_thread_primitives(self, w):
         # make clock use dummy lock

+ 6 - 2
celery/worker/loops.py

@@ -48,7 +48,11 @@ def asynloop(obj, connection, consumer, blueprint, hub, qos,
     # maxtasksperchild will mess up metrics.
     if not obj.restart_count and not obj.pool.did_start_ok():
         raise WorkerLostError('Could not start worker processes')
-    loop = hub._loop(propagate=errors)
+
+    # FIXME: Use loop.run_forever
+    # Tried and works, but no time to test properly before release.
+    hub.propagate_errors = errors
+    loop = hub.create_loop()
 
     try:
         while blueprint.state == RUN and obj.connection:
@@ -67,7 +71,7 @@ def asynloop(obj, connection, consumer, blueprint, hub, qos,
             try:
                 next(loop)
             except StopIteration:
-                loop = hub._loop(propagate=errors)
+                loop = hub.create_loop()
     finally:
         try:
             hub.close()