Browse Source

Do not take clock from task-sent events, also do not copy event dict in Receiver.event_from_message

Ask Solem 11 years ago
parent
commit
33081cf625
2 changed files with 18 additions and 8 deletions
  1. 14 5
      celery/events/__init__.py
  2. 4 3
      celery/events/state.py

+ 14 - 5
celery/events/__init__.py

@@ -294,6 +294,7 @@ class EventReceiver(ConsumerMixin):
                            durable=False,
                            queue_arguments=self._get_queue_arguments())
         self.adjust_clock = self.app.clock.adjust
+        self.forward_clock = self.app.clock.forward
 
     def _get_queue_arguments(self):
         conf = self.app.conf
@@ -338,10 +339,17 @@ class EventReceiver(ConsumerMixin):
     def event_from_message(self, body, localize=True,
                            now=time.time, tzfields=_TZGETTER,
                            adjust_timestamp=adjust_timestamp):
-        type = body.get('type', '').lower()
-        clock = body.get('clock')
-        if clock:
-            self.adjust_clock(clock)
+        type = body['type']
+        if type == 'task-sent':
+            # clients never sync so cannot use their clock value
+            body['clock'] = self.forward_clock()
+        else:
+            try:
+                clock = body['clock']
+            except KeyError:
+                body['clock'] = self.forward_clock()
+            else:
+                self.adjust_clock(clock)
 
         if localize:
             try:
@@ -350,7 +358,8 @@ class EventReceiver(ConsumerMixin):
                 pass
             else:
                 body['timestamp'] = adjust_timestamp(timestamp, offset)
-        return type, Event(type, body, local_received=now())
+        body['local_received'] = now()
+        return type, body
 
     def _receive(self, body, message):
         self.process(*self.event_from_message(body))

+ 4 - 3
celery/events/state.py

@@ -462,7 +462,8 @@ class State(object):
         get_handler = self.handlers.__getitem__
         event_callback = self.event_callback
         wfields = itemgetter('hostname', 'timestamp', 'local_received')
-        tfields = itemgetter('uuid', 'hostname', 'timestamp', 'local_received')
+        tfields = itemgetter('uuid', 'hostname', 'timestamp',
+                             'local_received', 'clock')
         taskheap = self._taskheap
         maxtasks = self.max_tasks_in_memory * 2
         add_type = self._seen_types.add
@@ -497,7 +498,8 @@ class State(object):
                     worker.event(subject, timestamp, local_received, event)
                     return created
             elif group == 'task':
-                uuid, hostname, timestamp, local_received = tfields(event)
+                (uuid, hostname, timestamp,
+                 local_received, clock) = tfields(event)
                 # task-sent event is sent by client, not worker
                 is_client_event = subject == 'sent'
                 try:
@@ -514,7 +516,6 @@ class State(object):
                     task.worker = worker
                     if worker is not None and local_received:
                         worker.event(None, local_received, timestamp)
-                clock = 0 if is_client_event else event.get('clock')
                 origin = hostname if is_client_event else worker.id
                 heappush(taskheap,
                          timetuple(clock, timestamp, origin, ref(task)))