|
@@ -215,6 +215,9 @@ class State(object):
|
|
|
self.tasks = LRUCache(limit=max_tasks_in_memory)
|
|
|
self.event_callback = callback
|
|
|
self._mutex = threading.Lock()
|
|
|
+ self.handlers = {'task': self.task_event,
|
|
|
+ 'worker': self.worker_event}
|
|
|
+ self._get_handler = self.handlers.__getitem__
|
|
|
|
|
|
def freeze_while(self, fun, *args, **kwargs):
|
|
|
clear_after = kwargs.pop('clear_after', False)
|
|
@@ -295,11 +298,14 @@ class State(object):
|
|
|
with self._mutex:
|
|
|
return self._dispatch_event(event)
|
|
|
|
|
|
- def _dispatch_event(self, event):
|
|
|
+ def _dispatch_event(self, event, kwdict=kwdict):
|
|
|
self.event_count += 1
|
|
|
event = kwdict(event)
|
|
|
group, _, subject = event['type'].partition('-')
|
|
|
- getattr(self, group + '_event')(subject, event)
|
|
|
+ try:
|
|
|
+ self._get_handler(group)(subject, event)
|
|
|
+ except KeyError:
|
|
|
+ pass
|
|
|
if self.event_callback:
|
|
|
self.event_callback(self, event)
|
|
|
|