123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230 |
- """Event dispatcher sends events."""
- from __future__ import absolute_import, unicode_literals
- import os
- import threading
- import time
- from collections import defaultdict, deque
- from kombu import Producer
- from celery.app import app_or_default
- from celery.five import items
- from celery.utils.nodenames import anon_nodename
- from celery.utils.time import utcoffset
- from .event import Event, get_exchange, group_from
- __all__ = ['EventDispatcher']
- class EventDispatcher(object):
- """Dispatches event messages.
- Arguments:
- connection (kombu.Connection): Connection to the broker.
- hostname (str): Hostname to identify ourselves as,
- by default uses the hostname returned by
- :func:`~celery.utils.anon_nodename`.
- groups (Sequence[str]): List of groups to send events for.
- :meth:`send` will ignore send requests to groups not in this list.
- If this is :const:`None`, all events will be sent.
- Example groups include ``"task"`` and ``"worker"``.
- enabled (bool): Set to :const:`False` to not actually publish any
- events, making :meth:`send` a no-op.
- channel (kombu.Channel): Can be used instead of `connection` to specify
- an exact channel to use when sending events.
- buffer_while_offline (bool): If enabled events will be buffered
- while the connection is down. :meth:`flush` must be called
- as soon as the connection is re-established.
- Note:
- You need to :meth:`close` this after use.
- """
- DISABLED_TRANSPORTS = {'sql'}
- app = None
- # set of callbacks to be called when :meth:`enabled`.
- on_enabled = None
- # set of callbacks to be called when :meth:`disabled`.
- on_disabled = None
- def __init__(self, connection=None, hostname=None, enabled=True,
- channel=None, buffer_while_offline=True, app=None,
- serializer=None, groups=None, delivery_mode=1,
- buffer_group=None, buffer_limit=24, on_send_buffered=None):
- self.app = app_or_default(app or self.app)
- self.connection = connection
- self.channel = channel
- self.hostname = hostname or anon_nodename()
- self.buffer_while_offline = buffer_while_offline
- self.buffer_group = buffer_group or frozenset()
- self.buffer_limit = buffer_limit
- self.on_send_buffered = on_send_buffered
- self._group_buffer = defaultdict(list)
- self.mutex = threading.Lock()
- self.producer = None
- self._outbound_buffer = deque()
- self.serializer = serializer or self.app.conf.event_serializer
- self.on_enabled = set()
- self.on_disabled = set()
- self.groups = set(groups or [])
- self.tzoffset = [-time.timezone, -time.altzone]
- self.clock = self.app.clock
- self.delivery_mode = delivery_mode
- if not connection and channel:
- self.connection = channel.connection.client
- self.enabled = enabled
- conninfo = self.connection or self.app.connection_for_write()
- self.exchange = get_exchange(conninfo)
- if conninfo.transport.driver_type in self.DISABLED_TRANSPORTS:
- self.enabled = False
- if self.enabled:
- self.enable()
- self.headers = {'hostname': self.hostname}
- self.pid = os.getpid()
- def __enter__(self):
- return self
- def __exit__(self, *exc_info):
- self.close()
- def enable(self):
- self.producer = Producer(self.channel or self.connection,
- exchange=self.exchange,
- serializer=self.serializer,
- auto_declare=False)
- self.enabled = True
- for callback in self.on_enabled:
- callback()
- def disable(self):
- if self.enabled:
- self.enabled = False
- self.close()
- for callback in self.on_disabled:
- callback()
- def publish(self, type, fields, producer,
- blind=False, Event=Event, **kwargs):
- """Publish event using custom :class:`~kombu.Producer`.
- Arguments:
- type (str): Event type name, with group separated by dash (`-`).
- fields: Dictionary of event fields, must be json serializable.
- producer (kombu.Producer): Producer instance to use:
- only the ``publish`` method will be called.
- retry (bool): Retry in the event of connection failure.
- retry_policy (Mapping): Map of custom retry policy options.
- See :meth:`~kombu.Connection.ensure`.
- blind (bool): Don't set logical clock value (also don't forward
- the internal logical clock).
- Event (Callable): Event type used to create event.
- Defaults to :func:`Event`.
- utcoffset (Callable): Function returning the current
- utc offset in hours.
- """
- clock = None if blind else self.clock.forward()
- event = Event(type, hostname=self.hostname, utcoffset=utcoffset(),
- pid=self.pid, clock=clock, **fields)
- with self.mutex:
- return self._publish(event, producer,
- routing_key=type.replace('-', '.'), **kwargs)
- def _publish(self, event, producer, routing_key, retry=False,
- retry_policy=None, utcoffset=utcoffset):
- exchange = self.exchange
- try:
- producer.publish(
- event,
- routing_key=routing_key,
- exchange=exchange.name,
- retry=retry,
- retry_policy=retry_policy,
- declare=[exchange],
- serializer=self.serializer,
- headers=self.headers,
- delivery_mode=self.delivery_mode,
- )
- except Exception as exc: # pylint: disable=broad-except
- if not self.buffer_while_offline:
- raise
- self._outbound_buffer.append((event, routing_key, exc))
- def send(self, type, blind=False, utcoffset=utcoffset, retry=False,
- retry_policy=None, Event=Event, **fields):
- """Send event.
- Arguments:
- type (str): Event type name, with group separated by dash (`-`).
- retry (bool): Retry in the event of connection failure.
- retry_policy (Mapping): Map of custom retry policy options.
- See :meth:`~kombu.Connection.ensure`.
- blind (bool): Don't set logical clock value (also don't forward
- the internal logical clock).
- Event (Callable): Event type used to create event,
- defaults to :func:`Event`.
- utcoffset (Callable): unction returning the current utc offset
- in hours.
- **fields (Any): Event fields -- must be json serializable.
- """
- if self.enabled:
- groups, group = self.groups, group_from(type)
- if groups and group not in groups:
- return
- if group in self.buffer_group:
- clock = self.clock.forward()
- event = Event(type, hostname=self.hostname,
- utcoffset=utcoffset(),
- pid=self.pid, clock=clock, **fields)
- buf = self._group_buffer[group]
- buf.append(event)
- if len(buf) >= self.buffer_limit:
- self.flush()
- elif self.on_send_buffered:
- self.on_send_buffered()
- else:
- return self.publish(type, fields, self.producer, blind=blind,
- Event=Event, retry=retry,
- retry_policy=retry_policy)
- def flush(self, errors=True, groups=True):
- """Flush the outbound buffer."""
- if errors:
- buf = list(self._outbound_buffer)
- try:
- with self.mutex:
- for event, routing_key, _ in buf:
- self._publish(event, self.producer, routing_key)
- finally:
- self._outbound_buffer.clear()
- if groups:
- with self.mutex:
- for group, events in items(self._group_buffer):
- self._publish(events, self.producer, '%s.multi' % group)
- events[:] = [] # list.clear
- def extend_buffer(self, other):
- """Copy the outbound buffer of another instance."""
- self._outbound_buffer.extend(other._outbound_buffer)
- def close(self):
- """Close the event dispatcher."""
- self.mutex.locked() and self.mutex.release()
- self.producer = None
- def _get_publisher(self):
- return self.producer
- def _set_publisher(self, producer):
- self.producer = producer
- publisher = property(_get_publisher, _set_publisher) # XXX compat
|