test_threads.py 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. from __future__ import absolute_import
  2. from __future__ import with_statement
  3. from mock import Mock
  4. from celery.concurrency.threads import NullDict, TaskPool, apply_target
  5. from celery.tests.utils import Case, mask_modules, mock_module
  6. class test_NullDict(Case):
  7. def test_setitem(self):
  8. x = NullDict()
  9. x['foo'] = 1
  10. with self.assertRaises(KeyError):
  11. x['foo']
  12. class test_TaskPool(Case):
  13. def test_without_threadpool(self):
  14. with mask_modules('threadpool'):
  15. with self.assertRaises(ImportError):
  16. TaskPool()
  17. def test_with_threadpool(self):
  18. with mock_module('threadpool'):
  19. x = TaskPool()
  20. self.assertTrue(x.ThreadPool)
  21. self.assertTrue(x.WorkRequest)
  22. def test_on_start(self):
  23. with mock_module('threadpool'):
  24. x = TaskPool()
  25. x.on_start()
  26. self.assertTrue(x._pool)
  27. self.assertIsInstance(x._pool.workRequests, NullDict)
  28. def test_on_stop(self):
  29. with mock_module('threadpool'):
  30. x = TaskPool()
  31. x.on_start()
  32. x.on_stop()
  33. x._pool.dismissWorkers.assert_called_with(x.limit, do_join=True)
  34. def test_on_apply(self):
  35. with mock_module('threadpool'):
  36. x = TaskPool()
  37. x.on_start()
  38. callback = Mock()
  39. accept_callback = Mock()
  40. target = Mock()
  41. req = x.on_apply(target, args=(1, 2), kwargs={'a': 10},
  42. callback=callback,
  43. accept_callback=accept_callback)
  44. x.WorkRequest.assert_called_with(
  45. apply_target,
  46. (target, (1, 2), {'a': 10}, callback, accept_callback),
  47. )
  48. x._pool.putRequest.assert_called_with(req)
  49. x._pool._results_queue.queue.clear.assert_called_with()