|
@@ -24,17 +24,18 @@ import threading
|
|
|
from datetime import datetime
|
|
|
from heapq import heappush, heappop
|
|
|
from itertools import islice
|
|
|
+from operator import itemgetter
|
|
|
from time import time
|
|
|
|
|
|
from kombu.clocks import timetuple
|
|
|
-from kombu.utils import kwdict
|
|
|
|
|
|
from celery import states
|
|
|
-from celery.datastructures import AttributeDict
|
|
|
from celery.five import items, values
|
|
|
from celery.utils.functional import LRUCache
|
|
|
from celery.utils.log import get_logger
|
|
|
|
|
|
+PYPY = hasattr(sys, 'pypy_version_info')
|
|
|
+
|
|
|
# The window (in percentage) is added to the workers heartbeat
|
|
|
# frequency. If the time between updates exceeds this window,
|
|
|
# then the worker is considered to be offline.
|
|
@@ -55,8 +56,8 @@ logger = get_logger(__name__)
|
|
|
warn = logger.warning
|
|
|
|
|
|
R_STATE = '<State: events={0.event_count} tasks={0.task_count}>'
|
|
|
-R_WORKER = '<Worker: {0.hostname} ({0.status_string})'
|
|
|
-R_TASK = '<Task: {0.name}({0.uuid}) {0.state}>'
|
|
|
+R_WORKER = '<Worker: {0.hostname} ({0.status_string} clock:{0.clock})'
|
|
|
+R_TASK = '<Task: {0.name}({0.uuid}) {0.state} clock:{0.clock}>'
|
|
|
|
|
|
__all__ = ['Worker', 'Task', 'State', 'heartbeat_expires']
|
|
|
|
|
@@ -89,45 +90,75 @@ def with_unique_field(attr):
|
|
|
|
|
|
|
|
|
@with_unique_field('hostname')
|
|
|
-class Worker(AttributeDict):
|
|
|
+class Worker(object):
|
|
|
"""Worker State."""
|
|
|
heartbeat_max = 4
|
|
|
expire_window = HEARTBEAT_EXPIRE_WINDOW
|
|
|
pid = None
|
|
|
_defaults = {'hostname': None, 'pid': None, 'freq': 60}
|
|
|
|
|
|
- def __init__(self, **fields):
|
|
|
- dict.__init__(self, self._defaults, **fields)
|
|
|
+ if not PYPY:
|
|
|
+ __slots__ = ('hostname', 'pid', 'freq',
|
|
|
+ 'heartbeats', 'clock', '__dict__')
|
|
|
+
|
|
|
+ def __init__(self, hostname=None, pid=None, freq=60):
|
|
|
+ self.hostname = hostname
|
|
|
+ self.pid = pid
|
|
|
+ self.freq = freq
|
|
|
self.heartbeats = []
|
|
|
+ self.clock = 0
|
|
|
+ self.event = self._create_event_handler()
|
|
|
+
|
|
|
+ def _create_event_handler(self):
|
|
|
+ _set = object.__setattr__
|
|
|
+ heartbeats = self.heartbeats
|
|
|
+ hbmax = self.heartbeat_max
|
|
|
+
|
|
|
+ def event(type_, timestamp=None,
|
|
|
+ local_received=None, fields=None,
|
|
|
+ max_drift=HEARTBEAT_DRIFT_MAX, items=items, abs=abs,
|
|
|
+ heappush=heappush, heappop=heappop, int=int, len=len):
|
|
|
+ fields = fields or {}
|
|
|
+ for k, v in items(fields):
|
|
|
+ _set(self, k, v)
|
|
|
+ if type_ == 'offline':
|
|
|
+ heartbeats[:] = []
|
|
|
+ else:
|
|
|
+ if not local_received or not timestamp:
|
|
|
+ return
|
|
|
+ drift = abs(int(local_received) - int(timestamp))
|
|
|
+ if drift > HEARTBEAT_DRIFT_MAX:
|
|
|
+ warn(DRIFT_WARNING, self.hostname, drift,
|
|
|
+ datetime.fromtimestamp(local_received),
|
|
|
+ datetime.fromtimestamp(timestamp))
|
|
|
+ if not heartbeats or (
|
|
|
+ local_received and local_received > heartbeats[-1]):
|
|
|
+ heappush(heartbeats, local_received)
|
|
|
+ if len(heartbeats) > hbmax:
|
|
|
+ heappop(heartbeats)
|
|
|
+ return event
|
|
|
+
|
|
|
+ def update(self, f, **kw):
|
|
|
+ for k, v in items(dict(f, **kw) if kw else f):
|
|
|
+ setattr(self, k, v)
|
|
|
|
|
|
- def on_online(self, timestamp=None, local_received=None, **kwargs):
|
|
|
- """Callback for the :event:`worker-online` event."""
|
|
|
- self.update(**kwargs)
|
|
|
- self.update_heartbeat(local_received, timestamp)
|
|
|
+ def update_heartbeat(self, received, timestamp):
|
|
|
+ self.event(None, timestamp, received)
|
|
|
|
|
|
- def on_offline(self, **kwargs):
|
|
|
- """Callback for the :event:`worker-offline` event."""
|
|
|
- self.update(**kwargs)
|
|
|
- self.heartbeats = []
|
|
|
+ def on_online(self, timestamp=None, local_received=None, **fields):
|
|
|
+ """Deprecated, to be removed in 3.1, use:
|
|
|
+ ``.event('online', timestamp, local_received, fields)``."""
|
|
|
+ self.event('online', timestamp, local_received, fields)
|
|
|
|
|
|
- def on_heartbeat(self, timestamp=None, local_received=None, **kwargs):
|
|
|
- """Callback for the :event:`worker-heartbeat` event."""
|
|
|
- self.update(**kwargs)
|
|
|
- self.update_heartbeat(local_received, timestamp)
|
|
|
+ def on_offline(self, timestamp=None, local_received=None, **fields):
|
|
|
+ """Deprecated, to be removed in 3.1, use:
|
|
|
+ ``.event('offline', timestamp, local_received, fields)``."""
|
|
|
+ self.event('offline', timestamp, local_received, fields)
|
|
|
|
|
|
- def update_heartbeat(self, received, timestamp):
|
|
|
- if not received or not timestamp:
|
|
|
- return
|
|
|
- drift = abs(int(received) - int(timestamp))
|
|
|
- if drift > HEARTBEAT_DRIFT_MAX:
|
|
|
- warn(DRIFT_WARNING, self.hostname, drift,
|
|
|
- datetime.fromtimestamp(received),
|
|
|
- datetime.fromtimestamp(timestamp))
|
|
|
- heartbeats, hbmax = self.heartbeats, self.heartbeat_max
|
|
|
- if not heartbeats or (received and received > heartbeats[-1]):
|
|
|
- heappush(heartbeats, received)
|
|
|
- if len(heartbeats) > hbmax:
|
|
|
- heartbeats[:] = heartbeats[hbmax:]
|
|
|
+ def on_heartbeat(self, timestamp=None, local_received=None, **fields):
|
|
|
+ """Deprecated, to be removed in 3.1, use:
|
|
|
+ ``.event('heartbeat', timestamp, local_received, fields)``."""
|
|
|
+ self.event('heartbeat', timestamp, local_received, fields)
|
|
|
|
|
|
def __repr__(self):
|
|
|
return R_WORKER.format(self)
|
|
@@ -142,8 +173,8 @@ class Worker(AttributeDict):
|
|
|
self.freq, self.expire_window)
|
|
|
|
|
|
@property
|
|
|
- def alive(self):
|
|
|
- return bool(self.heartbeats and time() < self.heartbeat_expires)
|
|
|
+ def alive(self, nowfun=time):
|
|
|
+ return bool(self.heartbeats and nowfun() < self.heartbeat_expires)
|
|
|
|
|
|
@property
|
|
|
def id(self):
|
|
@@ -151,8 +182,17 @@ class Worker(AttributeDict):
|
|
|
|
|
|
|
|
|
@with_unique_field('uuid')
|
|
|
-class Task(AttributeDict):
|
|
|
+class Task(object):
|
|
|
"""Task State."""
|
|
|
+ if not PYPY:
|
|
|
+ __slots__ = (
|
|
|
+ 'uuid', 'name', 'state', 'received', 'sent', 'started',
|
|
|
+ 'succeeded', 'failed', 'retried', 'revoked', 'args', 'kwargs',
|
|
|
+ 'eta', 'expires', 'retries', 'worker', 'result',
|
|
|
+ 'exception', 'timestamp', 'runtime',
|
|
|
+ 'traceback', 'exchange', 'routing_key', 'clock',
|
|
|
+ '__dict__',
|
|
|
+ )
|
|
|
|
|
|
#: How to merge out of order events.
|
|
|
#: Disorder is detected by logical ordering (e.g. :event:`task-received`
|
|
@@ -162,28 +202,50 @@ class Task(AttributeDict):
|
|
|
#: that state. ``(RECEIVED, ('name', 'args')``, means the name and args
|
|
|
#: fields are always taken from the RECEIVED state, and any values for
|
|
|
#: these fields received before or after is simply ignored.
|
|
|
- merge_rules = {states.RECEIVED: ('name', 'args', 'kwargs',
|
|
|
- 'retries', 'eta', 'expires')}
|
|
|
+ merge_rules = {
|
|
|
+ states.RECEIVED: ('name', 'args', 'kwargs',
|
|
|
+ 'retries', 'eta', 'expires'),
|
|
|
+ }
|
|
|
|
|
|
#: meth:`info` displays these fields by default.
|
|
|
_info_fields = ('args', 'kwargs', 'retries', 'result',
|
|
|
'eta', 'runtime', 'expires', 'exception',
|
|
|
'exchange', 'routing_key')
|
|
|
|
|
|
- #: Default values.
|
|
|
- _defaults = dict(uuid=None, name=None, state=states.PENDING,
|
|
|
- received=False, sent=False, started=False,
|
|
|
- succeeded=False, failed=False, retried=False,
|
|
|
- revoked=False, args=None, kwargs=None, eta=None,
|
|
|
- expires=None, retries=None, worker=None, result=None,
|
|
|
- exception=None, timestamp=None, runtime=None,
|
|
|
- traceback=None, exchange=None, routing_key=None,
|
|
|
- clock=0)
|
|
|
-
|
|
|
- def __init__(self, **fields):
|
|
|
- dict.__init__(self, self._defaults, **fields)
|
|
|
-
|
|
|
- def update(self, state, timestamp, fields, _state=states.state):
|
|
|
+ def __init__(self, uuid=None, name=None, state=states.PENDING,
|
|
|
+ received=None, sent=None, started=None, succeeded=None,
|
|
|
+ failed=None, retried=None, revoked=None, args=None,
|
|
|
+ kwargs=None, eta=None, expires=None, retries=None,
|
|
|
+ worker=None, result=None, exception=None, timestamp=None,
|
|
|
+ runtime=None, traceback=None, exchange=None,
|
|
|
+ routing_key=None, clock=0, **fields):
|
|
|
+ self.uuid = uuid
|
|
|
+ self.name = name
|
|
|
+ self.state = state
|
|
|
+ self.received = received
|
|
|
+ self.sent = sent
|
|
|
+ self.started = started
|
|
|
+ self.succeeded = succeeded
|
|
|
+ self.failed = failed
|
|
|
+ self.retried = retried
|
|
|
+ self.revoked = revoked
|
|
|
+ self.args = args
|
|
|
+ self.kwargs = kwargs
|
|
|
+ self.eta = eta
|
|
|
+ self.expires = expires
|
|
|
+ self.retries = retries
|
|
|
+ self.worker = worker
|
|
|
+ self.result = result
|
|
|
+ self.exception = exception
|
|
|
+ self.timestamp = timestamp
|
|
|
+ self.runtime = runtime
|
|
|
+ self.traceback = traceback
|
|
|
+ self.exchange = exchange
|
|
|
+ self.routing_key = routing_key
|
|
|
+ self.clock = clock
|
|
|
+
|
|
|
+ def update(self, state, timestamp, fields,
|
|
|
+ _state=states.state, RETRY=states.RETRY):
|
|
|
"""Update state from new event.
|
|
|
|
|
|
:param state: State from event.
|
|
@@ -194,59 +256,65 @@ class Task(AttributeDict):
|
|
|
time_received = fields.get('local_received') or 0
|
|
|
if self.worker and time_received:
|
|
|
self.worker.update_heartbeat(time_received, timestamp)
|
|
|
- if state != states.RETRY and self.state != states.RETRY and \
|
|
|
+ if state != RETRY and self.state != RETRY and \
|
|
|
_state(state) < _state(self.state):
|
|
|
# this state logically happens-before the current state, so merge.
|
|
|
self.merge(state, timestamp, fields)
|
|
|
else:
|
|
|
self.state = state
|
|
|
self.timestamp = timestamp
|
|
|
- super(Task, self).update(fields)
|
|
|
+ for key, value in items(self.fields):
|
|
|
+ setattr(self, key, value)
|
|
|
|
|
|
def merge(self, state, timestamp, fields):
|
|
|
"""Merge with out of order event."""
|
|
|
keep = self.merge_rules.get(state)
|
|
|
if keep is not None:
|
|
|
- fields = dict((key, fields.get(key)) for key in keep)
|
|
|
- super(Task, self).update(fields)
|
|
|
-
|
|
|
- def on_sent(self, timestamp=None, **fields):
|
|
|
- """Callback for the :event:`task-sent` event."""
|
|
|
- self.sent = timestamp
|
|
|
- self.update(states.PENDING, timestamp, fields)
|
|
|
-
|
|
|
- def on_received(self, timestamp=None, **fields):
|
|
|
- """Callback for the :event:`task-received` event."""
|
|
|
- self.received = timestamp
|
|
|
- self.update(states.RECEIVED, timestamp, fields)
|
|
|
-
|
|
|
- def on_started(self, timestamp=None, **fields):
|
|
|
- """Callback for the :event:`task-started` event."""
|
|
|
- self.started = timestamp
|
|
|
- self.update(states.STARTED, timestamp, fields)
|
|
|
-
|
|
|
- def on_failed(self, timestamp=None, **fields):
|
|
|
- """Callback for the :event:`task-failed` event."""
|
|
|
- self.failed = timestamp
|
|
|
- self.update(states.FAILURE, timestamp, fields)
|
|
|
-
|
|
|
- def on_retried(self, timestamp=None, **fields):
|
|
|
- """Callback for the :event:`task-retried` event."""
|
|
|
- self.retried = timestamp
|
|
|
- self.update(states.RETRY, timestamp, fields)
|
|
|
-
|
|
|
- def on_succeeded(self, timestamp=None, **fields):
|
|
|
- """Callback for the :event:`task-succeeded` event."""
|
|
|
- self.succeeded = timestamp
|
|
|
- self.update(states.SUCCESS, timestamp, fields)
|
|
|
-
|
|
|
- def on_revoked(self, timestamp=None, **fields):
|
|
|
- """Callback for the :event:`task-revoked` event."""
|
|
|
- self.revoked = timestamp
|
|
|
- self.update(states.REVOKED, timestamp, fields)
|
|
|
+ fields = dict((k, v) for k, v in items(fields) if k in keep)
|
|
|
+ for key, value in items(fields):
|
|
|
+ setattr(self, key, value)
|
|
|
+
|
|
|
+ def event(self, type_, timestamp=None, local_received=None, fields=None,
|
|
|
+ precedence=states.precedence, items=items, dict=dict,
|
|
|
+ PENDING=states.PENDING, RECEIVED=states.RECEIVED,
|
|
|
+ STARTED=states.STARTED, FAILURE=states.FAILURE,
|
|
|
+ RETRY=states.RETRY, SUCCESS=states.SUCCESS,
|
|
|
+ REVOKED=states.REVOKED):
|
|
|
+ fields = fields or {}
|
|
|
+ if type_ == 'sent':
|
|
|
+ state, self.sent = PENDING, timestamp
|
|
|
+ elif type_ == 'received':
|
|
|
+ state, self.received = RECEIVED, timestamp
|
|
|
+ elif type_ == 'started':
|
|
|
+ state, self.started = STARTED, timestamp
|
|
|
+ elif type_ == 'failed':
|
|
|
+ state, self.failed = FAILURE, timestamp
|
|
|
+ elif type_ == 'retried':
|
|
|
+ state, self.retried = RETRY, timestamp
|
|
|
+ elif type_ == 'succeeded':
|
|
|
+ state, self.succeeded = SUCCESS, timestamp
|
|
|
+ elif type_ == 'revoked':
|
|
|
+ state, self.revoked = REVOKED, timestamp
|
|
|
+ else:
|
|
|
+ state = type_.upper()
|
|
|
|
|
|
- def on_unknown_event(self, shortype, timestamp=None, **fields):
|
|
|
- self.update(shortype.upper(), timestamp, fields)
|
|
|
+ # note that precedence here is reversed
|
|
|
+ # see implementation in celery.states.state.__lt__
|
|
|
+ if state != RETRY and self.state != RETRY and \
|
|
|
+ precedence(state) > precedence(self.state):
|
|
|
+ # this state logically happens-before the current state, so merge.
|
|
|
+ keep = self.merge_rules.get(state)
|
|
|
+ if keep is not None:
|
|
|
+ fields = dict(
|
|
|
+ (k, v) for k, v in items(fields) if k in keep
|
|
|
+ )
|
|
|
+ for key, value in items(fields):
|
|
|
+ setattr(self, key, value)
|
|
|
+ else:
|
|
|
+ self.state = state
|
|
|
+ self.timestamp = timestamp
|
|
|
+ for key, value in items(fields):
|
|
|
+ setattr(self, key, value)
|
|
|
|
|
|
def info(self, fields=None, extra=[]):
|
|
|
"""Information about this task suitable for on-screen display."""
|
|
@@ -267,6 +335,46 @@ class Task(AttributeDict):
|
|
|
def ready(self):
|
|
|
return self.state in states.READY_STATES
|
|
|
|
|
|
+ def on_sent(self, timestamp=None, **fields):
|
|
|
+ """Deprecated, to be removed in 3.2, use:
|
|
|
+ ``.event('sent', timestamp, fields)``."""
|
|
|
+ self.event('sent', timestamp, fields)
|
|
|
+
|
|
|
+ def on_received(self, timestamp=None, **fields):
|
|
|
+ """Deprecated, to be removed in 3.2, use:
|
|
|
+ ``.event('received', timestamp, fields)``."""
|
|
|
+ self.event('received', timestamp, fields)
|
|
|
+
|
|
|
+ def on_started(self, timestamp=None, **fields):
|
|
|
+ """Deprecated, to be removed in 3.2, use:
|
|
|
+ ``.event('started', timestamp, fields)``."""
|
|
|
+ self.event('started', timestamp, fields)
|
|
|
+
|
|
|
+ def on_failed(self, timestamp=None, **fields):
|
|
|
+ """Deprecated, to be removed in 3.2, use:
|
|
|
+ ``.event('failed', timestamp, fields)``."""
|
|
|
+ self.event('failed', timestamp, fields)
|
|
|
+
|
|
|
+ def on_retried(self, timestamp=None, **fields):
|
|
|
+ """Deprecated, to be removed in 3.2, use:
|
|
|
+ ``.event('retried', timestamp, fields)``."""
|
|
|
+ self.event('retried', timestamp, fields)
|
|
|
+
|
|
|
+ def on_succeeded(self, timestamp=None, **fields):
|
|
|
+ """Deprecated, to be removed in 3.2, use:
|
|
|
+ ``.event('succeeded', timestamp, fields)``."""
|
|
|
+ self.event('succeeded', timestamp, fields)
|
|
|
+
|
|
|
+ def on_revoked(self, timestamp=None, **fields):
|
|
|
+ """Deprecated, to be removed in 3.2, use:
|
|
|
+ ``.event('revoked', timestamp, fields)``."""
|
|
|
+ self.event('revoked', timestamp, fields)
|
|
|
+
|
|
|
+ def on_unknown_event(self, shortype, timestamp=None, **fields):
|
|
|
+ """Deprecated, to be removed in 3.2, use:
|
|
|
+ ``.event(type, timestamp, fields)``."""
|
|
|
+ self.event(shortype, timestamp, fields)
|
|
|
+
|
|
|
|
|
|
class State(object):
|
|
|
"""Records clusters state."""
|
|
@@ -285,9 +393,8 @@ class State(object):
|
|
|
self.max_workers_in_memory = max_workers_in_memory
|
|
|
self.max_tasks_in_memory = max_tasks_in_memory
|
|
|
self._mutex = threading.Lock()
|
|
|
- self.handlers = {'task': self.task_event,
|
|
|
- 'worker': self.worker_event}
|
|
|
- self._get_handler = self.handlers.__getitem__
|
|
|
+ self.handlers = {}
|
|
|
+ self._event = self._create_dispatcher()
|
|
|
|
|
|
def freeze_while(self, fun, *args, **kwargs):
|
|
|
clear_after = kwargs.pop('clear_after', False)
|
|
@@ -330,7 +437,8 @@ class State(object):
|
|
|
"""
|
|
|
try:
|
|
|
worker = self.workers[hostname]
|
|
|
- worker.update(kwargs)
|
|
|
+ if kwargs:
|
|
|
+ worker.update(kwargs)
|
|
|
return worker, False
|
|
|
except KeyError:
|
|
|
worker = self.workers[hostname] = Worker(
|
|
@@ -345,61 +453,72 @@ class State(object):
|
|
|
task = self.tasks[uuid] = Task(uuid=uuid)
|
|
|
return task, True
|
|
|
|
|
|
- def worker_event(self, type, fields):
|
|
|
- """Process worker event."""
|
|
|
- try:
|
|
|
- hostname = fields['hostname']
|
|
|
- except KeyError:
|
|
|
- pass
|
|
|
- else:
|
|
|
- worker, created = self.get_or_create_worker(hostname)
|
|
|
- handler = getattr(worker, 'on_' + type, None)
|
|
|
- if handler:
|
|
|
- handler(**(fields if CAN_KWDICT else kwdict(fields)))
|
|
|
- return worker, created
|
|
|
-
|
|
|
- def task_event(self, type, fields, timetuple=timetuple):
|
|
|
- """Process task event."""
|
|
|
- uuid = fields['uuid']
|
|
|
- hostname = fields['hostname']
|
|
|
- # task-sent event is sent by client, not worker
|
|
|
- is_client_event = type == 'sent'
|
|
|
- task, created = self.get_or_create_task(uuid)
|
|
|
- if not is_client_event:
|
|
|
- worker, _ = self.get_or_create_worker(hostname)
|
|
|
- task.worker = worker
|
|
|
- maxtasks = self.max_tasks_in_memory * 2
|
|
|
-
|
|
|
- taskheap = self._taskheap
|
|
|
- timestamp = fields.get('timestamp') or 0
|
|
|
- clock = 0 if is_client_event else fields.get('clock')
|
|
|
- heappush(taskheap, timetuple(clock, timestamp, worker.id, task))
|
|
|
- if len(taskheap) > maxtasks:
|
|
|
- heappop(taskheap)
|
|
|
-
|
|
|
- handler = getattr(task, 'on_' + type, None)
|
|
|
- if type == 'received':
|
|
|
- self.task_count += 1
|
|
|
- if handler:
|
|
|
- handler(**fields)
|
|
|
- else:
|
|
|
- task.on_unknown_event(type, **fields)
|
|
|
- return created
|
|
|
-
|
|
|
def event(self, event):
|
|
|
with self._mutex:
|
|
|
- return self._dispatch_event(event)
|
|
|
+ return self._event(event)
|
|
|
+
|
|
|
+ def task_event(self, type_, fields):
|
|
|
+ """Deprecated, use :meth:`event`."""
|
|
|
+ return self._event(dict(fields, type='-'.join(['task', type_])))
|
|
|
+
|
|
|
+ def worker_event(self, type_, fields):
|
|
|
+ """Deprecated, use :meth:`event`."""
|
|
|
+ return self._event(dict(fields, type='-'.join(['worker', type_])))
|
|
|
+
|
|
|
+ def _create_dispatcher(self):
|
|
|
+ get_handler = self.handlers.__getitem__
|
|
|
+ event_callback = self.event_callback
|
|
|
+ get_or_create_worker = self.get_or_create_worker
|
|
|
+ get_or_create_task = self.get_or_create_task
|
|
|
+ wfields = itemgetter('hostname', 'timestamp', 'local_received')
|
|
|
+ tfields = itemgetter('uuid', 'hostname', 'timestamp', 'local_received')
|
|
|
+ taskheap = self._taskheap
|
|
|
+ maxtasks = self.max_tasks_in_memory * 2
|
|
|
|
|
|
- def _dispatch_event(self, event, kwdict=kwdict):
|
|
|
- self.event_count += 1
|
|
|
- event = kwdict(event)
|
|
|
- group, _, subject = event['type'].partition('-')
|
|
|
- try:
|
|
|
- self._get_handler(group)(subject, event)
|
|
|
- except KeyError:
|
|
|
- pass
|
|
|
- if self.event_callback:
|
|
|
- self.event_callback(self, event)
|
|
|
+ def _event(event, timetuple=timetuple):
|
|
|
+ self.event_count += 1
|
|
|
+ if event_callback:
|
|
|
+ event_callback(self, event)
|
|
|
+ group, _, subject = event['type'].partition('-')
|
|
|
+ try:
|
|
|
+ handler = get_handler(group)
|
|
|
+ except KeyError:
|
|
|
+ pass
|
|
|
+ else:
|
|
|
+ return handler(subject, event)
|
|
|
+
|
|
|
+ if group == 'worker':
|
|
|
+ try:
|
|
|
+ hostname, timestamp, local_received = wfields(event)
|
|
|
+ except KeyError:
|
|
|
+ pass
|
|
|
+ else:
|
|
|
+ worker, created = get_or_create_worker(hostname)
|
|
|
+ worker.event(subject, timestamp, local_received, event)
|
|
|
+ return created
|
|
|
+ elif group == 'task':
|
|
|
+ uuid, hostname, timestamp, local_received = tfields(event)
|
|
|
+ # task-sent event is sent by client, not worker
|
|
|
+ is_client_event = subject == 'sent'
|
|
|
+ task, created = get_or_create_task(uuid)
|
|
|
+ if not is_client_event:
|
|
|
+ worker, _ = get_or_create_worker(hostname)
|
|
|
+ task.worker = worker
|
|
|
+ if worker is not None and local_received:
|
|
|
+ worker.event(None, local_received, timestamp)
|
|
|
+ clock = 0 if is_client_event else event.get('clock')
|
|
|
+ heappush(
|
|
|
+ taskheap, timetuple(clock, timestamp, worker.id, task),
|
|
|
+ )
|
|
|
+ while len(taskheap) > maxtasks:
|
|
|
+ heappop(taskheap)
|
|
|
+ #if len(taskheap) > maxtasks:
|
|
|
+ # heappop(taskheap)
|
|
|
+ if subject == 'received':
|
|
|
+ self.task_count += 1
|
|
|
+ task.event(subject, timestamp, local_received, event)
|
|
|
+ return created
|
|
|
+ return _event
|
|
|
|
|
|
def itertasks(self, limit=None):
|
|
|
for index, row in enumerate(items(self.tasks)):
|