Procházet zdrojové kódy

celeryev should restablish connection when lost. Closes #574

Ask Solem před 13 roky
rodič
revize
658e01ece3
2 změnil soubory, kde provedl 32 přidání a 34 odebrání
  1. 10 27
      celery/events/__init__.py
  2. 22 7
      celery/events/cursesmon.py

+ 10 - 27
celery/events/__init__.py

@@ -22,6 +22,7 @@ from collections import deque
 from contextlib import contextmanager
 from itertools import count
 
+from kombu.common import eventloop
 from kombu.entity import Exchange, Queue
 from kombu.messaging import Consumer, Producer
 
@@ -178,29 +179,19 @@ class EventReceiver(object):
         handler and handler(event)
 
     @contextmanager
-    def consumer(self):
-        """Create event consumer.
-
-        .. warning::
-
-            This creates a new channel that needs to be closed
-            by calling `consumer.channel.close()`.
-
-        """
-        consumer = Consumer(self.connection.channel(),
+    def consumer(self, wakeup=True):
+        """Create event consumer."""
+        consumer = Consumer(self.connection,
                             queues=[self.queue], no_ack=True)
         consumer.register_callback(self._receive)
         with consumer:
-            yield consumer
-        consumer.channel.close()
-
-    def itercapture(self, limit=None, timeout=None, wakeup=True):
-        with self.consumer() as consumer:
             if wakeup:
                 self.wakeup_workers(channel=consumer.channel)
-
             yield consumer
 
+    def itercapture(self, limit=None, timeout=None, wakeup=True):
+        with self.consumer(wakeup=wakeup) as consumer:
+            yield consumer
             self.drain_events(limit=limit, timeout=timeout)
 
     def capture(self, limit=None, timeout=None, wakeup=True):
@@ -217,17 +208,9 @@ class EventReceiver(object):
                                    connection=self.connection,
                                    channel=channel)
 
-    def drain_events(self, limit=None, timeout=None):
-        for iteration in count(0):
-            if limit and iteration >= limit:
-                break
-            try:
-                self.connection.drain_events(timeout=timeout)
-            except socket.timeout:
-                if timeout:
-                    raise
-            except socket.error:
-                pass
+    def drain_events(self, **kwargs):
+        for _ in eventloop(self.connection, **kwargs):
+            pass
 
     def _receive(self, body, message):
         type = body.pop("type").lower()

+ 22 - 7
celery/events/cursesmon.py

@@ -471,27 +471,42 @@ class DisplayThread(threading.Thread):
             self.display.nap()
 
 
+def capture_events(app, state, display):
+
+    def on_connection_error(exc, interval):
+        sys.stderr.write("Connection Error: %r. Retry in %ss." % (
+            exc, interval))
+
+    while 1:
+        sys.stderr.write("-> evtop: starting capture...\n")
+        with app.broker_connection() as conn:
+            try:
+                conn.ensure_connection(on_connection_error,
+                                       app.conf.BROKER_CONNECTION_MAX_RETRIES)
+                recv = app.events.Receiver(conn, handlers={"*": state.event})
+                display.resetscreen()
+                display.init_screen()
+                with recv.consumer():
+                    recv.drain_events(timeout=1, ignore_timeouts=True)
+            except (conn.connection_errors, conn.channel_errors), exc:
+                sys.stderr.write("Connection lost: %r" % (exc, ))
+
+
 def evtop(app=None):
-    sys.stderr.write("-> evtop: starting capture...\n")
     app = app_or_default(app)
     state = app.events.State()
-    conn = app.broker_connection()
-    recv = app.events.Receiver(conn, handlers={"*": state.event})
-    capture = recv.itercapture()
-    capture.next()
     display = CursesMonitor(state, app=app)
     display.init_screen()
     refresher = DisplayThread(display)
     refresher.start()
     try:
-        capture.next()
+        capture_events(app, state, display)
     except Exception:
         refresher.shutdown = True
         refresher.join()
         display.resetscreen()
         raise
     except (KeyboardInterrupt, SystemExit):
-        conn and conn.close()
         refresher.shutdown = True
         refresher.join()
         display.resetscreen()