pidbox.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. from __future__ import absolute_import
  2. import socket
  3. import threading
  4. from kombu.common import ignore_errors
  5. from kombu.utils.encoding import safe_str
  6. from celery.datastructures import AttributeDict
  7. from celery.utils.functional import pass1
  8. from celery.utils.log import get_logger
  9. from . import control
  10. __all__ = ['Pidbox', 'gPidbox']
  11. logger = get_logger(__name__)
  12. debug, error, info = logger.debug, logger.error, logger.info
  13. class Pidbox(object):
  14. consumer = None
  15. def __init__(self, c):
  16. self.c = c
  17. self.hostname = c.hostname
  18. self.node = c.app.control.mailbox.Node(
  19. safe_str(c.hostname),
  20. handlers=control.Panel.data,
  21. state=AttributeDict(
  22. app=c.app,
  23. hostname=c.hostname,
  24. consumer=c,
  25. tset=pass1 if c.controller.use_eventloop else set),
  26. )
  27. self._forward_clock = self.c.app.clock.forward
  28. def on_message(self, body, message):
  29. # just increase clock as clients usually don't
  30. # have a valid clock to adjust with.
  31. self._forward_clock()
  32. try:
  33. self.node.handle_message(body, message)
  34. except KeyError as exc:
  35. error('No such control command: %s', exc)
  36. except Exception as exc:
  37. error('Control command error: %r', exc, exc_info=True)
  38. self.reset()
  39. def start(self, c):
  40. self.node.channel = c.connection.channel()
  41. self.consumer = self.node.listen(callback=self.on_message)
  42. self.consumer.on_decode_error = c.on_decode_error
  43. def on_stop(self):
  44. pass
  45. def stop(self, c):
  46. self.on_stop()
  47. self.consumer = self._close_channel(c)
  48. def reset(self):
  49. """Sets up the process mailbox."""
  50. self.stop(self.c)
  51. self.start(self.c)
  52. def _close_channel(self, c):
  53. if self.node and self.node.channel:
  54. ignore_errors(c, self.node.channel.close)
  55. def shutdown(self, c):
  56. self.on_stop()
  57. if self.consumer:
  58. debug('Cancelling broadcast consumer...')
  59. ignore_errors(c, self.consumer.cancel)
  60. self.stop(self.c)
  61. class gPidbox(Pidbox):
  62. _node_shutdown = None
  63. _node_stopped = None
  64. _resets = 0
  65. def start(self, c):
  66. c.pool.spawn_n(self.loop, c)
  67. def on_stop(self):
  68. if self._node_stopped:
  69. self._node_shutdown.set()
  70. debug('Waiting for broadcast thread to shutdown...')
  71. self._node_stopped.wait()
  72. self._node_stopped = self._node_shutdown = None
  73. def reset(self):
  74. self._resets += 1
  75. def _do_reset(self, c, connection):
  76. self._close_channel(c)
  77. self.node.channel = connection.channel()
  78. self.consumer = self.node.listen(callback=self.on_message)
  79. self.consumer.consume()
  80. def loop(self, c):
  81. resets = [self._resets]
  82. shutdown = self._node_shutdown = threading.Event()
  83. stopped = self._node_stopped = threading.Event()
  84. try:
  85. with c.connect() as connection:
  86. info('pidbox: Connected to %s.', connection.as_uri())
  87. self._do_reset(c, connection)
  88. while not shutdown.is_set() and c.connection:
  89. if resets[0] < self._resets:
  90. resets[0] += 1
  91. self._do_reset(c, connection)
  92. try:
  93. connection.drain_events(timeout=1.0)
  94. except socket.timeout:
  95. pass
  96. finally:
  97. stopped.set()