test_abortable.py 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849
  1. from __future__ import absolute_import
  2. from celery.contrib.abortable import AbortableTask, AbortableAsyncResult
  3. from celery.tests.case import AppCase
  4. class test_AbortableTask(AppCase):
  5. def setup(self):
  6. @self.app.task(base=AbortableTask, shared=False)
  7. def abortable():
  8. return True
  9. self.abortable = abortable
  10. def test_async_result_is_abortable(self):
  11. result = self.abortable.apply_async()
  12. tid = result.id
  13. self.assertIsInstance(
  14. self.abortable.AsyncResult(tid), AbortableAsyncResult,
  15. )
  16. def test_is_not_aborted(self):
  17. self.abortable.push_request()
  18. try:
  19. result = self.abortable.apply_async()
  20. tid = result.id
  21. self.assertFalse(self.abortable.is_aborted(task_id=tid))
  22. finally:
  23. self.abortable.pop_request()
  24. def test_is_aborted_not_abort_result(self):
  25. self.abortable.AsyncResult = self.app.AsyncResult
  26. self.abortable.push_request()
  27. try:
  28. self.abortable.request.id = 'foo'
  29. self.assertFalse(self.abortable.is_aborted())
  30. finally:
  31. self.abortable.pop_request()
  32. def test_abort_yields_aborted(self):
  33. self.abortable.push_request()
  34. try:
  35. result = self.abortable.apply_async()
  36. result.abort()
  37. tid = result.id
  38. self.assertTrue(self.abortable.is_aborted(task_id=tid))
  39. finally:
  40. self.abortable.pop_request()