__init__.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.events
  4. ~~~~~~~~~~~~~
  5. Events is a stream of messages sent for certain actions occurring
  6. in the worker (and clients if :setting:`CELERY_SEND_TASK_SENT_EVENT`
  7. is enabled), used for monitoring purposes.
  8. """
  9. from __future__ import absolute_import
  10. import os
  11. import time
  12. import socket
  13. import threading
  14. from collections import deque
  15. from contextlib import contextmanager
  16. from copy import copy
  17. from operator import itemgetter
  18. from kombu import Exchange, Queue, Producer
  19. from kombu.mixins import ConsumerMixin
  20. from kombu.utils import cached_property
  21. from celery.app import app_or_default
  22. from celery.utils import uuid
  23. from celery.utils.functional import dictfilter
  24. from celery.utils.timeutils import adjust_timestamp, utcoffset, maybe_s_to_ms
  25. event_exchange = Exchange('celeryev', type='topic')
  26. _TZGETTER = itemgetter('utcoffset', 'timestamp')
  27. def get_exchange(conn):
  28. ex = copy(event_exchange)
  29. if conn.transport.driver_type == 'redis':
  30. # quick hack for Issue #436
  31. ex.type = 'fanout'
  32. return ex
  33. def Event(type, _fields=None, __dict__=dict, __now__=time.time, **fields):
  34. """Create an event.
  35. An event is a dictionary, the only required field is ``type``.
  36. A ``timestamp`` field will be set to the current time if not provided.
  37. """
  38. event = __dict__(_fields, **fields) if _fields else fields
  39. if 'timestamp' not in event:
  40. event.update(timestamp=__now__(), type=type)
  41. else:
  42. event['type'] = type
  43. return event
  44. def group_from(type):
  45. """Get the group part of an event type name.
  46. E.g.::
  47. >>> group_from('task-sent')
  48. 'task'
  49. >>> group_from('custom-my-event')
  50. 'custom'
  51. """
  52. return type.split('-', 1)[0]
  53. class EventDispatcher(object):
  54. """Dispatches event messages.
  55. :param connection: Connection to the broker.
  56. :keyword hostname: Hostname to identify ourselves as,
  57. by default uses the hostname returned by :func:`socket.gethostname`.
  58. :keyword groups: List of groups to send events for. :meth:`send` will
  59. ignore send requests to groups not in this list.
  60. If this is :const:`None`, all events will be sent. Example groups
  61. include ``"task"`` and ``"worker"``.
  62. :keyword enabled: Set to :const:`False` to not actually publish any events,
  63. making :meth:`send` a noop operation.
  64. :keyword channel: Can be used instead of `connection` to specify
  65. an exact channel to use when sending events.
  66. :keyword buffer_while_offline: If enabled events will be buffered
  67. while the connection is down. :meth:`flush` must be called
  68. as soon as the connection is re-established.
  69. You need to :meth:`close` this after use.
  70. """
  71. DISABLED_TRANSPORTS = set(['sql'])
  72. # set of callbacks to be called when :meth:`enabled`.
  73. on_enabled = None
  74. # set of callbacks to be called when :meth:`disabled`.
  75. on_disabled = None
  76. def __init__(self, connection=None, hostname=None, enabled=True,
  77. channel=None, buffer_while_offline=True, app=None,
  78. serializer=None, groups=None):
  79. self.app = app_or_default(app or self.app)
  80. self.connection = connection
  81. self.channel = channel
  82. self.hostname = hostname or socket.gethostname()
  83. self.buffer_while_offline = buffer_while_offline
  84. self.mutex = threading.Lock()
  85. self.producer = None
  86. self._outbound_buffer = deque()
  87. self.serializer = serializer or self.app.conf.CELERY_EVENT_SERIALIZER
  88. self.on_enabled = set()
  89. self.on_disabled = set()
  90. self.groups = set(groups or [])
  91. self.tzoffset = [-time.timezone, -time.altzone]
  92. self.clock = self.app.clock
  93. if not connection and channel:
  94. self.connection = channel.connection.client
  95. self.enabled = enabled
  96. conninfo = self.connection or self.app.connection()
  97. self.exchange = get_exchange(conninfo)
  98. if conninfo.transport.driver_type in self.DISABLED_TRANSPORTS:
  99. self.enabled = False
  100. if self.enabled:
  101. self.enable()
  102. self.headers = {'hostname': self.hostname}
  103. self.pid = os.getpid()
  104. def __enter__(self):
  105. return self
  106. def __exit__(self, *exc_info):
  107. self.close()
  108. def enable(self):
  109. self.producer = Producer(self.channel or self.connection,
  110. exchange=self.exchange,
  111. serializer=self.serializer)
  112. self.enabled = True
  113. for callback in self.on_enabled:
  114. callback()
  115. def disable(self):
  116. if self.enabled:
  117. self.enabled = False
  118. self.close()
  119. for callback in self.on_disabled:
  120. callback()
  121. def publish(self, type, fields, producer, retry=False,
  122. retry_policy=None, blind=False, utcoffset=utcoffset,
  123. Event=Event):
  124. """Publish event using a custom :class:`~kombu.Producer`
  125. instance.
  126. :param type: Event type name, with group separated by dash (`-`).
  127. :param fields: Dictionary of event fields, must be json serializable.
  128. :param producer: :class:`~kombu.Producer` instance to use,
  129. only the ``publish`` method will be called.
  130. :keyword retry: Retry in the event of connection failure.
  131. :keyword retry_policy: Dict of custom retry policy, see
  132. :meth:`~kombu.Connection.ensure`.
  133. :keyword blind: Don't set logical clock value (also do not forward
  134. the internal logical clock).
  135. :keyword Event: Event type used to create event,
  136. defaults to :func:`Event`.
  137. :keyword utcoffset: Function returning the current utcoffset in hours.
  138. """
  139. with self.mutex:
  140. clock = None if blind else self.clock.forward()
  141. event = Event(type, hostname=self.hostname, utcoffset=utcoffset(),
  142. pid=self.pid, clock=clock, **fields)
  143. exchange = self.exchange
  144. producer.publish(
  145. event,
  146. routing_key=type.replace('-', '.'),
  147. exchange=exchange.name,
  148. retry=retry,
  149. retry_policy=retry_policy,
  150. declare=[exchange],
  151. serializer=self.serializer,
  152. headers=self.headers,
  153. )
  154. def send(self, type, blind=False, **fields):
  155. """Send event.
  156. :param type: Event type name, with group separated by dash (`-`).
  157. :keyword retry: Retry in the event of connection failure.
  158. :keyword retry_policy: Dict of custom retry policy, see
  159. :meth:`~kombu.Connection.ensure`.
  160. :keyword blind: Don't set logical clock value (also do not forward
  161. the internal logical clock).
  162. :keyword Event: Event type used to create event,
  163. defaults to :func:`Event`.
  164. :keyword utcoffset: Function returning the current utcoffset in hours.
  165. :keyword \*\*fields: Event fields, must be json serializable.
  166. """
  167. if self.enabled:
  168. groups = self.groups
  169. if groups and group_from(type) not in groups:
  170. return
  171. try:
  172. self.publish(type, fields, self.producer, blind)
  173. except Exception as exc:
  174. if not self.buffer_while_offline:
  175. raise
  176. self._outbound_buffer.append((type, fields, exc))
  177. def flush(self):
  178. """Flushes the outbound buffer."""
  179. while self._outbound_buffer:
  180. try:
  181. type, fields, _ = self._outbound_buffer.popleft()
  182. except IndexError:
  183. return
  184. self.send(type, **fields)
  185. def extend_buffer(self, other):
  186. """Copies the outbound buffer of another instance."""
  187. self._outbound_buffer.extend(other._outbound_buffer)
  188. def close(self):
  189. """Close the event dispatcher."""
  190. self.mutex.locked() and self.mutex.release()
  191. self.producer = None
  192. def _get_publisher(self):
  193. return self.producer
  194. def _set_publisher(self, producer):
  195. self.producer = producer
  196. publisher = property(_get_publisher, _set_publisher) # XXX compat
  197. class EventReceiver(ConsumerMixin):
  198. """Capture events.
  199. :param connection: Connection to the broker.
  200. :keyword handlers: Event handlers.
  201. :attr:`handlers` is a dict of event types and their handlers,
  202. the special handler `"*"` captures all events that doesn't have a
  203. handler.
  204. """
  205. def __init__(self, connection, handlers=None, routing_key='#',
  206. node_id=None, app=None, queue_prefix='celeryev'):
  207. self.app = app_or_default(app)
  208. self.connection = connection
  209. self.handlers = {} if handlers is None else handlers
  210. self.routing_key = routing_key
  211. self.node_id = node_id or uuid()
  212. self.queue_prefix = queue_prefix
  213. self.exchange = get_exchange(self.connection or self.app.connection())
  214. self.queue = Queue('.'.join([self.queue_prefix, self.node_id]),
  215. exchange=self.exchange,
  216. routing_key=self.routing_key,
  217. auto_delete=True,
  218. durable=False,
  219. queue_arguments=self._get_queue_arguments())
  220. self.adjust_clock = self.app.clock.adjust
  221. def _get_queue_arguments(self):
  222. conf = self.app.conf
  223. return dictfilter({
  224. 'x-message-ttl': maybe_s_to_ms(conf.CELERY_EVENT_QUEUE_TTL),
  225. 'x-expires': maybe_s_to_ms(conf.CELERY_EVENT_QUEUE_EXPIRES),
  226. })
  227. def process(self, type, event):
  228. """Process the received event by dispatching it to the appropriate
  229. handler."""
  230. handler = self.handlers.get(type) or self.handlers.get('*')
  231. handler and handler(event)
  232. def get_consumers(self, Consumer, channel):
  233. return [Consumer(queues=[self.queue],
  234. callbacks=[self._receive], no_ack=True,
  235. accept=['application/json'])]
  236. def on_consume_ready(self, connection, channel, consumers,
  237. wakeup=True, **kwargs):
  238. if wakeup:
  239. self.wakeup_workers(channel=channel)
  240. def itercapture(self, limit=None, timeout=None, wakeup=True):
  241. return self.consume(limit=limit, timeout=timeout, wakeup=wakeup)
  242. def capture(self, limit=None, timeout=None, wakeup=True):
  243. """Open up a consumer capturing events.
  244. This has to run in the main process, and it will never
  245. stop unless forced via :exc:`KeyboardInterrupt` or :exc:`SystemExit`.
  246. """
  247. return list(self.consume(limit=limit, timeout=timeout, wakeup=wakeup))
  248. def wakeup_workers(self, channel=None):
  249. self.app.control.broadcast('heartbeat',
  250. connection=self.connection,
  251. channel=channel)
  252. def event_from_message(self, body, localize=True,
  253. now=time.time, tzfields=_TZGETTER,
  254. adjust_timestamp=adjust_timestamp):
  255. type = body.get('type', '').lower()
  256. clock = body.get('clock')
  257. if clock:
  258. self.adjust_clock(clock)
  259. if localize:
  260. try:
  261. offset, timestamp = tzfields(body)
  262. except KeyError:
  263. pass
  264. else:
  265. body['timestamp'] = adjust_timestamp(timestamp, offset)
  266. return type, Event(type, body, local_received=now())
  267. def _receive(self, body, message):
  268. self.process(*self.event_from_message(body))
  269. class Events(object):
  270. def __init__(self, app=None):
  271. self.app = app
  272. @cached_property
  273. def Receiver(self):
  274. return self.app.subclass_with_self(EventReceiver,
  275. reverse='events.Receiver')
  276. @cached_property
  277. def Dispatcher(self):
  278. return self.app.subclass_with_self(EventDispatcher,
  279. reverse='events.Dispatcher')
  280. @cached_property
  281. def State(self):
  282. return self.app.subclass_with_self('celery.events.state:State',
  283. reverse='events.State')
  284. @contextmanager
  285. def default_dispatcher(self, hostname=None, enabled=True,
  286. buffer_while_offline=False):
  287. with self.app.amqp.producer_pool.acquire(block=True) as prod:
  288. with self.Dispatcher(prod.connection, hostname, enabled,
  289. prod.channel, buffer_while_offline) as d:
  290. yield d