__init__.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. """
  2. celery.events
  3. =============
  4. Events are messages sent for actions happening
  5. in the worker (and clients if :setting:`CELERY_SEND_TASK_SENT_EVENT` is
  6. enabled). These events can be used for monitoring.
  7. """
  8. from __future__ import absolute_import
  9. from __future__ import with_statement
  10. import time
  11. import socket
  12. import threading
  13. from collections import deque
  14. from contextlib import contextmanager
  15. from itertools import count
  16. from kombu.entity import Exchange, Queue
  17. from kombu.messaging import Consumer, Producer
  18. from ..app import app_or_default
  19. from ..utils import uuid
  20. __all__ = ["event_exchange", "Event", "EventDispatcher", "EventReceiver"]
  21. event_exchange = Exchange("celeryev", type="topic")
  22. def Event(type, _fields=None, **fields):
  23. """Create an event.
  24. An event is a dictionary, the only required field is ``type``.
  25. """
  26. event = dict(_fields or {}, type=type, **fields)
  27. if "timestamp" not in event:
  28. event["timestamp"] = time.time()
  29. return event
  30. class EventDispatcher(object):
  31. """Send events as messages.
  32. :param connection: Connection to the broker.
  33. :keyword hostname: Hostname to identify ourselves as,
  34. by default uses the hostname returned by :func:`socket.gethostname`.
  35. :keyword enabled: Set to :const:`False` to not actually publish any events,
  36. making :meth:`send` a noop operation.
  37. :keyword channel: Can be used instead of `connection` to specify
  38. an exact channel to use when sending events.
  39. :keyword buffer_while_offline: If enabled events will be buffered
  40. while the connection is down. :meth:`flush` must be called
  41. as soon as the connection is re-established.
  42. You need to :meth:`close` this after use.
  43. """
  44. def __init__(self, connection=None, hostname=None, enabled=True,
  45. channel=None, buffer_while_offline=True, app=None,
  46. serializer=None):
  47. self.app = app_or_default(app)
  48. self.connection = connection
  49. self.channel = channel
  50. self.hostname = hostname or socket.gethostname()
  51. self.buffer_while_offline = buffer_while_offline
  52. self.mutex = threading.Lock()
  53. self.publisher = None
  54. self._outbound_buffer = deque()
  55. self.serializer = serializer or self.app.conf.CELERY_EVENT_SERIALIZER
  56. self.enabled = enabled
  57. if self.enabled:
  58. self.enable()
  59. def __enter__(self):
  60. return self
  61. def __exit__(self, *exc_info):
  62. self.close()
  63. def enable(self):
  64. self.publisher = Producer(self.channel or self.connection.channel(),
  65. exchange=event_exchange,
  66. serializer=self.serializer)
  67. self.enabled = True
  68. def disable(self):
  69. if self.enabled:
  70. self.enabled = False
  71. self.close()
  72. def send(self, type, **fields):
  73. """Send event.
  74. :param type: Kind of event.
  75. :keyword \*\*fields: Event arguments.
  76. """
  77. if self.enabled:
  78. with self.mutex:
  79. event = Event(type, hostname=self.hostname,
  80. clock=self.app.clock.forward(), **fields)
  81. try:
  82. self.publisher.publish(event,
  83. routing_key=type.replace("-", "."))
  84. except Exception, exc:
  85. if not self.buffer_while_offline:
  86. raise
  87. self._outbound_buffer.append((type, fields, exc))
  88. def flush(self):
  89. while self._outbound_buffer:
  90. try:
  91. type, fields, _ = self._outbound_buffer.popleft()
  92. except IndexError:
  93. return
  94. self.send(type, **fields)
  95. def copy_buffer(self, other):
  96. self._outbound_buffer = other._outbound_buffer
  97. def close(self):
  98. """Close the event dispatcher."""
  99. self.mutex.locked() and self.mutex.release()
  100. if self.publisher is not None:
  101. if not self.channel: # close auto channel.
  102. self.publisher.channel.close()
  103. self.publisher = None
  104. class EventReceiver(object):
  105. """Capture events.
  106. :param connection: Connection to the broker.
  107. :keyword handlers: Event handlers.
  108. :attr:`handlers` is a dict of event types and their handlers,
  109. the special handler `"*"` captures all events that doesn't have a
  110. handler.
  111. """
  112. handlers = {}
  113. def __init__(self, connection, handlers=None, routing_key="#",
  114. node_id=None, app=None):
  115. self.app = app_or_default(app)
  116. self.connection = connection
  117. if handlers is not None:
  118. self.handlers = handlers
  119. self.routing_key = routing_key
  120. self.node_id = node_id or uuid()
  121. self.queue = Queue("%s.%s" % ("celeryev", self.node_id),
  122. exchange=event_exchange,
  123. routing_key=self.routing_key,
  124. auto_delete=True,
  125. durable=False)
  126. def process(self, type, event):
  127. """Process the received event by dispatching it to the appropriate
  128. handler."""
  129. handler = self.handlers.get(type) or self.handlers.get("*")
  130. handler and handler(event)
  131. @contextmanager
  132. def consumer(self):
  133. """Create event consumer.
  134. .. warning::
  135. This creates a new channel that needs to be closed
  136. by calling `consumer.channel.close()`.
  137. """
  138. consumer = Consumer(self.connection.channel(),
  139. queues=[self.queue], no_ack=True)
  140. consumer.register_callback(self._receive)
  141. with consumer:
  142. yield consumer
  143. consumer.channel.close()
  144. def itercapture(self, limit=None, timeout=None, wakeup=True):
  145. with self.consumer() as consumer:
  146. if wakeup:
  147. self.wakeup_workers(channel=consumer.channel)
  148. yield consumer
  149. self.drain_events(limit=limit, timeout=timeout)
  150. def capture(self, limit=None, timeout=None, wakeup=True):
  151. """Open up a consumer capturing events.
  152. This has to run in the main process, and it will never
  153. stop unless forced via :exc:`KeyboardInterrupt` or :exc:`SystemExit`.
  154. """
  155. list(self.itercapture(limit=limit, timeout=timeout, wakeup=wakeup))
  156. def wakeup_workers(self, channel=None):
  157. self.app.control.broadcast("heartbeat",
  158. connection=self.connection,
  159. channel=channel)
  160. def drain_events(self, limit=None, timeout=None):
  161. for iteration in count(0):
  162. if limit and iteration >= limit:
  163. break
  164. try:
  165. self.connection.drain_events(timeout=timeout)
  166. except socket.timeout:
  167. if timeout:
  168. raise
  169. except socket.error:
  170. pass
  171. def _receive(self, body, message):
  172. type = body.pop("type").lower()
  173. clock = body.get("clock")
  174. if clock:
  175. self.app.clock.adjust(clock)
  176. self.process(type, Event(type, body))
  177. class Events(object):
  178. def __init__(self, app=None):
  179. self.app = app
  180. def Receiver(self, connection, handlers=None, routing_key="#",
  181. node_id=None):
  182. return EventReceiver(connection,
  183. handlers=handlers,
  184. routing_key=routing_key,
  185. node_id=node_id,
  186. app=self.app)
  187. def Dispatcher(self, connection=None, hostname=None, enabled=True,
  188. channel=None, buffer_while_offline=True):
  189. return EventDispatcher(connection,
  190. hostname=hostname,
  191. enabled=enabled,
  192. channel=channel,
  193. app=self.app)
  194. def State(self):
  195. from .state import State as _State
  196. return _State()
  197. @contextmanager
  198. def default_dispatcher(self, hostname=None, enabled=True,
  199. buffer_while_offline=False):
  200. with self.app.amqp.publisher_pool.acquire(block=True) as pub:
  201. with self.Dispatcher(pub.connection, hostname, enabled,
  202. pub.channel, buffer_while_offline) as d:
  203. yield d