event.py 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. """Creating events, and event exchange definition."""
  2. from __future__ import absolute_import, unicode_literals
  3. import time
  4. from copy import copy
  5. from kombu import Exchange
  6. __all__ = (
  7. 'Event', 'event_exchange', 'get_exchange', 'group_from',
  8. )
  9. #: Exchange used to send events on.
  10. #: Note: Use :func:`get_exchange` instead, as the type of
  11. #: exchange will vary depending on the broker connection.
  12. event_exchange = Exchange('celeryev', type='topic')
  13. def Event(type, _fields=None, __dict__=dict, __now__=time.time, **fields):
  14. """Create an event.
  15. Notes:
  16. An event is simply a dictionary: the only required field is ``type``.
  17. A ``timestamp`` field will be set to the current time if not provided.
  18. """
  19. event = __dict__(_fields, **fields) if _fields else fields
  20. if 'timestamp' not in event:
  21. event.update(timestamp=__now__(), type=type)
  22. else:
  23. event['type'] = type
  24. return event
  25. def group_from(type):
  26. """Get the group part of an event type name.
  27. Example:
  28. >>> group_from('task-sent')
  29. 'task'
  30. >>> group_from('custom-my-event')
  31. 'custom'
  32. """
  33. return type.split('-', 1)[0]
  34. def get_exchange(conn):
  35. """Get exchange used for sending events.
  36. Arguments:
  37. conn (kombu.Connection): Connection used for sending/receving events.
  38. Note:
  39. The event type changes if Redis is used as the transport
  40. (from topic -> fanout).
  41. """
  42. ex = copy(event_exchange)
  43. if conn.transport.driver_type == 'redis':
  44. # quick hack for Issue #436
  45. ex.type = 'fanout'
  46. return ex