test_abortable.py 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344
  1. from celery.contrib.abortable import AbortableTask, AbortableAsyncResult
  2. class test_AbortableTask:
  3. def setup(self):
  4. @self.app.task(base=AbortableTask, shared=False)
  5. def abortable():
  6. return True
  7. self.abortable = abortable
  8. def test_async_result_is_abortable(self):
  9. result = self.abortable.apply_async()
  10. tid = result.id
  11. assert isinstance(
  12. self.abortable.AsyncResult(tid), AbortableAsyncResult)
  13. def test_is_not_aborted(self):
  14. self.abortable.push_request()
  15. try:
  16. result = self.abortable.apply_async()
  17. tid = result.id
  18. assert not self.abortable.is_aborted(task_id=tid)
  19. finally:
  20. self.abortable.pop_request()
  21. def test_is_aborted_not_abort_result(self):
  22. self.abortable.AsyncResult = self.app.AsyncResult
  23. self.abortable.push_request()
  24. try:
  25. self.abortable.request.id = 'foo'
  26. assert not self.abortable.is_aborted()
  27. finally:
  28. self.abortable.pop_request()
  29. def test_abort_yields_aborted(self):
  30. self.abortable.push_request()
  31. try:
  32. result = self.abortable.apply_async()
  33. result.abort()
  34. tid = result.id
  35. assert self.abortable.is_aborted(task_id=tid)
  36. finally:
  37. self.abortable.pop_request()