123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102 |
- import sys
- import socket
- import unittest
- from billiard.utils.functional import wraps
- from celery import utils
- from testunits.utils import sleepdeprived, execute_context
- from testunits.utils import mask_modules
- class TestChunks(unittest.TestCase):
- def test_chunks(self):
- # n == 2
- x = utils.chunks(iter([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]), 2)
- self.assertEquals(list(x),
- [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9], [10]])
- # n == 3
- x = utils.chunks(iter([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]), 3)
- self.assertEquals(list(x),
- [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9, 10]])
- # n == 2 (exact)
- x = utils.chunks(iter([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]), 2)
- self.assertEquals(list(x),
- [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9]])
- class TestGenUniqueId(unittest.TestCase):
- def test_gen_unique_id_without_ctypes(self):
- old_utils = sys.modules.pop("celery.utils")
- def with_ctypes_masked(_val):
- from celery.utils import ctypes, gen_unique_id
- self.assertTrue(ctypes is None)
- uuid = gen_unique_id()
- self.assertTrue(uuid)
- self.assertTrue(isinstance(uuid, basestring))
- try:
- context = mask_modules("ctypes")
- execute_context(context, with_ctypes_masked)
- finally:
- sys.modules["celery.utils"] = old_utils
- class TestDivUtils(unittest.TestCase):
- def test_repeatlast(self):
- items = range(6)
- it = utils.repeatlast(items)
- for i in items:
- self.assertEquals(it.next(), i)
- for j in items:
- self.assertEquals(it.next(), i)
- class TestRetryOverTime(unittest.TestCase):
- def test_returns_retval_on_success(self):
- def _fun(x, y):
- return x * y
- ret = utils.retry_over_time(_fun, (socket.error, ), args=[16, 16],
- max_retries=3)
- self.assertEquals(ret, 256)
- @sleepdeprived
- def test_raises_on_unlisted_exception(self):
- def _fun(x, y):
- raise KeyError("bar")
- self.assertRaises(KeyError, utils.retry_over_time, _fun,
- (socket.error, ), args=[32, 32], max_retries=3)
- @sleepdeprived
- def test_retries_on_failure(self):
- iterations = [0]
- def _fun(x, y):
- iterations[0] += 1
- if iterations[0] == 3:
- return x * y
- raise socket.error("foozbaz")
- ret = utils.retry_over_time(_fun, (socket.error, ), args=[32, 32],
- max_retries=None)
- self.assertEquals(iterations[0], 3)
- self.assertEquals(ret, 1024)
- self.assertRaises(socket.error, utils.retry_over_time,
- _fun, (socket.error, ), args=[32, 32], max_retries=1)
|