|
@@ -210,7 +210,7 @@ class Task(AttributeDict):
|
|
|
def __init__(self, **fields):
|
|
|
dict.__init__(self, self._defaults, **fields)
|
|
|
|
|
|
- def update(self, state, timestamp, fields):
|
|
|
+ def update(self, state, timestamp, fields, _state=states.state):
|
|
|
"""Update state from new event.
|
|
|
|
|
|
:param state: State from event.
|
|
@@ -222,7 +222,7 @@ class Task(AttributeDict):
|
|
|
if self.worker and time_received:
|
|
|
self.worker.update_heartbeat(time_received, timestamp)
|
|
|
if state != states.RETRY and self.state != states.RETRY and \
|
|
|
- states.state(state) < states.state(self.state):
|
|
|
+ _state(state) < _state(self.state):
|
|
|
# this state logically happens-before the current state, so merge.
|
|
|
self.merge(state, timestamp, fields)
|
|
|
else:
|
|
@@ -367,10 +367,10 @@ class State(object):
|
|
|
def get_or_create_task(self, uuid):
|
|
|
"""Get or create task by uuid."""
|
|
|
try:
|
|
|
- return self.tasks[uuid], True
|
|
|
+ return self.tasks[uuid], False
|
|
|
except KeyError:
|
|
|
task = self.tasks[uuid] = Task(uuid=uuid)
|
|
|
- return task, False
|
|
|
+ return task, True
|
|
|
|
|
|
def worker_event(self, type, fields):
|
|
|
"""Process worker event."""
|
|
@@ -480,5 +480,4 @@ class State(object):
|
|
|
self.event_callback, self.workers, self.tasks, self._taskheap,
|
|
|
self.max_workers_in_memory, self.max_tasks_in_memory,
|
|
|
)
|
|
|
-
|
|
|
state = State()
|