浏览代码

AsynPool: Smarter flush after reconnect

Ask Solem 11 年之前
父节点
当前提交
547a224233
共有 2 个文件被更改,包括 13 次插入8 次删除
  1. 12 7
      celery/concurrency/asynpool.py
  2. 1 1
      funtests/stress/stress/templates.py

+ 12 - 7
celery/concurrency/asynpool.py

@@ -810,8 +810,9 @@ class AsynPool(_pool.Pool):
                             except KeyError:
                             except KeyError:
                                 pass
                                 pass
                             else:
                             else:
-                                if job._write_to.exitcode is None:
-                                    self._flush_writer(gen)
+                                job_proc = job._write_to
+                                if job_proc.exitcode is None:
+                                    self._flush_writer(job_proc.inq, gen)
                     # workers may have exited in the meantime.
                     # workers may have exited in the meantime.
                     self.maintain_pool()
                     self.maintain_pool()
                     sleep(next(intervals))  # don't busyloop
                     sleep(next(intervals))  # don't busyloop
@@ -821,12 +822,16 @@ class AsynPool(_pool.Pool):
             self._active_writes.clear()
             self._active_writes.clear()
             self._busy_workers.clear()
             self._busy_workers.clear()
 
 
-    def _flush_writer(self, writer):
+    def _flush_writer(self, inq, writer):
+        fds = set([inq._writer])
         try:
         try:
-            list(writer)
-        except (OSError, IOError) as exc:
-            if get_errno(exc) != errno.EBADF:
-                raise
+            while fds:
+                _, writable, again = _select(writers=fds, timeout=0.5)
+                if not again and writable:
+                    try:
+                        next(writer)
+                    except (StopIteration, OSError, IOError, EOFError):
+                        break
         finally:
         finally:
             self._active_writers.discard(writer)
             self._active_writers.discard(writer)
 
 

+ 1 - 1
funtests/stress/stress/templates.py

@@ -58,7 +58,7 @@ class default(object):
     ]
     ]
     BROKER_URL = os.environ.get('CSTRESS_BROKER', 'amqp://')
     BROKER_URL = os.environ.get('CSTRESS_BROKER', 'amqp://')
     CELERY_RESULT_BACKEND = os.environ.get('CSTRESS_BACKEND', 'rpc://')
     CELERY_RESULT_BACKEND = os.environ.get('CSTRESS_BACKEND', 'rpc://')
-    CELERYD_PREFETCH_MULTIPLIER = int(os.environ.get('CSTRESS_PREFETCH', 1))
+    CELERYD_PREFETCH_MULTIPLIER = int(os.environ.get('CSTRESS_PREFETCH', 10))
     CELERY_TASK_PUBLISH_RETRY_POLICY = {
     CELERY_TASK_PUBLISH_RETRY_POLICY = {
         'max_retries': 100,
         'max_retries': 100,
         'interval_max': 2,
         'interval_max': 2,