Browse Source

Merge branch '3.0' of github.com:celery/celery into 3.0

Ask Solem 12 years ago
parent
commit
329b86079b
3 changed files with 22 additions and 6 deletions
  1. 2 2
      celery/events/dumper.py
  2. 11 4
      celery/events/state.py
  3. 9 0
      celery/utils/functional.py

+ 2 - 2
celery/events/dumper.py

@@ -3,8 +3,8 @@
     celery.events.dumper
     ~~~~~~~~~~~~~~~~~~~~
 
-    THis is a simple program that dumps events to the console
-    as they happen.  Think of it like a `tcpdump` for Celery events.
+    This is a simple program that dumps events to the console
+    as they happen. Think of it like a `tcpdump` for Celery events.
 
 """
 from __future__ import absolute_import

+ 11 - 4
celery/events/state.py

@@ -214,8 +214,6 @@ class State(object):
         self.workers = LRUCache(limit=max_workers_in_memory)
         self.tasks = LRUCache(limit=max_tasks_in_memory)
         self.event_callback = callback
-        self.group_handlers = {'worker': self.worker_event,
-                               'task': self.task_event}
         self._mutex = threading.Lock()
 
     def freeze_while(self, fun, *args, **kwargs):
@@ -300,8 +298,8 @@ class State(object):
     def _dispatch_event(self, event):
         self.event_count += 1
         event = kwdict(event)
-        group, _, type = event.pop('type').partition('-')
-        self.group_handlers[group](type, event)
+        group, _, subject = event.pop('type').partition('-')
+        getattr(self, group + '_event')(subject, event)
         if self.event_callback:
             self.event_callback(self, event)
 
@@ -358,5 +356,14 @@ class State(object):
         return '<ClusterState: events=%s tasks=%s>' % (self.event_count,
                                                        self.task_count)
 
+    def __getstate__(self):
+        d = dict(vars(self))
+        d.pop('_mutex')
+        return d
+
+    def __setstate__(self, state):
+        self.__dict__ = state
+        self._mutex = threading.Lock()
+
 
 state = State()

+ 9 - 0
celery/utils/functional.py

@@ -89,6 +89,15 @@ class LRUCache(UserDict):
             self[key] = str(newval)
         return newval
 
+    def __getstate__(self):
+        d = dict(vars(self))
+        d.pop('mutex')
+        return d
+
+    def __setstate__(self, state):
+        self.__dict__ = state
+        self.mutex = threading.RLock()
+
 
 def is_list(l):
     """Returns true if object is list-like, but not a dict or string."""