test_utils.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. from __future__ import with_statement
  2. import sys
  3. import socket
  4. import unittest
  5. from billiard.utils.functional import wraps
  6. from celery import utils
  7. from celery.tests.utils import sleepdeprived
  8. class TestChunks(unittest.TestCase):
  9. def test_chunks(self):
  10. # n == 2
  11. x = utils.chunks(iter([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]), 2)
  12. self.assertEquals(list(x),
  13. [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9], [10]])
  14. # n == 3
  15. x = utils.chunks(iter([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]), 3)
  16. self.assertEquals(list(x),
  17. [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9, 10]])
  18. # n == 2 (exact)
  19. x = utils.chunks(iter([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]), 2)
  20. self.assertEquals(list(x),
  21. [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9]])
  22. class TestGenUniqueId(unittest.TestCase):
  23. def test_gen_unique_id_without_ctypes(self):
  24. from celery.tests.utils import mask_modules
  25. old_utils = sys.modules.pop("celery.utils")
  26. try:
  27. with mask_modules("ctypes"):
  28. from celery.utils import ctypes, gen_unique_id
  29. self.assertTrue(ctypes is None)
  30. uuid = gen_unique_id()
  31. self.assertTrue(uuid)
  32. self.assertTrue(isinstance(uuid, basestring))
  33. finally:
  34. sys.modules["celery.utils"] = old_utils
  35. class TestDivUtils(unittest.TestCase):
  36. def test_repeatlast(self):
  37. items = range(6)
  38. it = utils.repeatlast(items)
  39. for i in items:
  40. self.assertEquals(it.next(), i)
  41. for j in items:
  42. self.assertEquals(it.next(), i)
  43. class TestRetryOverTime(unittest.TestCase):
  44. def test_returns_retval_on_success(self):
  45. def _fun(x, y):
  46. return x * y
  47. ret = utils.retry_over_time(_fun, (socket.error, ), args=[16, 16],
  48. max_retries=3)
  49. self.assertEquals(ret, 256)
  50. @sleepdeprived
  51. def test_raises_on_unlisted_exception(self):
  52. def _fun(x, y):
  53. raise KeyError("bar")
  54. self.assertRaises(KeyError, utils.retry_over_time, _fun,
  55. (socket.error, ), args=[32, 32], max_retries=3)
  56. @sleepdeprived
  57. def test_retries_on_failure(self):
  58. iterations = [0]
  59. def _fun(x, y):
  60. iterations[0] += 1
  61. if iterations[0] == 3:
  62. return x * y
  63. raise socket.error("foozbaz")
  64. ret = utils.retry_over_time(_fun, (socket.error, ), args=[32, 32],
  65. max_retries=None)
  66. self.assertEquals(iterations[0], 3)
  67. self.assertEquals(ret, 1024)
  68. self.assertRaises(socket.error, utils.retry_over_time,
  69. _fun, (socket.error, ), args=[32, 32], max_retries=1)