|
@@ -2,6 +2,7 @@ from __future__ import absolute_import, print_function, unicode_literals
|
|
|
|
|
|
import platform
|
|
|
import random
|
|
|
+import socket
|
|
|
|
|
|
from collections import namedtuple
|
|
|
from itertools import count
|
|
@@ -130,7 +131,7 @@ class Suite(object):
|
|
|
)
|
|
|
|
|
|
def manyshort(self):
|
|
|
- self.join(group(add.s(i, i) for i in range(1000))(), propagate=True)
|
|
|
+ self.join(group(add.s(i, i) for i in range(1000))(), timeout=10, propagate=True)
|
|
|
|
|
|
def runtest(self, fun, n=50, index=0, repeats=1):
|
|
|
with blockdetection(self.block_timeout):
|
|
@@ -217,10 +218,16 @@ class Suite(object):
|
|
|
return [res.id for res in r if res.id not in res.backend._cache]
|
|
|
|
|
|
def join(self, r, propagate=False, **kwargs):
|
|
|
+ received = [0]
|
|
|
+
|
|
|
+ def on_result(task_id, value):
|
|
|
+ received[0] += 1
|
|
|
+
|
|
|
while 1:
|
|
|
try:
|
|
|
- return r.get(propagate=propagate, **kwargs)
|
|
|
- except TimeoutError as exc:
|
|
|
+ return r.get(callback=on_result, propagate=propagate, **kwargs)
|
|
|
+ except (socket.timeout, TimeoutError) as exc:
|
|
|
+ print('RECEIVED {0}/{0}'.format(received[0], len(r)))
|
|
|
marker(
|
|
|
'Still waiting for {0!r}: {1!r}'.format(
|
|
|
self.missing_results(r), exc), '!')
|