__init__.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236
  1. import time
  2. import socket
  3. import threading
  4. from collections import deque
  5. from itertools import count
  6. from kombu.entity import Exchange, Queue
  7. from kombu.messaging import Consumer, Producer
  8. from celery.app import app_or_default
  9. from celery.utils import gen_unique_id
  10. event_exchange = Exchange("celeryev", type="topic")
  11. def create_event(type, fields):
  12. std = {"type": type,
  13. "timestamp": fields.get("timestamp") or time.time()}
  14. return dict(fields, **std)
  15. def Event(type, **fields):
  16. """Create an event.
  17. An event is a dictionary, the only required field is the type.
  18. """
  19. return create_event(type, fields)
  20. class EventDispatcher(object):
  21. """Send events as messages.
  22. :param connection: Connection to the broker.
  23. :keyword hostname: Hostname to identify ourselves as,
  24. by default uses the hostname returned by :func:`socket.gethostname`.
  25. :keyword enabled: Set to :const:`False` to not actually publish any events,
  26. making :meth:`send` a noop operation.
  27. :keyword channel: Can be used instead of `connection` to specify
  28. an exact channel to use when sending events.
  29. :keyword buffer_while_offline: If enabled events will be buffered
  30. while the connection is down. :meth:`flush` must be called
  31. as soon as the connection is re-established.
  32. You need to :meth:`close` this after use.
  33. """
  34. def __init__(self, connection=None, hostname=None, enabled=True,
  35. channel=None, buffer_while_offline=True, app=None):
  36. self.app = app_or_default(app)
  37. self.connection = connection
  38. self.channel = channel
  39. self.hostname = hostname or socket.gethostname()
  40. self.enabled = enabled
  41. self.buffer_while_offline = buffer_while_offline
  42. self._lock = threading.Lock()
  43. self.publisher = None
  44. self._outbound_buffer = deque()
  45. if self.enabled:
  46. self.enable()
  47. def enable(self):
  48. conf = self.app.conf
  49. self.enabled = True
  50. channel = self.channel or self.connection.channel()
  51. self.publisher = Producer(channel,
  52. exchange=event_exchange,
  53. serializer=conf.CELERY_EVENT_SERIALIZER)
  54. def disable(self):
  55. self.enabled = False
  56. if self.publisher is not None:
  57. if not self.channel: # close auto channel.
  58. self.publisher.channel.close()
  59. self.publisher = None
  60. def send(self, type, **fields):
  61. """Send event.
  62. :param type: Kind of event.
  63. :keyword \*\*fields: Event arguments.
  64. """
  65. if not self.enabled:
  66. return
  67. self._lock.acquire()
  68. event = Event(type, hostname=self.hostname, **fields)
  69. try:
  70. try:
  71. self.publisher.publish(event,
  72. routing_key=type.replace("-", "."))
  73. except Exception, exc:
  74. if not self.buffer_while_offline:
  75. raise
  76. self._outbound_buffer.append((type, fields, exc))
  77. finally:
  78. self._lock.release()
  79. def flush(self):
  80. while self._outbound_buffer:
  81. type, fields, _ = self._outbound_buffer.popleft()
  82. self.send(type, **fields)
  83. def close(self):
  84. """Close the event dispatcher."""
  85. self._lock.locked() and self._lock.release()
  86. self.publisher and self.publisher.channel.close()
  87. class EventReceiver(object):
  88. """Capture events.
  89. :param connection: Connection to the broker.
  90. :keyword handlers: Event handlers.
  91. :attr:`handlers` is a dict of event types and their handlers,
  92. the special handler `"*"` captures all events that doesn't have a
  93. handler.
  94. """
  95. handlers = {}
  96. def __init__(self, connection, handlers=None, routing_key="#",
  97. node_id=None, app=None):
  98. self.app = app_or_default(app)
  99. self.connection = connection
  100. if handlers is not None:
  101. self.handlers = handlers
  102. self.routing_key = routing_key
  103. self.node_id = node_id or gen_unique_id()
  104. self.queue = Queue("%s.%s" % ("celeryev", self.node_id),
  105. exchange=event_exchange,
  106. routing_key=self.routing_key,
  107. auto_delete=True,
  108. durable=False)
  109. def process(self, type, event):
  110. """Process the received event by dispatching it to the appropriate
  111. handler."""
  112. handler = self.handlers.get(type) or self.handlers.get("*")
  113. handler and handler(event)
  114. def consumer(self):
  115. """Create event consumer.
  116. .. warning::
  117. This creates a new channel that needs to be closed
  118. by calling `consumer.channel.close()`.
  119. """
  120. consumer = Consumer(self.connection.channel(),
  121. queues=[self.queue],
  122. no_ack=True)
  123. consumer.register_callback(self._receive)
  124. return consumer
  125. def itercapture(self, limit=None, timeout=None, wakeup=True):
  126. consumer = self.consumer()
  127. consumer.consume()
  128. if wakeup:
  129. self.wakeup_workers(channel=consumer.channel)
  130. yield consumer
  131. try:
  132. self.drain_events(limit=limit, timeout=timeout)
  133. finally:
  134. consumer.cancel()
  135. consumer.channel.close()
  136. def capture(self, limit=None, timeout=None, wakeup=True):
  137. """Open up a consumer capturing events.
  138. This has to run in the main process, and it will never
  139. stop unless forced via :exc:`KeyboardInterrupt` or :exc:`SystemExit`.
  140. """
  141. list(self.itercapture(limit=limit,
  142. timeout=timeout,
  143. wakeup=wakeup))
  144. def wakeup_workers(self, channel=None):
  145. self.app.control.broadcast("heartbeat",
  146. connection=self.connection,
  147. channel=channel)
  148. def drain_events(self, limit=None, timeout=None):
  149. for iteration in count(0):
  150. if limit and iteration >= limit:
  151. break
  152. try:
  153. self.connection.drain_events(timeout=timeout)
  154. except socket.timeout:
  155. if timeout:
  156. raise
  157. except socket.error:
  158. pass
  159. def _receive(self, message_data, message):
  160. type = message_data.pop("type").lower()
  161. self.process(type, create_event(type, message_data))
  162. class Events(object):
  163. def __init__(self, app=None):
  164. self.app = app
  165. def Receiver(self, connection, handlers=None, routing_key="#",
  166. node_id=None):
  167. return EventReceiver(connection,
  168. handlers=handlers,
  169. routing_key=routing_key,
  170. node_id=node_id,
  171. app=self.app)
  172. def Dispatcher(self, connection=None, hostname=None, enabled=True,
  173. channel=None, buffer_while_offline=True):
  174. return EventDispatcher(connection,
  175. hostname=hostname,
  176. enabled=enabled,
  177. channel=channel,
  178. app=self.app)
  179. def State(self):
  180. from celery.events.state import State as _State
  181. return _State()