Pārlūkot izejas kodu

Pidbox now uses the app's logical clock, and added a 'inspect clock' command

Ask Solem 12 gadi atpakaļ
vecāks
revīzija
9790e3607e

+ 5 - 1
celery/app/control.py

@@ -55,6 +55,9 @@ class Inspect(object):
     def report(self):
         return self._request('report')
 
+    def clock(self):
+        return self._request('clock')
+
     def active(self, safe=False):
         return self._request('dump_active', safe=safe)
 
@@ -89,7 +92,8 @@ class Control(object):
 
     def __init__(self, app=None):
         self.app = app_or_default(app)
-        self.mailbox = self.Mailbox('celery', type='fanout')
+        self.mailbox = self.Mailbox('celery',
+                type='fanout', clock=self.app.clock)
 
     @cached_property
     def inspect(self):

+ 1 - 0
celery/bin/celery.py

@@ -603,6 +603,7 @@ class inspect(_RemoteControl):
         'revoked': (1.0, 'dump of revoked task ids'),
         'registered': (1.0, 'dump of registered tasks'),
         'ping': (0.2, 'ping worker(s)'),
+        'clock': (1.0, 'get value of logical clock'),
         'report': (1.0, 'get bugreport info')
     }
 

+ 3 - 2
celery/events/__init__.py

@@ -85,6 +85,7 @@ class EventDispatcher(object):
         self.on_enabled = set()
         self.on_disabled = set()
         self.tzoffset = [-time.timezone, -time.altzone]
+        self.clock = self.app.clock
 
         self.enabled = enabled
         if not connection and channel:
@@ -129,7 +130,7 @@ class EventDispatcher(object):
         if self.enabled:
             with self.mutex:
                 event = Event(type, hostname=self.hostname,
-                                    clock=self.app.clock.forward(),
+                                    clock=self.clock.forward(),
                                     tzoffset=self.tzoffset, **fields)
                 try:
                     self.publisher.publish(event,
@@ -221,7 +222,7 @@ class EventReceiver(ConsumerMixin):
         type = body.pop('type').lower()
         clock = body.get('clock')
         if clock:
-            self.app.clock.adjust(clock)
+            self.clock.adjust(clock)
         self.process(type, Event(type, body))
 
 

+ 7 - 1
celery/worker/control.py

@@ -187,7 +187,13 @@ def stats(panel, **kwargs):
             'consumer': panel.consumer.info,
             'pool': panel.consumer.pool.info,
             'autoscaler': asinfo,
-            'pid': os.getpid()}
+            'pid': os.getpid(),
+            'clock': str(panel.app.clock)}
+
+
+@Panel.register
+def clock(panel, **kwargs):
+    return {'clock': panel.app.clock.value}
 
 
 @Panel.register