Browse Source

Persist clock value and synchronize revoked tasks with others. Closes #818

Ask Solem 12 years ago
parent
commit
600830314e

+ 0 - 1
Changelog

@@ -55,7 +55,6 @@ If you're looking for versions prior to 3.0.x you should go to :ref:`history`.
 - An optimization was too eager and caused some logging messages to never emit.
 
 - :mod:`celery.contrib.batches` now works again.
->>>>>>> 3.0
 
 .. _version-3.0.11:
 

+ 1 - 2
celery/app/amqp.py

@@ -215,8 +215,7 @@ class TaskProducer(Producer):
                 'callbacks': callbacks,
                 'errbacks': errbacks,
                 'reply_to': reply_to,
-                'timeouts': timeouts,
-                'clock': self.app.clock.forward()}
+                'timeouts': timeouts}
         group_id = group_id or taskset_id
         if group_id:
             body['taskset'] = group_id

+ 4 - 1
celery/app/control.py

@@ -86,6 +86,9 @@ class Inspect(object):
     def conf(self):
         return self._request('dump_conf')
 
+    def hello(self):
+        return self._request('hello')
+
 
 class Control(object):
     Mailbox = Mailbox
@@ -93,7 +96,7 @@ class Control(object):
     def __init__(self, app=None):
         self.app = app_or_default(app)
         self.mailbox = self.Mailbox('celery',
-                type='fanout', )#clock=self.app.clock)
+                type='fanout')#, clock=self.app.clock)
 
     @cached_property
     def inspect(self):

+ 5 - 3
celery/datastructures.py

@@ -517,10 +517,10 @@ class LimitedSet(object):
     """
     __slots__ = ('maxlen', 'expires', '_data', '__len__')
 
-    def __init__(self, maxlen=None, expires=None):
+    def __init__(self, maxlen=None, expires=None, data=None):
         self.maxlen = maxlen
         self.expires = expires
-        self._data = {}
+        self._data = {} if data is None else data
         self.__len__ = self._data.__len__
 
     def add(self, value):
@@ -552,7 +552,9 @@ class LimitedSet(object):
         return value in self._data
 
     def update(self, other):
-        if isinstance(other, self.__class__):
+        if isinstance(other, dict):
+            self._data.update(other)
+        elif isinstance(other, self.__class__):
             self._data.update(other._data)
         else:
             for obj in other:

+ 8 - 4
celery/events/__init__.py

@@ -139,10 +139,13 @@ class EventDispatcher(object):
             for callback in self.on_disabled:
                 callback()
 
-    def send(self, type, utcoffset=utcoffset, Event=Event, **fields):
+    def send(self, type, utcoffset=utcoffset, blind=False,
+            Event=Event, **fields):
         """Send event.
 
         :param type: Kind of event.
+        :keyword utcoffset: Function returning the current utcoffset in hours.
+        :keyword blind: Do not send clock value
         :keyword \*\*fields: Event arguments.
 
         """
@@ -151,8 +154,7 @@ class EventDispatcher(object):
             if groups and group_from(type) not in groups:
                 return
 
-            clock = self.clock.forward()
-
+            clock = None if blind else self.clock.forward()
 
             with self.mutex:
                 event = Event(type, hostname=self.hostname,
@@ -249,7 +251,9 @@ class EventReceiver(ConsumerMixin):
 
     def event_from_message(self, body, localize=True, now=time.time):
         type = body.get('type', '').lower()
-        self.adjust_clock(body.get('clock') or 0)
+        clock = body.get('clock')
+        if clock:
+            self.adjust_clock(clock)
 
         if localize:
             try:

+ 5 - 1
celery/events/state.py

@@ -364,7 +364,7 @@ class State(object):
 
         taskheap = self._taskheap
         timestamp = fields['timestamp']
-        clock = fields.get('clock')
+        clock = 0 if type == 'sent' else fields.get('clock')
         heappush(taskheap, _lamportinfo(clock, timestamp, worker.id, task))
         curcount = len(self.tasks)
         if len(taskheap) > self.max_tasks_in_memory * 2:
@@ -400,6 +400,10 @@ class State(object):
     def tasks_by_time(self, limit=None):
         """Generator giving tasks ordered by time,
         in ``(uuid, Task)`` tuples."""
+
+        print('TASKHEAP')
+        from pprint import pprint
+        pprint(self._taskheap)
         seen = set()
         for evtup in islice(reversed(self._taskheap), 0, limit):
             uuid = evtup[3].uuid

+ 1 - 1
celery/worker/components.py

@@ -233,7 +233,7 @@ class StateDB(bootsteps.Step):
         w._persistence = None
 
     def create(self, w):
-        w._persistence = w.state.Persistent(w.state_db)
+        w._persistence = w.state.Persistent(w.state_db, w.app.clock)
         atexit.register(w._persistence.save)
 
 

+ 28 - 9
celery/worker/consumer.py

@@ -34,7 +34,7 @@ from celery.utils.text import truncate
 from celery.utils.timeutils import humanize_seconds, timezone
 
 from . import heartbeat, loops, pidbox
-from .state import task_reserved, maybe_shutdown
+from .state import task_reserved, maybe_shutdown, revoked
 
 CLOSE = bootsteps.CLOSE
 logger = get_logger(__name__)
@@ -120,13 +120,14 @@ class Consumer(object):
         name = 'Consumer'
         default_steps = [
             'celery.worker.consumer:Connection',
+            'celery.worker.consumer:Mingle',
             'celery.worker.consumer:Events',
+            'celery.worker.consumer:Gossip',
             'celery.worker.consumer:Heart',
             'celery.worker.consumer:Control',
             'celery.worker.consumer:Tasks',
             'celery.worker.consumer:Evloop',
             'celery.worker.consumer:Agent',
-            'celery.worker.consumer:Gossip',
         ]
 
         def shutdown(self, parent):
@@ -478,8 +479,26 @@ class Agent(bootsteps.StartStopStep):
         return agent
 
 
+class Mingle(bootsteps.StartStopStep):
+    label = 'Mingle'
+    requires = (Connection, )
+
+    def start(self, c):
+        info('mingle: searching for neighbors')
+        I = c.app.control.inspect(timeout=1.0, connection=c.connection)
+        replies = I.hello()
+        if replies:
+            for reply in replies.itervalues():
+                c.app.clock.adjust(reply['clock'])
+                revoked.update(reply['revoked'])
+            info('mingle: synced with %s', ', '.join(replies))
+        else:
+            info('mingle: no one here')
+
+
+
 class Gossip(bootsteps.ConsumerStep):
-    label = 'gossip'
+    label = 'Gossip'
     requires = (Events, )
     _cons_stamp_fields = itemgetter(
         'clock', 'hostname', 'pid', 'topic', 'action',
@@ -545,26 +564,26 @@ class Gossip(bootsteps.ConsumerStep):
                 self.consensus_requests[id],
             )
             if leader == self.hostname:
-                print('I won the election %r' % (id, ))
+                info('I won the election %r', id)
                 try:
                     handler = self.election_handlers[topic]
                 except KeyError:
-                    error('Unknown election topic %r' % (topic, ), exc_info=1)
+                    error('Unknown election topic %r', topic, exc_info=1)
                 else:
                     handler(action)
             else:
-                print('Node %s elected for %r' % (leader, id))
+                info('node %s elected for %r', leader, id)
             self.consensus_requests.pop(id, None)
             self.consensus_replies.pop(id, None)
 
     def on_node_join(self, worker):
-        info('{0.hostname} joined the party'.format(worker))
+        info('%s joined the party', worker.hostname)
 
     def on_node_leave(self, worker):
-        info('{0.hostname} left'.format(worker))
+        info('%s left', worker.hostname)
 
     def on_node_lost(self, worker):
-        warn('{0.hostname} went missing!')
+        warn('%s went missing!', worker.hostname)
 
     def register_timer(self):
         if self._tref is not None:

+ 5 - 0
celery/worker/control.py

@@ -202,6 +202,11 @@ def dump_revoked(panel, **kwargs):
     return list(state.revoked)
 
 
+@Panel.register
+def hello(panel, **kwargs):
+    return {'revoked': state.revoked._data, 'clock': panel.app.clock.forward()}
+
+
 @Panel.register
 def dump_tasks(panel, taskinfoitems=None, **kwargs):
     tasks = panel.app.tasks

+ 1 - 1
celery/worker/heartbeat.py

@@ -27,7 +27,7 @@ class Heart(object):
     def __init__(self, timer, eventer, interval=None):
         self.timer = timer
         self.eventer = eventer
-        self.interval = float(interval or 5.0)
+        self.interval = float(interval or 2.0)
         self.tref = None
 
         # Make event dispatcher start/stop us when enabled/disabled.

+ 0 - 6
celery/worker/loops.py

@@ -43,7 +43,6 @@ def asynloop(obj, connection, consumer, strategies, ns, hub, qos,
         drain_nowait = connection.drain_nowait
         on_task_callbacks = hub.on_task
         keep_draining = connection.transport.nb_keep_draining
-        adjust_clock = clock.adjust
 
         if heartbeat and connection.supports_heartbeats:
             hub.timer.apply_interval(
@@ -57,8 +56,6 @@ def asynloop(obj, connection, consumer, strategies, ns, hub, qos,
             except (KeyError, TypeError):
                 return handle_unknown_message(body, message)
 
-            adjust_clock(body.get('clock') or 0)
-
             try:
                 strategies[name](message, body, message.ack_log_error)
             except KeyError as exc:
@@ -128,7 +125,6 @@ def synloop(obj, connection, consumer, strategies, ns, hub, qos,
         heartbeat, handle_unknown_message, handle_unknown_task,
         handle_invalid_task, clock, **kwargs):
     """Fallback blocking eventloop for transports that doesn't support AIO."""
-    adjust_clock = clock.adjust
 
     def on_task_received(body, message):
         try:
@@ -136,8 +132,6 @@ def synloop(obj, connection, consumer, strategies, ns, hub, qos,
         except (KeyError, TypeError):
             return handle_unknown_message(body, message)
 
-        adjust_clock(body.get('clock') or 0)
-
         try:
             strategies[name](message, body, message.ack_log_error)
         except KeyError as exc:

+ 6 - 1
celery/worker/state.py

@@ -141,8 +141,9 @@ class Persistent(object):
     storage = shelve
     _is_open = False
 
-    def __init__(self, filename):
+    def __init__(self, filename, clock=None):
         self.filename = filename
+        self.clock = clock
         self._load()
 
     def save(self):
@@ -152,12 +153,16 @@ class Persistent(object):
 
     def merge(self, d):
         revoked.update(d.get('revoked') or {})
+        if self.clock:
+            d['clock'] = self.clock.adjust(d.get('clock') or 0)
         return d
 
     def sync(self, d):
         prev = d.get('revoked') or {}
         prev.update(revoked.as_dict())
         d['revoked'] = prev
+        if self.clock:
+            d['clock'] = self.clock.forward()
         return d
 
     def open(self):