state.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299
  1. import time
  2. import heapq
  3. from threading import Lock
  4. from carrot.utils import partition
  5. from celery import states
  6. from celery.datastructures import AttributeDict, LocalCache
  7. from celery.utils import kwdict
  8. HEARTBEAT_EXPIRE = 150 # 2 minutes, 30 seconds
  9. class Element(AttributeDict):
  10. """Base class for types."""
  11. visited = False
  12. def __init__(self, **fields):
  13. dict.__init__(self, fields)
  14. class Worker(Element):
  15. """Worker State."""
  16. heartbeat_max = 4
  17. def __init__(self, **fields):
  18. super(Worker, self).__init__(**fields)
  19. self.heartbeats = []
  20. def on_online(self, timestamp=None, **kwargs):
  21. self._heartpush(timestamp)
  22. def on_offline(self, **kwargs):
  23. self.heartbeats = []
  24. def on_heartbeat(self, timestamp=None, **kwargs):
  25. self._heartpush(timestamp)
  26. def _heartpush(self, timestamp):
  27. if timestamp:
  28. heapq.heappush(self.heartbeats, timestamp)
  29. if len(self.heartbeats) > self.heartbeat_max:
  30. self.heartbeats = self.heartbeats[:self.heartbeat_max]
  31. def __repr__(self):
  32. return "<Worker: %s (%s)" % (self.hostname,
  33. self.alive and "ONLINE" or "OFFLINE")
  34. @property
  35. def alive(self):
  36. return (self.heartbeats and
  37. time.time() < self.heartbeats[-1] + HEARTBEAT_EXPIRE)
  38. class Task(Element):
  39. """Task State."""
  40. _info_fields = ("args", "kwargs", "retries",
  41. "result", "eta", "runtime", "expires",
  42. "exception")
  43. merge_rules = {states.RECEIVED: ("name", "args", "kwargs",
  44. "retries", "eta", "expires")}
  45. _defaults = dict(uuid=None,
  46. name=None,
  47. state=states.PENDING,
  48. received=False,
  49. started=False,
  50. succeeded=False,
  51. failed=False,
  52. retried=False,
  53. revoked=False,
  54. args=None,
  55. kwargs=None,
  56. eta=None,
  57. expires=None,
  58. retries=None,
  59. worker=None,
  60. result=None,
  61. exception=None,
  62. timestamp=None,
  63. runtime=None,
  64. traceback=None)
  65. def __init__(self, **fields):
  66. super(Task, self).__init__(**dict(self._defaults, **fields))
  67. def info(self, fields=None, extra=[]):
  68. if fields is None:
  69. fields = self._info_fields
  70. fields = list(fields) + list(extra)
  71. return dict((key, getattr(self, key, None))
  72. for key in fields
  73. if getattr(self, key, None) is not None)
  74. def update(self, state, timestamp, fields):
  75. if self.worker:
  76. self.worker.on_heartbeat(timestamp=timestamp)
  77. if state != states.RETRY and self.state != states.RETRY and \
  78. states.state(state) < states.state(self.state):
  79. self.merge(state, timestamp, fields)
  80. else:
  81. self.state = state
  82. self.timestamp = timestamp
  83. super(Task, self).update(fields)
  84. def merge(self, state, timestamp, fields):
  85. keep = self.merge_rules.get(state)
  86. if keep is not None:
  87. fields = dict((key, fields[key]) for key in keep)
  88. super(Task, self).update(fields)
  89. def on_received(self, timestamp=None, **fields):
  90. self.received = timestamp
  91. self.update(states.RECEIVED, timestamp, fields)
  92. def on_started(self, timestamp=None, **fields):
  93. self.started = timestamp
  94. self.update(states.STARTED, timestamp, fields)
  95. def on_failed(self, timestamp=None, **fields):
  96. self.failed = timestamp
  97. self.update(states.FAILURE, timestamp, fields)
  98. def on_retried(self, timestamp=None, **fields):
  99. self.retried = timestamp
  100. self.update(states.RETRY, timestamp, fields)
  101. def on_succeeded(self, timestamp=None, **fields):
  102. self.succeeded = timestamp
  103. self.update(states.SUCCESS, timestamp, fields)
  104. def on_revoked(self, timestamp=None, **fields):
  105. self.revoked = timestamp
  106. self.update(states.REVOKED, timestamp, fields)
  107. def __repr__(self):
  108. return "<Task: %s(%s) %s>" % (self.name, self.uuid, self.state)
  109. @property
  110. def ready(self):
  111. return self.state in states.READY_STATES
  112. class State(object):
  113. """Records clusters state."""
  114. event_count = 0
  115. task_count = 0
  116. def __init__(self, callback=None,
  117. max_workers_in_memory=5000, max_tasks_in_memory=10000):
  118. self.workers = LocalCache(max_workers_in_memory)
  119. self.tasks = LocalCache(max_tasks_in_memory)
  120. self.event_callback = callback
  121. self.group_handlers = {"worker": self.worker_event,
  122. "task": self.task_event}
  123. self._mutex = Lock()
  124. def freeze_while(self, fun, *args, **kwargs):
  125. clear_after = kwargs.pop("clear_after", False)
  126. self._mutex.acquire()
  127. try:
  128. return fun(*args, **kwargs)
  129. finally:
  130. if clear_after:
  131. self._clear()
  132. self._mutex.release()
  133. def clear_tasks(self, ready=True):
  134. self._mutex.acquire()
  135. try:
  136. return self._clear_tasks(ready)
  137. finally:
  138. self._mutex.release()
  139. def _clear_tasks(self, ready=True):
  140. if ready:
  141. self.tasks = dict((uuid, task)
  142. for uuid, task in self.tasks.items()
  143. if task.state not in states.READY_STATES)
  144. else:
  145. self.tasks.clear()
  146. def _clear(self, ready=True):
  147. self.workers.clear()
  148. self._clear_tasks(ready)
  149. self.event_count = 0
  150. self.task_count = 0
  151. def clear(self, ready=True):
  152. self._mutex.acquire()
  153. try:
  154. return self._clear(ready)
  155. finally:
  156. self._mutex.release()
  157. def get_or_create_worker(self, hostname, **kwargs):
  158. """Get or create worker by hostname."""
  159. try:
  160. worker = self.workers[hostname]
  161. worker.update(kwargs)
  162. except KeyError:
  163. worker = self.workers[hostname] = Worker(
  164. hostname=hostname, **kwargs)
  165. return worker
  166. def get_or_create_task(self, uuid):
  167. """Get or create task by uuid."""
  168. try:
  169. return self.tasks[uuid]
  170. except KeyError:
  171. task = self.tasks[uuid] = Task(uuid=uuid)
  172. return task
  173. def worker_event(self, type, fields):
  174. """Process worker event."""
  175. hostname = fields.pop("hostname", None)
  176. if hostname:
  177. worker = self.get_or_create_worker(hostname)
  178. handler = getattr(worker, "on_%s" % type, None)
  179. if handler:
  180. handler(**fields)
  181. def task_event(self, type, fields):
  182. """Process task event."""
  183. uuid = fields.pop("uuid")
  184. hostname = fields.pop("hostname")
  185. worker = self.get_or_create_worker(hostname)
  186. task = self.get_or_create_task(uuid)
  187. handler = getattr(task, "on_%s" % type, None)
  188. if type == "received":
  189. self.task_count += 1
  190. if handler:
  191. handler(**fields)
  192. task.worker = worker
  193. def event(self, event):
  194. self._mutex.acquire()
  195. try:
  196. return self._dispatch_event(event)
  197. finally:
  198. self._mutex.release()
  199. def _dispatch_event(self, event):
  200. self.event_count += 1
  201. event = kwdict(event)
  202. group, _, type = partition(event.pop("type"), "-")
  203. self.group_handlers[group](type, event)
  204. if self.event_callback:
  205. self.event_callback(self, event)
  206. def tasks_by_timestamp(self, limit=None):
  207. """Get tasks by timestamp.
  208. Returns a list of ``(uuid, task)`` tuples.
  209. """
  210. return self._sort_tasks_by_time(self.tasks.items()[:limit])
  211. def _sort_tasks_by_time(self, tasks):
  212. """Sort task items by time."""
  213. return sorted(tasks, key=lambda t: t[1].timestamp,
  214. reverse=True)
  215. def tasks_by_type(self, name, limit=None):
  216. """Get all tasks by type.
  217. Returns a list of ``(uuid, task)`` tuples.
  218. """
  219. return self._sort_tasks_by_time([(uuid, task)
  220. for uuid, task in self.tasks.items()[:limit]
  221. if task.name == name])
  222. def tasks_by_worker(self, hostname, limit=None):
  223. """Get all tasks by worker.
  224. Returns a list of ``(uuid, task)`` tuples.
  225. """
  226. return self._sort_tasks_by_time([(uuid, task)
  227. for uuid, task in self.tasks.items()[:limit]
  228. if task.worker.hostname == hostname])
  229. def task_types(self):
  230. """Returns a list of all seen task types."""
  231. return list(sorted(set(task.name for task in self.tasks.values())))
  232. def alive_workers(self):
  233. """Returns a list of (seemingly) alive workers."""
  234. return [w for w in self.workers.values() if w.alive]
  235. def __repr__(self):
  236. return "<ClusterState: events=%s tasks=%s>" % (self.event_count,
  237. self.task_count)
  238. state = State()