state.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.worker.state
  4. ~~~~~~~~~~~~~~~~~~~
  5. Internal worker state (global)
  6. This includes the currently active and reserved tasks,
  7. statistics, and revoked tasks.
  8. """
  9. from __future__ import absolute_import
  10. import os
  11. import platform
  12. import shelve
  13. from collections import defaultdict
  14. from celery import __version__
  15. from celery.datastructures import LimitedSet
  16. from celery.utils import cached_property
  17. #: Worker software/platform information.
  18. SOFTWARE_INFO = {'sw_ident': 'py-celery',
  19. 'sw_ver': __version__,
  20. 'sw_sys': platform.system()}
  21. #: maximum number of revokes to keep in memory.
  22. REVOKES_MAX = 10000
  23. #: how many seconds a revoke will be active before
  24. #: being expired when the max limit has been exceeded.
  25. REVOKE_EXPIRES = 3600
  26. #: set of all reserved :class:`~celery.worker.job.Request`'s.
  27. reserved_requests = set()
  28. #: set of currently active :class:`~celery.worker.job.Request`'s.
  29. active_requests = set()
  30. #: count of tasks executed by the worker, sorted by type.
  31. total_count = defaultdict(lambda: 0)
  32. #: the list of currently revoked tasks. Persistent if statedb set.
  33. revoked = LimitedSet(maxlen=REVOKES_MAX, expires=REVOKE_EXPIRES)
  34. #: Updates global state when a task has been reserved.
  35. task_reserved = reserved_requests.add
  36. should_stop = False
  37. should_terminate = False
  38. def task_accepted(request):
  39. """Updates global state when a task has been accepted."""
  40. active_requests.add(request)
  41. total_count[request.name] += 1
  42. def task_ready(request):
  43. """Updates global state when a task is ready."""
  44. active_requests.discard(request)
  45. reserved_requests.discard(request)
  46. C_BENCH = os.environ.get('C_BENCH') or os.environ.get('CELERY_BENCH')
  47. C_BENCH_EVERY = int(os.environ.get('C_BENCH_EVERY') or
  48. os.environ.get('CELERY_BENCH_EVERY') or 1000)
  49. if C_BENCH: # pragma: no cover
  50. import atexit
  51. from time import time
  52. from billiard import current_process
  53. from celery.utils.debug import memdump, sample_mem
  54. all_count = 0
  55. bench_first = None
  56. bench_start = None
  57. bench_last = None
  58. bench_every = C_BENCH_EVERY
  59. bench_sample = []
  60. __reserved = task_reserved
  61. __ready = task_ready
  62. if current_process()._name == 'MainProcess':
  63. @atexit.register
  64. def on_shutdown():
  65. if bench_first is not None and bench_last is not None:
  66. print('- Time spent in benchmark: %r' % (
  67. bench_last - bench_first))
  68. print('- Avg: %s' % (sum(bench_sample) / len(bench_sample)))
  69. memdump()
  70. def task_reserved(request): # noqa
  71. global bench_start
  72. global bench_first
  73. now = None
  74. if bench_start is None:
  75. bench_start = now = time()
  76. if bench_first is None:
  77. bench_first = now
  78. return __reserved(request)
  79. import sys
  80. def task_ready(request): # noqa
  81. global all_count
  82. global bench_start
  83. global bench_last
  84. all_count += 1
  85. if not all_count % bench_every:
  86. now = time()
  87. diff = now - bench_start
  88. print('- Time spent processing %s tasks (since first '
  89. 'task received): ~%.4fs\n' % (bench_every, diff))
  90. sys.stdout.flush()
  91. bench_start = bench_last = now
  92. bench_sample.append(diff)
  93. sample_mem()
  94. return __ready(request)
  95. class Persistent(object):
  96. """This is the persistent data stored by the worker when
  97. :option:`--statedb` is enabled.
  98. It currently only stores revoked task id's.
  99. """
  100. storage = shelve
  101. _is_open = False
  102. def __init__(self, filename):
  103. self.filename = filename
  104. self._load()
  105. def save(self):
  106. self.sync(self.db)
  107. self.db.sync()
  108. self.close()
  109. def merge(self, d):
  110. revoked.update(d.get('revoked') or {})
  111. return d
  112. def sync(self, d):
  113. prev = d.get('revoked') or {}
  114. prev.update(revoked.as_dict())
  115. d['revoked'] = prev
  116. return d
  117. def open(self):
  118. return self.storage.open(self.filename, writeback=True)
  119. def close(self):
  120. if self._is_open:
  121. self.db.close()
  122. self._is_open = False
  123. def _load(self):
  124. self.merge(self.db)
  125. @cached_property
  126. def db(self):
  127. self._is_open = True
  128. return self.open()