|
@@ -1,7 +1,12 @@
|
|
|
+import time
|
|
|
+import heapq
|
|
|
+
|
|
|
from carrot.utils import partition
|
|
|
|
|
|
from celery import states
|
|
|
|
|
|
+HEARTBEAT_EXPIRE = 150 # 2 minutes, 30 seconds
|
|
|
+
|
|
|
|
|
|
class Thing(object):
|
|
|
visited = False
|
|
@@ -16,21 +21,27 @@ class Thing(object):
|
|
|
|
|
|
|
|
|
class Worker(Thing):
|
|
|
- alive = False
|
|
|
|
|
|
def __init__(self, **fields):
|
|
|
super(Worker, self).__init__(**fields)
|
|
|
self.heartbeats = []
|
|
|
|
|
|
- def online(self, **kwargs):
|
|
|
- self.alive = True
|
|
|
+ def online(self, timestamp=None, **kwargs):
|
|
|
+ self._heartpush(timestamp)
|
|
|
|
|
|
def offline(self, **kwargs):
|
|
|
- self.alive = False
|
|
|
+ self.heartbeats = []
|
|
|
|
|
|
def heartbeat(self, timestamp=None, **kwargs):
|
|
|
- self.heartbeats.append(timestamp)
|
|
|
- self.alive = True
|
|
|
+ self._heartpush(timestamp)
|
|
|
+
|
|
|
+ def _heartpush(self, timestamp):
|
|
|
+ heapq.heappush(self.heartbeats, timestamp)
|
|
|
+
|
|
|
+ @property
|
|
|
+ def alive(self):
|
|
|
+ return (self.heartbeats and
|
|
|
+ time.time() < self.heartbeats[0] + HEARTBEAT_EXPIRE)
|
|
|
|
|
|
|
|
|
class Task(Thing):
|