test_cassandra.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. from __future__ import absolute_import
  2. from pickle import loads, dumps
  3. from datetime import datetime
  4. from celery import states
  5. from celery.exceptions import ImproperlyConfigured
  6. from celery.tests.case import (
  7. AppCase, Mock, mock_module, depends_on_current_app
  8. )
  9. CASSANDRA_MODULES = ['cassandra', 'cassandra.cluster']
  10. class Object(object):
  11. pass
  12. class test_CassandraBackend(AppCase):
  13. def setup(self):
  14. self.app.conf.update(
  15. cassandra_servers=['example.com'],
  16. cassandra_keyspace='celery',
  17. cassandra_table='task_results',
  18. )
  19. def test_init_no_cassandra(self):
  20. """should raise ImproperlyConfigured when no python-driver
  21. installed."""
  22. with mock_module(*CASSANDRA_MODULES):
  23. from celery.backends import cassandra as mod
  24. prev, mod.cassandra = mod.cassandra, None
  25. try:
  26. with self.assertRaises(ImproperlyConfigured):
  27. mod.CassandraBackend(app=self.app)
  28. finally:
  29. mod.cassandra = prev
  30. def test_init_with_and_without_LOCAL_QUROM(self):
  31. with mock_module(*CASSANDRA_MODULES):
  32. from celery.backends import cassandra as mod
  33. mod.cassandra = Mock()
  34. cons = mod.cassandra.ConsistencyLevel = Object()
  35. cons.LOCAL_QUORUM = 'foo'
  36. self.app.conf.cassandra_read_consistency = 'LOCAL_FOO'
  37. self.app.conf.cassandra_write_consistency = 'LOCAL_FOO'
  38. mod.CassandraBackend(app=self.app)
  39. cons.LOCAL_FOO = 'bar'
  40. mod.CassandraBackend(app=self.app)
  41. # no servers raises ImproperlyConfigured
  42. with self.assertRaises(ImproperlyConfigured):
  43. self.app.conf.cassandra_servers = None
  44. mod.CassandraBackend(
  45. app=self.app, keyspace='b', column_family='c',
  46. )
  47. @depends_on_current_app
  48. def test_reduce(self):
  49. with mock_module(*CASSANDRA_MODULES):
  50. from celery.backends.cassandra import CassandraBackend
  51. self.assertTrue(loads(dumps(CassandraBackend(app=self.app))))
  52. def test_get_task_meta_for(self):
  53. with mock_module(*CASSANDRA_MODULES):
  54. from celery.backends import cassandra as mod
  55. mod.cassandra = Mock()
  56. x = mod.CassandraBackend(app=self.app)
  57. x._connection = True
  58. session = x._session = Mock()
  59. execute = session.execute = Mock()
  60. execute.return_value = [
  61. [states.SUCCESS, '1', datetime.now(), b'', b'']
  62. ]
  63. x.decode = Mock()
  64. meta = x._get_task_meta_for('task_id')
  65. self.assertEqual(meta['status'], states.SUCCESS)
  66. x._session.execute.return_value = []
  67. meta = x._get_task_meta_for('task_id')
  68. self.assertEqual(meta['status'], states.PENDING)
  69. def test_store_result(self):
  70. with mock_module(*CASSANDRA_MODULES):
  71. from celery.backends import cassandra as mod
  72. mod.cassandra = Mock()
  73. x = mod.CassandraBackend(app=self.app)
  74. x._connection = True
  75. session = x._session = Mock()
  76. session.execute = Mock()
  77. x._store_result('task_id', 'result', states.SUCCESS)
  78. def test_process_cleanup(self):
  79. with mock_module(*CASSANDRA_MODULES):
  80. from celery.backends import cassandra as mod
  81. x = mod.CassandraBackend(app=self.app)
  82. x.process_cleanup()
  83. self.assertIsNone(x._connection)
  84. self.assertIsNone(x._session)
  85. def test_timeouting_cluster(self):
  86. """
  87. Tests behaviour when Cluster.connect raises cassandra.OperationTimedOut
  88. """
  89. with mock_module(*CASSANDRA_MODULES):
  90. from celery.backends import cassandra as mod
  91. class OTOExc(Exception):
  92. pass
  93. class VeryFaultyCluster(object):
  94. def __init__(self, *args, **kwargs):
  95. pass
  96. def connect(self, *args, **kwargs):
  97. raise OTOExc()
  98. def shutdown(self):
  99. pass
  100. mod.cassandra = Mock()
  101. mod.cassandra.OperationTimedOut = OTOExc
  102. mod.cassandra.cluster = Mock()
  103. mod.cassandra.cluster.Cluster = VeryFaultyCluster
  104. x = mod.CassandraBackend(app=self.app)
  105. with self.assertRaises(OTOExc):
  106. x._store_result('task_id', 'result', states.SUCCESS)
  107. self.assertIsNone(x._connection)
  108. self.assertIsNone(x._session)
  109. x.process_cleanup() # should not raise
  110. def test_please_free_memory(self):
  111. """
  112. Ensure that Cluster object IS shut down.
  113. """
  114. with mock_module(*CASSANDRA_MODULES):
  115. from celery.backends import cassandra as mod
  116. class RAMHoggingCluster(object):
  117. objects_alive = 0
  118. def __init__(self, *args, **kwargs):
  119. pass
  120. def connect(self, *args, **kwargs):
  121. RAMHoggingCluster.objects_alive += 1
  122. return Mock()
  123. def shutdown(self):
  124. RAMHoggingCluster.objects_alive -= 1
  125. mod.cassandra = Mock()
  126. mod.cassandra.cluster = Mock()
  127. mod.cassandra.cluster.Cluster = RAMHoggingCluster
  128. for x in range(0, 10):
  129. x = mod.CassandraBackend(app=self.app)
  130. x._store_result('task_id', 'result', states.SUCCESS)
  131. x.process_cleanup()
  132. self.assertEquals(RAMHoggingCluster.objects_alive, 0)