Parcourir la source

Last commit broke events.State pickleability

Ask Solem il y a 11 ans
Parent
commit
e827e73f93
1 fichiers modifiés avec 21 ajouts et 16 suppressions
  1. 21 16
      celery/events/state.py

+ 21 - 16
celery/events/state.py

@@ -50,6 +50,11 @@ Substantial drift from %s may mean clocks are out of sync.  Current drift is
 logger = get_logger(__name__)
 warn = logger.warning
 
+R_STATE = '<State: events={0.event_count} tasks={0.task_count}>'
+R_CLOCK = '_lamport(clock={0}, timestamp={1}, id={2} {3!r})'
+R_WORKER = '<Worker: {0.hostname} ({0.status_string})'
+R_TASK = '<Task: {0.name}({0.uuid}) {0.state}>'
+
 
 def heartbeat_expires(timestamp, freq=60,
                       expire_window=HEARTBEAT_EXPIRE_WINDOW):
@@ -63,7 +68,7 @@ class _lamportinfo(tuple):
         return tuple.__new__(cls, (clock, timestamp, id, obj))
 
     def __repr__(self):
-        return '_lamport(clock={0}, timestamp={1}, id={2} {3!r})'.format(*self)
+        return R_CLOCK.format(*self)
 
     def __getnewargs__(self):
         return tuple(self)
@@ -133,7 +138,7 @@ class Worker(Element):
                 heartbeats[:] = heartbeats[hbmax:]
 
     def __repr__(self):
-        return '<Worker: {0.hostname} ({0.status_string})'.format(self)
+        return R_WORKER.format(self)
 
     @property
     def status_string(self):
@@ -263,7 +268,7 @@ class Task(Element):
         return dict(_keys())
 
     def __repr__(self):
-        return '<Task: {0.name}({0.uuid}) {0.state}>'.format(self)
+        return R_TASK.format(self)
 
     @property
     def ready(self):
@@ -276,13 +281,16 @@ class State(object):
     task_count = 0
 
     def __init__(self, callback=None,
+                 workers=None, tasks=None, taskheap=None,
                  max_workers_in_memory=5000, max_tasks_in_memory=10000):
+        self.event_callback = callback
+        self.workers = (LRUCache(max_workers_in_memory)
+                        if workers is None else workers)
+        self.tasks = (LRUCache(max_tasks_in_memory)
+                      if tasks is None else tasks)
+        self._taskheap = [] if taskheap is None else taskheap
         self.max_workers_in_memory = max_workers_in_memory
         self.max_tasks_in_memory = max_tasks_in_memory
-        self.workers = LRUCache(limit=self.max_workers_in_memory)
-        self.tasks = LRUCache(limit=self.max_tasks_in_memory)
-        self._taskheap = []
-        self.event_callback = callback
         self._mutex = threading.Lock()
         self.handlers = {'task': self.task_event,
                          'worker': self.worker_event}
@@ -445,15 +453,12 @@ class State(object):
         return [w for w in values(self.workers) if w.alive]
 
     def __repr__(self):
-        return '<State: events={0.event_count} tasks={0.task_count}>' \
-            .format(self)
+        return R_STATE.format(self)
 
-    def __getstate__(self):
-        d = dict(vars(self))
-        d.pop('_mutex')
-        return d
+    def __reduce__(self):
+        return self.__class__, (
+            self.event_callback, self.workers, self.tasks, self._taskheap,
+            self.max_workers_in_memory, self.max_tasks_in_memory,
+        )
 
-    def __setstate__(self, state):
-        self.__dict__ = state
-        self._mutex = threading.Lock()
 state = State()