test_threads.py 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. from __future__ import absolute_import
  2. from celery.utils.threads import (
  3. _LocalStack,
  4. _FastLocalStack,
  5. LocalManager,
  6. Local,
  7. bgThread,
  8. )
  9. from celery.tests.case import Case, override_stdouts, patch
  10. class test_bgThread(Case):
  11. def test_crash(self):
  12. class T(bgThread):
  13. def body(self):
  14. raise KeyError()
  15. with patch('os._exit') as _exit:
  16. with override_stdouts():
  17. _exit.side_effect = ValueError()
  18. t = T()
  19. with self.assertRaises(ValueError):
  20. t.run()
  21. _exit.assert_called_with(1)
  22. def test_interface(self):
  23. x = bgThread()
  24. with self.assertRaises(NotImplementedError):
  25. x.body()
  26. class test_Local(Case):
  27. def test_iter(self):
  28. x = Local()
  29. x.foo = 'bar'
  30. ident = x.__ident_func__()
  31. self.assertIn((ident, {'foo': 'bar'}), list(iter(x)))
  32. delattr(x, 'foo')
  33. self.assertNotIn((ident, {'foo': 'bar'}), list(iter(x)))
  34. with self.assertRaises(AttributeError):
  35. delattr(x, 'foo')
  36. self.assertIsNotNone(x(lambda: 'foo'))
  37. class test_LocalStack(Case):
  38. def test_stack(self):
  39. x = _LocalStack()
  40. self.assertIsNone(x.pop())
  41. x.__release_local__()
  42. ident = x.__ident_func__
  43. x.__ident_func__ = ident
  44. with self.assertRaises(RuntimeError):
  45. x()[0]
  46. x.push(['foo'])
  47. self.assertEqual(x()[0], 'foo')
  48. x.pop()
  49. with self.assertRaises(RuntimeError):
  50. x()[0]
  51. class test_FastLocalStack(Case):
  52. def test_stack(self):
  53. x = _FastLocalStack()
  54. x.push(['foo'])
  55. x.push(['bar'])
  56. self.assertEqual(x.top, ['bar'])
  57. self.assertEqual(len(x), 2)
  58. x.pop()
  59. self.assertEqual(x.top, ['foo'])
  60. x.pop()
  61. self.assertIsNone(x.top)
  62. class test_LocalManager(Case):
  63. def test_init(self):
  64. x = LocalManager()
  65. self.assertListEqual(x.locals, [])
  66. self.assertTrue(x.ident_func)
  67. def ident():
  68. return 1
  69. loc = Local()
  70. x = LocalManager([loc], ident_func=ident)
  71. self.assertListEqual(x.locals, [loc])
  72. x = LocalManager(loc, ident_func=ident)
  73. self.assertListEqual(x.locals, [loc])
  74. self.assertIs(x.ident_func, ident)
  75. self.assertIs(x.locals[0].__ident_func__, ident)
  76. self.assertEqual(x.get_ident(), 1)
  77. with patch('celery.utils.threads.release_local') as release:
  78. x.cleanup()
  79. release.assert_called_with(loc)
  80. self.assertTrue(repr(x))