test_worker_revoke.py 331 B

12345678910111213
  1. from __future__ import absolute_import
  2. from celery.worker import state
  3. from celery.tests.utils import Case
  4. class test_revoked(Case):
  5. def test_is_working(self):
  6. state.revoked.add("foo")
  7. self.assertIn("foo", state.revoked)
  8. state.revoked.pop_value("foo")
  9. self.assertNotIn("foo", state.revoked)