Browse Source

[async result] Cleanup branch for merge

Ask Solem 9 years ago
parent
commit
7a47ddb1c7
4 changed files with 206 additions and 235 deletions
  1. 4 2
      celery/backends/amqp.py
  2. 201 0
      celery/backends/async.py
  3. 1 179
      celery/backends/base.py
  4. 0 54
      funtests/stress/t.py

+ 4 - 2
celery/backends/amqp.py

@@ -19,7 +19,8 @@ from celery.utils.functional import dictfilter
 from celery.utils.log import get_logger
 from celery.utils.timeutils import maybe_s_to_ms
 
-from .base import AsyncBackendMixin, Backend, BaseResultConsumer
+from . import base
+from .async import AsyncBackendMixin, BaseResultConsumer
 
 __all__ = ['BacklogLimitExceeded', 'AMQPBackend']
 
@@ -87,8 +88,9 @@ class ResultConsumer(BaseResultConsumer):
         self._consumer.cancel_by_queue(queue.name)
 
 
-class AMQPBackend(Backend, AsyncBackendMixin):
+class AMQPBackend(base.Backend, AsyncBackendMixin):
     """Publishes results by sending messages."""
+
     Exchange = Exchange
     Queue = NoCacheQueue
     Consumer = Consumer

+ 201 - 0
celery/backends/async.py

@@ -0,0 +1,201 @@
+"""
+    celery.backends.async
+    ~~~~~~~~~~~~~~~~~~~~~
+
+    Async backend support utilitites.
+
+"""
+from __future__ import absolute_import, unicode_literals
+
+import socket
+import time
+
+from collections import deque
+from weakref import WeakKeyDictionary
+
+from kombu.syn import detect_environment
+
+from celery import states
+from celery.exceptions import TimeoutError
+from celery.five import monotonic
+
+drainers = {}
+
+
+def register_drainer(name):
+
+    def _inner(cls):
+        drainers[name] = cls
+        return cls
+    return _inner
+
+
+@register_drainer('default')
+class Drainer(object):
+
+    def __init__(self, result_consumer):
+        self.result_consumer = result_consumer
+
+    def drain_events_until(self, p, timeout=None, on_interval=None,
+                           monotonic=monotonic, wait=None):
+        wait = wait or self.result_consumer.drain_events
+        time_start = monotonic()
+
+        while 1:
+            # Total time spent may exceed a single call to wait()
+            if timeout and monotonic() - time_start >= timeout:
+                raise socket.timeout()
+            try:
+                yield self.wait_for(p, wait, timeout=1)
+            except socket.timeout:
+                pass
+            if on_interval:
+                on_interval()
+            if p.ready:  # got event on the wanted channel.
+                break
+
+    def wait_for(self, p, wait, timeout=None):
+        wait(timeout=timeout)
+
+
+@register_drainer('eventlet')
+class EventletDrainer(Drainer):
+    _g = None
+    _stopped = False
+
+    def run(self):
+        while not self._stopped:
+            try:
+                self.result_consumer.drain_events(timeout=10)
+            except socket.timeout:
+                pass
+
+    def start(self):
+        from eventlet import spawn
+        if self._g is None:
+            self._g = spawn(self.run)
+
+    def stop(self):
+        self._stopped = True
+
+    def wait_for(self, p, wait, timeout=None):
+        if self._g is None:
+            self.start()
+        if not p.ready:
+            time.sleep(0)
+
+
+class AsyncBackendMixin(object):
+
+    def _collect_into(self, result, bucket):
+        self.result_consumer.buckets[result] = bucket
+
+    def iter_native(self, result, timeout=None, interval=0.5, no_ack=True,
+                    on_message=None, on_interval=None):
+        results = result.results
+        if not results:
+            raise StopIteration()
+
+        bucket = deque()
+        for result in results:
+            self._collect_into(result, bucket)
+
+        for _ in self._wait_for_pending(
+                result,
+                timeout=timeout, interval=interval, no_ack=no_ack,
+                on_message=on_message, on_interval=on_interval):
+            while bucket:
+                result = bucket.popleft()
+                yield result.id, result._cache
+        while bucket:
+            result = bucket.popleft()
+            yield result.id, result._cache
+
+    def add_pending_result(self, result):
+        if result.id not in self._pending_results:
+            self._pending_results[result.id] = result
+            self.result_consumer.consume_from(self._create_binding(result.id))
+        return result
+
+    def remove_pending_result(self, result):
+        self._pending_results.pop(result.id, None)
+        self.on_result_fulfilled(result)
+        return result
+
+    def on_result_fulfilled(self, result):
+        pass
+
+    def wait_for_pending(self, result,
+                         callback=None, propagate=True, **kwargs):
+        for _ in self._wait_for_pending(result, **kwargs):
+            pass
+        return result.maybe_throw(callback=callback, propagate=propagate)
+
+    def _wait_for_pending(self, result, timeout=None, interval=0.5,
+                          no_ack=True, on_interval=None, on_message=None,
+                          callback=None, propagate=True):
+        return self.result_consumer._wait_for_pending(
+            result, timeout=timeout, interval=interval,
+            no_ack=no_ack, on_interval=on_interval,
+            callback=callback, on_message=on_message, propagate=propagate,
+        )
+
+
+class BaseResultConsumer(object):
+
+    def __init__(self, backend, app, accept, pending_results):
+        self.backend = backend
+        self.app = app
+        self.accept = accept
+        self._pending_results = pending_results
+        self.on_message = None
+        self.buckets = WeakKeyDictionary()
+        self.drainer = drainers[detect_environment()](self)
+
+    def drain_events(self, timeout=None):
+        raise NotImplementedError('subclass responsibility')
+
+    def _after_fork(self):
+        self.bucket.clear()
+        self.buckets = WeakKeyDictionary()
+        self.on_message = None
+        self.on_after_fork()
+
+    def on_after_fork(self):
+        pass
+
+    def drain_events_until(self, p, timeout=None, on_interval=None):
+        return self.drainer.drain_events_until(
+            p, timeout=timeout, on_interval=on_interval)
+
+    def _wait_for_pending(self, result, timeout=None, interval=0.5,
+                          no_ack=True, on_interval=None, callback=None,
+                          on_message=None, propagate=True):
+        prev_on_m, self.on_message = self.on_message, on_message
+        try:
+            for _ in self.drain_events_until(
+                    result.on_ready, timeout=timeout,
+                    on_interval=on_interval):
+                yield
+                time.sleep(0)
+        except socket.timeout:
+            raise TimeoutError('The operation timed out.')
+        finally:
+            self.on_message = prev_on_m
+
+    def on_state_change(self, meta, message):
+        if self.on_message:
+            self.on_message(meta)
+        if meta['status'] in states.READY_STATES:
+            try:
+                result = self._pending_results[meta['task_id']]
+            except KeyError:
+                return
+            result._maybe_set_cache(meta)
+            buckets = self.buckets
+            try:
+                buckets[result].append(result)
+                buckets.pop(result)
+            except KeyError:
+                pass
+        time.sleep(0)

+ 1 - 179
celery/backends/base.py

@@ -13,27 +13,23 @@
 """
 from __future__ import absolute_import
 
-import socket
 import sys
 import time
 
-from collections import deque
 from datetime import timedelta
-from weakref import WeakKeyDictionary
 
 from billiard.einfo import ExceptionInfo
 from kombu.serialization import (
     dumps, loads, prepare_accept_content,
     registry as serializer_registry,
 )
-from kombu.syn import detect_environment
 from kombu.utils.encoding import bytes_to_str, ensure_bytes, from_utf8
 
 from celery import states
 from celery import current_app, group, maybe_signature
 from celery.app import current_task
 from celery.exceptions import ChordError, TimeoutError, TaskRevokedError
-from celery.five import items, monotonic
+from celery.five import items
 from celery.result import (
     GroupResult, ResultBase, allow_join_result, result_from_tuple,
 )
@@ -432,185 +428,11 @@ class SyncBackendMixin(object):
         return result
 
 
-class AsyncBackendMixin(object):
-
-    def _collect_into(self, result, bucket):
-        self.result_consumer.buckets[result] = bucket
-
-    def iter_native(self, result, timeout=None, interval=0.5, no_ack=True,
-                    on_message=None, on_interval=None):
-        results = result.results
-        if not results:
-            raise StopIteration()
-
-        bucket = deque()
-        for result in results:
-            self._collect_into(result, bucket)
-
-        for _ in self._wait_for_pending(
-                result,
-                timeout=timeout, interval=interval, no_ack=no_ack,
-                on_message=on_message, on_interval=on_interval):
-            while bucket:
-                result = bucket.popleft()
-                yield result.id, result._cache
-        while bucket:
-            result = bucket.popleft()
-            yield result.id, result._cache
-
-    def add_pending_result(self, result):
-        if result.id not in self._pending_results:
-            self._pending_results[result.id] = result
-            self.result_consumer.consume_from(self._create_binding(result.id))
-        return result
-
-    def remove_pending_result(self, result):
-        self._pending_results.pop(result.id, None)
-        self.on_result_fulfilled(result)
-        return result
-
-    def on_result_fulfilled(self, result):
-        pass
-
-    def wait_for_pending(self, result,
-                         callback=None, propagate=True, **kwargs):
-        for _ in self._wait_for_pending(result, **kwargs):
-            pass
-        return result.maybe_throw(callback=callback, propagate=propagate)
-
-    def _wait_for_pending(self, result, timeout=None, interval=0.5,
-                          no_ack=True, on_interval=None, on_message=None,
-                          callback=None, propagate=True):
-        return self.result_consumer._wait_for_pending(
-            result, timeout=timeout, interval=interval,
-            no_ack=no_ack, on_interval=on_interval,
-            callback=callback, on_message=on_message, propagate=propagate,
-        )
-
-
 class BaseBackend(Backend, SyncBackendMixin):
     pass
 BaseDictBackend = BaseBackend  # XXX compat
 
 
-
-class Drainer(object):
-
-    def __init__(self, result_consumer):
-        self.result_consumer = result_consumer
-
-    def drain_events_until(self, p, timeout=None, on_interval=None,
-                           monotonic=monotonic, wait=None):
-        wait = wait or self.result_consumer.drain_events
-        time_start = monotonic()
-
-        while 1:
-            # Total time spent may exceed a single call to wait()
-            if timeout and monotonic() - time_start >= timeout:
-                raise socket.timeout()
-            try:
-                yield self.wait_for(p, wait, timeout=1)
-            except socket.timeout:
-                pass
-            if on_interval:
-                on_interval()
-            if p.ready:  # got event on the wanted channel.
-                break
-
-    def wait_for(self, p, wait, timeout=None):
-        wait(timeout=timeout)
-
-
-class EventletDrainer(Drainer):
-    _g = None
-    _stopped = False
-
-    def run(self):
-        while not self._stopped:
-            try:
-                print("DRAINING!!!!!!!!!!!!!!!!")
-                self.result_consumer.drain_events(timeout=10)
-            except socket.timeout:
-                pass
-
-    def start(self):
-        from eventlet import spawn
-        if self._g is None:
-            self._g = spawn(self.run)
-
-    def stop(self):
-        self._stopped = True
-
-    def wait_for(self, p, wait, timeout=None):
-        if self._g is None:
-            self.start()
-        if not p.ready:
-            time.sleep(0)
-
-
-drainers = {'default': Drainer, 'eventlet': EventletDrainer}
-
-class BaseResultConsumer(object):
-
-    def __init__(self, backend, app, accept, pending_results):
-        self.backend = backend
-        self.app = app
-        self.accept = accept
-        self._pending_results = pending_results
-        self.on_message = None
-        self.buckets = WeakKeyDictionary()
-        self.drainer = drainers[detect_environment()](self)
-
-    def drain_events(self, timeout=None):
-        raise NotImplementedError('subclass responsibility')
-
-    def _after_fork(self):
-        self.bucket.clear()
-        self.buckets = WeakKeyDictionary()
-        self.on_message = None
-        self.on_after_fork()
-
-    def on_after_fork(self):
-        pass
-
-    def drain_events_until(self, p, timeout=None, on_interval=None):
-        return self.drainer.drain_events_until(
-            p, timeout=timeout, on_interval=on_interval)
-
-    def _wait_for_pending(self, result, timeout=None, interval=0.5,
-                          no_ack=True, on_interval=None, callback=None,
-                          on_message=None, propagate=True):
-        prev_on_m, self.on_message = self.on_message, on_message
-        try:
-            for _ in self.drain_events_until(
-                    result.on_ready, timeout=timeout,
-                    on_interval=on_interval):
-                yield
-                time.sleep(0)
-        except socket.timeout:
-            raise TimeoutError('The operation timed out.')
-        finally:
-            self.on_message = prev_on_m
-
-    def on_state_change(self, meta, message):
-        if self.on_message:
-            self.on_message(meta)
-        if meta['status'] in states.READY_STATES:
-            try:
-                result = self._pending_results[meta['task_id']]
-            except KeyError:
-                return
-            result._maybe_set_cache(meta)
-            buckets = self.buckets
-            try:
-                buckets[result].append(result)
-                buckets.pop(result)
-            except KeyError:
-                pass
-        time.sleep(0)
-
-
-
 class KeyValueStoreBackend(BaseBackend):
     key_t = ensure_bytes
     task_keyprefix = 'celery-task-meta-'

+ 0 - 54
funtests/stress/t.py

@@ -1,54 +0,0 @@
-#import eventlet
-#eventlet.monkey_patch()
-
-from celery import group
-import socket
-from stress.app import add, raising
-
-def on_ready(result):
-    print('RESULT: %r' % (result.get(),))
-
-finished = [0]
-
-def test():
-    #group(add.s(i, i) for i in range(1000)).delay().then(on_ready)
-
-    p = group(add.s(i, i) for i in range(1000)).delay()
-    x = p.get(timeout=5)
-    y = p.get(timeout=5)
-    try:
-        assert x == y
-    except AssertionError:
-        print('-' * 64)
-        print('X: %r' % (x,))
-        print('Y: %r' % (y,))
-        raise
-    assert not any(m is None for m in x)
-    assert not any(m is None for m in y)
-
-    #p = add.delay(2, 2)
-    #print(p.get(timeout=5))
-    #p = add.delay(2, 2)
-    #print(p.get(timeout=5))
-    #p = add.delay(2, 2)
-    #print(p.get(timeout=5))
-    #p = add.delay(2, 2)
-    #print(p.get(timeout=5))
-    #p = raising.delay()
-    #try:
-    #    print(p.get(timeout=5))
-    #except Exception as exc:
-    #    print('raised: %r' % (exc),)
-    finished[0] += 1
-
-
-for i in range(10):
-    test()
-
-
-#for i in range(2):
-#    eventlet.spawn(test)
-
-#while finished[0] < 100:
-#    import time
-#    time.sleep(0)