|
@@ -37,6 +37,8 @@ from celery.five import items, python_2_unicode_compatible, values
|
|
|
from celery.utils.functional import LRUCache, memoize
|
|
|
from celery.utils.log import get_logger
|
|
|
|
|
|
+__all__ = ['Worker', 'Task', 'State', 'heartbeat_expires']
|
|
|
+
|
|
|
PYPY = hasattr(sys, 'pypy_version_info')
|
|
|
|
|
|
# The window (in percentage) is added to the workers heartbeat
|
|
@@ -60,7 +62,17 @@ R_STATE = '<State: events={0.event_count} tasks={0.task_count}>'
|
|
|
R_WORKER = '<Worker: {0.hostname} ({0.status_string} clock:{0.clock})'
|
|
|
R_TASK = '<Task: {0.name}({0.uuid}) {0.state} clock:{0.clock}>'
|
|
|
|
|
|
-__all__ = ['Worker', 'Task', 'State', 'heartbeat_expires']
|
|
|
+#: Mapping of task event names to task state.
|
|
|
+TASK_EVENT_TO_STATE = {
|
|
|
+ 'sent': states.PENDING,
|
|
|
+ 'received': states.RECEIVED,
|
|
|
+ 'started': states.STARTED,
|
|
|
+ 'failed': states.FAILURE,
|
|
|
+ 'retried': states.RETRY,
|
|
|
+ 'succeeded': states.SUCCESS,
|
|
|
+ 'revoked': states.REVOKED,
|
|
|
+ 'rejected': states.REJECTED,
|
|
|
+}
|
|
|
|
|
|
|
|
|
@memoize(maxsize=1000, keyfun=lambda a, _: a[0])
|
|
@@ -254,30 +266,18 @@ class Task(object):
|
|
|
self.__dict__.update(kwargs)
|
|
|
|
|
|
def event(self, type_, timestamp=None, local_received=None, fields=None,
|
|
|
- precedence=states.precedence, items=items, dict=dict,
|
|
|
- PENDING=states.PENDING, RECEIVED=states.RECEIVED,
|
|
|
- STARTED=states.STARTED, FAILURE=states.FAILURE,
|
|
|
- RETRY=states.RETRY, SUCCESS=states.SUCCESS,
|
|
|
- REVOKED=states.REVOKED, REJECTED=states.REJECTED):
|
|
|
+ precedence=states.precedence, items=items,
|
|
|
+ setattr=setattr, task_event_to_state=TASK_EVENT_TO_STATE.get,
|
|
|
+ RETRY=states.RETRY):
|
|
|
fields = fields or {}
|
|
|
- if type_ == 'sent':
|
|
|
- state, self.sent = PENDING, timestamp
|
|
|
- elif type_ == 'received':
|
|
|
- state, self.received = RECEIVED, timestamp
|
|
|
- elif type_ == 'started':
|
|
|
- state, self.started = STARTED, timestamp
|
|
|
- elif type_ == 'failed':
|
|
|
- state, self.failed = FAILURE, timestamp
|
|
|
- elif type_ == 'retried':
|
|
|
- state, self.retried = RETRY, timestamp
|
|
|
- elif type_ == 'succeeded':
|
|
|
- state, self.succeeded = SUCCESS, timestamp
|
|
|
- elif type_ == 'revoked':
|
|
|
- state, self.revoked = REVOKED, timestamp
|
|
|
- elif type_ == 'rejected':
|
|
|
- state, self.rejected = REJECTED, timestamp
|
|
|
+
|
|
|
+ # using .get is faster than catching KeyError in this case.
|
|
|
+ state = task_event_to_state(type_)
|
|
|
+ if state is not None:
|
|
|
+ # sets e.g. self.succeeded to the timestamp.
|
|
|
+ setattr(self, type_, timestamp)
|
|
|
else:
|
|
|
- state = type_.upper()
|
|
|
+ state = type_.upper() # custom state
|
|
|
|
|
|
# note that precedence here is reversed
|
|
|
# see implementation in celery.states.state.__lt__
|
|
@@ -289,11 +289,11 @@ class Task(object):
|
|
|
fields = {
|
|
|
k: v for k, v in items(fields) if k in keep
|
|
|
}
|
|
|
- self.__dict__.update(fields)
|
|
|
else:
|
|
|
- self.state = state
|
|
|
- self.timestamp = timestamp
|
|
|
- self.__dict__.update(fields)
|
|
|
+ fields.update(state=state, timestamp=timestamp)
|
|
|
+
|
|
|
+ # update current state with info from this event.
|
|
|
+ self.__dict__.update(fields)
|
|
|
|
|
|
def info(self, fields=None, extra=[]):
|
|
|
"""Information about this task suitable for on-screen display."""
|