state.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. import os
  2. import platform
  3. import shelve
  4. from collections import defaultdict
  5. from kombu.utils import cached_property
  6. from celery import __version__
  7. from celery.datastructures import LimitedSet
  8. #: Worker software/platform information.
  9. SOFTWARE_INFO = {"sw_ident": "celeryd",
  10. "sw_ver": __version__,
  11. "sw_sys": platform.system()}
  12. #: maximum number of revokes to keep in memory.
  13. REVOKES_MAX = 10000
  14. #: how many seconds a revoke will be active before
  15. #: being expired when the max limit has been exceeded.
  16. REVOKE_EXPIRES = 3600
  17. #: set of all reserved :class:`~celery.worker.job.TaskRequest`'s.
  18. reserved_requests = set()
  19. #: set of currently active :class:`~celery.worker.job.TaskRequest`'s.
  20. active_requests = set()
  21. #: count of tasks executed by the worker, sorted by type.
  22. total_count = defaultdict(lambda: 0)
  23. #: the list of currently revoked tasks. Persistent if statedb set.
  24. revoked = LimitedSet(maxlen=REVOKES_MAX, expires=REVOKE_EXPIRES)
  25. def task_reserved(request):
  26. """Updates global state when a task has been reserved."""
  27. reserved_requests.add(request)
  28. def task_accepted(request):
  29. """Updates global state when a task has been accepted."""
  30. active_requests.add(request)
  31. total_count[request.task_name] += 1
  32. def task_ready(request):
  33. """Updates global state when a task is ready."""
  34. active_requests.discard(request)
  35. reserved_requests.discard(request)
  36. if os.environ.get("CELERY_BENCH"): # pragma: no cover
  37. from time import time
  38. all_count = 0
  39. bench_start = None
  40. bench_every = int(os.environ.get("CELERY_BENCH_EVERY", 1000))
  41. __reserved = task_reserved
  42. __ready = task_ready
  43. def task_reserved(request): # noqa
  44. global bench_start
  45. if bench_start is None:
  46. bench_start = time()
  47. return __reserved(request)
  48. def task_ready(request): # noqa
  49. global all_count, bench_start
  50. all_count += 1
  51. if not all_count % bench_every:
  52. print("* Time spent processing %s tasks (since first "
  53. "task received): ~%.4fs\n" % (
  54. bench_every, time() - bench_start))
  55. bench_start = None
  56. return __ready(request)
  57. class Persistent(object):
  58. storage = shelve
  59. _is_open = False
  60. def __init__(self, filename):
  61. self.filename = filename
  62. self._load()
  63. def save(self):
  64. self.sync(self.db).sync()
  65. self.close()
  66. def merge(self, d):
  67. revoked.update(d.get("revoked") or {})
  68. return d
  69. def sync(self, d):
  70. prev = d.get("revoked") or {}
  71. prev.update(revoked.as_dict())
  72. d["revoked"] = prev
  73. return d
  74. def open(self):
  75. return self.storage.open(self.filename)
  76. def close(self):
  77. if self._is_open:
  78. self.db.close()
  79. self._is_open = False
  80. def _load(self):
  81. self.merge(self.db)
  82. self.close()
  83. @cached_property
  84. def db(self):
  85. self._is_open = True
  86. return self.open()