|
@@ -45,6 +45,7 @@ def Event(type, _fields=None, **fields):
|
|
|
"""Create an event.
|
|
|
|
|
|
An event is a dictionary, the only required field is ``type``.
|
|
|
+ A ``timestamp`` field will be set to the current time if not provided.
|
|
|
|
|
|
"""
|
|
|
event = dict(_fields or {}, type=type, **fields)
|
|
@@ -54,11 +55,22 @@ def Event(type, _fields=None, **fields):
|
|
|
|
|
|
|
|
|
def group_from(type):
|
|
|
+ """Get the group part of a event type name.
|
|
|
+
|
|
|
+ E.g.::
|
|
|
+
|
|
|
+ >>> group_from('task-sent')
|
|
|
+ 'task'
|
|
|
+
|
|
|
+ >>> group_from('custom-my-event')
|
|
|
+ 'custom'
|
|
|
+
|
|
|
+ """
|
|
|
return type.split('-', 1)[0]
|
|
|
|
|
|
|
|
|
class EventDispatcher(object):
|
|
|
- """Send events as messages.
|
|
|
+ """Dispatches event messages.
|
|
|
|
|
|
:param connection: Connection to the broker.
|
|
|
|
|
@@ -85,6 +97,12 @@ class EventDispatcher(object):
|
|
|
"""
|
|
|
DISABLED_TRANSPORTS = set(['sql'])
|
|
|
|
|
|
+ # set of callbacks to be called when :meth:`enabled`.
|
|
|
+ on_enabled = None
|
|
|
+
|
|
|
+ # set of callbacks to be called when :meth:`disabled`.
|
|
|
+ on_disabled = None
|
|
|
+
|
|
|
def __init__(self, connection=None, hostname=None, enabled=True,
|
|
|
channel=None, buffer_while_offline=True, app=None,
|
|
|
serializer=None, groups=None):
|
|
@@ -114,7 +132,6 @@ class EventDispatcher(object):
|
|
|
self.headers = {'hostname': self.hostname}
|
|
|
self.pid = os.getpid()
|
|
|
|
|
|
-
|
|
|
def __enter__(self):
|
|
|
return self
|
|
|
|
|
@@ -139,6 +156,24 @@ class EventDispatcher(object):
|
|
|
def publish(self, type, fields, producer, retry=False,
|
|
|
retry_policy=None, blind=False, utcoffset=utcoffset,
|
|
|
Event=Event):
|
|
|
+ """Publish event using a custom :class:`~kombu.Producer`
|
|
|
+ instance.
|
|
|
+
|
|
|
+ :param type: Event type name, with group separated by dash (`-`).
|
|
|
+ :param fields: Dictionary of event fields, must be json serializable.
|
|
|
+ :param producer: :class:`~kombu.Producer` instance to use,
|
|
|
+ only the ``publish`` method will be called.
|
|
|
+ :keyword retry: Retry in the event of connection failure.
|
|
|
+ :keyword retry_policy: Dict of custom retry policy, see
|
|
|
+ :meth:`~kombu.Connection.ensure`.
|
|
|
+ :keyword blind: Don't set logical clock value (also do not forward
|
|
|
+ the internal logical clock).
|
|
|
+ :keyword Event: Event type used to create event,
|
|
|
+ defaults to :func:`Event`.
|
|
|
+ :keyword utcoffset: Function returning the current utcoffset in hours.
|
|
|
+
|
|
|
+ """
|
|
|
+
|
|
|
with self.mutex:
|
|
|
clock = None if blind else self.clock.forward()
|
|
|
event = Event(type, hostname=self.hostname, utcoffset=utcoffset(),
|
|
@@ -158,10 +193,16 @@ class EventDispatcher(object):
|
|
|
def send(self, type, blind=False, **fields):
|
|
|
"""Send event.
|
|
|
|
|
|
- :param type: Kind of event.
|
|
|
+ :param type: Event type name, with group separated by dash (`-`).
|
|
|
+ :keyword retry: Retry in the event of connection failure.
|
|
|
+ :keyword retry_policy: Dict of custom retry policy, see
|
|
|
+ :meth:`~kombu.Connection.ensure`.
|
|
|
+ :keyword blind: Don't set logical clock value (also do not forward
|
|
|
+ the internal logical clock).
|
|
|
+ :keyword Event: Event type used to create event,
|
|
|
+ defaults to :func:`Event`.
|
|
|
:keyword utcoffset: Function returning the current utcoffset in hours.
|
|
|
- :keyword blind: Do not send clock value
|
|
|
- :keyword \*\*fields: Event arguments.
|
|
|
+ :keyword \*\*fields: Event fields, must be json serializable.
|
|
|
|
|
|
"""
|
|
|
if self.enabled:
|
|
@@ -176,6 +217,7 @@ class EventDispatcher(object):
|
|
|
self._outbound_buffer.append((type, fields, exc))
|
|
|
|
|
|
def flush(self):
|
|
|
+ """Flushes the outbound buffer."""
|
|
|
while self._outbound_buffer:
|
|
|
try:
|
|
|
type, fields, _ = self._outbound_buffer.popleft()
|
|
@@ -184,6 +226,7 @@ class EventDispatcher(object):
|
|
|
self.send(type, **fields)
|
|
|
|
|
|
def copy_buffer(self, other):
|
|
|
+ """Copies the outbound buffer of another instance."""
|
|
|
self._outbound_buffer = other._outbound_buffer
|
|
|
|
|
|
def close(self):
|