state.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.worker.state
  4. ~~~~~~~~~~~~~~~~~~~
  5. Internal worker state (global)
  6. This includes the currently active and reserved tasks,
  7. statistics, and revoked tasks.
  8. :copyright: (c) 2009 - 2012 by Ask Solem.
  9. :license: BSD, see LICENSE for more details.
  10. """
  11. from __future__ import absolute_import
  12. import os
  13. import platform
  14. import shelve
  15. from collections import defaultdict
  16. from celery import __version__
  17. from celery.datastructures import LimitedSet
  18. from celery.utils import cached_property
  19. #: Worker software/platform information.
  20. SOFTWARE_INFO = {"sw_ident": "celeryd",
  21. "sw_ver": __version__,
  22. "sw_sys": platform.system()}
  23. #: maximum number of revokes to keep in memory.
  24. REVOKES_MAX = 10000
  25. #: how many seconds a revoke will be active before
  26. #: being expired when the max limit has been exceeded.
  27. REVOKE_EXPIRES = 3600
  28. #: set of all reserved :class:`~celery.worker.job.Request`'s.
  29. reserved_requests = set()
  30. #: set of currently active :class:`~celery.worker.job.Request`'s.
  31. active_requests = set()
  32. #: count of tasks executed by the worker, sorted by type.
  33. total_count = defaultdict(lambda: 0)
  34. #: the list of currently revoked tasks. Persistent if statedb set.
  35. revoked = LimitedSet(maxlen=REVOKES_MAX, expires=REVOKE_EXPIRES)
  36. #: Updates global state when a task has been reserved.
  37. task_reserved = reserved_requests.add
  38. should_stop = False
  39. should_terminate = False
  40. def task_accepted(request):
  41. """Updates global state when a task has been accepted."""
  42. active_requests.add(request)
  43. total_count[request.name] += 1
  44. def task_ready(request):
  45. """Updates global state when a task is ready."""
  46. active_requests.discard(request)
  47. reserved_requests.discard(request)
  48. if os.environ.get("CELERY_BENCH"): # pragma: no cover
  49. import atexit
  50. from time import time
  51. from billiard import current_process
  52. from celery.utils.compat import format_d
  53. all_count = 0
  54. bench_first = None
  55. bench_mem_sample = []
  56. bench_start = None
  57. bench_last = None
  58. bench_every = int(os.environ.get("CELERY_BENCH_EVERY", 1000))
  59. bench_sample = []
  60. __reserved = task_reserved
  61. __ready = task_ready
  62. _process = None
  63. def ps():
  64. global _process
  65. if _process is None:
  66. try:
  67. from psutil import Process
  68. except ImportError:
  69. return None
  70. _process = Process(os.getpid())
  71. return _process
  72. def mem_rss():
  73. p = ps()
  74. if p is not None:
  75. return "%sMB" % (format_d(p.get_memory_info().rss // 1024), )
  76. def sample(x, n=10, k=0):
  77. j = len(x) // n
  78. for _ in xrange(n):
  79. yield x[k]
  80. k += j
  81. if current_process()._name == 'MainProcess':
  82. @atexit.register
  83. def on_shutdown():
  84. if bench_first is not None and bench_last is not None:
  85. print("- Time spent in benchmark: %r" % (
  86. bench_last - bench_first))
  87. print("- Avg: %s" % (sum(bench_sample) / len(bench_sample)))
  88. if filter(None, bench_mem_sample):
  89. print("- rss (sample):")
  90. for mem in sample(bench_mem_sample):
  91. print("- > %s," % mem)
  92. bench_mem_sample[:] = []
  93. bench_sample[:] = []
  94. import gc
  95. gc.collect()
  96. print("- rss (shutdown): %s." % (mem_rss()))
  97. else:
  98. print("- rss: (psutil not installed).")
  99. def task_reserved(request): # noqa
  100. global bench_start
  101. global bench_first
  102. now = None
  103. if bench_start is None:
  104. bench_start = now = time()
  105. if bench_first is None:
  106. bench_first = now
  107. return __reserved(request)
  108. import sys
  109. def task_ready(request): # noqa
  110. global all_count
  111. global bench_start
  112. global bench_last
  113. all_count += 1
  114. if not all_count % bench_every:
  115. now = time()
  116. diff = now - bench_start
  117. print("- Time spent processing %s tasks (since first "
  118. "task received): ~%.4fs\n" % (bench_every, diff))
  119. sys.stdout.flush()
  120. bench_start = bench_last = now
  121. bench_sample.append(diff)
  122. bench_mem_sample.append(mem_rss())
  123. return __ready(request)
  124. class Persistent(object):
  125. storage = shelve
  126. _is_open = False
  127. def __init__(self, filename):
  128. self.filename = filename
  129. self._load()
  130. def save(self):
  131. self.sync(self.db)
  132. self.db.sync()
  133. self.close()
  134. def merge(self, d):
  135. revoked.update(d.get("revoked") or {})
  136. return d
  137. def sync(self, d):
  138. prev = d.get("revoked") or {}
  139. prev.update(revoked.as_dict())
  140. d["revoked"] = prev
  141. return d
  142. def open(self):
  143. return self.storage.open(self.filename, writeback=True)
  144. def close(self):
  145. if self._is_open:
  146. self.db.close()
  147. self._is_open = False
  148. def _load(self):
  149. self.merge(self.db)
  150. @cached_property
  151. def db(self):
  152. self._is_open = True
  153. return self.open()