Browse Source

Skew client clocks by 1

Ask Solem 11 years ago
parent
commit
356baf2f80
2 changed files with 12 additions and 4 deletions
  1. 2 0
      celery/bin/base.py
  2. 10 4
      celery/events/__init__.py

+ 2 - 0
celery/bin/base.py

@@ -66,6 +66,7 @@ in any command that also has a `--detach` option.
 from __future__ import absolute_import, print_function, unicode_literals
 
 import os
+import random
 import re
 import socket
 import sys
@@ -259,6 +260,7 @@ class Command(object):
         pass
 
     def __call__(self, *args, **kwargs):
+        random.seed()  # maybe we were forked.
         self.verify_args(args)
         try:
             ret = self.run(*args, **kwargs)

+ 10 - 4
celery/events/__init__.py

@@ -44,6 +44,8 @@ so timestamps will not work.
 Please uninstall yajl or force anyjson to use a different library.
 """
 
+CLIENT_CLOCK_SKEW = -1
+
 
 def get_exchange(conn):
     ex = copy(event_exchange)
@@ -294,8 +296,9 @@ class EventReceiver(ConsumerMixin):
                            auto_delete=True,
                            durable=False,
                            queue_arguments=self._get_queue_arguments())
-        self.adjust_clock = self.app.clock.adjust
-        self.forward_clock = self.app.clock.forward
+        self.clock = self.app.clock
+        self.adjust_clock = self.clock.adjust
+        self.forward_clock = self.clock.forward
         if accept is None:
             accept = set([self.app.conf.CELERY_EVENT_SERIALIZER, 'json'])
         self.accept = accept
@@ -342,11 +345,14 @@ class EventReceiver(ConsumerMixin):
 
     def event_from_message(self, body, localize=True,
                            now=time.time, tzfields=_TZGETTER,
-                           adjust_timestamp=adjust_timestamp):
+                           adjust_timestamp=adjust_timestamp,
+                           CLIENT_CLOCK_SKEW=CLIENT_CLOCK_SKEW):
         type = body['type']
+        is_client_event = type == 'task-sent'
         if type == 'task-sent':
             # clients never sync so cannot use their clock value
-            body['clock'] = self.forward_clock()
+            _c = body['clock'] = (self.clock.value or 1) + CLIENT_CLOCK_SKEW
+            self.adjust_clock(_c)
         else:
             try:
                 clock = body['clock']