123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189 |
- from __future__ import absolute_import
- import socket
- from mock import Mock
- from pickle import loads, dumps
- from celery import states
- from celery.exceptions import ImproperlyConfigured
- from celery.tests.case import AppCase, mock_module, depends_on_current_app
- class Object(object):
- pass
- def install_exceptions(mod):
- # py3k: cannot catch exceptions not ineheriting from BaseException.
- class NotFoundException(Exception):
- pass
- class TException(Exception):
- pass
- class InvalidRequestException(Exception):
- pass
- class UnavailableException(Exception):
- pass
- class TimedOutException(Exception):
- pass
- class AllServersUnavailable(Exception):
- pass
- mod.NotFoundException = NotFoundException
- mod.TException = TException
- mod.InvalidRequestException = InvalidRequestException
- mod.TimedOutException = TimedOutException
- mod.UnavailableException = UnavailableException
- mod.AllServersUnavailable = AllServersUnavailable
- class test_CassandraBackend(AppCase):
- def setup(self):
- self.app.conf.update(
- CASSANDRA_SERVERS=['example.com'],
- CASSANDRA_KEYSPACE='keyspace',
- CASSANDRA_COLUMN_FAMILY='columns',
- )
- def test_init_no_pycassa(self):
- with mock_module('pycassa'):
- from celery.backends import cassandra as mod
- prev, mod.pycassa = mod.pycassa, None
- try:
- with self.assertRaises(ImproperlyConfigured):
- mod.CassandraBackend(app=self.app)
- finally:
- mod.pycassa = prev
- def test_init_with_and_without_LOCAL_QUROM(self):
- with mock_module('pycassa'):
- from celery.backends import cassandra as mod
- mod.pycassa = Mock()
- install_exceptions(mod.pycassa)
- cons = mod.pycassa.ConsistencyLevel = Object()
- cons.LOCAL_QUORUM = 'foo'
- self.app.conf.CASSANDRA_READ_CONSISTENCY = 'LOCAL_FOO'
- self.app.conf.CASSANDRA_WRITE_CONSISTENCY = 'LOCAL_FOO'
- mod.CassandraBackend(app=self.app)
- cons.LOCAL_FOO = 'bar'
- mod.CassandraBackend(app=self.app)
- # no servers raises ImproperlyConfigured
- with self.assertRaises(ImproperlyConfigured):
- self.app.conf.CASSANDRA_SERVERS = None
- mod.CassandraBackend(
- app=self.app, keyspace='b', column_family='c',
- )
- @depends_on_current_app
- def test_reduce(self):
- with mock_module('pycassa'):
- from celery.backends.cassandra import CassandraBackend
- self.assertTrue(loads(dumps(CassandraBackend(app=self.app))))
- def test_get_task_meta_for(self):
- with mock_module('pycassa'):
- from celery.backends import cassandra as mod
- mod.pycassa = Mock()
- install_exceptions(mod.pycassa)
- mod.Thrift = Mock()
- install_exceptions(mod.Thrift)
- x = mod.CassandraBackend(app=self.app)
- Get_Column = x._get_column_family = Mock()
- get_column = Get_Column.return_value = Mock()
- get = get_column.get
- META = get.return_value = {
- 'task_id': 'task_id',
- 'status': states.SUCCESS,
- 'result': '1',
- 'date_done': 'date',
- 'traceback': '',
- 'children': None,
- }
- x.decode = Mock()
- x.detailed_mode = False
- meta = x._get_task_meta_for('task_id')
- self.assertEqual(meta['status'], states.SUCCESS)
- x.detailed_mode = True
- row = get.return_value = Mock()
- row.values.return_value = [Mock()]
- x.decode.return_value = META
- meta = x._get_task_meta_for('task_id')
- self.assertEqual(meta['status'], states.SUCCESS)
- x.decode.return_value = Mock()
- x.detailed_mode = False
- get.side_effect = KeyError()
- meta = x._get_task_meta_for('task_id')
- self.assertEqual(meta['status'], states.PENDING)
- calls = [0]
- end = [10]
- def work_eventually(*arg):
- try:
- if calls[0] > end[0]:
- return META
- raise socket.error()
- finally:
- calls[0] += 1
- get.side_effect = work_eventually
- x._retry_timeout = 10
- x._retry_wait = 0.01
- meta = x._get_task_meta_for('task')
- self.assertEqual(meta['status'], states.SUCCESS)
- x._retry_timeout = 0.1
- calls[0], end[0] = 0, 100
- with self.assertRaises(socket.error):
- x._get_task_meta_for('task')
- def test_store_result(self):
- with mock_module('pycassa'):
- from celery.backends import cassandra as mod
- mod.pycassa = Mock()
- install_exceptions(mod.pycassa)
- mod.Thrift = Mock()
- install_exceptions(mod.Thrift)
- x = mod.CassandraBackend(app=self.app)
- Get_Column = x._get_column_family = Mock()
- cf = Get_Column.return_value = Mock()
- x.detailed_mode = False
- x._store_result('task_id', 'result', states.SUCCESS)
- self.assertTrue(cf.insert.called)
- cf.insert.reset()
- x.detailed_mode = True
- x._store_result('task_id', 'result', states.SUCCESS)
- self.assertTrue(cf.insert.called)
- def test_process_cleanup(self):
- with mock_module('pycassa'):
- from celery.backends import cassandra as mod
- x = mod.CassandraBackend(app=self.app)
- x._column_family = None
- x.process_cleanup()
- x._column_family = True
- x.process_cleanup()
- self.assertIsNone(x._column_family)
- def test_get_column_family(self):
- with mock_module('pycassa'):
- from celery.backends import cassandra as mod
- mod.pycassa = Mock()
- install_exceptions(mod.pycassa)
- x = mod.CassandraBackend(app=self.app)
- self.assertTrue(x._get_column_family())
- self.assertIsNotNone(x._column_family)
- self.assertIs(x._get_column_family(), x._column_family)
|