|
@@ -15,6 +15,7 @@ from datetime import datetime
|
|
|
|
|
|
from celery.app import app_or_default
|
|
|
from celery.datastructures import LRUCache
|
|
|
+from celery.utils.timeutils import humanize_seconds
|
|
|
|
|
|
|
|
|
TASK_NAMES = LRUCache(limit=0xFFF)
|
|
@@ -23,6 +24,11 @@ HUMAN_TYPES = {'worker-offline': 'shutdown',
|
|
|
'worker-online': 'started',
|
|
|
'worker-heartbeat': 'heartbeat'}
|
|
|
|
|
|
+CONNECTION_ERROR = """\
|
|
|
+-> Cannot connect to %s: %s.
|
|
|
+Trying again %s
|
|
|
+"""
|
|
|
+
|
|
|
|
|
|
def humanize_type(type):
|
|
|
try:
|
|
@@ -76,11 +82,22 @@ def evdump(app=None, out=sys.stdout):
|
|
|
dumper = Dumper(out=out)
|
|
|
dumper.say('-> evdump: starting capture...')
|
|
|
conn = app.connection()
|
|
|
- recv = app.events.Receiver(conn, handlers={'*': dumper.on_event})
|
|
|
- try:
|
|
|
- recv.capture()
|
|
|
- except (KeyboardInterrupt, SystemExit):
|
|
|
- conn and conn.close()
|
|
|
+
|
|
|
+ def _error_handler(exc, interval):
|
|
|
+ dumper.say(CONNECTION_ERROR % (
|
|
|
+ conn.as_uri(), exc, humanize_seconds(interval, 'in', ' ')
|
|
|
+ ))
|
|
|
+
|
|
|
+ while 1:
|
|
|
+ try:
|
|
|
+ conn = conn.clone()
|
|
|
+ conn.ensure_connection(_error_handler)
|
|
|
+ recv = app.events.Receiver(conn, handlers={'*': dumper.on_event})
|
|
|
+ recv.capture()
|
|
|
+ except (KeyboardInterrupt, SystemExit):
|
|
|
+ return conn and conn.close()
|
|
|
+ except conn.connection_errors + conn.channel_errors as exc:
|
|
|
+ dumper.say('-> Connection lost, attempting reconnect')
|
|
|
|
|
|
if __name__ == '__main__': # pragma: no cover
|
|
|
evdump()
|