瀏覽代碼

[async result] Work in progress on Async result backend

Ask Solem 9 年之前
父節點
當前提交
3ef4f0cdb9
共有 7 個文件被更改,包括 352 次插入264 次删除
  1. 35 173
      celery/backends/amqp.py
  2. 245 43
      celery/backends/base.py
  3. 3 0
      celery/backends/rpc.py
  4. 9 4
      celery/canvas.py
  5. 16 24
      celery/result.py
  6. 1 1
      funtests/stress/stress/suite.py
  7. 43 19
      funtests/stress/t.py

+ 35 - 173
celery/backends/amqp.py

@@ -10,21 +10,16 @@
 """
 from __future__ import absolute_import
 
-import socket
-
-from collections import deque
-from operator import itemgetter
-
 from kombu import Exchange, Queue, Producer, Consumer
+from kombu.utils import register_after_fork
 
 from celery import states
-from celery.exceptions import TimeoutError
-from celery.five import range, monotonic
+from celery.five import range
 from celery.utils.functional import dictfilter
 from celery.utils.log import get_logger
 from celery.utils.timeutils import maybe_s_to_ms
 
-from .base import BaseBackend
+from .base import AsyncBackendMixin, Backend, BaseResultConsumer
 
 __all__ = ['BacklogLimitExceeded', 'AMQPBackend']
 
@@ -42,78 +37,45 @@ def repair_uuid(s):
     return '%s-%s-%s-%s-%s' % (s[:8], s[8:12], s[12:16], s[16:20], s[20:])
 
 
+def _on_after_fork_cleanup_backend(backend):
+    backend._after_fork()
+
+
 class NoCacheQueue(Queue):
     can_cache_declaration = False
 
 
-class ResultConsumer(object):
+class ResultConsumer(BaseResultConsumer):
     Consumer = Consumer
 
-    def __init__(self, backend, app, accept, pending_results):
-        self.backend = backend
-        self.app = app
-        self.accept = accept
-        self._pending_results = pending_results
+    def __init__(self, *args, **kwargs):
+        super(ResultConsumer, self).__init__(*args, **kwargs)
+        self._connection = None
         self._consumer = None
-        self._conn = None
-        self.on_message = None
-        self.bucket = None
-
-    def consume(self, task_id, timeout=None, no_ack=True, on_interval=None):
-        wait = self.drain_events
-        with self.app.pool.acquire_channel(block=True) as (conn, channel):
-            binding = self.backend._create_binding(task_id)
-            with self.Consumer(channel, binding,
-                               no_ack=no_ack, accept=self.accept) as consumer:
-                while 1:
-                    try:
-                        return wait(
-                            conn, consumer, timeout, on_interval)[task_id]
-                    except KeyError:
-                        continue
-
-    def wait_for_pending(self, result,
-                         callback=None, propagate=True, **kwargs):
-        for _ in self._wait_for_pending(result, **kwargs):
-            pass
-        return result.maybe_throw(callback=callback, propagate=propagate)
-
-    def _wait_for_pending(self, result, timeout=None, interval=0.5,
-                 no_ack=True, on_interval=None, callback=None,
-                 on_message=None, propagate=True):
-        prev_on_m, self.on_message = self.on_message, on_message
-        try:
-            for _ in self.drain_events_until(
-                    result.on_ready, timeout=timeout,
-                    on_interval=on_interval):
-                yield
-        except socket.timeout:
-            raise TimeoutError('The operation timed out.')
-        finally:
-            self.on_message = prev_on_m
-
-    def collect_for_pending(self, result, bucket=None, **kwargs):
-        prev_bucket, self.bucket = self.bucket, bucket
-        try:
-            for _ in self._wait_for_pending(result, **kwargs):
-                yield
-        finally:
-            self.bucket = prev_bucket
 
     def start(self, initial_queue, no_ack=True):
-        self._conn = self.app.connection()
+        self._connection = self.app.connection()
         self._consumer = self.Consumer(
-            self._conn.default_channel, [initial_queue],
+            self._connection.default_channel, [initial_queue],
             callbacks=[self.on_state_change], no_ack=no_ack,
             accept=self.accept)
         self._consumer.consume()
 
+    def drain_events(self, timeout=None):
+        return self._connection.drain_events(timeout=timeout)
+
     def stop(self):
         try:
             self._consumer.cancel()
         finally:
             self._connection.close()
 
+    def on_after_fork(self):
+        self._consumer = None
+        if self._connection is not None:
+            self._connection.collect()
+            self._connection = None
+
     def consume_from(self, queue):
         if self._consumer is None:
             return self.start(queue)
@@ -122,40 +84,10 @@ class ResultConsumer(object):
             self._consumer.consume()
 
     def cancel_for(self, queue):
-        self._consumer.cancel_by_queue(queue)
-
-    def on_state_change(self, meta, message):
-        if self.on_message:
-            self.on_message(meta)
-        if meta['status'] in states.READY_STATES:
-            try:
-                result = self._pending_results[meta['task_id']]
-            except KeyError:
-                return
-            result._maybe_set_cache(meta)
-            if self.bucket is not None:
-                self.bucket.append(result)
-
-    def drain_events_until(self, p, timeout=None, on_interval=None,
-                           monotonic=monotonic, wait=None):
-        wait = wait or self._conn.drain_events
-        time_start = monotonic()
-
-        while 1:
-            # Total time spent may exceed a single call to wait()
-            if timeout and monotonic() - time_start >= timeout:
-                raise socket.timeout()
-            try:
-                yield wait(timeout=1)
-            except socket.timeout:
-                pass
-            if on_interval:
-                on_interval()
-            if p.ready:  # got event on the wanted channel.
-                break
-
-
-class AMQPBackend(BaseBackend):
+        self._consumer.cancel_by_queue(queue.name)
+
+
+class AMQPBackend(Backend, AsyncBackendMixin):
     """Publishes results by sending messages."""
     Exchange = Exchange
     Queue = NoCacheQueue
@@ -195,6 +127,15 @@ class AMQPBackend(BaseBackend):
         })
         self.result_consumer = self.ResultConsumer(
             self, self.app, self.accept, self._pending_results)
+        if register_after_fork is not None:
+            register_after_fork(self, _on_after_fork_cleanup_backend)
+
+    def _after_fork(self):
+        self._pending_results.clear()
+        self.result_consumer._after_fork()
+
+    def on_result_fulfilled(self, result):
+        self.result_consumer.cancel_for(self._create_binding(result.id))
 
     def _create_exchange(self, name, type='direct', delivery_mode=2):
         return self.Exchange(name=name,
@@ -285,85 +226,6 @@ class AMQPBackend(BaseBackend):
                     return {'status': states.PENDING, 'result': None}
     poll = get_task_meta  # XXX compat
 
-    def wait_for_pending(self, result, timeout=None, interval=0.5,
-                 no_ack=True, on_interval=None, on_message=None,
-                 callback=None, propagate=True):
-        return self.result_consumer.wait_for_pending(
-            result, timeout=timeout, interval=interval,
-            no_ack=no_ack, on_interval=on_interval,
-            callback=callback, on_message=on_message, propagate=propagate,
-        )
-
-    def collect_for_pending(self, result, bucket=None, timeout=None,
-                            interval=0.5, no_ack=True, on_interval=None,
-                            on_message=None, callback=None, propagate=True):
-        return self.result_consumer.collect_for_pending(
-            result, bucket=bucket, timeout=timeout, interval=interval,
-            no_ack=no_ack, on_interval=on_interval,
-            callback=callback, on_message=on_message, propagate=propagate,
-        )
-
-    def add_pending_result(self, result):
-        if result.id not in self._pending_results:
-            self._pending_results[result.id] = result
-            self.result_consumer.consume_from(self._create_binding(result.id))
-
-    def remove_pending_result(self, result):
-        self._pending_results.pop(result.id, None)
-        # XXX cancel queue after result consumed
-
-    def _many_bindings(self, ids):
-        return [self._create_binding(task_id) for task_id in ids]
-
-    def xxx_get_many(self, task_ids, timeout=None, no_ack=True,
-                 on_message=None, on_interval=None,
-                 now=monotonic, getfields=itemgetter('status', 'task_id'),
-                 READY_STATES=states.READY_STATES,
-                 PROPAGATE_STATES=states.PROPAGATE_STATES, **kwargs):
-        with self.app.pool.acquire_channel(block=True) as (conn, channel):
-            ids = set(task_ids)
-            cached_ids = set()
-            mark_cached = cached_ids.add
-            for task_id in ids:
-                try:
-                    cached = self._cache[task_id]
-                except KeyError:
-                    pass
-                else:
-                    if cached['status'] in READY_STATES:
-                        yield task_id, cached
-                        mark_cached(task_id)
-            ids.difference_update(cached_ids)
-            results = deque()
-            push_result = results.append
-            push_cache = self._cache.__setitem__
-            decode_result = self.meta_from_decoded
-
-            def _on_message(message):
-                body = decode_result(message.decode())
-                if on_message is not None:
-                    on_message(body)
-                state, uid = getfields(body)
-                if state in READY_STATES:
-                    push_result(body) \
-                        if uid in task_ids else push_cache(uid, body)
-
-            bindings = self._many_bindings(task_ids)
-            with self.Consumer(channel, bindings, on_message=_on_message,
-                               accept=self.accept, no_ack=no_ack):
-                wait = conn.drain_events
-                popleft = results.popleft
-                while ids:
-                    wait(timeout=timeout)
-                    while results:
-                        state = popleft()
-                        task_id = state['task_id']
-                        ids.discard(task_id)
-                        push_cache(task_id, state)
-                        yield task_id, state
-                    if on_interval:
-                        on_interval()
-
     def reload_task_result(self, task_id):
         raise NotImplementedError(
             'reload_task_result is not supported by this backend.')

+ 245 - 43
celery/backends/base.py

@@ -13,23 +13,27 @@
 """
 from __future__ import absolute_import
 
-import time
+import socket
 import sys
+import time
 
+from collections import deque
 from datetime import timedelta
+from weakref import WeakKeyDictionary
 
 from billiard.einfo import ExceptionInfo
 from kombu.serialization import (
     dumps, loads, prepare_accept_content,
     registry as serializer_registry,
 )
+from kombu.syn import detect_environment
 from kombu.utils.encoding import bytes_to_str, ensure_bytes, from_utf8
 
 from celery import states
 from celery import current_app, group, maybe_signature
 from celery.app import current_task
 from celery.exceptions import ChordError, TimeoutError, TaskRevokedError
-from celery.five import items
+from celery.five import items, monotonic
 from celery.result import (
     GroupResult, ResultBase, allow_join_result, result_from_tuple,
 )
@@ -61,7 +65,7 @@ class _nulldict(dict):
     __setitem__ = update = setdefault = ignore
 
 
-class BaseBackend(object):
+class Backend(object):
     READY_STATES = states.READY_STATES
     UNREADY_STATES = states.UNREADY_STATES
     EXCEPTION_STATES = states.EXCEPTION_STATES
@@ -222,46 +226,6 @@ class BaseBackend(object):
                      content_encoding=self.content_encoding,
                      accept=self.accept)
 
-    def wait_for_pending(self, result, timeout=None, interval=0.5,
-                         no_ack=True, on_interval=None, callback=None,
-                         propagate=True):
-        meta = self.wait_for(
-            result.id, timeout=timeout,
-            interval=interval,
-            on_interval=on_interval,
-            no_ack=no_ack,
-        )
-        if meta:
-            result._maybe_set_cache(meta)
-            return result.maybe_throw(propagate=propagate, callback=callback)
-
-    def wait_for(self, task_id,
-                 timeout=None, interval=0.5, no_ack=True, on_interval=None):
-        """Wait for task and return its result.
-
-        If the task raises an exception, this exception
-        will be re-raised by :func:`wait_for`.
-
-        If `timeout` is not :const:`None`, this raises the
-        :class:`celery.exceptions.TimeoutError` exception if the operation
-        takes longer than `timeout` seconds.
-
-        """
-
-        time_elapsed = 0.0
-
-        while 1:
-            meta = self.get_task_meta(task_id)
-            if meta['status'] in states.READY_STATES:
-                return meta
-            if on_interval:
-                on_interval()
-            # avoid hammering the CPU checking status.
-            time.sleep(interval)
-            time_elapsed += interval
-            if timeout and time_elapsed >= timeout:
-                raise TimeoutError('The operation timed out.')
-
     def prepare_expires(self, value, type=None):
         if value is None:
             value = self.app.conf.result_expires
@@ -406,9 +370,247 @@ class BaseBackend(object):
 
     def __reduce__(self, args=(), kwargs={}):
         return (unpickle_backend, (self.__class__, args, kwargs))
+
+
+class SyncBackendMixin(object):
+
+    def iter_native(self, result, timeout=None, interval=0.5, no_ack=True,
+                    on_message=None, on_interval=None):
+        results = result.results
+        if not results:
+            return iter([])
+        return self.get_many(
+            {r.id for r in results},
+            timeout=timeout, interval=interval, no_ack=no_ack,
+            on_message=on_message, on_interval=on_interval,
+        )
+
+    def wait_for_pending(self, result, timeout=None, interval=0.5,
+                         no_ack=True, on_interval=None, callback=None,
+                         propagate=True):
+        meta = self.wait_for(
+            result.id, timeout=timeout,
+            interval=interval,
+            on_interval=on_interval,
+            no_ack=no_ack,
+        )
+        if meta:
+            result._maybe_set_cache(meta)
+            return result.maybe_throw(propagate=propagate, callback=callback)
+
+    def wait_for(self, task_id,
+                 timeout=None, interval=0.5, no_ack=True, on_interval=None):
+        """Wait for task and return its result.
+
+        If the task raises an exception, this exception
+        will be re-raised by :func:`wait_for`.
+
+        If `timeout` is not :const:`None`, this raises the
+        :class:`celery.exceptions.TimeoutError` exception if the operation
+        takes longer than `timeout` seconds.
+
+        """
+
+        time_elapsed = 0.0
+
+        while 1:
+            meta = self.get_task_meta(task_id)
+            if meta['status'] in states.READY_STATES:
+                return meta
+            if on_interval:
+                on_interval()
+            # avoid hammering the CPU checking status.
+            time.sleep(interval)
+            time_elapsed += interval
+            if timeout and time_elapsed >= timeout:
+                raise TimeoutError('The operation timed out.')
+
+    def add_pending_result(self, result):
+        return result
+
+    def remove_pending_result(self, result):
+        return result
+
+
+class AsyncBackendMixin(object):
+
+    def _collect_into(self, result, bucket):
+        self.result_consumer.buckets[result] = bucket
+
+    def iter_native(self, result, timeout=None, interval=0.5, no_ack=True,
+                    on_message=None, on_interval=None):
+        results = result.results
+        if not results:
+            raise StopIteration()
+
+        bucket = deque()
+        for result in results:
+            self._collect_into(result, bucket)
+
+        for _ in self._wait_for_pending(
+                result,
+                timeout=timeout, interval=interval, no_ack=no_ack,
+                on_message=on_message, on_interval=on_interval):
+            while bucket:
+                result = bucket.popleft()
+                yield result.id, result._cache
+        while bucket:
+            result = bucket.popleft()
+            yield result.id, result._cache
+
+    def add_pending_result(self, result):
+        if result.id not in self._pending_results:
+            self._pending_results[result.id] = result
+            self.result_consumer.consume_from(self._create_binding(result.id))
+        return result
+
+    def remove_pending_result(self, result):
+        self._pending_results.pop(result.id, None)
+        self.on_result_fulfilled(result)
+        return result
+
+    def on_result_fulfilled(self, result):
+        pass
+
+    def wait_for_pending(self, result,
+                         callback=None, propagate=True, **kwargs):
+        for _ in self._wait_for_pending(result, **kwargs):
+            pass
+        return result.maybe_throw(callback=callback, propagate=propagate)
+
+    def _wait_for_pending(self, result, timeout=None, interval=0.5,
+                          no_ack=True, on_interval=None, on_message=None,
+                          callback=None, propagate=True):
+        return self.result_consumer._wait_for_pending(
+            result, timeout=timeout, interval=interval,
+            no_ack=no_ack, on_interval=on_interval,
+            callback=callback, on_message=on_message, propagate=propagate,
+        )
+
+
+class BaseBackend(Backend, SyncBackendMixin):
+    pass
 BaseDictBackend = BaseBackend  # XXX compat
 
 
+
+class Drainer(object):
+
+    def __init__(self, result_consumer):
+        self.result_consumer = result_consumer
+
+    def drain_events_until(self, p, timeout=None, on_interval=None,
+                           monotonic=monotonic, wait=None):
+        wait = wait or self.result_consumer.drain_events
+        time_start = monotonic()
+
+        while 1:
+            # Total time spent may exceed a single call to wait()
+            if timeout and monotonic() - time_start >= timeout:
+                raise socket.timeout()
+            try:
+                yield self.wait_for(p, wait, timeout=1)
+            except socket.timeout:
+                pass
+            if on_interval:
+                on_interval()
+            if p.ready:  # got event on the wanted channel.
+                break
+
+    def wait_for(self, p, wait, timeout=None):
+        wait(timeout=timeout)
+
+
+class EventletDrainer(Drainer):
+    _g = None
+    _stopped = False
+
+    def run(self):
+        while not self._stopped:
+            try:
+                print("DRAINING!!!!!!!!!!!!!!!!")
+                self.result_consumer.drain_events(timeout=10)
+            except socket.timeout:
+                pass
+
+    def start(self):
+        from eventlet import spawn
+        if self._g is None:
+            self._g = spawn(self.run)
+
+    def stop(self):
+        self._stopped = True
+
+    def wait_for(self, p, wait, timeout=None):
+        if self._g is None:
+            self.start()
+        if not p.ready:
+            time.sleep(0)
+
+
+drainers = {'default': Drainer, 'eventlet': EventletDrainer}
+
+class BaseResultConsumer(object):
+
+    def __init__(self, backend, app, accept, pending_results):
+        self.backend = backend
+        self.app = app
+        self.accept = accept
+        self._pending_results = pending_results
+        self.on_message = None
+        self.buckets = WeakKeyDictionary()
+        self.drainer = drainers[detect_environment()](self)
+
+    def drain_events(self, timeout=None):
+        raise NotImplementedError('subclass responsibility')
+
+    def _after_fork(self):
+        self.bucket.clear()
+        self.buckets = WeakKeyDictionary()
+        self.on_message = None
+        self.on_after_fork()
+
+    def on_after_fork(self):
+        pass
+
+    def drain_events_until(self, p, timeout=None, on_interval=None):
+        return self.drainer.drain_events_until(
+            p, timeout=timeout, on_interval=on_interval)
+
+    def _wait_for_pending(self, result, timeout=None, interval=0.5,
+                          no_ack=True, on_interval=None, callback=None,
+                          on_message=None, propagate=True):
+        prev_on_m, self.on_message = self.on_message, on_message
+        try:
+            for _ in self.drain_events_until(
+                    result.on_ready, timeout=timeout,
+                    on_interval=on_interval):
+                yield
+                time.sleep(0)
+        except socket.timeout:
+            raise TimeoutError('The operation timed out.')
+        finally:
+            self.on_message = prev_on_m
+
+    def on_state_change(self, meta, message):
+        if self.on_message:
+            self.on_message(meta)
+        if meta['status'] in states.READY_STATES:
+            try:
+                result = self._pending_results[meta['task_id']]
+            except KeyError:
+                return
+            result._maybe_set_cache(meta)
+            buckets = self.buckets
+            try:
+                buckets[result].append(result)
+                buckets.pop(result)
+            except KeyError:
+                pass
+        time.sleep(0)
+
+
+
 class KeyValueStoreBackend(BaseBackend):
     key_t = ensure_bytes
     task_keyprefix = 'celery-task-meta-'

+ 3 - 0
celery/backends/rpc.py

@@ -54,6 +54,9 @@ class RPCBackend(amqp.AMQPBackend):
     def on_reply_declare(self, task_id):
         pass
 
+    def on_result_fulfilled(self, result):
+        pass
+
     @property
     def binding(self):
         return self.Queue(self.oid, self.exchange, self.oid,

+ 9 - 4
celery/canvas.py

@@ -20,6 +20,7 @@ from functools import partial as _partial, reduce
 from operator import itemgetter
 from itertools import chain as _chain
 
+from amqp.promise import barrier
 from kombu.utils import cached_property, fxrange, reprcall, uuid
 
 from celery._state import current_app
@@ -730,7 +731,7 @@ class group(Signature):
                     task.args = tuple(partial_args) + tuple(task.args)
                 yield task, task.freeze(group_id=group_id, root_id=root_id)
 
-    def _apply_tasks(self, tasks, producer=None, app=None,
+    def _apply_tasks(self, tasks, producer=None, app=None, p=None,
                      add_to_parent=None, chord=None, **options):
         app = app or self.app
         with app.producer_or_acquire(producer) as producer:
@@ -738,6 +739,9 @@ class group(Signature):
                 sig.apply_async(producer=producer, add_to_parent=False,
                                 chord=sig.options.get('chord') or chord,
                                 **options)
+                if p:
+                    p.add_noincr(res)
+                    res.backend.add_pending_result(res)
                 yield res  # <-- r.parent, etc set in the frozen result.
 
     def _freeze_gid(self, options):
@@ -762,9 +766,10 @@ class group(Signature):
 
         options, group_id, root_id = self._freeze_gid(options)
         tasks = self._prepared(self.tasks, args, group_id, root_id, app)
-        result = self.app.GroupResult(
-            group_id, list(self._apply_tasks(tasks, producer, app, **options)),
-        )
+        p = barrier()
+        results = list(self._apply_tasks(tasks, producer, app, p, **options))
+        result = self.app.GroupResult(group_id, results, ready_barrier=p)
+        p.finalize()
 
         # - Special case of group(A.s() | group(B.s(), C.s()))
         # That is, group with single item that is a chain but the

+ 16 - 24
celery/result.py

@@ -432,10 +432,13 @@ class ResultSet(ResultBase):
     #: List of results in in the set.
     results = None
 
-    def __init__(self, results, app=None, **kwargs):
+    def __init__(self, results, app=None, ready_barrier=None, **kwargs):
         self._app = app
+        self._cache = None
         self.results = results
-        self.on_ready = barrier(self.results, (self,), callback=self._on_ready)
+        self._on_full = ready_barrier or barrier(self.results)
+        self._on_full.then(promise(self._on_ready))
+        self.on_ready = promise()
 
     def add(self, result):
         """Add :class:`AsyncResult` as a new member of the set.
@@ -447,8 +450,10 @@ class ResultSet(ResultBase):
             self.results.append(result)
             self.ready.add(result)
 
-    def _on_ready(self, result):
-        self.backend.remove_pending_result(result)
+    def _on_ready(self):
+        self.backend.remove_pending_result(self)
+        self._cache = [r.get() for r in self.results]
+        self.on_ready(self)
 
     def remove(self, result):
         """Remove result from the set; it must be a member.
@@ -594,6 +599,8 @@ class ResultSet(ResultBase):
         current result backend.
 
         """
+        if self._cache is not None:
+            return self._cache
         return (self.join_native if self.supports_native_join else self.join)(
             timeout=timeout, propagate=propagate,
             interval=interval, callback=callback, no_ack=no_ack,
@@ -675,9 +682,6 @@ class ResultSet(ResultBase):
         return results
 
     def then(self, callback, on_error=None):
-        for result in self.results:
-            self.backend.add_pending_result(result)
-            result.on_ready.then(self.on_ready)
         return self.on_ready.then(callback, on_error)
 
     def iter_native(self, timeout=None, interval=0.5, no_ack=True,
@@ -693,23 +697,11 @@ class ResultSet(ResultBase):
         result backends.
 
         """
-        results = self.results
-        if not results:
-            raise StopIteration()
-        ids = set()
-        for result in self.results:
-            self.backend.add_pending_result(result)
-            ids.add(result.id)
-        bucket = deque()
-        for _ in  self.backend.collect_for_pending(
-                self,
-                bucket=bucket,
-                timeout=timeout, interval=interval, no_ack=no_ack,
-                on_message=on_message, on_interval=on_interval):
-            while bucket:
-                result = bucket.popleft()
-                if result.id in ids:
-                    yield result.id, result._cache
+        return self.backend.iter_native(
+            self,
+            timeout=timeout, interval=interval, no_ack=no_ack,
+            on_message=on_message, on_interval=on_interval,
+        )
 
     def join_native(self, timeout=None, propagate=True,
                     interval=0.5, callback=None, no_ack=True,

+ 1 - 1
funtests/stress/stress/suite.py

@@ -201,7 +201,7 @@ class BaseSuite(object):
                     self.speaker.beep()
                     raise
                 finally:
-                    print('{0} {1} iterations in {2}s'.format(
+                    print('{0} {1} iterations in {2}'.format(
                         'failed after' if failed else 'completed',
                         i + 1, humanize_seconds(monotonic() - elapsed),
                     ))

+ 43 - 19
funtests/stress/t.py

@@ -1,30 +1,54 @@
+#import eventlet
+#eventlet.monkey_patch()
+
 from celery import group
 import socket
 from stress.app import add, raising
 
 def on_ready(result):
-    print('RESULT: %r' % (result,))
+    print('RESULT: %r' % (result.get(),))
+
+finished = [0]
 
 def test():
-    group(add.s(i, i) for i in range(10)).delay().then(on_ready)
-
-    p = group(add.s(i, i) for i in range(10)).delay()
-    print(p.get(timeout=5))
-
-    p = add.delay(2, 2)
-    print(p.get(timeout=5))
-    p = add.delay(2, 2)
-    print(p.get(timeout=5))
-    p = add.delay(2, 2)
-    print(p.get(timeout=5))
-    p = add.delay(2, 2)
-    print(p.get(timeout=5))
-    p = raising.delay()
+    #group(add.s(i, i) for i in range(1000)).delay().then(on_ready)
+
+    p = group(add.s(i, i) for i in range(1000)).delay()
+    x = p.get(timeout=5)
+    y = p.get(timeout=5)
     try:
-        print(p.get(timeout=5))
-    except Exception as exc:
-        print('raised: %r' % (exc),)
+        assert x == y
+    except AssertionError:
+        print('-' * 64)
+        print('X: %r' % (x,))
+        print('Y: %r' % (y,))
+        raise
+    assert not any(m is None for m in x)
+    assert not any(m is None for m in y)
+
+    #p = add.delay(2, 2)
+    #print(p.get(timeout=5))
+    #p = add.delay(2, 2)
+    #print(p.get(timeout=5))
+    #p = add.delay(2, 2)
+    #print(p.get(timeout=5))
+    #p = add.delay(2, 2)
+    #print(p.get(timeout=5))
+    #p = raising.delay()
+    #try:
+    #    print(p.get(timeout=5))
+    #except Exception as exc:
+    #    print('raised: %r' % (exc),)
+    finished[0] += 1
 
 
-for i in range(100):
+for i in range(10):
     test()
+
+
+#for i in range(2):
+#    eventlet.spawn(test)
+
+#while finished[0] < 100:
+#    import time
+#    time.sleep(0)