Sfoglia il codice sorgente

Fix for --autoscale

Ask Solem 12 anni fa
parent
commit
4f7c239eb8
2 ha cambiato i file con 60 aggiunte e 12 eliminazioni
  1. 23 6
      celery/concurrency/processes.py
  2. 37 6
      funtests/stress/stress.py

+ 23 - 6
celery/concurrency/processes.py

@@ -148,9 +148,9 @@ class ResultHandler(_pool.ResultHandler):
 
     def __init__(self, *args, **kwargs):
         self.fileno_to_outq = kwargs.pop('fileno_to_outq')
-        self.on_worker_alive = kwargs.pop('on_worker_alive')
+        self.on_process_alive = kwargs.pop('on_process_alive')
         super(ResultHandler, self).__init__(*args, **kwargs)
-        self.state_handlers[WORKER_UP] = self.on_worker_alive
+        self.state_handlers[WORKER_UP] = self.on_process_alive
 
     def _process_result(self):
         fileno_to_outq = self.fileno_to_outq
@@ -258,6 +258,23 @@ class AsynPool(_pool.Pool):
         return next(q for q, owner in items(self._queues)
                     if owner is None)
 
+    def on_grow(self, n):
+        self._queues.update(
+            dict((self.create_process_queues(), None)
+                for _ in range(self._processes - len(self._queues)))
+        )
+
+    def on_shrink(self, n):
+        queues = self._queues
+        for i in range(n):
+            if len(queues) > self._processes:
+                try:
+                    queues.pop(next(
+                        q for q, owner in items(queues) if owner is None
+                    ), None)
+                except StopIteration:
+                    break
+
     def create_process_queues(self):
         inq, outq, synq = _SimpleQueue(), _SimpleQueue(), None
         inq._writer.setblocking(0)
@@ -266,7 +283,7 @@ class AsynPool(_pool.Pool):
             synq._writer.setblocking(0)
         return inq, outq, synq
 
-    def on_worker_alive(self, pid):
+    def on_process_alive(self, pid):
         try:
             proc = next(w for w in self._pool if w.pid == pid)
         except StopIteration:
@@ -299,7 +316,7 @@ class AsynPool(_pool.Pool):
     def create_result_handler(self):
         return super(AsynPool, self).create_result_handler(
             fileno_to_outq=self._fileno_to_outq,
-            on_worker_alive=self.on_worker_alive,
+            on_process_alive=self.on_process_alive,
         )
 
     def _process_register_queues(self, proc, queues):
@@ -338,7 +355,7 @@ class AsynPool(_pool.Pool):
                         if not sock.closed:
                             sock.close()
                             #os.close(sock.fileno())
-            self._queues.pop((proc.inq, proc.outq, proc.synq))
+            self._queues.pop((proc.inq, proc.outq, proc.synq), None)
             self._queues[self.create_process_queues()] = None
             self.on_inqueue_close(proc.inqW_fd)
 
@@ -567,9 +584,9 @@ class TaskPool(BasePool):
                 except KeyError:
                     put_message(job)
                     raise StopIteration()
-                send_offset = proc.inq._writer.send_offset
                 # job result keeps track of what process the job is sent to.
                 job._write_to = proc
+                send_offset = proc.inq._writer.send_offset
 
                 Hw = Bw = 0
                 while Hw < 4:

+ 37 - 6
funtests/stress/stress.py

@@ -8,7 +8,7 @@ import sys
 from time import time, sleep
 
 from celery import Celery, group
-from celery.exceptions import TimeoutError
+from celery.exceptions import TimeoutError, SoftTimeLimitExceeded
 from celery.five import range
 from celery.utils.debug import blockdetection
 
@@ -38,6 +38,11 @@ celery.conf.update(
 )
 
 
+@celery.task
+def _marker(s, sep='-'):
+    print('{0} {1} {2}'.format(sep * 3, s, sep * 3))
+
+
 @celery.task
 def add(x, y):
     return x + y
@@ -65,6 +70,14 @@ def sleeping(i):
     sleep(i)
 
 
+@celery.task
+def sleeping_ignore_limits(i):
+    try:
+        sleep(i)
+    except SoftTimeLimitExceeded:
+        sleep(i)
+
+
 @celery.task
 def segfault():
     import ctypes
@@ -72,6 +85,11 @@ def segfault():
     assert False, 'should not get here'
 
 
+def marker(s, sep='-'):
+    print('{0}{1}'.format(sep, s))
+    _marker.delay(s, sep)
+
+
 class Stresstests(object):
 
     def __init__(self, app, block_timeout=30 * 60):
@@ -80,14 +98,18 @@ class Stresstests(object):
         self.block_timeout = block_timeout
 
     def run(self, n=50):
-        tests = [#self.manyshort,
+        marker('Stresstest suite start', '+')
+        tests = [self.manyshort,
                  self.termbysig,
                  self.bigtasks,
                  self.smalltasks,
                  self.revoketermfast,
+                 self.timelimits,
+                 self.timelimits_soft,
                  self.revoketermslow]
         for test in tests:
             self.runtest(test, n)
+        marker('Stresstest suite end', '+')
 
     def manyshort(self):
         self.join(group(add.s(i, i) for i in xrange(1000))())
@@ -97,7 +119,7 @@ class Stresstests(object):
             t = time()
             i = 0
             failed = False
-            print('-%s(%s)' % (fun.__name__, n))
+            marker('{0}({1})'.format(fun.__name__, n))
             try:
                 for i in range(n):
                     print(i)
@@ -117,9 +139,18 @@ class Stresstests(object):
     def termbysegfault(self):
         self._evil_groupmember(segfault)
 
-    def _evil_groupmember(self, evil_t):
-        g1 = group(add.s(2, 2), evil_t.s(), add.s(4, 4), add.s(8, 8))
-        g2 = group(add.s(3, 3), add.s(5, 5), evil_t.s(), add.s(7, 7))
+    def timelimits(self):
+        self._evil_groupmember(sleeping, 2, timeout=1)
+
+    def timelimits_soft(self):
+        self._evil_groupmember(sleeping_ignore_limits, 2,
+                               soft_timeout=1, timeout=1.1)
+
+    def _evil_groupmember(self, evil_t, *eargs, **opts):
+        g1 = group(add.s(2, 2).set(**opts), evil_t.s(*eargs).set(**opts),
+                   add.s(4, 4).set(**opts), add.s(8, 8).set(**opts))
+        g2 = group(add.s(3, 3).set(**opts), add.s(5, 5).set(**opts),
+                   evil_t.s(*eargs).set(**opts), add.s(7, 7).set(**opts))
         self.join(g1(), timeout=10)
         self.join(g2(), timeout=10)