Ask Solem 12 лет назад
Родитель
Сommit
b8c2341ae5
2 измененных файлов с 4 добавлено и 4 удалено
  1. 3 3
      celery/concurrency/processes.py
  2. 1 1
      funtests/stress/stress.py

+ 3 - 3
celery/concurrency/processes.py

@@ -37,7 +37,7 @@ from celery import platforms
 from celery import signals
 from celery._state import set_default_app
 from celery.concurrency.base import BasePool
-from celery.five import items
+from celery.five import items, values
 from celery.task import trace
 from celery.utils.log import get_logger
 from celery.worker.hub import READ, WRITE, ERR
@@ -642,9 +642,9 @@ class TaskPool(BasePool):
 
     def flush(self):
         # cancel all tasks that have not been accepted to that NACK is sent.
-        for job in values(self._pool._pool):
+        for job in values(self._pool._cache):
             if not job._accepted:
-                job.cancel()
+                job._cancel()
 
         # clear the outgoing buffer as the tasks will be redelivered by
         # the broker anyway.

+ 1 - 1
funtests/stress/stress.py

@@ -30,7 +30,7 @@ BIG = 'x' * 2 ** 20 * 8
 SMALL = 'e' * 1024
 
 celery = Celery(
-    'stress', broker='amqp://', backend='redis://',
+    'stress', broker='pyamqp://', backend='redis://',
     set_as_current=False,
 )