|
@@ -22,9 +22,9 @@ import threading
|
|
|
|
|
|
from heapq import heappush, heappop
|
|
from heapq import heappush, heappop
|
|
from itertools import islice
|
|
from itertools import islice
|
|
-from operator import itemgetter
|
|
|
|
from time import time
|
|
from time import time
|
|
|
|
|
|
|
|
+from kombu.clocks import timetuple
|
|
from kombu.utils import kwdict
|
|
from kombu.utils import kwdict
|
|
|
|
|
|
from celery import states
|
|
from celery import states
|
|
@@ -51,50 +51,17 @@ logger = get_logger(__name__)
|
|
warn = logger.warning
|
|
warn = logger.warning
|
|
|
|
|
|
R_STATE = '<State: events={0.event_count} tasks={0.task_count}>'
|
|
R_STATE = '<State: events={0.event_count} tasks={0.task_count}>'
|
|
-R_CLOCK = '_lamport(clock={0}, timestamp={1}, id={2} {3!r})'
|
|
|
|
R_WORKER = '<Worker: {0.hostname} ({0.status_string})'
|
|
R_WORKER = '<Worker: {0.hostname} ({0.status_string})'
|
|
R_TASK = '<Task: {0.name}({0.uuid}) {0.state}>'
|
|
R_TASK = '<Task: {0.name}({0.uuid}) {0.state}>'
|
|
|
|
|
|
|
|
+__all__ = ['Worker', 'Task', 'State', 'heartbeat_expires']
|
|
|
|
+
|
|
|
|
|
|
def heartbeat_expires(timestamp, freq=60,
|
|
def heartbeat_expires(timestamp, freq=60,
|
|
expire_window=HEARTBEAT_EXPIRE_WINDOW):
|
|
expire_window=HEARTBEAT_EXPIRE_WINDOW):
|
|
return timestamp + freq * (expire_window / 1e2)
|
|
return timestamp + freq * (expire_window / 1e2)
|
|
|
|
|
|
|
|
|
|
-class _lamportinfo(tuple):
|
|
|
|
- __slots__ = ()
|
|
|
|
-
|
|
|
|
- def __new__(cls, clock, timestamp, id, obj):
|
|
|
|
- return tuple.__new__(cls, (clock, timestamp, id, obj))
|
|
|
|
-
|
|
|
|
- def __repr__(self):
|
|
|
|
- return R_CLOCK.format(*self)
|
|
|
|
-
|
|
|
|
- def __getnewargs__(self):
|
|
|
|
- return tuple(self)
|
|
|
|
-
|
|
|
|
- def __lt__(self, other):
|
|
|
|
- # 0: clock 1: timestamp 3: process id
|
|
|
|
- try:
|
|
|
|
- A, B = self[0], other[0]
|
|
|
|
- # uses logical clock value first
|
|
|
|
- if A and B: # use logical clock if available
|
|
|
|
- if A == B: # equal clocks use lower process id
|
|
|
|
- return self[2] < other[2]
|
|
|
|
- return A < B
|
|
|
|
- return self[1] < other[1] # ... or use timestamp
|
|
|
|
- except IndexError:
|
|
|
|
- return NotImplemented
|
|
|
|
- __gt__ = lambda self, other: other < self
|
|
|
|
- __le__ = lambda self, other: not other < self
|
|
|
|
- __ge__ = lambda self, other: not self < other
|
|
|
|
-
|
|
|
|
- clock = property(itemgetter(0))
|
|
|
|
- timestamp = property(itemgetter(1))
|
|
|
|
- id = property(itemgetter(2))
|
|
|
|
- obj = property(itemgetter(3))
|
|
|
|
-
|
|
|
|
-
|
|
|
|
def with_unique_field(attr):
|
|
def with_unique_field(attr):
|
|
|
|
|
|
def _decorate_cls(cls):
|
|
def _decorate_cls(cls):
|
|
@@ -385,7 +352,7 @@ class State(object):
|
|
handler(**fields)
|
|
handler(**fields)
|
|
return worker, created
|
|
return worker, created
|
|
|
|
|
|
- def task_event(self, type, fields):
|
|
|
|
|
|
+ def task_event(self, type, fields, timetuple=timetuple):
|
|
"""Process task event."""
|
|
"""Process task event."""
|
|
uuid = fields['uuid']
|
|
uuid = fields['uuid']
|
|
hostname = fields['hostname']
|
|
hostname = fields['hostname']
|
|
@@ -397,7 +364,7 @@ class State(object):
|
|
taskheap = self._taskheap
|
|
taskheap = self._taskheap
|
|
timestamp = fields.get('timestamp') or 0
|
|
timestamp = fields.get('timestamp') or 0
|
|
clock = 0 if type == 'sent' else fields.get('clock')
|
|
clock = 0 if type == 'sent' else fields.get('clock')
|
|
- heappush(taskheap, _lamportinfo(clock, timestamp, worker.id, task))
|
|
|
|
|
|
+ heappush(taskheap, timetuple(clock, timestamp, worker.id, task))
|
|
if len(taskheap) > maxtasks:
|
|
if len(taskheap) > maxtasks:
|
|
heappop(taskheap)
|
|
heappop(taskheap)
|
|
|
|
|