dispatcher.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. """Event dispatcher sends events."""
  2. from __future__ import absolute_import, unicode_literals
  3. import os
  4. import threading
  5. import time
  6. from collections import defaultdict, deque
  7. from kombu import Producer
  8. from celery.app import app_or_default
  9. from celery.five import items
  10. from celery.utils.nodenames import anon_nodename
  11. from celery.utils.time import utcoffset
  12. from .event import Event, get_exchange, group_from
  13. __all__ = ['EventDispatcher']
  14. class EventDispatcher(object):
  15. """Dispatches event messages.
  16. Arguments:
  17. connection (kombu.Connection): Connection to the broker.
  18. hostname (str): Hostname to identify ourselves as,
  19. by default uses the hostname returned by
  20. :func:`~celery.utils.anon_nodename`.
  21. groups (Sequence[str]): List of groups to send events for.
  22. :meth:`send` will ignore send requests to groups not in this list.
  23. If this is :const:`None`, all events will be sent.
  24. Example groups include ``"task"`` and ``"worker"``.
  25. enabled (bool): Set to :const:`False` to not actually publish any
  26. events, making :meth:`send` a no-op.
  27. channel (kombu.Channel): Can be used instead of `connection` to specify
  28. an exact channel to use when sending events.
  29. buffer_while_offline (bool): If enabled events will be buffered
  30. while the connection is down. :meth:`flush` must be called
  31. as soon as the connection is re-established.
  32. Note:
  33. You need to :meth:`close` this after use.
  34. """
  35. DISABLED_TRANSPORTS = {'sql'}
  36. app = None
  37. # set of callbacks to be called when :meth:`enabled`.
  38. on_enabled = None
  39. # set of callbacks to be called when :meth:`disabled`.
  40. on_disabled = None
  41. def __init__(self, connection=None, hostname=None, enabled=True,
  42. channel=None, buffer_while_offline=True, app=None,
  43. serializer=None, groups=None, delivery_mode=1,
  44. buffer_group=None, buffer_limit=24, on_send_buffered=None):
  45. self.app = app_or_default(app or self.app)
  46. self.connection = connection
  47. self.channel = channel
  48. self.hostname = hostname or anon_nodename()
  49. self.buffer_while_offline = buffer_while_offline
  50. self.buffer_group = buffer_group or frozenset()
  51. self.buffer_limit = buffer_limit
  52. self.on_send_buffered = on_send_buffered
  53. self._group_buffer = defaultdict(list)
  54. self.mutex = threading.Lock()
  55. self.producer = None
  56. self._outbound_buffer = deque()
  57. self.serializer = serializer or self.app.conf.event_serializer
  58. self.on_enabled = set()
  59. self.on_disabled = set()
  60. self.groups = set(groups or [])
  61. self.tzoffset = [-time.timezone, -time.altzone]
  62. self.clock = self.app.clock
  63. self.delivery_mode = delivery_mode
  64. if not connection and channel:
  65. self.connection = channel.connection.client
  66. self.enabled = enabled
  67. conninfo = self.connection or self.app.connection_for_write()
  68. self.exchange = get_exchange(conninfo)
  69. if conninfo.transport.driver_type in self.DISABLED_TRANSPORTS:
  70. self.enabled = False
  71. if self.enabled:
  72. self.enable()
  73. self.headers = {'hostname': self.hostname}
  74. self.pid = os.getpid()
  75. def __enter__(self):
  76. return self
  77. def __exit__(self, *exc_info):
  78. self.close()
  79. def enable(self):
  80. self.producer = Producer(self.channel or self.connection,
  81. exchange=self.exchange,
  82. serializer=self.serializer,
  83. auto_declare=False)
  84. self.enabled = True
  85. for callback in self.on_enabled:
  86. callback()
  87. def disable(self):
  88. if self.enabled:
  89. self.enabled = False
  90. self.close()
  91. for callback in self.on_disabled:
  92. callback()
  93. def publish(self, type, fields, producer,
  94. blind=False, Event=Event, **kwargs):
  95. """Publish event using custom :class:`~kombu.Producer`.
  96. Arguments:
  97. type (str): Event type name, with group separated by dash (`-`).
  98. fields: Dictionary of event fields, must be json serializable.
  99. producer (kombu.Producer): Producer instance to use:
  100. only the ``publish`` method will be called.
  101. retry (bool): Retry in the event of connection failure.
  102. retry_policy (Mapping): Map of custom retry policy options.
  103. See :meth:`~kombu.Connection.ensure`.
  104. blind (bool): Don't set logical clock value (also don't forward
  105. the internal logical clock).
  106. Event (Callable): Event type used to create event.
  107. Defaults to :func:`Event`.
  108. utcoffset (Callable): Function returning the current
  109. utc offset in hours.
  110. """
  111. clock = None if blind else self.clock.forward()
  112. event = Event(type, hostname=self.hostname, utcoffset=utcoffset(),
  113. pid=self.pid, clock=clock, **fields)
  114. with self.mutex:
  115. return self._publish(event, producer,
  116. routing_key=type.replace('-', '.'), **kwargs)
  117. def _publish(self, event, producer, routing_key, retry=False,
  118. retry_policy=None, utcoffset=utcoffset):
  119. exchange = self.exchange
  120. try:
  121. producer.publish(
  122. event,
  123. routing_key=routing_key,
  124. exchange=exchange.name,
  125. retry=retry,
  126. retry_policy=retry_policy,
  127. declare=[exchange],
  128. serializer=self.serializer,
  129. headers=self.headers,
  130. delivery_mode=self.delivery_mode,
  131. )
  132. except Exception as exc: # pylint: disable=broad-except
  133. if not self.buffer_while_offline:
  134. raise
  135. self._outbound_buffer.append((event, routing_key, exc))
  136. def send(self, type, blind=False, utcoffset=utcoffset, retry=False,
  137. retry_policy=None, Event=Event, **fields):
  138. """Send event.
  139. Arguments:
  140. type (str): Event type name, with group separated by dash (`-`).
  141. retry (bool): Retry in the event of connection failure.
  142. retry_policy (Mapping): Map of custom retry policy options.
  143. See :meth:`~kombu.Connection.ensure`.
  144. blind (bool): Don't set logical clock value (also don't forward
  145. the internal logical clock).
  146. Event (Callable): Event type used to create event,
  147. defaults to :func:`Event`.
  148. utcoffset (Callable): unction returning the current utc offset
  149. in hours.
  150. **fields (Any): Event fields -- must be json serializable.
  151. """
  152. if self.enabled:
  153. groups, group = self.groups, group_from(type)
  154. if groups and group not in groups:
  155. return
  156. if group in self.buffer_group:
  157. clock = self.clock.forward()
  158. event = Event(type, hostname=self.hostname,
  159. utcoffset=utcoffset(),
  160. pid=self.pid, clock=clock, **fields)
  161. buf = self._group_buffer[group]
  162. buf.append(event)
  163. if len(buf) >= self.buffer_limit:
  164. self.flush()
  165. elif self.on_send_buffered:
  166. self.on_send_buffered()
  167. else:
  168. return self.publish(type, fields, self.producer, blind=blind,
  169. Event=Event, retry=retry,
  170. retry_policy=retry_policy)
  171. def flush(self, errors=True, groups=True):
  172. """Flush the outbound buffer."""
  173. if errors:
  174. buf = list(self._outbound_buffer)
  175. try:
  176. with self.mutex:
  177. for event, routing_key, _ in buf:
  178. self._publish(event, self.producer, routing_key)
  179. finally:
  180. self._outbound_buffer.clear()
  181. if groups:
  182. with self.mutex:
  183. for group, events in items(self._group_buffer):
  184. self._publish(events, self.producer, '%s.multi' % group)
  185. events[:] = [] # list.clear
  186. def extend_buffer(self, other):
  187. """Copy the outbound buffer of another instance."""
  188. self._outbound_buffer.extend(other._outbound_buffer)
  189. def close(self):
  190. """Close the event dispatcher."""
  191. self.mutex.locked() and self.mutex.release()
  192. self.producer = None
  193. def _get_publisher(self):
  194. return self.producer
  195. def _set_publisher(self, producer):
  196. self.producer = producer
  197. publisher = property(_get_publisher, _set_publisher) # XXX compat