state.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. # -*- coding: utf-8 -*-
  2. """Internal worker state (global)
  3. This includes the currently active and reserved tasks,
  4. statistics, and revoked tasks.
  5. """
  6. import os
  7. import sys
  8. import platform
  9. import shelve
  10. import weakref
  11. import zlib
  12. from collections import Counter
  13. from kombu.serialization import pickle, pickle_protocol
  14. from kombu.utils.objects import cached_property
  15. from celery import __version__
  16. from celery.exceptions import WorkerShutdown, WorkerTerminate
  17. from celery.utils.collections import LimitedSet
  18. __all__ = [
  19. 'SOFTWARE_INFO', 'reserved_requests', 'active_requests',
  20. 'total_count', 'revoked', 'task_reserved', 'maybe_shutdown',
  21. 'task_accepted', 'task_reserved', 'task_ready', 'Persistent',
  22. ]
  23. #: Worker software/platform information.
  24. SOFTWARE_INFO = {
  25. 'sw_ident': 'py-celery',
  26. 'sw_ver': __version__,
  27. 'sw_sys': platform.system(),
  28. }
  29. #: maximum number of revokes to keep in memory.
  30. REVOKES_MAX = 50000
  31. #: how many seconds a revoke will be active before
  32. #: being expired when the max limit has been exceeded.
  33. REVOKE_EXPIRES = 10800
  34. #: Mapping of reserved task_id->Request.
  35. requests = {}
  36. #: set of all reserved :class:`~celery.worker.request.Request`'s.
  37. reserved_requests = weakref.WeakSet()
  38. #: set of currently active :class:`~celery.worker.request.Request`'s.
  39. active_requests = weakref.WeakSet()
  40. #: count of tasks accepted by the worker, sorted by type.
  41. total_count = Counter()
  42. #: count of all tasks accepted by the worker
  43. all_total_count = [0]
  44. #: the list of currently revoked tasks. Persistent if ``statedb`` set.
  45. revoked = LimitedSet(maxlen=REVOKES_MAX, expires=REVOKE_EXPIRES)
  46. should_stop = None
  47. should_terminate = None
  48. def reset_state():
  49. requests.clear()
  50. reserved_requests.clear()
  51. active_requests.clear()
  52. total_count.clear()
  53. all_total_count[:] = [0]
  54. revoked.clear()
  55. def maybe_shutdown():
  56. if should_stop is not None and should_stop is not False:
  57. raise WorkerShutdown(should_stop)
  58. elif should_terminate is not None and should_terminate is not False:
  59. raise WorkerTerminate(should_terminate)
  60. def task_reserved(request,
  61. add_request=requests.__setitem__,
  62. add_reserved_request=reserved_requests.add):
  63. """Update global state when a task has been reserved."""
  64. add_request(request.id, request)
  65. add_reserved_request(request)
  66. def task_accepted(request,
  67. _all_total_count=all_total_count,
  68. add_active_request=active_requests.add,
  69. add_to_total_count=total_count.update):
  70. """Updates global state when a task has been accepted."""
  71. add_active_request(request)
  72. add_to_total_count({request.name: 1})
  73. all_total_count[0] += 1
  74. def task_ready(request,
  75. remove_request=requests.pop,
  76. discard_active_request=active_requests.discard,
  77. discard_reserved_request=reserved_requests.discard):
  78. """Updates global state when a task is ready."""
  79. remove_request(request.id, None)
  80. discard_active_request(request)
  81. discard_reserved_request(request)
  82. C_BENCH = os.environ.get('C_BENCH') or os.environ.get('CELERY_BENCH')
  83. C_BENCH_EVERY = int(os.environ.get('C_BENCH_EVERY') or
  84. os.environ.get('CELERY_BENCH_EVERY') or 1000)
  85. if C_BENCH: # pragma: no cover
  86. import atexit
  87. from time import monotonic
  88. from billiard.process import current_process
  89. from celery.utils.debug import memdump, sample_mem
  90. all_count = 0
  91. bench_first = None
  92. bench_start = None
  93. bench_last = None
  94. bench_every = C_BENCH_EVERY
  95. bench_sample = []
  96. __reserved = task_reserved
  97. __ready = task_ready
  98. if current_process()._name == 'MainProcess':
  99. @atexit.register
  100. def on_shutdown():
  101. if bench_first is not None and bench_last is not None:
  102. print('- Time spent in benchmark: {0!r}'.format(
  103. bench_last - bench_first))
  104. print('- Avg: {0}'.format(
  105. sum(bench_sample) / len(bench_sample)))
  106. memdump()
  107. def task_reserved(request): # noqa
  108. global bench_start
  109. global bench_first
  110. now = None
  111. if bench_start is None:
  112. bench_start = now = monotonic()
  113. if bench_first is None:
  114. bench_first = now
  115. return __reserved(request)
  116. def task_ready(request): # noqa
  117. global all_count
  118. global bench_start
  119. global bench_last
  120. all_count += 1
  121. if not all_count % bench_every:
  122. now = monotonic()
  123. diff = now - bench_start
  124. print('- Time spent processing {0} tasks (since first '
  125. 'task received): ~{1:.4f}s\n'.format(bench_every, diff))
  126. sys.stdout.flush()
  127. bench_start = bench_last = now
  128. bench_sample.append(diff)
  129. sample_mem()
  130. return __ready(request)
  131. class Persistent:
  132. """This is the persistent data stored by the worker when
  133. :option:`celery worker --statedb` is enabled.
  134. Currently only stores revoked task id's.
  135. """
  136. storage = shelve
  137. protocol = pickle_protocol
  138. compress = zlib.compress
  139. decompress = zlib.decompress
  140. _is_open = False
  141. def __init__(self, state, filename, clock=None):
  142. self.state = state
  143. self.filename = filename
  144. self.clock = clock
  145. self.merge()
  146. def open(self):
  147. return self.storage.open(
  148. self.filename, protocol=self.protocol, writeback=True,
  149. )
  150. def merge(self):
  151. self._merge_with(self.db)
  152. def sync(self):
  153. self._sync_with(self.db)
  154. self.db.sync()
  155. def close(self):
  156. if self._is_open:
  157. self.db.close()
  158. self._is_open = False
  159. def save(self):
  160. self.sync()
  161. self.close()
  162. def _merge_with(self, d):
  163. self._merge_revoked(d)
  164. self._merge_clock(d)
  165. return d
  166. def _sync_with(self, d):
  167. self._revoked_tasks.purge()
  168. d.update(
  169. __proto__=3,
  170. zrevoked=self.compress(self._dumps(self._revoked_tasks)),
  171. clock=self.clock.forward() if self.clock else 0,
  172. )
  173. return d
  174. def _merge_clock(self, d):
  175. if self.clock:
  176. d['clock'] = self.clock.adjust(d.get('clock') or 0)
  177. def _merge_revoked(self, d):
  178. try:
  179. self._merge_revoked_v3(d['zrevoked'])
  180. except KeyError:
  181. try:
  182. self._merge_revoked_v2(d.pop('revoked'))
  183. except KeyError:
  184. pass
  185. # purge expired items at boot
  186. self._revoked_tasks.purge()
  187. def _merge_revoked_v3(self, zrevoked):
  188. if zrevoked:
  189. self._revoked_tasks.update(pickle.loads(self.decompress(zrevoked)))
  190. def _merge_revoked_v2(self, saved):
  191. if not isinstance(saved, LimitedSet):
  192. # (pre 3.0.18) used to be stored as a dict
  193. return self._merge_revoked_v1(saved)
  194. self._revoked_tasks.update(saved)
  195. def _merge_revoked_v1(self, saved):
  196. add = self._revoked_tasks.add
  197. for item in saved:
  198. add(item)
  199. def _dumps(self, obj):
  200. return pickle.dumps(obj, protocol=self.protocol)
  201. @property
  202. def _revoked_tasks(self):
  203. return self.state.revoked
  204. @cached_property
  205. def db(self):
  206. self._is_open = True
  207. return self.open()