Browse Source

needs preliminary tests

Piotr Maślanka 9 years ago
parent
commit
0da49c0c41
2 changed files with 72 additions and 32 deletions
  1. 50 32
      celery/backends/new_cassandra.py
  2. 22 0
      docs/configuration.rst

+ 50 - 32
celery/backends/new_cassandra.py

@@ -96,10 +96,41 @@ class NewCassandraBackend(BaseBackend):
                 'Cassandra backend not configured.')
 
         self._connection = None
+        self._session = None
 
     def _get_connection(self):
         if self._connection is None:
             self._connection = cassandra.Cluster(self.servers, port=self.port)
+            self._session = self._connection.connect(self.keyspace)
+
+            self._write_stmt = self._session.prepare('''INSERT INTO '''+
+                self.column_family+''' (task_id,status, result,date_done,'''
+                '''traceback, children) VALUES (?, ?, ?, ?, ?, ?) '''
+                '''USING TTL '''+str(self.expires),
+                consistency_level=self.write_consistency)
+
+            self._make_stmt = self._session.prepare(
+                '''CREATE TABLE '''+self.column_family+''' (
+                    task_id text,
+                    status text,
+                    result text,
+                    date_done timestamp,
+                    traceback text,
+                    children text,
+                    PRIMARY KEY ((task_id), date_done)
+                ) WITH CLUSTERING ORDER BY (date_done DESC)
+                  WITH default_time_to_live = '''+str(self.expires)+';')
+
+            self._read_stmt = self._session.prepare(
+                '''SELECT task_id, status, result, date_done, traceback, children
+                   FROM '''+self.column_family+'''
+                   WHERE task_id=? LIMIT 1''',
+                   consistency_level=self.read_consistency)
+
+            try:
+                self._session.execute(self._make_stmt)
+            except cassandra.AlreadyExists:
+                pass
 
     def _retry_on_error(self, fun, *args, **kwargs):
         ts = monotonic() + self._retry_timeout
@@ -122,46 +153,33 @@ class NewCassandraBackend(BaseBackend):
             self._get_connection()
             date_done = self.app.now()
 
-
-
-            meta = {'status': status,
-                    'date_done': date_done.strftime('%Y-%m-%dT%H:%M:%SZ'),
-                    'traceback': self.encode(traceback),
-                    'result': self.encode(result),
-                    'children': self.encode(
-                        self.current_task_children(request),
-                    )}
-            if self.detailed_mode:
-                cf.insert(
-                    task_id, {date_done: self.encode(meta)}, ttl=self.expires,
-                )
-            else:
-                cf.insert(task_id, meta, ttl=self.expires)
-
+            self._session.execute(self._write_stmt, (
+                task_id, status, result,
+                self.app.now().strftime('%Y-%m-%dT%H:%M:%SZ'),
+                traceback, self.encode(self.current_task_children(request))
+            ))
         return self._retry_on_error(_do_store)
 
     def _get_task_meta_for(self, task_id):
         """Get task metadata for a task by id."""
 
         def _do_get():
-            cf = self._get_column_family()
-            try:
-                if self.detailed_mode:
-                    row = cf.get(task_id, column_reversed=True, column_count=1)
-                    return self.decode(list(row.values())[0])
-                else:
-                    obj = cf.get(task_id)
-                    return self.meta_from_decoded({
-                        'task_id': task_id,
-                        'status': obj['status'],
-                        'result': self.decode(obj['result']),
-                        'date_done': obj['date_done'],
-                        'traceback': self.decode(obj['traceback']),
-                        'children': self.decode(obj['children']),
-                    })
-            except (KeyError, pycassa.NotFoundException):
+
+            res = self._session.execute(self._read_stmt, (task_id, ))
+            if not res:
                 return {'status': states.PENDING, 'result': None}
 
+            task_id, status, result, date_done, traceback, children = res[0]
+
+            return self.meta_from_decoded({
+                        'task_id': task_id,
+                        'status': status,
+                        'result': self.decode(result),
+                        'date_done': date_done,
+                        'traceback': self.decode(traceback),
+                        'children': self.decode(children),
+            })
+
         return self._retry_on_error(_do_get)
 
     def __reduce__(self, args=(), kwargs={}):

+ 22 - 0
docs/configuration.rst

@@ -213,6 +213,10 @@ Can be one of the following:
     Use `Cassandra`_ to store the results.
     See :ref:`conf-cassandra-result-backend`.
 
+* new_cassandra
+    Use `Cassandra`_ to store the results, using other backend than _cassandra_.
+    See :ref:`conf-cassandra-result-backend`.
+
 * ironcache
     Use `IronCache`_ to store the results.
     See :ref:`conf-ironcache-result-backend`.
@@ -544,6 +548,16 @@ Cassandra backend settings
 
         $ pip install pycassa
 
+    If you are using new_cassandra, :mod:`cassandra-driver` is required instead:
+    https://pypi.python.org/pypi/cassandra-driver
+
+    To install, use `pip` or `easy_install`:
+
+    .. code-block:: bash
+
+        $ pip install cassandra-driver
+
+
 This backend requires the following configuration directives to be set.
 
 .. setting:: CASSANDRA_SERVERS
@@ -555,6 +569,10 @@ List of ``host:port`` Cassandra servers. e.g.::
 
     CASSANDRA_SERVERS = ['localhost:9160']
 
+Omit the ``port`` part when using new_cassandra. e.g.::
+
+    CASSANDRA_SERVERS = ['localhost']
+
 .. setting:: CASSANDRA_KEYSPACE
 
 CASSANDRA_KEYSPACE
@@ -601,6 +619,8 @@ use the ``TimeUUID`` type as a comparator::
 
     create column family task_results with comparator = TimeUUIDType;
 
+new_cassandra uses detailed mode by default, and that cannot be disabled.
+
 CASSANDRA_OPTIONS
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
@@ -608,6 +628,8 @@ Options to be passed to the `pycassa connection pool`_ (optional).
 
 .. _`pycassa connection pool`: http://pycassa.github.com/pycassa/api/pycassa/pool.html
 
+Not used in new_cassandra
+
 Example configuration
 ~~~~~~~~~~~~~~~~~~~~~