state.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.worker.state
  4. ~~~~~~~~~~~~~~~~~~~
  5. Internal worker state (global)
  6. This includes the currently active and reserved tasks,
  7. statistics, and revoked tasks.
  8. :copyright: (c) 2009 - 2012 by Ask Solem.
  9. :license: BSD, see LICENSE for more details.
  10. """
  11. from __future__ import absolute_import
  12. import os
  13. import platform
  14. import shelve
  15. from collections import defaultdict
  16. from celery import __version__
  17. from celery.datastructures import LimitedSet
  18. from celery.utils import cached_property
  19. #: Worker software/platform information.
  20. SOFTWARE_INFO = {"sw_ident": "celeryd",
  21. "sw_ver": __version__,
  22. "sw_sys": platform.system()}
  23. #: maximum number of revokes to keep in memory.
  24. REVOKES_MAX = 10000
  25. #: how many seconds a revoke will be active before
  26. #: being expired when the max limit has been exceeded.
  27. REVOKE_EXPIRES = 3600
  28. #: set of all reserved :class:`~celery.worker.job.Request`'s.
  29. reserved_requests = set()
  30. #: set of currently active :class:`~celery.worker.job.Request`'s.
  31. active_requests = set()
  32. #: count of tasks executed by the worker, sorted by type.
  33. total_count = defaultdict(lambda: 0)
  34. #: the list of currently revoked tasks. Persistent if statedb set.
  35. revoked = LimitedSet(maxlen=REVOKES_MAX, expires=REVOKE_EXPIRES)
  36. #: Updates global state when a task has been reserved.
  37. task_reserved = reserved_requests.add
  38. should_stop = False
  39. should_terminate = False
  40. def task_accepted(request):
  41. """Updates global state when a task has been accepted."""
  42. active_requests.add(request)
  43. total_count[request.name] += 1
  44. def task_ready(request):
  45. """Updates global state when a task is ready."""
  46. active_requests.discard(request)
  47. reserved_requests.discard(request)
  48. if os.environ.get("CELERY_BENCH"): # pragma: no cover
  49. import atexit
  50. from time import time
  51. from billiard import current_process
  52. from celery.utils.compat import format_d
  53. from celery.utils.debug import memdump, sample_mem
  54. all_count = 0
  55. bench_first = None
  56. bench_start = None
  57. bench_last = None
  58. bench_every = int(os.environ.get("CELERY_BENCH_EVERY", 1000))
  59. bench_sample = []
  60. __reserved = task_reserved
  61. __ready = task_ready
  62. if current_process()._name == 'MainProcess':
  63. @atexit.register
  64. def on_shutdown():
  65. if bench_first is not None and bench_last is not None:
  66. print("- Time spent in benchmark: %r" % (
  67. bench_last - bench_first))
  68. print("- Avg: %s" % (sum(bench_sample) / len(bench_sample)))
  69. memdump()
  70. def task_reserved(request): # noqa
  71. global bench_start
  72. global bench_first
  73. now = None
  74. if bench_start is None:
  75. bench_start = now = time()
  76. if bench_first is None:
  77. bench_first = now
  78. return __reserved(request)
  79. import sys
  80. def task_ready(request): # noqa
  81. global all_count
  82. global bench_start
  83. global bench_last
  84. all_count += 1
  85. if not all_count % bench_every:
  86. now = time()
  87. diff = now - bench_start
  88. print("- Time spent processing %s tasks (since first "
  89. "task received): ~%.4fs\n" % (bench_every, diff))
  90. sys.stdout.flush()
  91. bench_start = bench_last = now
  92. bench_sample.append(diff)
  93. sample_mem()
  94. return __ready(request)
  95. class Persistent(object):
  96. storage = shelve
  97. _is_open = False
  98. def __init__(self, filename):
  99. self.filename = filename
  100. self._load()
  101. def save(self):
  102. self.sync(self.db)
  103. self.db.sync()
  104. self.close()
  105. def merge(self, d):
  106. revoked.update(d.get("revoked") or {})
  107. return d
  108. def sync(self, d):
  109. prev = d.get("revoked") or {}
  110. prev.update(revoked.as_dict())
  111. d["revoked"] = prev
  112. return d
  113. def open(self):
  114. return self.storage.open(self.filename, writeback=True)
  115. def close(self):
  116. if self._is_open:
  117. self.db.close()
  118. self._is_open = False
  119. def _load(self):
  120. self.merge(self.db)
  121. @cached_property
  122. def db(self):
  123. self._is_open = True
  124. return self.open()