test_cassandra.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. from __future__ import absolute_import, unicode_literals
  2. from datetime import datetime
  3. from pickle import dumps, loads
  4. import pytest
  5. from case import Mock, mock
  6. from celery import states
  7. from celery.exceptions import ImproperlyConfigured
  8. from celery.utils.objects import Bunch
  9. CASSANDRA_MODULES = ['cassandra', 'cassandra.auth', 'cassandra.cluster']
  10. @mock.module(*CASSANDRA_MODULES)
  11. class test_CassandraBackend:
  12. def setup(self):
  13. self.app.conf.update(
  14. cassandra_servers=['example.com'],
  15. cassandra_keyspace='celery',
  16. cassandra_table='task_results',
  17. )
  18. def test_init_no_cassandra(self, *modules):
  19. # should raise ImproperlyConfigured when no python-driver
  20. # installed.
  21. from celery.backends import cassandra as mod
  22. prev, mod.cassandra = mod.cassandra, None
  23. try:
  24. with pytest.raises(ImproperlyConfigured):
  25. mod.CassandraBackend(app=self.app)
  26. finally:
  27. mod.cassandra = prev
  28. def test_init_with_and_without_LOCAL_QUROM(self, *modules):
  29. from celery.backends import cassandra as mod
  30. mod.cassandra = Mock()
  31. cons = mod.cassandra.ConsistencyLevel = Bunch(
  32. LOCAL_QUORUM='foo',
  33. )
  34. self.app.conf.cassandra_read_consistency = 'LOCAL_FOO'
  35. self.app.conf.cassandra_write_consistency = 'LOCAL_FOO'
  36. mod.CassandraBackend(app=self.app)
  37. cons.LOCAL_FOO = 'bar'
  38. mod.CassandraBackend(app=self.app)
  39. # no servers raises ImproperlyConfigured
  40. with pytest.raises(ImproperlyConfigured):
  41. self.app.conf.cassandra_servers = None
  42. mod.CassandraBackend(
  43. app=self.app, keyspace='b', column_family='c',
  44. )
  45. @pytest.mark.usefixtures('depends_on_current_app')
  46. def test_reduce(self, *modules):
  47. from celery.backends.cassandra import CassandraBackend
  48. assert loads(dumps(CassandraBackend(app=self.app)))
  49. def test_get_task_meta_for(self, *modules):
  50. from celery.backends import cassandra as mod
  51. mod.cassandra = Mock()
  52. x = mod.CassandraBackend(app=self.app)
  53. x._connection = True
  54. session = x._session = Mock()
  55. execute = session.execute = Mock()
  56. execute.return_value = [
  57. [states.SUCCESS, '1', datetime.now(), b'', b'']
  58. ]
  59. x.decode = Mock()
  60. meta = x._get_task_meta_for('task_id')
  61. assert meta['status'] == states.SUCCESS
  62. x._session.execute.return_value = []
  63. meta = x._get_task_meta_for('task_id')
  64. assert meta['status'] == states.PENDING
  65. def test_store_result(self, *modules):
  66. from celery.backends import cassandra as mod
  67. mod.cassandra = Mock()
  68. x = mod.CassandraBackend(app=self.app)
  69. x._connection = True
  70. session = x._session = Mock()
  71. session.execute = Mock()
  72. x._store_result('task_id', 'result', states.SUCCESS)
  73. def test_process_cleanup(self, *modules):
  74. from celery.backends import cassandra as mod
  75. x = mod.CassandraBackend(app=self.app)
  76. x.process_cleanup()
  77. assert x._connection is None
  78. assert x._session is None
  79. def test_timeouting_cluster(self):
  80. # Tests behavior when Cluster.connect raises
  81. # cassandra.OperationTimedOut.
  82. from celery.backends import cassandra as mod
  83. class OTOExc(Exception):
  84. pass
  85. class VeryFaultyCluster(object):
  86. def __init__(self, *args, **kwargs):
  87. pass
  88. def connect(self, *args, **kwargs):
  89. raise OTOExc()
  90. def shutdown(self):
  91. pass
  92. mod.cassandra = Mock()
  93. mod.cassandra.OperationTimedOut = OTOExc
  94. mod.cassandra.cluster = Mock()
  95. mod.cassandra.cluster.Cluster = VeryFaultyCluster
  96. x = mod.CassandraBackend(app=self.app)
  97. with pytest.raises(OTOExc):
  98. x._store_result('task_id', 'result', states.SUCCESS)
  99. assert x._connection is None
  100. assert x._session is None
  101. x.process_cleanup() # shouldn't raise
  102. def test_please_free_memory(self):
  103. # Ensure that Cluster object IS shut down.
  104. from celery.backends import cassandra as mod
  105. class RAMHoggingCluster(object):
  106. objects_alive = 0
  107. def __init__(self, *args, **kwargs):
  108. pass
  109. def connect(self, *args, **kwargs):
  110. RAMHoggingCluster.objects_alive += 1
  111. return Mock()
  112. def shutdown(self):
  113. RAMHoggingCluster.objects_alive -= 1
  114. mod.cassandra = Mock()
  115. mod.cassandra.cluster = Mock()
  116. mod.cassandra.cluster.Cluster = RAMHoggingCluster
  117. for x in range(0, 10):
  118. x = mod.CassandraBackend(app=self.app)
  119. x._store_result('task_id', 'result', states.SUCCESS)
  120. x.process_cleanup()
  121. assert RAMHoggingCluster.objects_alive == 0
  122. def test_auth_provider(self):
  123. # Ensure valid auth_provider works properly, and invalid one raises
  124. # ImproperlyConfigured exception.
  125. from celery.backends import cassandra as mod
  126. class DummyAuth(object):
  127. ValidAuthProvider = Mock()
  128. mod.cassandra = Mock()
  129. mod.cassandra.auth = DummyAuth
  130. # Valid auth_provider
  131. self.app.conf.cassandra_auth_provider = 'ValidAuthProvider'
  132. self.app.conf.cassandra_auth_kwargs = {
  133. 'username': 'stuff'
  134. }
  135. mod.CassandraBackend(app=self.app)
  136. # Invalid auth_provider
  137. self.app.conf.cassandra_auth_provider = 'SpiderManAuth'
  138. self.app.conf.cassandra_auth_kwargs = {
  139. 'username': 'Jack'
  140. }
  141. with pytest.raises(ImproperlyConfigured):
  142. mod.CassandraBackend(app=self.app)
  143. def test_options(self):
  144. # Ensure valid options works properly
  145. from celery.backends import cassandra as mod
  146. mod.cassandra = Mock()
  147. # Valid options
  148. self.app.conf.cassandra_options = {
  149. 'cql_version': '3.2.1',
  150. 'protocol_version': 3
  151. }
  152. mod.CassandraBackend(app=self.app)