Parcourir la source

Last commit broke events.State pickleability

Ask Solem il y a 11 ans
Parent
commit
00b8211bc0
1 fichiers modifiés avec 13 ajouts et 11 suppressions
  1. 13 11
      celery/events/state.py

+ 13 - 11
celery/events/state.py

@@ -210,10 +210,16 @@ class State(object):
     task_count = 0
 
     def __init__(self, callback=None,
+                 workers=None, tasks=None, taskheap=None,
                  max_workers_in_memory=5000, max_tasks_in_memory=10000):
-        self.workers = LRUCache(limit=max_workers_in_memory)
-        self.tasks = LRUCache(limit=max_tasks_in_memory)
         self.event_callback = callback
+        self.workers = (LRUCache(max_workers_in_memory)
+                        if workers is None else workers)
+        self.tasks = (LRUCache(max_tasks_in_memory)
+                      if tasks is None else tasks)
+        self._taskheap = None   # reserved for __reduce__ in 3.1
+        self.max_workers_in_memory = max_workers_in_memory
+        self.max_tasks_in_memory = max_tasks_in_memory
         self._mutex = threading.Lock()
         self.handlers = {'task': self.task_event,
                          'worker': self.worker_event}
@@ -362,14 +368,10 @@ class State(object):
         return '<ClusterState: events=%s tasks=%s>' % (self.event_count,
                                                        self.task_count)
 
-    def __getstate__(self):
-        d = dict(vars(self))
-        d.pop('_mutex')
-        return d
-
-    def __setstate__(self, state):
-        self.__dict__ = state
-        self._mutex = threading.Lock()
-
+    def __reduce__(self):
+        return self.__class__, (
+            self.event_callback, self.workers, self.tasks, None,
+            self.max_workers_in_memory, self.max_tasks_in_memory,
+        )
 
 state = State()