|
@@ -59,8 +59,8 @@ class Task(Element):
|
|
|
"result", "eta", "runtime", "expires",
|
|
|
"exception")
|
|
|
|
|
|
- _merge_rules = {states.RECEIVED: ("name", "args", "kwargs",
|
|
|
- "retries", "eta", "expires")}
|
|
|
+ merge_rules = {states.RECEIVED: ("name", "args", "kwargs",
|
|
|
+ "retries", "eta", "expires")}
|
|
|
|
|
|
_defaults = dict(uuid=None,
|
|
|
name=None,
|
|
@@ -97,7 +97,8 @@ class Task(Element):
|
|
|
def update(self, state, timestamp, fields):
|
|
|
if self.worker:
|
|
|
self.worker.on_heartbeat(timestamp=timestamp)
|
|
|
- if states.state(state) < states.state(self.state):
|
|
|
+ if state != states.RETRY and self.state != states.RETRY and \
|
|
|
+ states.state(state) < states.state(self.state):
|
|
|
self.merge(state, timestamp, fields)
|
|
|
else:
|
|
|
self.state = state
|
|
@@ -105,7 +106,7 @@ class Task(Element):
|
|
|
super(Task, self).update(fields)
|
|
|
|
|
|
def merge(self, state, timestamp, fields):
|
|
|
- keep = self._merge_rules.get(state)
|
|
|
+ keep = self.merge_rules.get(state)
|
|
|
if keep is not None:
|
|
|
fields = dict((key, fields[key]) for key in keep)
|
|
|
super(Task, self).update(fields)
|