state.py 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. import shelve
  2. from celery.utils import cached_property
  3. from celery.utils.compat import defaultdict
  4. from celery.datastructures import LimitedSet
  5. #: maximum number of revokes to keep in memory.
  6. REVOKES_MAX = 10000
  7. #: how many seconds a revoke will be active before
  8. #: being expired when the max limit has been exceeded.
  9. REVOKE_EXPIRES = 3600
  10. #: set of all reserved :class:`~celery.worker.job.TaskRequest`'s.
  11. reserved_requests = set()
  12. #: set of currently active :class:`~celery.worker.job.TaskRequest`'s.
  13. active_requests = set()
  14. #: count of tasks executed by the worker, sorted by type.
  15. total_count = defaultdict(lambda: 0)
  16. #: the list of currently revoked tasks. Persistent if statedb set.
  17. revoked = LimitedSet(maxlen=REVOKES_MAX, expires=REVOKE_EXPIRES)
  18. def task_reserved(request):
  19. """Updates global state when a task has been reserved."""
  20. reserved_requests.add(request)
  21. def task_accepted(request):
  22. """Updates global state when a task has been accepted."""
  23. active_requests.add(request)
  24. total_count[request.task_name] += 1
  25. def task_ready(request):
  26. """Updates global state when a task is ready."""
  27. active_requests.discard(request)
  28. reserved_requests.discard(request)
  29. class Persistent(object):
  30. storage = shelve
  31. _is_open = False
  32. def __init__(self, filename):
  33. self.filename = filename
  34. self._load()
  35. def save(self):
  36. self.sync(self.db).sync()
  37. self.close()
  38. def merge(self, d):
  39. revoked.update(d.get("revoked") or {})
  40. return d
  41. def sync(self, d):
  42. prev = d.get("revoked") or {}
  43. prev.update(revoked.as_dict())
  44. d["revoked"] = prev
  45. return d
  46. def open(self):
  47. return self.storage.open(self.filename)
  48. def close(self):
  49. if self._is_open:
  50. self.db.close()
  51. self._is_open = False
  52. def _load(self):
  53. self.merge(self.db)
  54. self.close()
  55. @cached_property
  56. def db(self):
  57. self._is_open = True
  58. return self.open()