Pārlūkot izejas kodu

amqplib: Consumer is now using an epoll/kqueue/select event loop

If the connection has a valid .eventmap attribute (which the amqplib
transport has), the celeryd consumer will use epoll/kqueue/select to
pop messages of the queue, and also replace the multiprocessing pool's
ResultHandler thread with an evented version.

Depends on:

    - ask/billiard@7adfb4264b75f6f63ad41581e9c043de3e66713d

    - ask/kombu@59a92919d9678ba31d7085af5c287b82d56cdf07

Closes #644
Ask Solem 13 gadi atpakaļ
vecāks
revīzija
5f08d979d0

+ 12 - 1
celery/concurrency/base.py

@@ -73,7 +73,7 @@ class BasePool(object):
                 "%s does not implement restart" % (self.__class__, ))
                 "%s does not implement restart" % (self.__class__, ))
 
 
     def stop(self):
     def stop(self):
-        self._state = self.CLOSE
+        self.close()
         self.on_stop()
         self.on_stop()
         self._state = self.TERMINATE
         self._state = self.TERMINATE
 
 
@@ -85,6 +85,13 @@ class BasePool(object):
         self.on_start()
         self.on_start()
         self._state = self.RUN
         self._state = self.RUN
 
 
+    def close(self):
+        self._state = self.CLOSE
+        self.on_close()
+
+    def on_close(self):
+        pass
+
     def apply_async(self, target, args=[], kwargs={}, **options):
     def apply_async(self, target, args=[], kwargs={}, **options):
         """Equivalent of the :func:`apply` built-in function.
         """Equivalent of the :func:`apply` built-in function.
 
 
@@ -114,3 +121,7 @@ class BasePool(object):
     @property
     @property
     def num_processes(self):
     def num_processes(self):
         return self.limit
         return self.limit
+
+    @property
+    def eventmap(self):
+        return {}

+ 8 - 0
celery/concurrency/processes/__init__.py

@@ -81,6 +81,10 @@ class TaskPool(BasePool):
             self._pool.terminate()
             self._pool.terminate()
             self._pool = None
             self._pool = None
 
 
+    def on_close(self):
+        if self._pool is not None and self._pool._state == RUN:
+            self._pool.close()
+
     def terminate_job(self, pid, signal=None):
     def terminate_job(self, pid, signal=None):
         _kill(pid, signal or _signal.SIGTERM)
         _kill(pid, signal or _signal.SIGTERM)
 
 
@@ -103,3 +107,7 @@ class TaskPool(BasePool):
     @property
     @property
     def num_processes(self):
     def num_processes(self):
         return self._pool._processes
         return self._pool._processes
+
+    @property
+    def eventmap(self):
+        return self._pool.eventmap

+ 14 - 12
celery/utils/threads.py

@@ -50,20 +50,22 @@ class bgThread(Thread):
 
 
     def run(self):
     def run(self):
         shutdown = self._is_shutdown
         shutdown = self._is_shutdown
-        while not shutdown.is_set():
-            try:
-                self.body()
-            except Exception, exc:
-                self.on_crash("%r crashed: %r", self.name, exc)
-                # exiting by normal means does not work here, so force exit.
-                os._exit(1)
         try:
         try:
+            while not shutdown.is_set():
+                try:
+                    self.body()
+                except Exception, exc:
+                    self.on_crash("%r crashed: %r", self.name, exc)
+                    # exiting by normal means does not work here, so force exit.
+                    os._exit(1)
+            try:
+                self._is_stopped.set()
+            except TypeError:  # pragma: no cover
+                # we lost the race at interpreter shutdown,
+                # so gc collected built-in modules.
+                pass
+        finally:
             self._is_stopped.set()
             self._is_stopped.set()
-        except TypeError:  # pragma: no cover
-            # we lost the race at interpreter shutdown,
-            # so gc collected built-in modules.
-            pass
-        self._is_stopped.set()
 
 
     def stop(self):
     def stop(self):
         """Graceful shutdown."""
         """Graceful shutdown."""

+ 21 - 6
celery/worker/__init__.py

@@ -21,6 +21,7 @@ import sys
 import traceback
 import traceback
 
 
 from billiard import forking_enable
 from billiard import forking_enable
+from kombu.syn import detect_environment
 from kombu.utils.finalize import Finalize
 from kombu.utils.finalize import Finalize
 
 
 from celery import concurrency as _concurrency
 from celery import concurrency as _concurrency
@@ -87,16 +88,19 @@ class Pool(abstract.StartStopComponent):
         w.no_execv = no_execv
         w.no_execv = no_execv
         if w.autoscale:
         if w.autoscale:
             w.max_concurrency, w.min_concurrency = w.autoscale
             w.max_concurrency, w.min_concurrency = w.autoscale
+        w.use_eventloop = (detect_environment() == "default" and
+                           w.app.broker_connection().eventmap)
 
 
     def create(self, w):
     def create(self, w):
         forking_enable(w.no_execv or not w.force_execv)
         forking_enable(w.no_execv or not w.force_execv)
         pool = w.pool = self.instantiate(w.pool_cls, w.min_concurrency,
         pool = w.pool = self.instantiate(w.pool_cls, w.min_concurrency,
-                                initargs=(w.app, w.hostname),
-                                maxtasksperchild=w.max_tasks_per_child,
-                                timeout=w.task_time_limit,
-                                soft_timeout=w.task_soft_time_limit,
-                                putlocks=w.pool_putlocks,
-                                lost_worker_timeout=w.worker_lost_wait)
+                            initargs=(w.app, w.hostname),
+                            maxtasksperchild=w.max_tasks_per_child,
+                            timeout=w.task_time_limit,
+                            soft_timeout=w.task_soft_time_limit,
+                            putlocks=w.pool_putlocks,
+                            lost_worker_timeout=w.worker_lost_wait,
+                            start_result_thread=not w.use_eventloop)
         return pool
         return pool
 
 
 
 
@@ -274,13 +278,21 @@ class WorkController(configurated):
             self.stop()
             self.stop()
             raise exc
             raise exc
 
 
+    def signal_consumer_close(self):
+        try:
+            self.consumer.close()
+        except AttributeError:
+            pass
+
     def stop(self, in_sighandler=False):
     def stop(self, in_sighandler=False):
         """Graceful shutdown of the worker server."""
         """Graceful shutdown of the worker server."""
+        self.signal_consumer_close()
         if not in_sighandler or self.pool.signal_safe:
         if not in_sighandler or self.pool.signal_safe:
             self._shutdown(warm=True)
             self._shutdown(warm=True)
 
 
     def terminate(self, in_sighandler=False):
     def terminate(self, in_sighandler=False):
         """Not so graceful shutdown of the worker server."""
         """Not so graceful shutdown of the worker server."""
+        self.signal_consumer_close()
         if not in_sighandler or self.pool.signal_safe:
         if not in_sighandler or self.pool.signal_safe:
             self._shutdown(warm=False)
             self._shutdown(warm=False)
 
 
@@ -290,6 +302,9 @@ class WorkController(configurated):
         if self._state in (self.CLOSE, self.TERMINATE):
         if self._state in (self.CLOSE, self.TERMINATE):
             return
             return
 
 
+        if self.pool:
+            self.pool.close()
+
         if self._state != self.RUN or self._running != len(self.components):
         if self._state != self.RUN or self._running != len(self.components):
             # Not fully started, can safely exit.
             # Not fully started, can safely exit.
             self._state = self.TERMINATE
             self._state = self.TERMINATE

+ 48 - 7
celery/worker/consumer.py

@@ -84,7 +84,7 @@ from kombu.utils.encoding import safe_repr
 
 
 from celery.app import app_or_default
 from celery.app import app_or_default
 from celery.datastructures import AttributeDict
 from celery.datastructures import AttributeDict
-from celery.exceptions import InvalidTaskError
+from celery.exceptions import InvalidTaskError, SystemTerminate
 from celery.utils import timer2
 from celery.utils import timer2
 from celery.utils.functional import noop
 from celery.utils.functional import noop
 from celery.utils.log import get_logger
 from celery.utils.log import get_logger
@@ -93,6 +93,7 @@ from . import state
 from .abstract import StartStopComponent
 from .abstract import StartStopComponent
 from .control import Panel
 from .control import Panel
 from .heartbeat import Heart
 from .heartbeat import Heart
+from .hub import Hub
 
 
 RUN = 0x1
 RUN = 0x1
 CLOSE = 0x2
 CLOSE = 0x2
@@ -164,7 +165,8 @@ class Component(StartStopComponent):
                 pool=w.pool,
                 pool=w.pool,
                 priority_timer=w.priority_timer,
                 priority_timer=w.priority_timer,
                 app=w.app,
                 app=w.app,
-                controller=w)
+                controller=w,
+                use_eventloop=w.use_eventloop)
         return c
         return c
 
 
 
 
@@ -295,10 +297,15 @@ class Consumer(object):
     # Consumer state, can be RUN or CLOSE.
     # Consumer state, can be RUN or CLOSE.
     _state = None
     _state = None
 
 
+    #: If true then pool results and broker messages will be
+    #: handled in an event loop.
+    use_eventloop = False
+
     def __init__(self, ready_queue, eta_schedule,
     def __init__(self, ready_queue, eta_schedule,
             init_callback=noop, send_events=False, hostname=None,
             init_callback=noop, send_events=False, hostname=None,
             initial_prefetch_count=2, pool=None, app=None,
             initial_prefetch_count=2, pool=None, app=None,
-            priority_timer=None, controller=None, **kwargs):
+            priority_timer=None, controller=None, use_eventloop=False,
+            **kwargs):
         self.app = app_or_default(app)
         self.app = app_or_default(app)
         self.connection = None
         self.connection = None
         self.task_consumer = None
         self.task_consumer = None
@@ -314,6 +321,7 @@ class Consumer(object):
         self.heart = None
         self.heart = None
         self.pool = pool
         self.pool = pool
         self.priority_timer = priority_timer or timer2.default_timer
         self.priority_timer = priority_timer or timer2.default_timer
+        self.use_eventloop = use_eventloop
         pidbox_state = AttributeDict(app=self.app,
         pidbox_state = AttributeDict(app=self.app,
                                      hostname=self.hostname,
                                      hostname=self.hostname,
                                      listener=self,     # pre 2.2
                                      listener=self,     # pre 2.2
@@ -352,22 +360,52 @@ class Consumer(object):
                 error(RETRY_CONNECTION, exc_info=True)
                 error(RETRY_CONNECTION, exc_info=True)
 
 
     def consume_messages(self):
     def consume_messages(self):
-        """Consume messages forever (or until an exception is raised)."""
-        debug("Starting message consumer...")
         self.task_consumer.consume()
         self.task_consumer.consume()
         debug("Ready to accept tasks!")
         debug("Ready to accept tasks!")
 
 
+        # evented version
+        if self.use_eventloop:
+            return self._eventloop()
+
         while self._state != CLOSE and self.connection:
         while self._state != CLOSE and self.connection:
+            if state.should_stop:
+                raise SystemExit()
+            elif state.should_terminate:
+                raise SystemTerminate()
             if self.qos.prev != self.qos.value:     # pragma: no cover
             if self.qos.prev != self.qos.value:     # pragma: no cover
                 self.qos.update()
                 self.qos.update()
             try:
             try:
-                self.connection.drain_events(timeout=1)
+                self.connection.drain_events(timeout=1.0)
             except socket.timeout:
             except socket.timeout:
                 pass
                 pass
             except socket.error:
             except socket.error:
                 if self._state != CLOSE:            # pragma: no cover
                 if self._state != CLOSE:            # pragma: no cover
                     raise
                     raise
 
 
+    def _eventloop(self):
+        """Consume messages forever (or until an exception is raised)."""
+        with Hub() as hub:
+            hub.update(self.connection.eventmap,
+                       self.pool.eventmap)
+            fdmap = hub.fdmap
+            poll = hub.poller.poll
+
+            while self._state != CLOSE and self.connection:
+                if state.should_stop:
+                    raise SystemExit()
+                elif state.should_terminate:
+                    raise SystemTerminate()
+                if not fdmap:
+                    return
+                if self.qos.prev != self.qos.value:     # pragma: no cover
+                    self.qos.update()
+                for fileno, event in poll(1.0) or ():
+                    try:
+                        fdmap[fileno]()
+                    except socket.error:
+                        if self._state != CLOSE:        # pragma: no cover
+                            raise
+
     def on_task(self, task):
     def on_task(self, task):
         """Handle received task.
         """Handle received task.
 
 
@@ -665,10 +703,13 @@ class Consumer(object):
         """
         """
         # Notifies other threads that this instance can't be used
         # Notifies other threads that this instance can't be used
         # anymore.
         # anymore.
-        self._state = CLOSE
+        self.close()
         debug("Stopping consumers...")
         debug("Stopping consumers...")
         self.stop_consumers(close_connection=False)
         self.stop_consumers(close_connection=False)
 
 
+    def close(self):
+        self._state = CLOSE
+
     @property
     @property
     def info(self):
     def info(self):
         """Returns information about this consumer instance
         """Returns information about this consumer instance

+ 49 - 0
celery/worker/hub.py

@@ -0,0 +1,49 @@
+from __future__ import absolute_import
+
+from time import sleep
+
+from kombu.utils.eventio import poll, POLL_READ, POLL_ERR
+
+
+class Hub(object):
+    eventflags = POLL_READ | POLL_ERR
+
+    def __init__(self):
+        self.fdmap = {}
+        self.poller = poll()
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, *exc_info):
+        return self.close()
+
+    def add(self, f, callback, flags=None):
+        flags = self.eventflags if flags is None else flags
+        fd = f.fileno()
+        self.poller.register(fd, flags)
+        self.fdmap[fd] = callback
+
+    def update(self, *maps):
+        [self.add(*x) for row in maps for x in row.iteritems()]
+
+    def remove(self, fd):
+        try:
+            self.poller.unregister(fd)
+        except (KeyError, OSError):
+            pass
+
+    def tick(self, timeout=1.0):
+        if not self.fdmap:
+            return sleep(0.1)
+        for fileno, event in self.poller.poll(timeout) or ():
+            try:
+                self.fdmap[fileno]()
+            except socket.timeout:
+                pass
+            except socket.error, exc:
+                if exc.errno != errno.EAGAIN:
+                    raise
+
+    def close(self):
+        [self.remove(fd) for fd in self.fdmap.keys()]