dumper.py 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.events.dumper
  4. ~~~~~~~~~~~~~~~~~~~~
  5. THis is a simple program that dumps events to the console
  6. as they happen. Think of it like a `tcpdump` for Celery events.
  7. """
  8. from __future__ import absolute_import
  9. import sys
  10. from datetime import datetime
  11. from celery.app import app_or_default
  12. from celery.datastructures import LRUCache
  13. TASK_NAMES = LRUCache(limit=0xFFF)
  14. HUMAN_TYPES = {'worker-offline': 'shutdown',
  15. 'worker-online': 'started',
  16. 'worker-heartbeat': 'heartbeat'}
  17. def humanize_type(type):
  18. try:
  19. return HUMAN_TYPES[type.lower()]
  20. except KeyError:
  21. return type.lower().replace('-', ' ')
  22. def say(msg, out=sys.stdout):
  23. out.write(msg + '\n')
  24. class Dumper(object):
  25. def __init__(self, out=sys.stdout):
  26. self.out = out
  27. def say(self, msg):
  28. say(msg, out=self.out)
  29. def on_event(self, event):
  30. timestamp = datetime.utcfromtimestamp(event.pop('timestamp'))
  31. type = event.pop('type').lower()
  32. hostname = event.pop('hostname')
  33. if type.startswith('task-'):
  34. uuid = event.pop('uuid')
  35. if type in ('task-received', 'task-sent'):
  36. task = TASK_NAMES[uuid] = '%s(%s) args=%s kwargs=%s' % (
  37. event.pop('name'), uuid,
  38. event.pop('args'),
  39. event.pop('kwargs'))
  40. else:
  41. task = TASK_NAMES.get(uuid, '')
  42. return self.format_task_event(hostname, timestamp,
  43. type, task, event)
  44. fields = ', '.join('%s=%s' % (key, event[key])
  45. for key in sorted(event.keys()))
  46. sep = fields and ':' or ''
  47. self.say('%s [%s] %s%s %s' % (hostname, timestamp,
  48. humanize_type(type), sep, fields))
  49. def format_task_event(self, hostname, timestamp, type, task, event):
  50. fields = ', '.join('%s=%s' % (key, event[key])
  51. for key in sorted(event.keys()))
  52. sep = fields and ':' or ''
  53. self.say('%s [%s] %s%s %s %s' % (hostname, timestamp,
  54. humanize_type(type), sep, task, fields))
  55. def evdump(app=None, out=sys.stdout):
  56. app = app_or_default(app)
  57. dumper = Dumper(out=out)
  58. dumper.say('-> evdump: starting capture...')
  59. conn = app.broker_connection()
  60. recv = app.events.Receiver(conn, handlers={'*': dumper.on_event})
  61. try:
  62. recv.capture()
  63. except (KeyboardInterrupt, SystemExit):
  64. conn and conn.close()
  65. if __name__ == '__main__': # pragma: no cover
  66. evdump()