__init__.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. from __future__ import absolute_import
  2. from __future__ import with_statement
  3. import time
  4. import socket
  5. import threading
  6. from collections import deque
  7. from contextlib import contextmanager
  8. from itertools import count
  9. from kombu.entity import Exchange, Queue
  10. from kombu.messaging import Consumer, Producer
  11. from ..app import app_or_default
  12. from ..utils import uuid
  13. event_exchange = Exchange("celeryev", type="topic")
  14. def Event(type, _fields=None, **fields):
  15. """Create an event.
  16. An event is a dictionary, the only required field is ``type``.
  17. """
  18. event = dict(_fields or {}, type=type, **fields)
  19. if "timestamp" not in event:
  20. event["timestamp"] = time.time()
  21. return event
  22. class EventDispatcher(object):
  23. """Send events as messages.
  24. :param connection: Connection to the broker.
  25. :keyword hostname: Hostname to identify ourselves as,
  26. by default uses the hostname returned by :func:`socket.gethostname`.
  27. :keyword enabled: Set to :const:`False` to not actually publish any events,
  28. making :meth:`send` a noop operation.
  29. :keyword channel: Can be used instead of `connection` to specify
  30. an exact channel to use when sending events.
  31. :keyword buffer_while_offline: If enabled events will be buffered
  32. while the connection is down. :meth:`flush` must be called
  33. as soon as the connection is re-established.
  34. You need to :meth:`close` this after use.
  35. """
  36. def __init__(self, connection=None, hostname=None, enabled=True,
  37. channel=None, buffer_while_offline=True, app=None,
  38. serializer=None):
  39. self.app = app_or_default(app)
  40. self.connection = connection
  41. self.channel = channel
  42. self.hostname = hostname or socket.gethostname()
  43. self.buffer_while_offline = buffer_while_offline
  44. self.mutex = threading.Lock()
  45. self.publisher = None
  46. self._outbound_buffer = deque()
  47. self.serializer = serializer or self.app.conf.CELERY_EVENT_SERIALIZER
  48. self.enabled = enabled
  49. if self.enabled:
  50. self.enable()
  51. def __enter__(self):
  52. return self
  53. def __exit__(self, *exc_info):
  54. self.close()
  55. def enable(self):
  56. self.publisher = Producer(self.channel or self.connection.channel(),
  57. exchange=event_exchange,
  58. serializer=self.serializer)
  59. self.enabled = True
  60. def disable(self):
  61. if self.enabled:
  62. self.enabled = False
  63. self.close()
  64. def send(self, type, **fields):
  65. """Send event.
  66. :param type: Kind of event.
  67. :keyword \*\*fields: Event arguments.
  68. """
  69. if self.enabled:
  70. with self.mutex:
  71. event = Event(type, hostname=self.hostname,
  72. clock=self.app.clock.forward(), **fields)
  73. try:
  74. self.publisher.publish(event,
  75. routing_key=type.replace("-", "."))
  76. except Exception, exc:
  77. if not self.buffer_while_offline:
  78. raise
  79. self._outbound_buffer.append((type, fields, exc))
  80. def flush(self):
  81. while self._outbound_buffer:
  82. try:
  83. type, fields, _ = self._outbound_buffer.popleft()
  84. except IndexError:
  85. return
  86. self.send(type, **fields)
  87. def copy_buffer(self, other):
  88. self._outbound_buffer = other._outbound_buffer
  89. def close(self):
  90. """Close the event dispatcher."""
  91. self.mutex.locked() and self.mutex.release()
  92. if self.publisher is not None:
  93. if not self.channel: # close auto channel.
  94. self.publisher.channel.close()
  95. self.publisher = None
  96. class EventReceiver(object):
  97. """Capture events.
  98. :param connection: Connection to the broker.
  99. :keyword handlers: Event handlers.
  100. :attr:`handlers` is a dict of event types and their handlers,
  101. the special handler `"*"` captures all events that doesn't have a
  102. handler.
  103. """
  104. handlers = {}
  105. def __init__(self, connection, handlers=None, routing_key="#",
  106. node_id=None, app=None):
  107. self.app = app_or_default(app)
  108. self.connection = connection
  109. if handlers is not None:
  110. self.handlers = handlers
  111. self.routing_key = routing_key
  112. self.node_id = node_id or uuid()
  113. self.queue = Queue("%s.%s" % ("celeryev", self.node_id),
  114. exchange=event_exchange,
  115. routing_key=self.routing_key,
  116. auto_delete=True,
  117. durable=False)
  118. def process(self, type, event):
  119. """Process the received event by dispatching it to the appropriate
  120. handler."""
  121. handler = self.handlers.get(type) or self.handlers.get("*")
  122. handler and handler(event)
  123. @contextmanager
  124. def consumer(self):
  125. """Create event consumer.
  126. .. warning::
  127. This creates a new channel that needs to be closed
  128. by calling `consumer.channel.close()`.
  129. """
  130. consumer = Consumer(self.connection.channel(),
  131. queues=[self.queue], no_ack=True)
  132. consumer.register_callback(self._receive)
  133. with consumer:
  134. yield consumer
  135. consumer.channel.close()
  136. def itercapture(self, limit=None, timeout=None, wakeup=True):
  137. with self.consumer() as consumer:
  138. if wakeup:
  139. self.wakeup_workers(channel=consumer.channel)
  140. yield consumer
  141. self.drain_events(limit=limit, timeout=timeout)
  142. def capture(self, limit=None, timeout=None, wakeup=True):
  143. """Open up a consumer capturing events.
  144. This has to run in the main process, and it will never
  145. stop unless forced via :exc:`KeyboardInterrupt` or :exc:`SystemExit`.
  146. """
  147. list(self.itercapture(limit=limit, timeout=timeout, wakeup=wakeup))
  148. def wakeup_workers(self, channel=None):
  149. self.app.control.broadcast("heartbeat",
  150. connection=self.connection,
  151. channel=channel)
  152. def drain_events(self, limit=None, timeout=None):
  153. for iteration in count(0):
  154. if limit and iteration >= limit:
  155. break
  156. try:
  157. self.connection.drain_events(timeout=timeout)
  158. except socket.timeout:
  159. if timeout:
  160. raise
  161. except socket.error:
  162. pass
  163. def _receive(self, body, message):
  164. type = body.pop("type").lower()
  165. clock = body.get("clock")
  166. if clock:
  167. self.app.clock.adjust(clock)
  168. self.process(type, Event(type, body))
  169. class Events(object):
  170. def __init__(self, app=None):
  171. self.app = app
  172. def Receiver(self, connection, handlers=None, routing_key="#",
  173. node_id=None):
  174. return EventReceiver(connection,
  175. handlers=handlers,
  176. routing_key=routing_key,
  177. node_id=node_id,
  178. app=self.app)
  179. def Dispatcher(self, connection=None, hostname=None, enabled=True,
  180. channel=None, buffer_while_offline=True):
  181. return EventDispatcher(connection,
  182. hostname=hostname,
  183. enabled=enabled,
  184. channel=channel,
  185. app=self.app)
  186. def State(self):
  187. from .state import State as _State
  188. return _State()
  189. @contextmanager
  190. def default_dispatcher(self, hostname=None, enabled=True,
  191. buffer_while_offline=False):
  192. with self.app.amqp.publisher_pool.acquire(block=True) as pub:
  193. with self.Dispatcher(pub.connection, hostname, enabled,
  194. pub.channel, buffer_while_offline) as d:
  195. yield d