test_new_cassandra.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. from __future__ import absolute_import
  2. from pickle import loads, dumps
  3. from datetime import datetime
  4. from celery import states
  5. from celery.exceptions import ImproperlyConfigured
  6. from celery.tests.case import (
  7. AppCase, Mock, mock_module, depends_on_current_app
  8. )
  9. class Object(object):
  10. pass
  11. class test_NewCassandraBackend(AppCase):
  12. def setup(self):
  13. self.app.conf.update(
  14. CASSANDRA_SERVERS=['example.com'],
  15. CASSANDRA_KEYSPACE='celery',
  16. CASSANDRA_TABLE='task_results',
  17. )
  18. def test_init_no_cassandra(self):
  19. """
  20. Tests behaviour when no python-driver is installed.
  21. new_cassandra should raise ImproperlyConfigured
  22. """
  23. with mock_module('cassandra'):
  24. from celery.backends import new_cassandra as mod
  25. prev, mod.cassandra = mod.cassandra, None
  26. try:
  27. with self.assertRaises(ImproperlyConfigured):
  28. mod.NewCassandraBackend(app=self.app)
  29. finally:
  30. mod.cassandra = prev
  31. def test_init_with_and_without_LOCAL_QUROM(self):
  32. with mock_module('cassandra'):
  33. from celery.backends import new_cassandra as mod
  34. mod.cassandra = Mock()
  35. cons = mod.cassandra.ConsistencyLevel = Object()
  36. cons.LOCAL_QUORUM = 'foo'
  37. self.app.conf.CASSANDRA_READ_CONSISTENCY = 'LOCAL_FOO'
  38. self.app.conf.CASSANDRA_WRITE_CONSISTENCY = 'LOCAL_FOO'
  39. mod.NewCassandraBackend(app=self.app)
  40. cons.LOCAL_FOO = 'bar'
  41. mod.NewCassandraBackend(app=self.app)
  42. # no servers raises ImproperlyConfigured
  43. with self.assertRaises(ImproperlyConfigured):
  44. self.app.conf.CASSANDRA_SERVERS = None
  45. mod.NewCassandraBackend(
  46. app=self.app, keyspace='b', column_family='c',
  47. )
  48. @depends_on_current_app
  49. def test_reduce(self):
  50. with mock_module('cassandra'):
  51. from celery.backends.new_cassandra import NewCassandraBackend
  52. self.assertTrue(loads(dumps(NewCassandraBackend(app=self.app))))
  53. def test_get_task_meta_for(self):
  54. with mock_module('cassandra'):
  55. from celery.backends import new_cassandra as mod
  56. mod.cassandra = Mock()
  57. x = mod.NewCassandraBackend(app=self.app)
  58. x._connection = True
  59. session = x._session = Mock()
  60. execute = session.execute = Mock()
  61. execute.return_value = [
  62. [states.SUCCESS, '1', datetime.now(), b'', b'']
  63. ]
  64. x.decode = Mock()
  65. meta = x._get_task_meta_for('task_id')
  66. self.assertEqual(meta['status'], states.SUCCESS)
  67. x._session.execute.return_value = []
  68. meta = x._get_task_meta_for('task_id')
  69. self.assertEqual(meta['status'], states.PENDING)
  70. def test_store_result(self):
  71. with mock_module('cassandra'):
  72. from celery.backends import new_cassandra as mod
  73. mod.cassandra = Mock()
  74. x = mod.NewCassandraBackend(app=self.app)
  75. x._connection = True
  76. session = x._session = Mock()
  77. session.execute = Mock()
  78. x._store_result('task_id', 'result', states.SUCCESS)
  79. def test_process_cleanup(self):
  80. with mock_module('cassandra'):
  81. from celery.backends import new_cassandra as mod
  82. x = mod.NewCassandraBackend(app=self.app)
  83. x.process_cleanup()
  84. self.assertIsNone(x._connection)
  85. self.assertIsNone(x._session)