test_new_cassandra.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. from __future__ import absolute_import
  2. from pickle import loads, dumps
  3. from datetime import datetime
  4. import six
  5. from celery import states
  6. from celery.exceptions import ImproperlyConfigured
  7. from celery.tests.case import (
  8. AppCase, Mock, mock_module, depends_on_current_app
  9. )
  10. class Object(object):
  11. pass
  12. class test_NewCassandraBackend(AppCase):
  13. def setup(self):
  14. self.app.conf.update(
  15. CASSANDRA_SERVERS=['example.com'],
  16. CASSANDRA_KEYSPACE='celery',
  17. CASSANDRA_TABLE='task_results',
  18. )
  19. def test_init_no_cassandra(self):
  20. """
  21. Tests behaviour when no python-driver is installed.
  22. new_cassandra should raise ImproperlyConfigured
  23. """
  24. with mock_module('cassandra'):
  25. from celery.backends import new_cassandra as mod
  26. prev, mod.cassandra = mod.cassandra, None
  27. try:
  28. with self.assertRaises(ImproperlyConfigured):
  29. mod.NewCassandraBackend(app=self.app)
  30. finally:
  31. mod.cassandra = prev
  32. def test_init_with_and_without_LOCAL_QUROM(self):
  33. with mock_module('cassandra'):
  34. from celery.backends import new_cassandra as mod
  35. mod.cassandra = Mock()
  36. cons = mod.cassandra.ConsistencyLevel = Object()
  37. cons.LOCAL_QUORUM = 'foo'
  38. self.app.conf.CASSANDRA_READ_CONSISTENCY = 'LOCAL_FOO'
  39. self.app.conf.CASSANDRA_WRITE_CONSISTENCY = 'LOCAL_FOO'
  40. mod.NewCassandraBackend(app=self.app)
  41. cons.LOCAL_FOO = 'bar'
  42. mod.NewCassandraBackend(app=self.app)
  43. # no servers raises ImproperlyConfigured
  44. with self.assertRaises(ImproperlyConfigured):
  45. self.app.conf.CASSANDRA_SERVERS = None
  46. mod.NewCassandraBackend(
  47. app=self.app, keyspace='b', column_family='c',
  48. )
  49. @depends_on_current_app
  50. def test_reduce(self):
  51. with mock_module('cassandra'):
  52. from celery.backends.new_cassandra import NewCassandraBackend
  53. self.assertTrue(loads(dumps(NewCassandraBackend(app=self.app))))
  54. def test_get_task_meta_for(self):
  55. with mock_module('cassandra'):
  56. from celery.backends import new_cassandra as mod
  57. mod.cassandra = Mock()
  58. x = mod.NewCassandraBackend(app=self.app)
  59. x._connection = True
  60. session = x._session = Mock()
  61. execute = session.execute = Mock()
  62. execute.return_value = [
  63. [states.SUCCESS, '1', datetime.now(), b'', b'']
  64. ]
  65. x.decode = Mock()
  66. meta = x._get_task_meta_for('task_id')
  67. self.assertEqual(meta['status'], states.SUCCESS)
  68. x._session.execute.return_value = []
  69. meta = x._get_task_meta_for('task_id')
  70. self.assertEqual(meta['status'], states.PENDING)
  71. def test_store_result(self):
  72. with mock_module('cassandra'):
  73. from celery.backends import new_cassandra as mod
  74. mod.cassandra = Mock()
  75. x = mod.NewCassandraBackend(app=self.app)
  76. x._connection = True
  77. session = x._session = Mock()
  78. session.execute = Mock()
  79. x._store_result('task_id', 'result', states.SUCCESS)
  80. def test_process_cleanup(self):
  81. with mock_module('cassandra'):
  82. from celery.backends import new_cassandra as mod
  83. x = mod.NewCassandraBackend(app=self.app)
  84. x.process_cleanup()
  85. self.assertIsNone(x._connection)
  86. self.assertIsNone(x._session)