test_state.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. from __future__ import absolute_import
  2. from celery.datastructures import LimitedSet
  3. from celery.worker import state
  4. from celery.tests.utils import Case
  5. class StateResetCase(Case):
  6. def setUp(self):
  7. self.reset_state()
  8. self.on_setup()
  9. def tearDown(self):
  10. self.reset_state()
  11. self.on_teardown()
  12. def reset_state(self):
  13. state.active_requests.clear()
  14. state.revoked.clear()
  15. state.total_count.clear()
  16. def on_setup(self):
  17. pass
  18. def on_teardown(self):
  19. pass
  20. class MockShelve(dict):
  21. filename = None
  22. in_sync = False
  23. closed = False
  24. def open(self, filename, **kwargs):
  25. self.filename = filename
  26. return self
  27. def sync(self):
  28. self.in_sync = True
  29. def close(self):
  30. self.closed = True
  31. class MyPersistent(state.Persistent):
  32. storage = MockShelve()
  33. class test_Persistent(StateResetCase):
  34. def on_setup(self):
  35. self.p = MyPersistent(filename="celery-state")
  36. def test_close_twice(self):
  37. self.p._is_open = False
  38. self.p.close()
  39. def test_constructor(self):
  40. self.assertDictEqual(self.p.db, {})
  41. self.assertEqual(self.p.db.filename, self.p.filename)
  42. def test_save(self):
  43. self.p.db["foo"] = "bar"
  44. self.p.save()
  45. self.assertTrue(self.p.db.in_sync)
  46. self.assertTrue(self.p.db.closed)
  47. def add_revoked(self, *ids):
  48. for id in ids:
  49. self.p.db.setdefault("revoked", LimitedSet()).add(id)
  50. def test_merge(self, data=["foo", "bar", "baz"]):
  51. self.add_revoked(*data)
  52. self.p.merge(self.p.db)
  53. for item in data:
  54. self.assertIn(item, state.revoked)
  55. def test_sync(self, data1=["foo", "bar", "baz"],
  56. data2=["baz", "ini", "koz"]):
  57. self.add_revoked(*data1)
  58. for item in data2:
  59. state.revoked.add(item)
  60. self.p.sync(self.p.db)
  61. for item in data2:
  62. self.assertIn(item, self.p.db["revoked"])
  63. class SimpleReq(object):
  64. def __init__(self, name):
  65. self.name = name
  66. class test_state(StateResetCase):
  67. def test_accepted(self, requests=[SimpleReq("foo"),
  68. SimpleReq("bar"),
  69. SimpleReq("baz"),
  70. SimpleReq("baz")]):
  71. for request in requests:
  72. state.task_accepted(request)
  73. for req in requests:
  74. self.assertIn(req, state.active_requests)
  75. self.assertEqual(state.total_count["foo"], 1)
  76. self.assertEqual(state.total_count["bar"], 1)
  77. self.assertEqual(state.total_count["baz"], 2)
  78. def test_ready(self, requests=[SimpleReq("foo"),
  79. SimpleReq("bar")]):
  80. for request in requests:
  81. state.task_accepted(request)
  82. self.assertEqual(len(state.active_requests), 2)
  83. for request in requests:
  84. state.task_ready(request)
  85. self.assertEqual(len(state.active_requests), 0)