test_utils.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. import sys
  2. import socket
  3. import unittest
  4. from billiard.utils.functional import wraps
  5. from celery import utils
  6. from testunits.utils import sleepdeprived, execute_context
  7. from testunits.utils import mask_modules
  8. class TestChunks(unittest.TestCase):
  9. def test_chunks(self):
  10. # n == 2
  11. x = utils.chunks(iter([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]), 2)
  12. self.assertEquals(list(x),
  13. [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9], [10]])
  14. # n == 3
  15. x = utils.chunks(iter([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]), 3)
  16. self.assertEquals(list(x),
  17. [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9, 10]])
  18. # n == 2 (exact)
  19. x = utils.chunks(iter([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]), 2)
  20. self.assertEquals(list(x),
  21. [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9]])
  22. class TestGenUniqueId(unittest.TestCase):
  23. def test_gen_unique_id_without_ctypes(self):
  24. old_utils = sys.modules.pop("celery.utils")
  25. def with_ctypes_masked(_val):
  26. from celery.utils import ctypes, gen_unique_id
  27. self.assertTrue(ctypes is None)
  28. uuid = gen_unique_id()
  29. self.assertTrue(uuid)
  30. self.assertTrue(isinstance(uuid, basestring))
  31. try:
  32. context = mask_modules("ctypes")
  33. execute_context(context, with_ctypes_masked)
  34. finally:
  35. sys.modules["celery.utils"] = old_utils
  36. class TestDivUtils(unittest.TestCase):
  37. def test_repeatlast(self):
  38. items = range(6)
  39. it = utils.repeatlast(items)
  40. for i in items:
  41. self.assertEquals(it.next(), i)
  42. for j in items:
  43. self.assertEquals(it.next(), i)
  44. class TestRetryOverTime(unittest.TestCase):
  45. def test_returns_retval_on_success(self):
  46. def _fun(x, y):
  47. return x * y
  48. ret = utils.retry_over_time(_fun, (socket.error, ), args=[16, 16],
  49. max_retries=3)
  50. self.assertEquals(ret, 256)
  51. @sleepdeprived
  52. def test_raises_on_unlisted_exception(self):
  53. def _fun(x, y):
  54. raise KeyError("bar")
  55. self.assertRaises(KeyError, utils.retry_over_time, _fun,
  56. (socket.error, ), args=[32, 32], max_retries=3)
  57. @sleepdeprived
  58. def test_retries_on_failure(self):
  59. iterations = [0]
  60. def _fun(x, y):
  61. iterations[0] += 1
  62. if iterations[0] == 3:
  63. return x * y
  64. raise socket.error("foozbaz")
  65. ret = utils.retry_over_time(_fun, (socket.error, ), args=[32, 32],
  66. max_retries=None)
  67. self.assertEquals(iterations[0], 3)
  68. self.assertEquals(ret, 1024)
  69. self.assertRaises(socket.error, utils.retry_over_time,
  70. _fun, (socket.error, ), args=[32, 32], max_retries=1)