test_cassandra.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. from __future__ import absolute_import
  2. import socket
  3. from mock import Mock
  4. from pickle import loads, dumps
  5. from celery import states
  6. from celery.exceptions import ImproperlyConfigured
  7. from celery.tests.case import AppCase, mock_module, depends_on_current_app
  8. class Object(object):
  9. pass
  10. def install_exceptions(mod):
  11. # py3k: cannot catch exceptions not ineheriting from BaseException.
  12. class NotFoundException(Exception):
  13. pass
  14. class TException(Exception):
  15. pass
  16. class InvalidRequestException(Exception):
  17. pass
  18. class UnavailableException(Exception):
  19. pass
  20. class TimedOutException(Exception):
  21. pass
  22. class AllServersUnavailable(Exception):
  23. pass
  24. mod.NotFoundException = NotFoundException
  25. mod.TException = TException
  26. mod.InvalidRequestException = InvalidRequestException
  27. mod.TimedOutException = TimedOutException
  28. mod.UnavailableException = UnavailableException
  29. mod.AllServersUnavailable = AllServersUnavailable
  30. class test_CassandraBackend(AppCase):
  31. def setup(self):
  32. self.app.conf.update(
  33. CASSANDRA_SERVERS=['example.com'],
  34. CASSANDRA_KEYSPACE='keyspace',
  35. CASSANDRA_COLUMN_FAMILY='columns',
  36. )
  37. def test_init_no_pycassa(self):
  38. with mock_module('pycassa'):
  39. from celery.backends import cassandra as mod
  40. prev, mod.pycassa = mod.pycassa, None
  41. try:
  42. with self.assertRaises(ImproperlyConfigured):
  43. mod.CassandraBackend(app=self.app)
  44. finally:
  45. mod.pycassa = prev
  46. def test_init_with_and_without_LOCAL_QUROM(self):
  47. with mock_module('pycassa'):
  48. from celery.backends import cassandra as mod
  49. mod.pycassa = Mock()
  50. install_exceptions(mod.pycassa)
  51. cons = mod.pycassa.ConsistencyLevel = Object()
  52. cons.LOCAL_QUORUM = 'foo'
  53. self.app.conf.CASSANDRA_READ_CONSISTENCY = 'LOCAL_FOO'
  54. self.app.conf.CASSANDRA_WRITE_CONSISTENCY = 'LOCAL_FOO'
  55. mod.CassandraBackend(app=self.app)
  56. cons.LOCAL_FOO = 'bar'
  57. mod.CassandraBackend(app=self.app)
  58. # no servers raises ImproperlyConfigured
  59. with self.assertRaises(ImproperlyConfigured):
  60. self.app.conf.CASSANDRA_SERVERS = None
  61. mod.CassandraBackend(
  62. app=self.app, keyspace='b', column_family='c',
  63. )
  64. @depends_on_current_app
  65. def test_reduce(self):
  66. with mock_module('pycassa'):
  67. from celery.backends.cassandra import CassandraBackend
  68. self.assertTrue(loads(dumps(CassandraBackend(app=self.app))))
  69. def test_get_task_meta_for(self):
  70. with mock_module('pycassa'):
  71. from celery.backends import cassandra as mod
  72. mod.pycassa = Mock()
  73. install_exceptions(mod.pycassa)
  74. mod.Thrift = Mock()
  75. install_exceptions(mod.Thrift)
  76. x = mod.CassandraBackend(app=self.app)
  77. Get_Column = x._get_column_family = Mock()
  78. get_column = Get_Column.return_value = Mock()
  79. get = get_column.get
  80. META = get.return_value = {
  81. 'task_id': 'task_id',
  82. 'status': states.SUCCESS,
  83. 'result': '1',
  84. 'date_done': 'date',
  85. 'traceback': '',
  86. 'children': None,
  87. }
  88. x.decode = Mock()
  89. x.detailed_mode = False
  90. meta = x._get_task_meta_for('task_id')
  91. self.assertEqual(meta['status'], states.SUCCESS)
  92. x.detailed_mode = True
  93. row = get.return_value = Mock()
  94. row.values.return_value = [Mock()]
  95. x.decode.return_value = META
  96. meta = x._get_task_meta_for('task_id')
  97. self.assertEqual(meta['status'], states.SUCCESS)
  98. x.decode.return_value = Mock()
  99. x.detailed_mode = False
  100. get.side_effect = KeyError()
  101. meta = x._get_task_meta_for('task_id')
  102. self.assertEqual(meta['status'], states.PENDING)
  103. calls = [0]
  104. end = [10]
  105. def work_eventually(*arg):
  106. try:
  107. if calls[0] > end[0]:
  108. return META
  109. raise socket.error()
  110. finally:
  111. calls[0] += 1
  112. get.side_effect = work_eventually
  113. x._retry_timeout = 10
  114. x._retry_wait = 0.01
  115. meta = x._get_task_meta_for('task')
  116. self.assertEqual(meta['status'], states.SUCCESS)
  117. x._retry_timeout = 0.1
  118. calls[0], end[0] = 0, 100
  119. with self.assertRaises(socket.error):
  120. x._get_task_meta_for('task')
  121. def test_store_result(self):
  122. with mock_module('pycassa'):
  123. from celery.backends import cassandra as mod
  124. mod.pycassa = Mock()
  125. install_exceptions(mod.pycassa)
  126. mod.Thrift = Mock()
  127. install_exceptions(mod.Thrift)
  128. x = mod.CassandraBackend(app=self.app)
  129. Get_Column = x._get_column_family = Mock()
  130. cf = Get_Column.return_value = Mock()
  131. x.detailed_mode = False
  132. x._store_result('task_id', 'result', states.SUCCESS)
  133. self.assertTrue(cf.insert.called)
  134. cf.insert.reset()
  135. x.detailed_mode = True
  136. x._store_result('task_id', 'result', states.SUCCESS)
  137. self.assertTrue(cf.insert.called)
  138. def test_process_cleanup(self):
  139. with mock_module('pycassa'):
  140. from celery.backends import cassandra as mod
  141. x = mod.CassandraBackend(app=self.app)
  142. x._column_family = None
  143. x.process_cleanup()
  144. x._column_family = True
  145. x.process_cleanup()
  146. self.assertIsNone(x._column_family)
  147. def test_get_column_family(self):
  148. with mock_module('pycassa'):
  149. from celery.backends import cassandra as mod
  150. mod.pycassa = Mock()
  151. install_exceptions(mod.pycassa)
  152. x = mod.CassandraBackend(app=self.app)
  153. self.assertTrue(x._get_column_family())
  154. self.assertIsNotNone(x._column_family)
  155. self.assertIs(x._get_column_family(), x._column_family)