test_utils.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. import sys
  2. import socket
  3. import unittest
  4. from billiard.utils.functional import wraps
  5. from celery import utils
  6. class TestChunks(unittest.TestCase):
  7. def test_chunks(self):
  8. # n == 2
  9. x = utils.chunks(iter([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]), 2)
  10. self.assertEquals(list(x),
  11. [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9], [10]])
  12. # n == 3
  13. x = utils.chunks(iter([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]), 3)
  14. self.assertEquals(list(x),
  15. [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9, 10]])
  16. # n == 2 (exact)
  17. x = utils.chunks(iter([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]), 2)
  18. self.assertEquals(list(x),
  19. [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9]])
  20. class TestGenUniqueId(unittest.TestCase):
  21. def test_gen_unique_id_without_ctypes(self):
  22. from celery.tests.utils import mask_modules
  23. old_utils = sys.modules.pop("celery.utils")
  24. try:
  25. with mask_modules("ctypes"):
  26. from celery.utils import ctypes, gen_unique_id
  27. self.assertTrue(ctypes is None)
  28. uuid = gen_unique_id()
  29. self.assertTrue(uuid)
  30. self.assertTrue(isinstance(uuid, basestring))
  31. finally:
  32. sys.modules["celery.utils"] = old_utils
  33. class TestDivUtils(unittest.TestCase):
  34. def test_repeatlast(self):
  35. items = range(6)
  36. it = utils.repeatlast(items)
  37. for i in items:
  38. self.assertEquals(it.next(), i)
  39. for j in items:
  40. self.assertEquals(it.next(), i)
  41. def sleepdeprived(fun):
  42. @wraps(fun)
  43. def _sleepdeprived(*args, **kwargs):
  44. import time
  45. old_sleep = time.sleep
  46. time.sleep = utils.noop
  47. try:
  48. return fun(*args, **kwargs)
  49. finally:
  50. time.sleep = old_sleep
  51. return _sleepdeprived
  52. class TestRetryOverTime(unittest.TestCase):
  53. def test_returns_retval_on_success(self):
  54. def _fun(x, y):
  55. return x * y
  56. ret = utils.retry_over_time(_fun, (socket.error, ), args=[16, 16],
  57. max_retries=3)
  58. self.assertEquals(ret, 256)
  59. @sleepdeprived
  60. def test_raises_on_unlisted_exception(self):
  61. def _fun(x, y):
  62. raise KeyError("bar")
  63. self.assertRaises(KeyError, utils.retry_over_time, _fun,
  64. (socket.error, ), args=[32, 32], max_retries=3)
  65. @sleepdeprived
  66. def test_retries_on_failure(self):
  67. iterations = [0]
  68. def _fun(x, y):
  69. iterations[0] += 1
  70. if iterations[0] == 3:
  71. return x * y
  72. raise socket.error("foozbaz")
  73. ret = utils.retry_over_time(_fun, (socket.error, ), args=[32, 32],
  74. max_retries=None)
  75. self.assertEquals(iterations[0], 3)
  76. self.assertEquals(ret, 1024)
  77. self.assertRaises(socket.error, utils.retry_over_time,
  78. _fun, (socket.error, ), args=[32, 32], max_retries=1)