state.py 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. import shelve
  2. from celery.utils.compat import defaultdict
  3. from celery.datastructures import LimitedSet
  4. # Maximum number of revokes to keep in memory.
  5. REVOKES_MAX = 10000
  6. # How many seconds a revoke will be active before
  7. # being expired when the max limit has been exceeded.
  8. REVOKE_EXPIRES = 3600 # One hour.
  9. """
  10. .. data:: active_requests
  11. Set of currently active :class:`~celery.worker.job.TaskRequest`'s.
  12. .. data:: total_count
  13. Count of tasks executed by the worker, sorted by type.
  14. .. data:: revoked
  15. The list of currently revoked tasks. (PERSISTENT if statedb set).
  16. """
  17. active_requests = set()
  18. total_count = defaultdict(lambda: 0)
  19. revoked = LimitedSet(maxlen=REVOKES_MAX, expires=REVOKE_EXPIRES)
  20. def task_accepted(request):
  21. """Updates global state when a task has been accepted."""
  22. active_requests.add(request)
  23. total_count[request.task_name] += 1
  24. def task_ready(request):
  25. """Updates global state when a task is ready."""
  26. try:
  27. active_requests.remove(request)
  28. except KeyError:
  29. pass
  30. class Persistent(object):
  31. _open = None
  32. def __init__(self, filename):
  33. self.filename = filename
  34. self._load()
  35. def _load(self):
  36. self.merge(self.db)
  37. self.close()
  38. def save(self):
  39. self.sync(self.db).sync()
  40. self.close()
  41. def merge(self, d):
  42. revoked.update(d.get("revoked") or {})
  43. return d
  44. def sync(self, d):
  45. prev = d.get("revoked") or {}
  46. prev.update(revoked.as_dict())
  47. d["revoked"] = prev
  48. return d
  49. def open(self):
  50. return shelve.open(self.filename)
  51. def close(self):
  52. if self._open:
  53. self._open.close()
  54. self._open = None
  55. @property
  56. def db(self):
  57. if self._open is None:
  58. self._open = self.open()
  59. return self._open