test_cassandra.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. import pytest
  2. from pickle import loads, dumps
  3. from datetime import datetime
  4. from case import Mock, mock
  5. from celery import states
  6. from celery.exceptions import ImproperlyConfigured
  7. from celery.utils.objects import Bunch
  8. CASSANDRA_MODULES = ['cassandra', 'cassandra.auth', 'cassandra.cluster']
  9. @mock.module(*CASSANDRA_MODULES)
  10. class test_CassandraBackend:
  11. def setup(self):
  12. self.app.conf.update(
  13. cassandra_servers=['example.com'],
  14. cassandra_keyspace='celery',
  15. cassandra_table='task_results',
  16. )
  17. def test_init_no_cassandra(self, *modules):
  18. # should raise ImproperlyConfigured when no python-driver
  19. # installed.
  20. from celery.backends import cassandra as mod
  21. prev, mod.cassandra = mod.cassandra, None
  22. try:
  23. with pytest.raises(ImproperlyConfigured):
  24. mod.CassandraBackend(app=self.app)
  25. finally:
  26. mod.cassandra = prev
  27. def test_init_with_and_without_LOCAL_QUROM(self, *modules):
  28. from celery.backends import cassandra as mod
  29. mod.cassandra = Mock()
  30. cons = mod.cassandra.ConsistencyLevel = Bunch(
  31. LOCAL_QUORUM='foo',
  32. )
  33. self.app.conf.cassandra_read_consistency = 'LOCAL_FOO'
  34. self.app.conf.cassandra_write_consistency = 'LOCAL_FOO'
  35. mod.CassandraBackend(app=self.app)
  36. cons.LOCAL_FOO = 'bar'
  37. mod.CassandraBackend(app=self.app)
  38. # no servers raises ImproperlyConfigured
  39. with pytest.raises(ImproperlyConfigured):
  40. self.app.conf.cassandra_servers = None
  41. mod.CassandraBackend(
  42. app=self.app, keyspace='b', column_family='c',
  43. )
  44. @pytest.mark.usefixtures('depends_on_current_app')
  45. def test_reduce(self, *modules):
  46. from celery.backends.cassandra import CassandraBackend
  47. assert loads(dumps(CassandraBackend(app=self.app)))
  48. def test_get_task_meta_for(self, *modules):
  49. from celery.backends import cassandra as mod
  50. mod.cassandra = Mock()
  51. x = mod.CassandraBackend(app=self.app)
  52. x._connection = True
  53. session = x._session = Mock()
  54. execute = session.execute = Mock()
  55. execute.return_value = [
  56. [states.SUCCESS, '1', datetime.now(), b'', b'']
  57. ]
  58. x.decode = Mock()
  59. meta = x._get_task_meta_for('task_id')
  60. assert meta['status'] == states.SUCCESS
  61. x._session.execute.return_value = []
  62. meta = x._get_task_meta_for('task_id')
  63. assert meta['status'] == states.PENDING
  64. def test_store_result(self, *modules):
  65. from celery.backends import cassandra as mod
  66. mod.cassandra = Mock()
  67. x = mod.CassandraBackend(app=self.app)
  68. x._connection = True
  69. session = x._session = Mock()
  70. session.execute = Mock()
  71. x._store_result('task_id', 'result', states.SUCCESS)
  72. def test_process_cleanup(self, *modules):
  73. from celery.backends import cassandra as mod
  74. x = mod.CassandraBackend(app=self.app)
  75. x.process_cleanup()
  76. assert x._connection is None
  77. assert x._session is None
  78. def test_timeouting_cluster(self):
  79. # Tests behavior when Cluster.connect raises
  80. # cassandra.OperationTimedOut.
  81. from celery.backends import cassandra as mod
  82. class OTOExc(Exception):
  83. pass
  84. class VeryFaultyCluster:
  85. def __init__(self, *args, **kwargs):
  86. pass
  87. def connect(self, *args, **kwargs):
  88. raise OTOExc()
  89. def shutdown(self):
  90. pass
  91. mod.cassandra = Mock()
  92. mod.cassandra.OperationTimedOut = OTOExc
  93. mod.cassandra.cluster = Mock()
  94. mod.cassandra.cluster.Cluster = VeryFaultyCluster
  95. x = mod.CassandraBackend(app=self.app)
  96. with pytest.raises(OTOExc):
  97. x._store_result('task_id', 'result', states.SUCCESS)
  98. assert x._connection is None
  99. assert x._session is None
  100. x.process_cleanup() # shouldn't raise
  101. def test_please_free_memory(self):
  102. # Ensure that Cluster object IS shut down.
  103. from celery.backends import cassandra as mod
  104. class RAMHoggingCluster:
  105. objects_alive = 0
  106. def __init__(self, *args, **kwargs):
  107. pass
  108. def connect(self, *args, **kwargs):
  109. RAMHoggingCluster.objects_alive += 1
  110. return Mock()
  111. def shutdown(self):
  112. RAMHoggingCluster.objects_alive -= 1
  113. mod.cassandra = Mock()
  114. mod.cassandra.cluster = Mock()
  115. mod.cassandra.cluster.Cluster = RAMHoggingCluster
  116. for x in range(0, 10):
  117. x = mod.CassandraBackend(app=self.app)
  118. x._store_result('task_id', 'result', states.SUCCESS)
  119. x.process_cleanup()
  120. assert RAMHoggingCluster.objects_alive == 0
  121. def test_auth_provider(self):
  122. # Ensure valid auth_provider works properly, and invalid one raises
  123. # ImproperlyConfigured exception.
  124. from celery.backends import cassandra as mod
  125. class DummyAuth:
  126. ValidAuthProvider = Mock()
  127. mod.cassandra = Mock()
  128. mod.cassandra.auth = DummyAuth
  129. # Valid auth_provider
  130. self.app.conf.cassandra_auth_provider = 'ValidAuthProvider'
  131. self.app.conf.cassandra_auth_kwargs = {
  132. 'username': 'stuff'
  133. }
  134. mod.CassandraBackend(app=self.app)
  135. # Invalid auth_provider
  136. self.app.conf.cassandra_auth_provider = 'SpiderManAuth'
  137. self.app.conf.cassandra_auth_kwargs = {
  138. 'username': 'Jack'
  139. }
  140. with pytest.raises(ImproperlyConfigured):
  141. mod.CassandraBackend(app=self.app)