Piotr Maślanka vor 9 Jahren
Ursprung
Commit
db0e031441
2 geänderte Dateien mit 122 neuen und 6 gelöschten Zeilen
  1. 20 6
      celery/backends/new_cassandra.py
  2. 102 0
      celery/tests/backends/test_new_cassandra.py

+ 20 - 6
celery/backends/new_cassandra.py

@@ -25,7 +25,7 @@ logger = get_logger(__name__)
 
 
 
 
 class NewCassandraBackend(BaseBackend):
 class NewCassandraBackend(BaseBackend):
-    """New Cassandra backend utilizing DataStax's driver
+    """New Cassandra backend utilizing DataStax driver
 
 
     .. attribute:: servers
     .. attribute:: servers
 
 
@@ -38,7 +38,7 @@ class NewCassandraBackend(BaseBackend):
     servers = []
     servers = []
     keyspace = None
     keyspace = None
     table = None
     table = None
-    supports_autoexpire = True
+    supports_autoexpire = True      # autoexpire supported via entry_ttl
 
 
     def __init__(self, servers=None, keyspace=None, table=None, entry_ttl=None,
     def __init__(self, servers=None, keyspace=None, table=None, entry_ttl=None,
                  port=9042, **kwargs):
                  port=9042, **kwargs):
@@ -103,8 +103,11 @@ class NewCassandraBackend(BaseBackend):
             self._session = None
             self._session = None
 
 
     def _get_connection(self, write=False):
     def _get_connection(self, write=False):
-        # only writers can create the table to get rid of two processes
-        # creating table at same time and Cassandra choking on that
+        """
+        Prepare the connection for action
+
+        :param write: bool - are we a writer?
+        """
         if self._connection is None:
         if self._connection is None:
             self._connection = cassandra.cluster.Cluster(self.servers,
             self._connection = cassandra.cluster.Cluster(self.servers,
                                                          port=self.port)
                                                          port=self.port)
@@ -123,6 +126,14 @@ class NewCassandraBackend(BaseBackend):
             self._read_stmt.consistency_level = self.read_consistency
             self._read_stmt.consistency_level = self.read_consistency
 
 
             if write:
             if write:
+                # Only possible writers "workers" are allowed to issue
+                # CREATE TABLE. This is to prevent conflicting situations
+                # where both task-creator and task-executor would issue it
+                # at the same time.
+
+                # Anyway, if you are doing anything critical, you should
+                # have probably created this table in advance, in which case
+                # this query will be a no-op (instant fail with AlreadyExists)
                 self._make_stmt = cassandra.query.SimpleStatement(
                 self._make_stmt = cassandra.query.SimpleStatement(
                     '''CREATE TABLE '''+self.table+''' (
                     '''CREATE TABLE '''+self.table+''' (
                         task_id text,
                         task_id text,
@@ -145,9 +156,12 @@ class NewCassandraBackend(BaseBackend):
         self._get_connection(write=True)
         self._get_connection(write=True)
 
 
         self._session.execute(self._write_stmt, (
         self._session.execute(self._write_stmt, (
-            task_id, status, buffer(self.encode(result)),
+            task_id,
+            status,
+            buffer(self.encode(result)),
             self.app.now().strftime('%Y-%m-%dT%H:%M:%SZ'),
             self.app.now().strftime('%Y-%m-%dT%H:%M:%SZ'),
-            buffer(self.encode(traceback)), buffer(self.encode(self.current_task_children(request)))
+            buffer(self.encode(traceback)),
+            buffer(self.encode(self.current_task_children(request)))
         ))
         ))
 
 
     def _get_task_meta_for(self, task_id):
     def _get_task_meta_for(self, task_id):

+ 102 - 0
celery/tests/backends/test_new_cassandra.py

@@ -0,0 +1,102 @@
+from __future__ import absolute_import
+
+from pickle import loads, dumps
+
+from celery import states
+from celery.exceptions import ImproperlyConfigured
+from celery.tests.case import (
+    AppCase, Mock, mock_module, depends_on_current_app, MagicMock
+)
+
+class Object(object):
+    pass
+
+class test_NewCassandraBackend(AppCase):
+
+    def setup(self):
+        self.app.conf.update(
+            CASSANDRA_SERVERS=['example.com'],
+            CASSANDRA_KEYSPACE='celery',
+            CASSANDRA_TABLE='task_results',
+        )
+
+    def test_init_no_cassandra(self):
+        """
+        Tests behaviour when no python-driver is installed.
+        new_cassandra should raise ImproperlyConfigured
+        """
+        with mock_module('cassandra'):
+            from celery.backends import new_cassandra as mod
+            prev, mod.cassandra = mod.cassandra, None
+            try:
+                with self.assertRaises(ImproperlyConfigured):
+                    mod.NewCassandraBackend(app=self.app)
+            finally:
+                mod.cassandra = prev
+
+    def test_init_with_and_without_LOCAL_QUROM(self):
+        with mock_module('cassandra'):
+            from celery.backends import new_cassandra as mod
+            mod.cassandra = Mock()
+            cons = mod.cassandra.ConsistencyLevel = Object()
+            cons.LOCAL_QUORUM = 'foo'
+
+            self.app.conf.CASSANDRA_READ_CONSISTENCY = 'LOCAL_FOO'
+            self.app.conf.CASSANDRA_WRITE_CONSISTENCY = 'LOCAL_FOO'
+
+            mod.NewCassandraBackend(app=self.app)
+            cons.LOCAL_FOO = 'bar'
+            mod.NewCassandraBackend(app=self.app)
+
+            # no servers raises ImproperlyConfigured
+            with self.assertRaises(ImproperlyConfigured):
+                self.app.conf.CASSANDRA_SERVERS = None
+                mod.NewCassandraBackend(
+                    app=self.app, keyspace='b', column_family='c',
+                )
+
+    @depends_on_current_app
+    def test_reduce(self):
+        with mock_module('cassandra'):
+            from celery.backends.new_cassandra import NewCassandraBackend
+            self.assertTrue(loads(dumps(NewCassandraBackend(app=self.app))))
+
+    def test_get_task_meta_for(self):
+        with mock_module('cassandra'):
+            from celery.backends import new_cassandra as mod
+            mod.cassandra = Mock()
+            x = mod.NewCassandraBackend(app=self.app)
+            x._connection = True
+            session = x._session = Mock()
+            execute = session.execute = Mock()
+            execute.return_value = [
+                [states.SUCCESS, '1', 'date', '', None]
+            ]
+            x.decode = Mock()
+            meta = x._get_task_meta_for('task_id')
+            self.assertEqual(meta['status'], states.SUCCESS)
+
+            x._session.execute.return_value = []
+            meta = x._get_task_meta_for('task_id')
+            self.assertEqual(meta['status'], states.PENDING)
+
+
+    def test_store_result(self):
+        with mock_module('cassandra'):
+            from celery.backends import new_cassandra as mod
+            mod.cassandra = Mock()
+
+            x = mod.NewCassandraBackend(app=self.app)
+            x._connection = True
+            session = x._session = Mock()
+            execute = session.execute = Mock()
+            x._store_result('task_id', 'result', states.SUCCESS)
+
+    def test_process_cleanup(self):
+        with mock_module('cassandra'):
+            from celery.backends import new_cassandra as mod
+            x = mod.NewCassandraBackend(app=self.app)
+            x.process_cleanup()
+
+            self.assertIsNone(x._connection)
+            self.assertIsNone(x._session)