__init__.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.events
  4. ~~~~~~~~~~~~~
  5. Events is a stream of messages sent for certain actions occurring
  6. in the worker (and clients if :setting:`CELERY_SEND_TASK_SENT_EVENT`
  7. is enabled), used for monitoring purposes.
  8. """
  9. from __future__ import absolute_import
  10. import os
  11. import time
  12. import socket
  13. import threading
  14. from collections import deque
  15. from contextlib import contextmanager
  16. from copy import copy
  17. from operator import itemgetter
  18. from kombu import Exchange, Queue, Producer
  19. from kombu.mixins import ConsumerMixin
  20. from kombu.utils import cached_property
  21. from celery.app import app_or_default
  22. from celery.utils import uuid
  23. from celery.utils.timeutils import adjust_timestamp, utcoffset
  24. event_exchange = Exchange('celeryev', type='topic')
  25. _TZGETTER = itemgetter('utcoffset', 'timestamp')
  26. def get_exchange(conn):
  27. ex = copy(event_exchange)
  28. if conn.transport.driver_type == 'redis':
  29. # quick hack for Issue #436
  30. ex.type = 'fanout'
  31. return ex
  32. def Event(type, _fields=None, **fields):
  33. """Create an event.
  34. An event is a dictionary, the only required field is ``type``.
  35. A ``timestamp`` field will be set to the current time if not provided.
  36. """
  37. event = dict(_fields or {}, type=type, **fields)
  38. if 'timestamp' not in event:
  39. event['timestamp'] = time.time()
  40. return event
  41. def group_from(type):
  42. """Get the group part of a event type name.
  43. E.g.::
  44. >>> group_from('task-sent')
  45. 'task'
  46. >>> group_from('custom-my-event')
  47. 'custom'
  48. """
  49. return type.split('-', 1)[0]
  50. class EventDispatcher(object):
  51. """Dispatches event messages.
  52. :param connection: Connection to the broker.
  53. :keyword hostname: Hostname to identify ourselves as,
  54. by default uses the hostname returned by :func:`socket.gethostname`.
  55. :keyword groups: List of groups to send events for. :meth:`send` will
  56. ignore send requests to groups not in this list.
  57. If this is :const:`None`, all events will be sent. Example groups
  58. include ``"task"`` and ``"worker"``.
  59. :keyword enabled: Set to :const:`False` to not actually publish any events,
  60. making :meth:`send` a noop operation.
  61. :keyword channel: Can be used instead of `connection` to specify
  62. an exact channel to use when sending events.
  63. :keyword buffer_while_offline: If enabled events will be buffered
  64. while the connection is down. :meth:`flush` must be called
  65. as soon as the connection is re-established.
  66. You need to :meth:`close` this after use.
  67. """
  68. DISABLED_TRANSPORTS = set(['sql'])
  69. # set of callbacks to be called when :meth:`enabled`.
  70. on_enabled = None
  71. # set of callbacks to be called when :meth:`disabled`.
  72. on_disabled = None
  73. def __init__(self, connection=None, hostname=None, enabled=True,
  74. channel=None, buffer_while_offline=True, app=None,
  75. serializer=None, groups=None):
  76. self.app = app_or_default(app or self.app)
  77. self.connection = connection
  78. self.channel = channel
  79. self.hostname = hostname or socket.gethostname()
  80. self.buffer_while_offline = buffer_while_offline
  81. self.mutex = threading.Lock()
  82. self.producer = None
  83. self._outbound_buffer = deque()
  84. self.serializer = serializer or self.app.conf.CELERY_EVENT_SERIALIZER
  85. self.on_enabled = set()
  86. self.on_disabled = set()
  87. self.groups = set(groups or [])
  88. self.tzoffset = [-time.timezone, -time.altzone]
  89. self.clock = self.app.clock
  90. if not connection and channel:
  91. self.connection = channel.connection.client
  92. self.enabled = enabled
  93. conninfo = self.connection or self.app.connection()
  94. self.exchange = get_exchange(conninfo)
  95. if conninfo.transport.driver_type in self.DISABLED_TRANSPORTS:
  96. self.enabled = False
  97. if self.enabled:
  98. self.enable()
  99. self.headers = {'hostname': self.hostname}
  100. self.pid = os.getpid()
  101. def __enter__(self):
  102. return self
  103. def __exit__(self, *exc_info):
  104. self.close()
  105. def enable(self):
  106. self.producer = Producer(self.channel or self.connection,
  107. exchange=self.exchange,
  108. serializer=self.serializer)
  109. self.enabled = True
  110. for callback in self.on_enabled:
  111. callback()
  112. def disable(self):
  113. if self.enabled:
  114. self.enabled = False
  115. self.close()
  116. for callback in self.on_disabled:
  117. callback()
  118. def publish(self, type, fields, producer, retry=False,
  119. retry_policy=None, blind=False, utcoffset=utcoffset,
  120. Event=Event):
  121. """Publish event using a custom :class:`~kombu.Producer`
  122. instance.
  123. :param type: Event type name, with group separated by dash (`-`).
  124. :param fields: Dictionary of event fields, must be json serializable.
  125. :param producer: :class:`~kombu.Producer` instance to use,
  126. only the ``publish`` method will be called.
  127. :keyword retry: Retry in the event of connection failure.
  128. :keyword retry_policy: Dict of custom retry policy, see
  129. :meth:`~kombu.Connection.ensure`.
  130. :keyword blind: Don't set logical clock value (also do not forward
  131. the internal logical clock).
  132. :keyword Event: Event type used to create event,
  133. defaults to :func:`Event`.
  134. :keyword utcoffset: Function returning the current utcoffset in hours.
  135. """
  136. with self.mutex:
  137. clock = None if blind else self.clock.forward()
  138. event = Event(type, hostname=self.hostname, utcoffset=utcoffset(),
  139. pid=self.pid, clock=clock, **fields)
  140. exchange = self.exchange
  141. producer.publish(
  142. event,
  143. routing_key=type.replace('-', '.'),
  144. exchange=exchange.name,
  145. retry=retry,
  146. retry_policy=retry_policy,
  147. declare=[exchange],
  148. serializer=self.serializer,
  149. headers=self.headers,
  150. )
  151. def send(self, type, blind=False, **fields):
  152. """Send event.
  153. :param type: Event type name, with group separated by dash (`-`).
  154. :keyword retry: Retry in the event of connection failure.
  155. :keyword retry_policy: Dict of custom retry policy, see
  156. :meth:`~kombu.Connection.ensure`.
  157. :keyword blind: Don't set logical clock value (also do not forward
  158. the internal logical clock).
  159. :keyword Event: Event type used to create event,
  160. defaults to :func:`Event`.
  161. :keyword utcoffset: Function returning the current utcoffset in hours.
  162. :keyword \*\*fields: Event fields, must be json serializable.
  163. """
  164. if self.enabled:
  165. groups = self.groups
  166. if groups and group_from(type) not in groups:
  167. return
  168. try:
  169. self.publish(type, fields, self.producer, blind)
  170. except Exception as exc:
  171. if not self.buffer_while_offline:
  172. raise
  173. self._outbound_buffer.append((type, fields, exc))
  174. def flush(self):
  175. """Flushes the outbound buffer."""
  176. while self._outbound_buffer:
  177. try:
  178. type, fields, _ = self._outbound_buffer.popleft()
  179. except IndexError:
  180. return
  181. self.send(type, **fields)
  182. def copy_buffer(self, other):
  183. """Copies the outbound buffer of another instance."""
  184. self._outbound_buffer = other._outbound_buffer
  185. def close(self):
  186. """Close the event dispatcher."""
  187. self.mutex.locked() and self.mutex.release()
  188. self.producer = None
  189. def _get_publisher(self):
  190. return self.producer
  191. def _set_publisher(self, producer):
  192. self.producer = producer
  193. publisher = property(_get_publisher, _set_publisher) # XXX compat
  194. class EventReceiver(ConsumerMixin):
  195. """Capture events.
  196. :param connection: Connection to the broker.
  197. :keyword handlers: Event handlers.
  198. :attr:`handlers` is a dict of event types and their handlers,
  199. the special handler `"*"` captures all events that doesn't have a
  200. handler.
  201. """
  202. def __init__(self, connection, handlers=None, routing_key='#',
  203. node_id=None, app=None, queue_prefix='celeryev'):
  204. self.app = app_or_default(app)
  205. self.connection = connection
  206. self.handlers = {} if handlers is None else handlers
  207. self.routing_key = routing_key
  208. self.node_id = node_id or uuid()
  209. self.queue_prefix = queue_prefix
  210. self.exchange = get_exchange(self.connection or self.app.connection())
  211. self.queue = Queue('.'.join([self.queue_prefix, self.node_id]),
  212. exchange=self.exchange,
  213. routing_key=self.routing_key,
  214. auto_delete=True,
  215. durable=False)
  216. self.adjust_clock = self.app.clock.adjust
  217. def process(self, type, event):
  218. """Process the received event by dispatching it to the appropriate
  219. handler."""
  220. handler = self.handlers.get(type) or self.handlers.get('*')
  221. handler and handler(event)
  222. def get_consumers(self, Consumer, channel):
  223. return [Consumer(queues=[self.queue],
  224. callbacks=[self._receive], no_ack=True)]
  225. def on_consume_ready(self, connection, channel, consumers,
  226. wakeup=True, **kwargs):
  227. if wakeup:
  228. self.wakeup_workers(channel=channel)
  229. def itercapture(self, limit=None, timeout=None, wakeup=True):
  230. return self.consume(limit=limit, timeout=timeout, wakeup=wakeup)
  231. def capture(self, limit=None, timeout=None, wakeup=True):
  232. """Open up a consumer capturing events.
  233. This has to run in the main process, and it will never
  234. stop unless forced via :exc:`KeyboardInterrupt` or :exc:`SystemExit`.
  235. """
  236. return list(self.consume(limit=limit, timeout=timeout, wakeup=wakeup))
  237. def wakeup_workers(self, channel=None):
  238. self.app.control.broadcast('heartbeat',
  239. connection=self.connection,
  240. channel=channel)
  241. def event_from_message(self, body, localize=True, now=time.time):
  242. type = body.get('type', '').lower()
  243. clock = body.get('clock')
  244. if clock:
  245. self.adjust_clock(clock)
  246. if localize:
  247. try:
  248. offset, timestamp = _TZGETTER(body)
  249. except KeyError:
  250. pass
  251. else:
  252. body['timestamp'] = adjust_timestamp(timestamp, offset)
  253. return type, Event(type, body, local_received=now())
  254. def _receive(self, body, message):
  255. self.process(*self.event_from_message(body))
  256. class Events(object):
  257. def __init__(self, app=None):
  258. self.app = app
  259. @cached_property
  260. def Receiver(self):
  261. return self.app.subclass_with_self(EventReceiver,
  262. reverse='events.Receiver')
  263. @cached_property
  264. def Dispatcher(self):
  265. return self.app.subclass_with_self(EventDispatcher,
  266. reverse='events.Dispatcher')
  267. @cached_property
  268. def State(self):
  269. return self.app.subclass_with_self('celery.events.state:State',
  270. reverse='events.State')
  271. @contextmanager
  272. def default_dispatcher(self, hostname=None, enabled=True,
  273. buffer_while_offline=False):
  274. with self.app.amqp.producer_pool.acquire(block=True) as pub:
  275. with self.Dispatcher(pub.connection, hostname, enabled,
  276. pub.channel, buffer_while_offline) as d:
  277. yield d