|
@@ -9,14 +9,14 @@ from celery import states
|
|
|
from celery.datastructures import AttributeDict, LocalCache
|
|
|
from celery.utils import kwdict
|
|
|
|
|
|
-HEARTBEAT_EXPIRE = 150 # 2 minutes, 30 seconds
|
|
|
+#: Hartbeat expiry time in seconds. The worker will be considered offline
|
|
|
+#: if no heartbeat is received within this time.
|
|
|
+#: Default is 2:30 minutes.
|
|
|
+HEARTBEAT_EXPIRE = 150
|
|
|
|
|
|
|
|
|
class Element(AttributeDict):
|
|
|
- """Base class for types."""
|
|
|
-
|
|
|
- def __init__(self, **fields):
|
|
|
- dict.__init__(self, fields)
|
|
|
+ """Base class for worker state elements."""
|
|
|
|
|
|
|
|
|
class Worker(Element):
|
|
@@ -28,12 +28,15 @@ class Worker(Element):
|
|
|
self.heartbeats = []
|
|
|
|
|
|
def on_online(self, timestamp=None, **kwargs):
|
|
|
+ """Callback for the `worker-online` event."""
|
|
|
self._heartpush(timestamp)
|
|
|
|
|
|
def on_offline(self, **kwargs):
|
|
|
+ """Callback for the `worker-offline` event."""
|
|
|
self.heartbeats = []
|
|
|
|
|
|
def on_heartbeat(self, timestamp=None, **kwargs):
|
|
|
+ """Callback for the `worker-heartbeat` event."""
|
|
|
self._heartpush(timestamp)
|
|
|
|
|
|
def _heartpush(self, timestamp):
|
|
@@ -54,51 +57,48 @@ class Worker(Element):
|
|
|
|
|
|
class Task(Element):
|
|
|
"""Task State."""
|
|
|
- _info_fields = ("args", "kwargs", "retries",
|
|
|
- "result", "eta", "runtime", "expires",
|
|
|
- "exception")
|
|
|
|
|
|
+ #: How to merge out of order events.
|
|
|
+ #: Disorder is detected by logical ordering (e.g. task-received must have
|
|
|
+ #: happened before a task-failed event).
|
|
|
+ #:
|
|
|
+ #: A merge rule consists of a state and a list of fields to keep from
|
|
|
+ #: that state. ``(RECEIVED, ("name", "args")``, means the name and args
|
|
|
+ #: fields are always taken from the RECEIVED state, and any values for
|
|
|
+ #: these fields received before or after is simply ignored.
|
|
|
merge_rules = {states.RECEIVED: ("name", "args", "kwargs",
|
|
|
"retries", "eta", "expires")}
|
|
|
|
|
|
- _defaults = dict(uuid=None,
|
|
|
- name=None,
|
|
|
- state=states.PENDING,
|
|
|
- received=False,
|
|
|
- sent=False,
|
|
|
- started=False,
|
|
|
- succeeded=False,
|
|
|
- failed=False,
|
|
|
- retried=False,
|
|
|
- revoked=False,
|
|
|
- args=None,
|
|
|
- kwargs=None,
|
|
|
- eta=None,
|
|
|
- expires=None,
|
|
|
- retries=None,
|
|
|
- worker=None,
|
|
|
- result=None,
|
|
|
- exception=None,
|
|
|
- timestamp=None,
|
|
|
- runtime=None,
|
|
|
+
|
|
|
+ #: meth:`info` displays these fields by default.
|
|
|
+ _info_fields = ("args", "kwargs", "retries", "result",
|
|
|
+ "eta", "runtime", "expires", "exception")
|
|
|
+
|
|
|
+ #: Default values.
|
|
|
+ _defaults = dict(uuid=None, name=None, state=states.PENDING,
|
|
|
+ received=False, sent=False, started=False,
|
|
|
+ succeeded=False, failed=False, retried=False,
|
|
|
+ revoked=False, args=None, kwargs=None, eta=None,
|
|
|
+ expires=None, retries=None, worker=None, result=None,
|
|
|
+ exception=None, timestamp=None, runtime=None,
|
|
|
traceback=None)
|
|
|
|
|
|
def __init__(self, **fields):
|
|
|
super(Task, self).__init__(**dict(self._defaults, **fields))
|
|
|
|
|
|
- def info(self, fields=None, extra=[]):
|
|
|
- if fields is None:
|
|
|
- fields = self._info_fields
|
|
|
- fields = list(fields) + list(extra)
|
|
|
- return dict((key, getattr(self, key, None))
|
|
|
- for key in fields
|
|
|
- if getattr(self, key, None) is not None)
|
|
|
-
|
|
|
def update(self, state, timestamp, fields):
|
|
|
+ """Update state from new event.
|
|
|
+
|
|
|
+ :param state: State from event.
|
|
|
+ :param timestamp: Timestamp from event.
|
|
|
+ :param fields: Event data.
|
|
|
+
|
|
|
+ """
|
|
|
if self.worker:
|
|
|
self.worker.on_heartbeat(timestamp=timestamp)
|
|
|
if state != states.RETRY and self.state != states.RETRY and \
|
|
|
states.state(state) < states.state(self.state):
|
|
|
+ # this state logically happens-before the current state, so merge.
|
|
|
self.merge(state, timestamp, fields)
|
|
|
else:
|
|
|
self.state = state
|
|
@@ -106,39 +106,55 @@ class Task(Element):
|
|
|
super(Task, self).update(fields)
|
|
|
|
|
|
def merge(self, state, timestamp, fields):
|
|
|
+ """Merge with out of order event."""
|
|
|
keep = self.merge_rules.get(state)
|
|
|
if keep is not None:
|
|
|
fields = dict((key, fields.get(key)) for key in keep)
|
|
|
super(Task, self).update(fields)
|
|
|
|
|
|
def on_sent(self, timestamp=None, **fields):
|
|
|
+ """Callback for the ``task-sent`` event."""
|
|
|
self.sent = timestamp
|
|
|
self.update(states.PENDING, timestamp, fields)
|
|
|
|
|
|
def on_received(self, timestamp=None, **fields):
|
|
|
+ """Callback for the ``task-received`` event."""
|
|
|
self.received = timestamp
|
|
|
self.update(states.RECEIVED, timestamp, fields)
|
|
|
|
|
|
def on_started(self, timestamp=None, **fields):
|
|
|
+ """Callback for the ``task-started`` event."""
|
|
|
self.started = timestamp
|
|
|
self.update(states.STARTED, timestamp, fields)
|
|
|
|
|
|
def on_failed(self, timestamp=None, **fields):
|
|
|
+ """Callback for the ``task-failed`` event."""
|
|
|
self.failed = timestamp
|
|
|
self.update(states.FAILURE, timestamp, fields)
|
|
|
|
|
|
def on_retried(self, timestamp=None, **fields):
|
|
|
+ """Callback for the ``task-retried`` event."""
|
|
|
self.retried = timestamp
|
|
|
self.update(states.RETRY, timestamp, fields)
|
|
|
|
|
|
def on_succeeded(self, timestamp=None, **fields):
|
|
|
+ """Callback for the ``task-succeeded`` event."""
|
|
|
self.succeeded = timestamp
|
|
|
self.update(states.SUCCESS, timestamp, fields)
|
|
|
|
|
|
def on_revoked(self, timestamp=None, **fields):
|
|
|
+ """Callback for the ``task-revoked`` event."""
|
|
|
self.revoked = timestamp
|
|
|
self.update(states.REVOKED, timestamp, fields)
|
|
|
|
|
|
+ def info(self, fields=None, extra=[]):
|
|
|
+ """Information about this task suitable for on-screen display."""
|
|
|
+ if fields is None:
|
|
|
+ fields = self._info_fields
|
|
|
+ return dict((key, getattr(self, key, None))
|
|
|
+ for key in list(fields) + list(extra)
|
|
|
+ if getattr(self, key, None) is not None)
|
|
|
+
|
|
|
def __repr__(self):
|
|
|
return "<Task: %s(%s) %s>" % (self.name, self.uuid, self.state)
|
|
|
|