Browse Source

[events] Dispatcher adds process pid to all events, and Receiver sets Event['local_received'] to time of event received

Ask Solem 12 years ago
parent
commit
d577aa8ec4
1 changed files with 7 additions and 6 deletions
  1. 7 6
      celery/events/__init__.py

+ 7 - 6
celery/events/__init__.py

@@ -10,6 +10,7 @@
 """
 from __future__ import absolute_import
 
+import os
 import time
 import socket
 import threading
@@ -109,6 +110,7 @@ class EventDispatcher(object):
         if self.enabled:
             self.enable()
         self.headers = {'hostname': self.hostname}
+        self.pid = os.getpid()
 
     def __enter__(self):
         return self
@@ -137,7 +139,7 @@ class EventDispatcher(object):
             for callback in self.on_disabled:
                 callback()
 
-    def send(self, type, **fields):
+    def send(self, type, utcoffset=utcoffset, Event=Event, **fields):
         """Send event.
 
         :param type: Kind of event.
@@ -152,7 +154,8 @@ class EventDispatcher(object):
             with self.mutex:
                 event = Event(type, hostname=self.hostname,
                                     clock=self.clock.forward(),
-                                    utcoffset=utcoffset(), **fields)
+                                    utcoffset=utcoffset(),
+                                    pid=self.pid, **fields)
                 try:
                     self.publisher.publish(event,
                                            routing_key=type.replace('-', '.'),
@@ -241,7 +244,7 @@ class EventReceiver(ConsumerMixin):
                                    connection=self.connection,
                                    channel=channel)
 
-    def event_from_message(self, body, localize=True):
+    def event_from_message(self, body, localize=True, now=time.time):
         type = body.get('type', '').lower()
         self.adjust_clock(body.get('clock') or 0)
         if localize:
@@ -251,9 +254,7 @@ class EventReceiver(ConsumerMixin):
                 pass
             else:
                 body['timestamp'] = adjust_timestamp(timestamp, offset)
-        from datetime import datetime
-        print('TS: %s' % (datetime.fromtimestamp(body['timestamp']), ))
-        return type, Event(type, body)
+        return type, Event(type, body, local_received=now())
 
     def _receive(self, body, message):
         self.process(*self.event_from_message(body))