Browse Source

events.state: State.freeze(buffer=True) + State.thaw(replay=True) + State.freeze_while(fun)

* State.freeze(buffer=True)

Pauses recording of the stream. If buffer is True, then events received
while being frozen will be kept, so it can be replayed later.

* State.thaw(replay=True)

Resumes recording of the stream. If replay is true, then the buffer
will be applied.

* State.freeze_while(fun)

Apply function. Freezes the stream before the function,
 and replays the buffer when the function returns.

This is used for e.g. storing snapshots of the current state.
Ask Solem 14 years ago
parent
commit
f7182a5f9a
1 changed files with 85 additions and 5 deletions
  1. 85 5
      celery/events/state.py

+ 85 - 5
celery/events/state.py

@@ -1,6 +1,9 @@
 import time
 import heapq
 
+from collections import deque
+from threading import RLock
+
 from carrot.utils import partition
 
 from celery import states
@@ -50,7 +53,7 @@ class Worker(Element):
 class Task(Element):
     """Task State."""
     _info_fields = ("args", "kwargs", "retries",
-                    "result", "eta", "runtime",
+                    "result", "eta", "runtime", "expires",
                     "exception")
 
     _defaults = dict(uuid=None,
@@ -65,9 +68,14 @@ class Task(Element):
                      args=None,
                      kwargs=None,
                      eta=None,
+                     expires=None,
                      retries=None,
                      worker=None,
-                     timestamp=None)
+                     result=None,
+                     exception=None,
+                     timestamp=None,
+                     runtime=None,
+                     traceback=None)
 
     def __init__(self, **fields):
         super(Task, self).__init__(**dict(self._defaults, **fields))
@@ -121,9 +129,58 @@ class Task(Element):
 
 
 class State(object):
-    """Represents a snapshot of a clusters state."""
+    """Records clusters state."""
     event_count = 0
     task_count = 0
+    _buffering = False
+    buffer = deque()
+    frozen = False
+
+    def freeze(self, buffer=True):
+        """Stop recording the event stream.
+
+        :keyword buffer: If true, any events received while frozen
+           will be buffered, you can use ``thaw(replay=True)`` to apply
+           this buffer. :meth:`thaw` will clear the buffer and resume
+           recording the stream.
+
+        """
+        self._buffering = buffer
+        self.frozen = True
+
+    def _replay(self):
+        while self.buffer:
+            try:
+                event = self.buffer.popleft()
+            except IndexError:
+                pass
+            self._dispatch_event(event)
+
+    def thaw(self, replay=True):
+        """Resume recording of the event stream.
+
+        :keyword replay: Will replay buffered events received while
+          the stream was frozen.
+
+        This will always clear the buffer, deleting any events collected
+        while the stream was frozen.
+
+        """
+        self._buffering = False
+        try:
+            if replay:
+                self._replay()
+            else:
+                self.buffer.clear()
+        finally:
+            self.frozen = False
+
+    def freeze_while(self, fun, *args, **kwargs):
+        self.freeze()
+        try:
+            return fun(*args, **kwargs)
+        finally:
+            self.thaw(replay=True)
 
     def __init__(self, callback=None,
             max_workers_in_memory=5000, max_tasks_in_memory=10000):
@@ -132,6 +189,16 @@ class State(object):
         self.event_callback = callback
         self.group_handlers = {"worker": self.worker_event,
                                "task": self.task_event}
+        self._resource = RLock()
+
+    def clear(self):
+        try:
+            self.workers.clear()
+            self.tasks.clear()
+            self.event_count = 0
+            self.task_count = 0
+        finally:
+            pass
 
     def get_or_create_worker(self, hostname, **kwargs):
         """Get or create worker by hostname."""
@@ -173,8 +240,7 @@ class State(object):
             handler(**fields)
         task.worker = worker
 
-    def event(self, event):
-        """Process event."""
+    def _dispatch_event(self, event):
         self.event_count += 1
         event = kwdict(event)
         group, _, type = partition(event.pop("type"), "-")
@@ -182,6 +248,16 @@ class State(object):
         if self.event_callback:
             self.event_callback(self, event)
 
+    def event(self, event):
+        """Process event."""
+        try:
+            if not self.frozen:
+                self._dispatch_event(event)
+            elif self._buffering:
+                self.buffer.append(event)
+        finally:
+            pass
+
     def tasks_by_timestamp(self, limit=None):
         """Get tasks by timestamp.
 
@@ -223,5 +299,9 @@ class State(object):
         """Returns a list of (seemingly) alive workers."""
         return [w for w in self.workers.values() if w.alive]
 
+    def __repr__(self):
+        return "<ClusterState: events=%s tasks=%s>" % (self.event_count,
+                                                       self.task_count)
+
 
 state = State()