Browse Source

Yes! Yes! Got eventlet support working (but some small bugs to fix still)

Ask Solem 14 years ago
parent
commit
6d255867ae
3 changed files with 146 additions and 2 deletions
  1. 120 0
      celery/concurrency/evlet.py
  2. 25 1
      celery/worker/consumer.py
  3. 1 1
      celery/worker/job.py

+ 120 - 0
celery/concurrency/evlet.py

@@ -0,0 +1,120 @@
+
+import threading
+
+from eventlet import GreenPile, GreenPool
+from eventlet import hubs
+from eventlet import spawn_n
+from eventlet.greenthread import sleep
+from Queue import Empty, Queue as LightQueue
+
+from celery import log
+from celery.utils.functional import partial
+from celery.datastructures import ExceptionInfo
+
+RUN = 0x1
+CLOSE = 0x2
+TERMINATE = 0x3
+
+
+accept_lock = threading.Lock()
+
+
+def do_work(target, args=(), kwargs={}, callback=None,
+        accept_callback=None, method_queue=None):
+    method_queue.delegate(accept_callback)
+    callback(target(*args, **kwargs))
+
+
+class Waiter(threading.Thread):
+
+    def __init__(self, inqueue, limit):
+        self.inqueue = inqueue
+        self.limit = limit
+        self._state = None
+        threading.Thread.__init__(self)
+
+    def run(self):
+        hubs.use_hub()
+        pool = GreenPool(self.limit)
+        pile = GreenPile(pool)
+        self._state = RUN
+        inqueue = self.inqueue
+
+        def get_from_queue_forever():
+            while self._state == RUN:
+                try:
+                    print("+INQUEUE GET")
+                    m = inqueue.get_nowait()
+                    print("-INQUEUE GET")
+                except Empty:
+                    sleep(0.3)
+                else:
+                    print("+SPAWN")
+                    pile.spawn(*m)
+                    print("-SPAWN")
+
+        def wait_for_pile_forever():
+            while self._state == RUN:
+                print("+CALLING PILE NEXT")
+                pile.next()
+                print("-CALLING PILE NEXT")
+
+        spawn_n(get_from_queue_forever)
+        spawn_n(wait_for_pile_forever)
+
+        while self._state == RUN:
+            sleep(0.1)
+
+
+
+class TaskPool(object):
+    _state = None
+
+    def __init__(self, limit, logger=None, **kwargs):
+        self.limit = limit
+        self.logger = logger or log.get_default_logger()
+        self._pool = None
+
+    def start(self):
+        self._state = RUN
+        self._out = LightQueue()
+        self._waiter = Waiter(self._out, self.limit)
+        self._waiter.start()
+
+    def stop(self):
+        self._state = CLOSE
+        self._pool.pool.waitall()
+        self._waiter._state = TERMINATE
+        self._state = TERMINATE
+
+    def apply_async(self, target, args=None, kwargs=None, callbacks=None,
+            errbacks=None, accept_callback=None, **compat):
+        if self._state != RUN:
+            return
+        args = args or []
+        kwargs = kwargs or {}
+        callbacks = callbacks or []
+        errbacks = errbacks or []
+
+        on_ready = partial(self.on_ready, callbacks, errbacks)
+
+        self.logger.debug("GreenPile: Spawn %s (args:%s kwargs:%s)" % (
+            target, args, kwargs))
+
+        print("+OUTQUEUE.PUT")
+        self._out.put((do_work, target, args, kwargs,
+                      on_ready, accept_callback, self.method_queue))
+        print("-OUTQUEUE.PUT")
+
+
+    def on_ready(self, callbacks, errbacks, ret_value):
+        """What to do when a worker task is ready and its return value has
+        been collected."""
+
+        if isinstance(ret_value, ExceptionInfo):
+            if isinstance(ret_value.exception, (
+                    SystemExit, KeyboardInterrupt)): # pragma: no cover
+                raise ret_value.exception
+            [errback(ret_value) for errback in errbacks]
+        else:
+            [callback(ret_value) for callback in callbacks]

+ 25 - 1
celery/worker/consumer.py

@@ -74,6 +74,8 @@ import socket
 import sys
 import warnings
 
+from Queue import Empty, Queue
+
 from celery.app import app_or_default
 from celery.datastructures import AttributeDict, SharedCounter
 from celery.exceptions import NotRegistered
@@ -88,6 +90,22 @@ RUN = 0x1
 CLOSE = 0x2
 
 
+class MethodQueue(Queue):
+
+    def delegate(self, fun, *args, **kwargs):
+        self.put_nowait((fun, args, kwargs))
+
+    def drain(self):
+        while 1:
+            try:
+                m = self.get_nowait()
+            except Empty:
+                break
+            else:
+                method, margs, mkwargs = m
+                method(*margs, **mkwargs)
+
+
 class QoS(object):
     """Quality of Service for Channel.
 
@@ -211,6 +229,8 @@ class Consumer(object):
         self.event_dispatcher = None
         self.heart = None
         self.pool = pool
+        self.method_queue = MethodQueue()
+        self.pool.method_queue = self.method_queue; # FIXME Ahg ahg ahg.
         pidbox_state = AttributeDict(app=self.app,
                                      logger=logger,
                                      hostname=self.hostname,
@@ -440,7 +460,11 @@ class Consumer(object):
 
     def _mainloop(self):
         while 1:
-            yield self.connection.drain_events()
+            self.method_queue.drain()
+            try:
+                yield self.connection.drain_events(timeout=0.1)
+            except socket.timeout:
+                pass
 
     def _open_connection(self):
         """Open connection.  May retry opening the connection if configuration

+ 1 - 1
celery/worker/job.py

@@ -430,7 +430,7 @@ class TaskRequest(object):
         if self.task.acks_late:
             self.acknowledge()
 
-        runtime = time.time() - self.time_start
+        runtime = self.time_start and (time.time() - self.time_start) or 0
         self.send_event("task-succeeded", uuid=self.task_id,
                         result=repr(ret_value), runtime=runtime)