fbi.py 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. from __future__ import absolute_import, print_function, unicode_literals
  2. import socket
  3. import sys
  4. from contextlib import contextmanager
  5. from celery import states
  6. class FBI(object):
  7. def __init__(self, app):
  8. self.app = app
  9. self.receiver = None
  10. self.state = self.app.events.State()
  11. self.connection = None
  12. self.enabled = False
  13. def enable(self, enabled):
  14. self.enabled = enabled
  15. @contextmanager
  16. def investigation(self):
  17. if self.enabled:
  18. with self.app.connection() as conn:
  19. receiver = self.app.events.Receiver(
  20. conn, handlers={'*': self.state.event},
  21. )
  22. with receiver.consumer_context() as (conn, _, _):
  23. self.connection = conn
  24. try:
  25. yield self
  26. finally:
  27. self.ffwd()
  28. else:
  29. yield
  30. def ffwd(self):
  31. while 1:
  32. try:
  33. self.connection.drain_events(timeout=1)
  34. except socket.error:
  35. break
  36. def state_of(self, tid):
  37. try:
  38. task = self.state.tasks[tid]
  39. except KeyError:
  40. return 'No events for {0}'.format(tid)
  41. if task.state in states.READY_STATES:
  42. return 'Task {0.uuid} completed with {0.state}'.format(task)
  43. elif task.state in states.UNREADY_STATES:
  44. return 'Task {0.uuid} waiting in {0.state} state'.format(task)
  45. else:
  46. return 'Task {0.uuid} in other state {0.state}'.format(task)
  47. def query(self, ids):
  48. return self.app.control.inspect().query_task(id)
  49. def diag(self, ids, file=sys.stderr):
  50. if self.enabled:
  51. self.ffwd()
  52. for tid in ids:
  53. print(self.state_of(tid), file=file)