dumper.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.events.dumper
  4. ~~~~~~~~~~~~~~~~~~~~
  5. This is a simple program that dumps events to the console
  6. as they happen. Think of it like a `tcpdump` for Celery events.
  7. """
  8. from __future__ import absolute_import
  9. import sys
  10. from datetime import datetime
  11. from celery.app import app_or_default
  12. from celery.datastructures import LRUCache
  13. from celery.utils.timeutils import humanize_seconds
  14. TASK_NAMES = LRUCache(limit=0xFFF)
  15. HUMAN_TYPES = {'worker-offline': 'shutdown',
  16. 'worker-online': 'started',
  17. 'worker-heartbeat': 'heartbeat'}
  18. CONNECTION_ERROR = """\
  19. -> Cannot connect to %s: %s.
  20. Trying again %s
  21. """
  22. def humanize_type(type):
  23. try:
  24. return HUMAN_TYPES[type.lower()]
  25. except KeyError:
  26. return type.lower().replace('-', ' ')
  27. def say(msg, out=sys.stdout):
  28. out.write(msg + '\n')
  29. class Dumper(object):
  30. def __init__(self, out=sys.stdout):
  31. self.out = out
  32. def say(self, msg):
  33. say(msg, out=self.out)
  34. def on_event(self, ev):
  35. timestamp = datetime.utcfromtimestamp(ev.pop('timestamp'))
  36. type = ev.pop('type').lower()
  37. hostname = ev.pop('hostname')
  38. if type.startswith('task-'):
  39. uuid = ev.pop('uuid')
  40. if type in ('task-received', 'task-sent'):
  41. task = TASK_NAMES[uuid] = '%s(%s) args=%s kwargs=%s' % (
  42. ev.pop('name'), uuid,
  43. ev.pop('args'),
  44. ev.pop('kwargs'))
  45. else:
  46. task = TASK_NAMES.get(uuid, '')
  47. return self.format_task_event(hostname, timestamp,
  48. type, task, ev)
  49. fields = ', '.join('%s=%s' % (key, ev[key]) for key in sorted(ev))
  50. sep = fields and ':' or ''
  51. self.say('%s [%s] %s%s %s' % (hostname, timestamp,
  52. humanize_type(type), sep, fields))
  53. def format_task_event(self, hostname, timestamp, type, task, ev):
  54. fields = ', '.join('%s=%s' % (key, ev[key]) for key in sorted(ev))
  55. sep = fields and ':' or ''
  56. self.say('%s [%s] %s%s %s %s' % (
  57. hostname, timestamp, humanize_type(type), sep, task, fields,
  58. ))
  59. def evdump(app=None, out=sys.stdout):
  60. app = app_or_default(app)
  61. dumper = Dumper(out=out)
  62. dumper.say('-> evdump: starting capture...')
  63. conn = app.connection()
  64. def _error_handler(exc, interval):
  65. dumper.say(CONNECTION_ERROR % (
  66. conn.as_uri(), exc, humanize_seconds(interval, 'in', ' ')
  67. ))
  68. while 1:
  69. try:
  70. conn = conn.clone()
  71. conn.ensure_connection(_error_handler)
  72. recv = app.events.Receiver(conn, handlers={'*': dumper.on_event})
  73. recv.capture()
  74. except (KeyboardInterrupt, SystemExit):
  75. return conn and conn.close()
  76. except conn.connection_errors + conn.channel_errors:
  77. dumper.say('-> Connection lost, attempting reconnect')
  78. if __name__ == '__main__': # pragma: no cover
  79. evdump()