|
@@ -22,6 +22,7 @@ import sys
|
|
|
import threading
|
|
|
|
|
|
from datetime import datetime
|
|
|
+from decimal import Decimal
|
|
|
from heapq import heapify, heappush, heappop
|
|
|
from itertools import islice
|
|
|
from operator import itemgetter
|
|
@@ -66,8 +67,14 @@ __all__ = ['Worker', 'Task', 'State', 'heartbeat_expires']
|
|
|
|
|
|
|
|
|
def heartbeat_expires(timestamp, freq=60,
|
|
|
- expire_window=HEARTBEAT_EXPIRE_WINDOW):
|
|
|
- return timestamp + freq * (expire_window / 1e2)
|
|
|
+ expire_window=HEARTBEAT_EXPIRE_WINDOW,
|
|
|
+ Decimal=Decimal, float=float, isinstance=isinstance):
|
|
|
+ # some json implementations returns decimal.Decimal objects,
|
|
|
+ # which are not compatible with float.
|
|
|
+ freq = float(freq) if isinstance(freq, Decimal) else freq
|
|
|
+ if isinstance(timestamp, Decimal):
|
|
|
+ timestamp = float(timestamp)
|
|
|
+ return timestamp + (freq * (expire_window / 1e2))
|
|
|
|
|
|
|
|
|
def _depickle_task(cls, fields):
|