|
@@ -17,18 +17,16 @@ from celery.utils import gen_unique_id
|
|
event_exchange = Exchange("celeryev", type="topic")
|
|
event_exchange = Exchange("celeryev", type="topic")
|
|
|
|
|
|
|
|
|
|
-def create_event(type, fields):
|
|
|
|
- return dict(fields, type=type,
|
|
|
|
- timestamp=fields.get("timestamp") or time.time())
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-def Event(type, **fields):
|
|
|
|
|
|
+def Event(type, _fields=None, **fields):
|
|
"""Create an event.
|
|
"""Create an event.
|
|
|
|
|
|
- An event is a dictionary, the only required field is the type.
|
|
|
|
|
|
+ An event is a dictionary, the only required field is ``type``.
|
|
|
|
|
|
"""
|
|
"""
|
|
- return create_event(type, fields)
|
|
|
|
|
|
+ event = dict(_fields or {}, type=type, **fields)
|
|
|
|
+ if "timestamp" not in event:
|
|
|
|
+ event["timestamp"] = time.time()
|
|
|
|
+ return event
|
|
|
|
|
|
|
|
|
|
class EventDispatcher(object):
|
|
class EventDispatcher(object):
|
|
@@ -60,28 +58,30 @@ class EventDispatcher(object):
|
|
self.connection = connection
|
|
self.connection = connection
|
|
self.channel = channel
|
|
self.channel = channel
|
|
self.hostname = hostname or socket.gethostname()
|
|
self.hostname = hostname or socket.gethostname()
|
|
- self.enabled = enabled
|
|
|
|
self.buffer_while_offline = buffer_while_offline
|
|
self.buffer_while_offline = buffer_while_offline
|
|
- self._lock = threading.Lock()
|
|
|
|
|
|
+ self.mutex = threading.Lock()
|
|
self.publisher = None
|
|
self.publisher = None
|
|
self._outbound_buffer = deque()
|
|
self._outbound_buffer = deque()
|
|
self.serializer = serializer or self.app.conf.CELERY_EVENT_SERIALIZER
|
|
self.serializer = serializer or self.app.conf.CELERY_EVENT_SERIALIZER
|
|
|
|
|
|
|
|
+ self.enabled = enabled
|
|
if self.enabled:
|
|
if self.enabled:
|
|
self.enable()
|
|
self.enable()
|
|
|
|
|
|
|
|
+
|
|
def enable(self):
|
|
def enable(self):
|
|
- self.enabled = True
|
|
|
|
- channel = self.channel or self.connection.channel()
|
|
|
|
- self.publisher = Producer(channel, exchange=event_exchange,
|
|
|
|
|
|
+ self.publisher = Producer(self.channel or self.connection.channel(),
|
|
|
|
+ exchange=event_exchange,
|
|
serializer=self.serializer)
|
|
serializer=self.serializer)
|
|
|
|
+ self.enabled = True
|
|
|
|
|
|
def disable(self):
|
|
def disable(self):
|
|
- self.enabled = False
|
|
|
|
- if self.publisher is not None:
|
|
|
|
- if not self.channel: # close auto channel.
|
|
|
|
- self.publisher.channel.close()
|
|
|
|
- self.publisher = None
|
|
|
|
|
|
+ if self.enabled:
|
|
|
|
+ self.enabled = False
|
|
|
|
+ if self.publisher is not None:
|
|
|
|
+ if not self.channel: # close auto channel.
|
|
|
|
+ self.publisher.channel.close()
|
|
|
|
+ self.publisher = None
|
|
|
|
|
|
def send(self, type, **fields):
|
|
def send(self, type, **fields):
|
|
"""Send event.
|
|
"""Send event.
|
|
@@ -91,7 +91,7 @@ class EventDispatcher(object):
|
|
|
|
|
|
"""
|
|
"""
|
|
if self.enabled:
|
|
if self.enabled:
|
|
- with self._lock:
|
|
|
|
|
|
+ with self.mutex:
|
|
event = Event(type, hostname=self.hostname,
|
|
event = Event(type, hostname=self.hostname,
|
|
clock=self.app.clock.forward(), **fields)
|
|
clock=self.app.clock.forward(), **fields)
|
|
try:
|
|
try:
|
|
@@ -115,7 +115,7 @@ class EventDispatcher(object):
|
|
|
|
|
|
def close(self):
|
|
def close(self):
|
|
"""Close the event dispatcher."""
|
|
"""Close the event dispatcher."""
|
|
- self._lock.locked() and self._lock.release()
|
|
|
|
|
|
+ self.mutex.locked() and self.mutex.release()
|
|
self.publisher and self.publisher.channel.close()
|
|
self.publisher and self.publisher.channel.close()
|
|
|
|
|
|
|
|
|
|
@@ -209,7 +209,7 @@ class EventReceiver(object):
|
|
clock = body.get("clock")
|
|
clock = body.get("clock")
|
|
if clock:
|
|
if clock:
|
|
self.app.clock.adjust(clock)
|
|
self.app.clock.adjust(clock)
|
|
- self.process(type, create_event(type, body))
|
|
|
|
|
|
+ self.process(type, Event(type, body))
|
|
|
|
|
|
|
|
|
|
class Events(object):
|
|
class Events(object):
|