__init__.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.events
  4. ~~~~~~~~~~~~~
  5. Events are messages sent for actions happening
  6. in the worker (and clients if :setting:`CELERY_SEND_TASK_SENT_EVENT`
  7. is enabled), used for monitoring purposes.
  8. :copyright: (c) 2009 - 2012 by Ask Solem.
  9. :license: BSD, see LICENSE for more details.
  10. """
  11. from __future__ import absolute_import
  12. from __future__ import with_statement
  13. import time
  14. import socket
  15. import threading
  16. from collections import deque
  17. from contextlib import contextmanager
  18. from copy import copy
  19. from kombu.common import eventloop
  20. from kombu.entity import Exchange, Queue
  21. from kombu.messaging import Consumer, Producer
  22. from kombu.utils import cached_property
  23. from celery.app import app_or_default
  24. from celery.utils import uuid
  25. event_exchange = Exchange("celeryev", type="topic")
  26. def get_exchange(conn):
  27. ex = copy(event_exchange)
  28. if "redis" in type(conn.transport).__module__:
  29. # quick hack for #436
  30. ex.type = "fanout"
  31. return ex
  32. def Event(type, _fields=None, **fields):
  33. """Create an event.
  34. An event is a dictionary, the only required field is ``type``.
  35. """
  36. event = dict(_fields or {}, type=type, **fields)
  37. if "timestamp" not in event:
  38. event["timestamp"] = time.time()
  39. return event
  40. class EventDispatcher(object):
  41. """Send events as messages.
  42. :param connection: Connection to the broker.
  43. :keyword hostname: Hostname to identify ourselves as,
  44. by default uses the hostname returned by :func:`socket.gethostname`.
  45. :keyword enabled: Set to :const:`False` to not actually publish any events,
  46. making :meth:`send` a noop operation.
  47. :keyword channel: Can be used instead of `connection` to specify
  48. an exact channel to use when sending events.
  49. :keyword buffer_while_offline: If enabled events will be buffered
  50. while the connection is down. :meth:`flush` must be called
  51. as soon as the connection is re-established.
  52. You need to :meth:`close` this after use.
  53. """
  54. def __init__(self, connection=None, hostname=None, enabled=True,
  55. channel=None, buffer_while_offline=True, app=None,
  56. serializer=None):
  57. self.app = app_or_default(app)
  58. self.connection = connection
  59. self.channel = channel
  60. self.hostname = hostname or socket.gethostname()
  61. self.buffer_while_offline = buffer_while_offline
  62. self.mutex = threading.Lock()
  63. self.publisher = None
  64. self._outbound_buffer = deque()
  65. self.serializer = serializer or self.app.conf.CELERY_EVENT_SERIALIZER
  66. self.on_enabled = set()
  67. self.on_disabled = set()
  68. self.enabled = enabled
  69. if not connection and channel:
  70. self.connection = channel.connection.client
  71. if self.enabled:
  72. self.enable()
  73. def __enter__(self):
  74. return self
  75. def __exit__(self, *exc_info):
  76. self.close()
  77. def get_exchange(self):
  78. if self.connection:
  79. return get_exchange(self.connection)
  80. else:
  81. return get_exchange(self.channel.connection.client)
  82. def enable(self):
  83. self.publisher = Producer(self.channel or self.connection,
  84. exchange=self.get_exchange(),
  85. serializer=self.serializer)
  86. self.enabled = True
  87. for callback in self.on_enabled:
  88. callback()
  89. def disable(self):
  90. if self.enabled:
  91. self.enabled = False
  92. self.close()
  93. for callback in self.on_disabled:
  94. callback()
  95. def send(self, type, **fields):
  96. """Send event.
  97. :param type: Kind of event.
  98. :keyword \*\*fields: Event arguments.
  99. """
  100. if self.enabled:
  101. with self.mutex:
  102. event = Event(type, hostname=self.hostname,
  103. clock=self.app.clock.forward(), **fields)
  104. try:
  105. self.publisher.publish(event,
  106. routing_key=type.replace("-", "."))
  107. except Exception, exc:
  108. if not self.buffer_while_offline:
  109. raise
  110. self._outbound_buffer.append((type, fields, exc))
  111. def flush(self):
  112. while self._outbound_buffer:
  113. try:
  114. type, fields, _ = self._outbound_buffer.popleft()
  115. except IndexError:
  116. return
  117. self.send(type, **fields)
  118. def copy_buffer(self, other):
  119. self._outbound_buffer = other._outbound_buffer
  120. def close(self):
  121. """Close the event dispatcher."""
  122. self.mutex.locked() and self.mutex.release()
  123. self.publisher = None
  124. class EventReceiver(object):
  125. """Capture events.
  126. :param connection: Connection to the broker.
  127. :keyword handlers: Event handlers.
  128. :attr:`handlers` is a dict of event types and their handlers,
  129. the special handler `"*"` captures all events that doesn't have a
  130. handler.
  131. """
  132. handlers = {}
  133. def __init__(self, connection, handlers=None, routing_key="#",
  134. node_id=None, app=None, queue_prefix="celeryev"):
  135. self.app = app_or_default(app)
  136. self.connection = connection
  137. if handlers is not None:
  138. self.handlers = handlers
  139. self.routing_key = routing_key
  140. self.node_id = node_id or uuid()
  141. self.queue_prefix = queue_prefix
  142. self.queue = Queue('.'.join([self.queue_prefix, self.node_id]),
  143. exchange=self.get_exchange(),
  144. routing_key=self.routing_key,
  145. auto_delete=True,
  146. durable=False)
  147. def get_exchange(self):
  148. return get_exchange(self.connection)
  149. def process(self, type, event):
  150. """Process the received event by dispatching it to the appropriate
  151. handler."""
  152. handler = self.handlers.get(type) or self.handlers.get("*")
  153. handler and handler(event)
  154. @contextmanager
  155. def consumer(self, wakeup=True):
  156. """Create event consumer."""
  157. consumer = Consumer(self.connection,
  158. queues=[self.queue], no_ack=True)
  159. consumer.register_callback(self._receive)
  160. with consumer:
  161. if wakeup:
  162. self.wakeup_workers(channel=consumer.channel)
  163. yield consumer
  164. def itercapture(self, limit=None, timeout=None, wakeup=True):
  165. with self.consumer(wakeup=wakeup) as consumer:
  166. yield consumer
  167. self.drain_events(limit=limit, timeout=timeout)
  168. def capture(self, limit=None, timeout=None, wakeup=True):
  169. """Open up a consumer capturing events.
  170. This has to run in the main process, and it will never
  171. stop unless forced via :exc:`KeyboardInterrupt` or :exc:`SystemExit`.
  172. """
  173. list(self.itercapture(limit=limit, timeout=timeout, wakeup=wakeup))
  174. def wakeup_workers(self, channel=None):
  175. self.app.control.broadcast("heartbeat",
  176. connection=self.connection,
  177. channel=channel)
  178. def drain_events(self, **kwargs):
  179. for _ in eventloop(self.connection, **kwargs):
  180. pass
  181. def _receive(self, body, message):
  182. type = body.pop("type").lower()
  183. clock = body.get("clock")
  184. if clock:
  185. self.app.clock.adjust(clock)
  186. self.process(type, Event(type, body))
  187. class Events(object):
  188. def __init__(self, app=None):
  189. self.app = app
  190. @cached_property
  191. def Receiver(self):
  192. return self.app.subclass_with_self(EventReceiver,
  193. reverse="events.Receiver")
  194. @cached_property
  195. def Dispatcher(self):
  196. return self.app.subclass_with_self(EventDispatcher,
  197. reverse="events.Dispatcher")
  198. @cached_property
  199. def State(self):
  200. return self.app.subclass_with_self("celery.events.state:State",
  201. reverse="events.State")
  202. @contextmanager
  203. def default_dispatcher(self, hostname=None, enabled=True,
  204. buffer_while_offline=False):
  205. with self.app.amqp.publisher_pool.acquire(block=True) as pub:
  206. with self.Dispatcher(pub.connection, hostname, enabled,
  207. pub.channel, buffer_while_offline) as d:
  208. yield d