123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299 |
- import time
- import heapq
- from threading import Lock
- from carrot.utils import partition
- from celery import states
- from celery.datastructures import AttributeDict, LocalCache
- from celery.utils import kwdict
- HEARTBEAT_EXPIRE = 150
- class Element(AttributeDict):
- """Base class for types."""
- visited = False
- def __init__(self, **fields):
- dict.__init__(self, fields)
- class Worker(Element):
- """Worker State."""
- heartbeat_max = 4
- def __init__(self, **fields):
- super(Worker, self).__init__(**fields)
- self.heartbeats = []
- def on_online(self, timestamp=None, **kwargs):
- self._heartpush(timestamp)
- def on_offline(self, **kwargs):
- self.heartbeats = []
- def on_heartbeat(self, timestamp=None, **kwargs):
- self._heartpush(timestamp)
- def _heartpush(self, timestamp):
- if timestamp:
- heapq.heappush(self.heartbeats, timestamp)
- if len(self.heartbeats) > self.heartbeat_max:
- self.heartbeats = self.heartbeats[:self.heartbeat_max]
- def __repr__(self):
- return "<Worker: %s (%s)" % (self.hostname,
- self.alive and "ONLINE" or "OFFLINE")
- @property
- def alive(self):
- return (self.heartbeats and
- time.time() < self.heartbeats[-1] + HEARTBEAT_EXPIRE)
- class Task(Element):
- """Task State."""
- _info_fields = ("args", "kwargs", "retries",
- "result", "eta", "runtime", "expires",
- "exception")
- merge_rules = {states.RECEIVED: ("name", "args", "kwargs",
- "retries", "eta", "expires")}
- _defaults = dict(uuid=None,
- name=None,
- state=states.PENDING,
- received=False,
- started=False,
- succeeded=False,
- failed=False,
- retried=False,
- revoked=False,
- args=None,
- kwargs=None,
- eta=None,
- expires=None,
- retries=None,
- worker=None,
- result=None,
- exception=None,
- timestamp=None,
- runtime=None,
- traceback=None)
- def __init__(self, **fields):
- super(Task, self).__init__(**dict(self._defaults, **fields))
- def info(self, fields=None, extra=[]):
- if fields is None:
- fields = self._info_fields
- fields = list(fields) + list(extra)
- return dict((key, getattr(self, key, None))
- for key in fields
- if getattr(self, key, None) is not None)
- def update(self, state, timestamp, fields):
- if self.worker:
- self.worker.on_heartbeat(timestamp=timestamp)
- if state != states.RETRY and self.state != states.RETRY and \
- states.state(state) < states.state(self.state):
- self.merge(state, timestamp, fields)
- else:
- self.state = state
- self.timestamp = timestamp
- super(Task, self).update(fields)
- def merge(self, state, timestamp, fields):
- keep = self.merge_rules.get(state)
- if keep is not None:
- fields = dict((key, fields[key]) for key in keep)
- super(Task, self).update(fields)
- def on_received(self, timestamp=None, **fields):
- self.received = timestamp
- self.update(states.RECEIVED, timestamp, fields)
- def on_started(self, timestamp=None, **fields):
- self.started = timestamp
- self.update(states.STARTED, timestamp, fields)
- def on_failed(self, timestamp=None, **fields):
- self.failed = timestamp
- self.update(states.FAILURE, timestamp, fields)
- def on_retried(self, timestamp=None, **fields):
- self.retried = timestamp
- self.update(states.RETRY, timestamp, fields)
- def on_succeeded(self, timestamp=None, **fields):
- self.succeeded = timestamp
- self.update(states.SUCCESS, timestamp, fields)
- def on_revoked(self, timestamp=None, **fields):
- self.revoked = timestamp
- self.update(states.REVOKED, timestamp, fields)
- def __repr__(self):
- return "<Task: %s(%s) %s>" % (self.name, self.uuid, self.state)
- @property
- def ready(self):
- return self.state in states.READY_STATES
- class State(object):
- """Records clusters state."""
- event_count = 0
- task_count = 0
- def __init__(self, callback=None,
- max_workers_in_memory=5000, max_tasks_in_memory=10000):
- self.workers = LocalCache(max_workers_in_memory)
- self.tasks = LocalCache(max_tasks_in_memory)
- self.event_callback = callback
- self.group_handlers = {"worker": self.worker_event,
- "task": self.task_event}
- self._mutex = Lock()
- def freeze_while(self, fun, *args, **kwargs):
- clear_after = kwargs.pop("clear_after", False)
- self._mutex.acquire()
- try:
- return fun(*args, **kwargs)
- finally:
- if clear_after:
- self._clear()
- self._mutex.release()
- def clear_tasks(self, ready=True):
- self._mutex.acquire()
- try:
- return self._clear_tasks(ready)
- finally:
- self._mutex.release()
- def _clear_tasks(self, ready=True):
- if ready:
- self.tasks = dict((uuid, task)
- for uuid, task in self.tasks.items()
- if task.state not in states.READY_STATES)
- else:
- self.tasks.clear()
- def _clear(self, ready=True):
- self.workers.clear()
- self._clear_tasks(ready)
- self.event_count = 0
- self.task_count = 0
- def clear(self, ready=True):
- self._mutex.acquire()
- try:
- return self._clear(ready)
- finally:
- self._mutex.release()
- def get_or_create_worker(self, hostname, **kwargs):
- """Get or create worker by hostname."""
- try:
- worker = self.workers[hostname]
- worker.update(kwargs)
- except KeyError:
- worker = self.workers[hostname] = Worker(
- hostname=hostname, **kwargs)
- return worker
- def get_or_create_task(self, uuid):
- """Get or create task by uuid."""
- try:
- return self.tasks[uuid]
- except KeyError:
- task = self.tasks[uuid] = Task(uuid=uuid)
- return task
- def worker_event(self, type, fields):
- """Process worker event."""
- hostname = fields.pop("hostname", None)
- if hostname:
- worker = self.get_or_create_worker(hostname)
- handler = getattr(worker, "on_%s" % type, None)
- if handler:
- handler(**fields)
- def task_event(self, type, fields):
- """Process task event."""
- uuid = fields.pop("uuid")
- hostname = fields.pop("hostname")
- worker = self.get_or_create_worker(hostname)
- task = self.get_or_create_task(uuid)
- handler = getattr(task, "on_%s" % type, None)
- if type == "received":
- self.task_count += 1
- if handler:
- handler(**fields)
- task.worker = worker
- def event(self, event):
- self._mutex.acquire()
- try:
- return self._dispatch_event(event)
- finally:
- self._mutex.release()
- def _dispatch_event(self, event):
- self.event_count += 1
- event = kwdict(event)
- group, _, type = partition(event.pop("type"), "-")
- self.group_handlers[group](type, event)
- if self.event_callback:
- self.event_callback(self, event)
- def tasks_by_timestamp(self, limit=None):
- """Get tasks by timestamp.
- Returns a list of ``(uuid, task)`` tuples.
- """
- return self._sort_tasks_by_time(self.tasks.items()[:limit])
- def _sort_tasks_by_time(self, tasks):
- """Sort task items by time."""
- return sorted(tasks, key=lambda t: t[1].timestamp,
- reverse=True)
- def tasks_by_type(self, name, limit=None):
- """Get all tasks by type.
- Returns a list of ``(uuid, task)`` tuples.
- """
- return self._sort_tasks_by_time([(uuid, task)
- for uuid, task in self.tasks.items()[:limit]
- if task.name == name])
- def tasks_by_worker(self, hostname, limit=None):
- """Get all tasks by worker.
- Returns a list of ``(uuid, task)`` tuples.
- """
- return self._sort_tasks_by_time([(uuid, task)
- for uuid, task in self.tasks.items()[:limit]
- if task.worker.hostname == hostname])
- def task_types(self):
- """Returns a list of all seen task types."""
- return list(sorted(set(task.name for task in self.tasks.values())))
- def alive_workers(self):
- """Returns a list of (seemingly) alive workers."""
- return [w for w in self.workers.values() if w.alive]
- def __repr__(self):
- return "<ClusterState: events=%s tasks=%s>" % (self.event_count,
- self.task_count)
- state = State()
|