state.py 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304
  1. import time
  2. import heapq
  3. from threading import Lock
  4. from kombu.utils import partition
  5. from celery import states
  6. from celery.datastructures import AttributeDict, LocalCache
  7. from celery.utils import kwdict
  8. HEARTBEAT_EXPIRE = 150 # 2 minutes, 30 seconds
  9. class Element(AttributeDict):
  10. """Base class for types."""
  11. visited = False
  12. def __init__(self, **fields):
  13. dict.__init__(self, fields)
  14. class Worker(Element):
  15. """Worker State."""
  16. heartbeat_max = 4
  17. def __init__(self, **fields):
  18. super(Worker, self).__init__(**fields)
  19. self.heartbeats = []
  20. def on_online(self, timestamp=None, **kwargs):
  21. self._heartpush(timestamp)
  22. def on_offline(self, **kwargs):
  23. self.heartbeats = []
  24. def on_heartbeat(self, timestamp=None, **kwargs):
  25. self._heartpush(timestamp)
  26. def _heartpush(self, timestamp):
  27. if timestamp:
  28. heapq.heappush(self.heartbeats, timestamp)
  29. if len(self.heartbeats) > self.heartbeat_max:
  30. self.heartbeats = self.heartbeats[:self.heartbeat_max]
  31. def __repr__(self):
  32. return "<Worker: %s (%s)" % (self.hostname,
  33. self.alive and "ONLINE" or "OFFLINE")
  34. @property
  35. def alive(self):
  36. return (self.heartbeats and
  37. time.time() < self.heartbeats[-1] + HEARTBEAT_EXPIRE)
  38. class Task(Element):
  39. """Task State."""
  40. _info_fields = ("args", "kwargs", "retries",
  41. "result", "eta", "runtime", "expires",
  42. "exception")
  43. merge_rules = {states.RECEIVED: ("name", "args", "kwargs",
  44. "retries", "eta", "expires")}
  45. _defaults = dict(uuid=None,
  46. name=None,
  47. state=states.PENDING,
  48. received=False,
  49. sent=False,
  50. started=False,
  51. succeeded=False,
  52. failed=False,
  53. retried=False,
  54. revoked=False,
  55. args=None,
  56. kwargs=None,
  57. eta=None,
  58. expires=None,
  59. retries=None,
  60. worker=None,
  61. result=None,
  62. exception=None,
  63. timestamp=None,
  64. runtime=None,
  65. traceback=None)
  66. def __init__(self, **fields):
  67. super(Task, self).__init__(**dict(self._defaults, **fields))
  68. def info(self, fields=None, extra=[]):
  69. if fields is None:
  70. fields = self._info_fields
  71. fields = list(fields) + list(extra)
  72. return dict((key, getattr(self, key, None))
  73. for key in fields
  74. if getattr(self, key, None) is not None)
  75. def update(self, state, timestamp, fields):
  76. if self.worker:
  77. self.worker.on_heartbeat(timestamp=timestamp)
  78. if state != states.RETRY and self.state != states.RETRY and \
  79. states.state(state) < states.state(self.state):
  80. self.merge(state, timestamp, fields)
  81. else:
  82. self.state = state
  83. self.timestamp = timestamp
  84. super(Task, self).update(fields)
  85. def merge(self, state, timestamp, fields):
  86. keep = self.merge_rules.get(state)
  87. if keep is not None:
  88. fields = dict((key, fields.get(key)) for key in keep)
  89. super(Task, self).update(fields)
  90. def on_sent(self, timestamp=None, **fields):
  91. self.sent = timestamp
  92. self.update(states.PENDING, timestamp, fields)
  93. def on_received(self, timestamp=None, **fields):
  94. self.received = timestamp
  95. self.update(states.RECEIVED, timestamp, fields)
  96. def on_started(self, timestamp=None, **fields):
  97. self.started = timestamp
  98. self.update(states.STARTED, timestamp, fields)
  99. def on_failed(self, timestamp=None, **fields):
  100. self.failed = timestamp
  101. self.update(states.FAILURE, timestamp, fields)
  102. def on_retried(self, timestamp=None, **fields):
  103. self.retried = timestamp
  104. self.update(states.RETRY, timestamp, fields)
  105. def on_succeeded(self, timestamp=None, **fields):
  106. self.succeeded = timestamp
  107. self.update(states.SUCCESS, timestamp, fields)
  108. def on_revoked(self, timestamp=None, **fields):
  109. self.revoked = timestamp
  110. self.update(states.REVOKED, timestamp, fields)
  111. def __repr__(self):
  112. return "<Task: %s(%s) %s>" % (self.name, self.uuid, self.state)
  113. @property
  114. def ready(self):
  115. return self.state in states.READY_STATES
  116. class State(object):
  117. """Records clusters state."""
  118. event_count = 0
  119. task_count = 0
  120. def __init__(self, callback=None,
  121. max_workers_in_memory=5000, max_tasks_in_memory=10000):
  122. self.workers = LocalCache(max_workers_in_memory)
  123. self.tasks = LocalCache(max_tasks_in_memory)
  124. self.event_callback = callback
  125. self.group_handlers = {"worker": self.worker_event,
  126. "task": self.task_event}
  127. self._mutex = Lock()
  128. def freeze_while(self, fun, *args, **kwargs):
  129. clear_after = kwargs.pop("clear_after", False)
  130. self._mutex.acquire()
  131. try:
  132. return fun(*args, **kwargs)
  133. finally:
  134. if clear_after:
  135. self._clear()
  136. self._mutex.release()
  137. def clear_tasks(self, ready=True):
  138. self._mutex.acquire()
  139. try:
  140. return self._clear_tasks(ready)
  141. finally:
  142. self._mutex.release()
  143. def _clear_tasks(self, ready=True):
  144. if ready:
  145. self.tasks = dict((uuid, task)
  146. for uuid, task in self.tasks.items()
  147. if task.state not in states.READY_STATES)
  148. else:
  149. self.tasks.clear()
  150. def _clear(self, ready=True):
  151. self.workers.clear()
  152. self._clear_tasks(ready)
  153. self.event_count = 0
  154. self.task_count = 0
  155. def clear(self, ready=True):
  156. self._mutex.acquire()
  157. try:
  158. return self._clear(ready)
  159. finally:
  160. self._mutex.release()
  161. def get_or_create_worker(self, hostname, **kwargs):
  162. """Get or create worker by hostname."""
  163. try:
  164. worker = self.workers[hostname]
  165. worker.update(kwargs)
  166. except KeyError:
  167. worker = self.workers[hostname] = Worker(
  168. hostname=hostname, **kwargs)
  169. return worker
  170. def get_or_create_task(self, uuid):
  171. """Get or create task by uuid."""
  172. try:
  173. return self.tasks[uuid]
  174. except KeyError:
  175. task = self.tasks[uuid] = Task(uuid=uuid)
  176. return task
  177. def worker_event(self, type, fields):
  178. """Process worker event."""
  179. hostname = fields.pop("hostname", None)
  180. if hostname:
  181. worker = self.get_or_create_worker(hostname)
  182. handler = getattr(worker, "on_%s" % type, None)
  183. if handler:
  184. handler(**fields)
  185. def task_event(self, type, fields):
  186. """Process task event."""
  187. uuid = fields.pop("uuid")
  188. hostname = fields.pop("hostname")
  189. worker = self.get_or_create_worker(hostname)
  190. task = self.get_or_create_task(uuid)
  191. handler = getattr(task, "on_%s" % type, None)
  192. if type == "received":
  193. self.task_count += 1
  194. if handler:
  195. handler(**fields)
  196. task.worker = worker
  197. def event(self, event):
  198. self._mutex.acquire()
  199. try:
  200. return self._dispatch_event(event)
  201. finally:
  202. self._mutex.release()
  203. def _dispatch_event(self, event):
  204. self.event_count += 1
  205. event = kwdict(event)
  206. group, _, type = partition(event.pop("type"), "-")
  207. self.group_handlers[group](type, event)
  208. if self.event_callback:
  209. self.event_callback(self, event)
  210. def tasks_by_timestamp(self, limit=None):
  211. """Get tasks by timestamp.
  212. Returns a list of `(uuid, task)` tuples.
  213. """
  214. return self._sort_tasks_by_time(self.tasks.items()[:limit])
  215. def _sort_tasks_by_time(self, tasks):
  216. """Sort task items by time."""
  217. return sorted(tasks, key=lambda t: t[1].timestamp,
  218. reverse=True)
  219. def tasks_by_type(self, name, limit=None):
  220. """Get all tasks by type.
  221. Returns a list of `(uuid, task)` tuples.
  222. """
  223. return self._sort_tasks_by_time([(uuid, task)
  224. for uuid, task in self.tasks.items()[:limit]
  225. if task.name == name])
  226. def tasks_by_worker(self, hostname, limit=None):
  227. """Get all tasks by worker.
  228. Returns a list of `(uuid, task)` tuples.
  229. """
  230. return self._sort_tasks_by_time([(uuid, task)
  231. for uuid, task in self.tasks.items()[:limit]
  232. if task.worker.hostname == hostname])
  233. def task_types(self):
  234. """Returns a list of all seen task types."""
  235. return list(sorted(set(task.name for task in self.tasks.values())))
  236. def alive_workers(self):
  237. """Returns a list of (seemingly) alive workers."""
  238. return [w for w in self.workers.values() if w.alive]
  239. def __repr__(self):
  240. return "<ClusterState: events=%s tasks=%s>" % (self.event_count,
  241. self.task_count)
  242. state = State()