test_worker_revoke.py 315 B

123456789101112
  1. from celery.tests.utils import unittest
  2. from celery.worker import state
  3. class TestRevokeRegistry(unittest.TestCase):
  4. def test_is_working(self):
  5. state.revoked.add("foo")
  6. self.assertIn("foo", state.revoked)
  7. state.revoked.pop_value("foo")
  8. self.assertNotIn("foo", state.revoked)