浏览代码

works for me

Piotr Maślanka 9 年之前
父节点
当前提交
0b294d5d84
共有 3 个文件被更改,包括 173 次插入108 次删除
  1. 1 0
      celery/backends/__init__.py
  2. 80 94
      celery/backends/new_cassandra.py
  3. 92 14
      docs/configuration.rst

+ 1 - 0
celery/backends/__init__.py

@@ -30,6 +30,7 @@ BACKEND_ALIASES = {
     'db': 'celery.backends.database:DatabaseBackend',
     'database': 'celery.backends.database:DatabaseBackend',
     'cassandra': 'celery.backends.cassandra:CassandraBackend',
+    'new_cassandra': 'celery.backends.new_cassandra:NewCassandraBackend',
     'couchbase': 'celery.backends.couchbase:CouchBaseBackend',
     'couchdb': 'celery.backends.couchdb:CouchDBBackend',
     'riak': 'celery.backends.riak:RiakBackend',

+ 80 - 94
celery/backends/new_cassandra.py

@@ -1,9 +1,9 @@
 # -* coding: utf-8 -*-
 """
-    celery.backends.cassandra
+    celery.backends.new_cassandra
     ~~~~~~~~~~~~~~~~~~~~~~~~~
 
-    Apache Cassandra result store backend.
+    Apache Cassandra result store backend using DataStax driver
 
 """
 from __future__ import absolute_import
@@ -13,11 +13,8 @@ try:  # pragma: no cover
 except ImportError:  # pragma: no cover
     cassandra = None   # noqa
 
-import time
-
 from celery import states
 from celery.exceptions import ImproperlyConfigured
-from celery.five import monotonic
 from celery.utils.log import get_logger
 
 from .base import BaseBackend
@@ -32,22 +29,19 @@ class NewCassandraBackend(BaseBackend):
 
     .. attribute:: servers
 
-        List of Cassandra servers with format: ``hostname:port`` or ``hostname``
+        List of Cassandra servers with format: ``hostname``
 
     :raises celery.exceptions.ImproperlyConfigured: if
-        module :mod:`pycassa` is not available.
+        module :mod:`cassandra` is not available.
 
     """
     servers = []
     keyspace = None
-    column_family = None
-    detailed_mode = False
-    _retry_timeout = 300
-    _retry_wait = 3
+    table = None
     supports_autoexpire = True
 
-    def __init__(self, servers=None, keyspace=None, column_family=None,
-                 cassandra_options=None, detailed_mode=False, port=9042, **kwargs):
+    def __init__(self, servers=None, keyspace=None, table=None, entry_ttl=None,
+                 port=9042, **kwargs):
         """Initialize Cassandra backend.
 
         Raises :class:`celery.exceptions.ImproperlyConfigured` if
@@ -70,14 +64,16 @@ class NewCassandraBackend(BaseBackend):
         self.keyspace = (keyspace or
                          conf.get('CASSANDRA_KEYSPACE') or
                          self.keyspace)
-        self.column_family = (column_family or
-                              conf.get('CASSANDRA_COLUMN_FAMILY') or
-                              self.column_family)
-        self.cassandra_options = dict(conf.get('CASSANDRA_OPTIONS') or {},
-                                      **cassandra_options or {})
-        self.detailed_mode = (detailed_mode or
-                              conf.get('CASSANDRA_DETAILED_MODE') or
-                              self.detailed_mode)
+        self.table = (table or
+                      conf.get('CASSANDRA_TABLE') or
+                      self.table)
+        expires = (entry_ttl or conf.get('CASSANDRA_ENTRY_TTL', None))
+
+        if expires is not None:
+            self.cqlexpires = ' USING TTL %s' % (expires, )
+        else:
+            self.cqlexpires = ''
+
         read_cons = conf.get('CASSANDRA_READ_CONSISTENCY') or 'LOCAL_QUORUM'
         write_cons = conf.get('CASSANDRA_WRITE_CONSISTENCY') or 'LOCAL_QUORUM'
         try:
@@ -91,101 +87,91 @@ class NewCassandraBackend(BaseBackend):
         except AttributeError:
             self.write_consistency = cassandra.ConsistencyLevel.LOCAL_QUORUM
 
-        if not self.servers or not self.keyspace or not self.column_family:
+        if not self.servers or not self.keyspace or not self.table:
             raise ImproperlyConfigured(
                 'Cassandra backend not configured.')
 
         self._connection = None
         self._session = None
-
-    def _get_connection(self):
+        self._write_stmt = None
+        self._read_stmt = None
+
+    def process_cleanup(self):
+        if self._connection is not None:
+            self._session.shutdown()
+            self._connection = None
+            self._session = None
+
+    def _get_connection(self, write=False):
+        # only writers can create the table to get rid of two processes
+        # creating table at same time and Cassandra choking on that
         if self._connection is None:
-            self._connection = cassandra.Cluster(self.servers, port=self.port)
+            self._connection = cassandra.cluster.Cluster(self.servers,
+                                                         port=self.port)
             self._session = self._connection.connect(self.keyspace)
 
-            self._write_stmt = self._session.prepare('''INSERT INTO '''+
-                self.column_family+''' (task_id,status, result,date_done,'''
-                '''traceback, children) VALUES (?, ?, ?, ?, ?, ?) '''
-                '''USING TTL '''+str(self.expires),
-                consistency_level=self.write_consistency)
-
-            self._make_stmt = self._session.prepare(
-                '''CREATE TABLE '''+self.column_family+''' (
-                    task_id text,
-                    status text,
-                    result text,
-                    date_done timestamp,
-                    traceback text,
-                    children text,
-                    PRIMARY KEY ((task_id), date_done)
-                ) WITH CLUSTERING ORDER BY (date_done DESC)
-                  WITH default_time_to_live = '''+str(self.expires)+';')
-
-            self._read_stmt = self._session.prepare(
-                '''SELECT task_id, status, result, date_done, traceback, children
-                   FROM '''+self.column_family+'''
-                   WHERE task_id=? LIMIT 1''',
-                   consistency_level=self.read_consistency)
-
-            try:
-                self._session.execute(self._make_stmt)
-            except cassandra.AlreadyExists:
-                pass
-
-    def _retry_on_error(self, fun, *args, **kwargs):
-        ts = monotonic() + self._retry_timeout
-        while 1:
-            try:
-                return fun(*args, **kwargs)
-            except (cassandra.Unavailable,
-                    cassandra.Timeout,
-                    cassandra.InvalidRequest) as exc:
-                if monotonic() > ts:
-                    raise
-                logger.warning('Cassandra error: %r. Retrying...', exc)
-                time.sleep(self._retry_wait)
+            self._write_stmt = cassandra.query.SimpleStatement(
+                'INSERT INTO '+self.table+' (task_id, status, result,'''
+                ''' date_done, traceback, children) VALUES'''
+                ' (%s, %s, %s, %s, %s, %s) '+self.cqlexpires+';')
+            self._write_stmt.consistency_level = self.write_consistency
+
+            self._read_stmt = cassandra.query.SimpleStatement(
+                '''SELECT status, result, date_done, traceback, children
+                   FROM '''+self.table+'''
+                   WHERE task_id=%s''')
+            self._read_stmt.consistency_level = self.read_consistency
+
+            if write:
+                self._make_stmt = cassandra.query.SimpleStatement(
+                    '''CREATE TABLE '''+self.table+''' (
+                        task_id text,
+                        status text,
+                        result blob,
+                        date_done text,
+                        traceback blob,
+                        children blob,
+                        PRIMARY KEY (task_id)
+                    );''')
+                self._make_stmt.consistency_level = self.write_consistency
+                try:
+                    self._session.execute(self._make_stmt)
+                except cassandra.AlreadyExists:
+                    pass
 
     def _store_result(self, task_id, result, status,
                       traceback=None, request=None, **kwargs):
         """Store return value and status of an executed task."""
+        self._get_connection(write=True)
 
-        def _do_store():
-            self._get_connection()
-            date_done = self.app.now()
-
-            self._session.execute(self._write_stmt, (
-                task_id, status, result,
-                self.app.now().strftime('%Y-%m-%dT%H:%M:%SZ'),
-                traceback, self.encode(self.current_task_children(request))
-            ))
-        return self._retry_on_error(_do_store)
+        self._session.execute(self._write_stmt, (
+            task_id, status, buffer(self.encode(result)),
+            self.app.now().strftime('%Y-%m-%dT%H:%M:%SZ'),
+            buffer(self.encode(traceback)), buffer(self.encode(self.current_task_children(request)))
+        ))
 
     def _get_task_meta_for(self, task_id):
         """Get task metadata for a task by id."""
+        self._get_connection()
 
-        def _do_get():
-
-            res = self._session.execute(self._read_stmt, (task_id, ))
-            if not res:
-                return {'status': states.PENDING, 'result': None}
-
-            task_id, status, result, date_done, traceback, children = res[0]
+        res = self._session.execute(self._read_stmt, (task_id, ))
+        if not res:
+            return {'status': states.PENDING, 'result': None}
 
-            return self.meta_from_decoded({
-                        'task_id': task_id,
-                        'status': status,
-                        'result': self.decode(result),
-                        'date_done': date_done,
-                        'traceback': self.decode(traceback),
-                        'children': self.decode(children),
-            })
+        status, result, date_done, traceback, children = res[0]
 
-        return self._retry_on_error(_do_get)
+        return self.meta_from_decoded({
+                    'task_id': task_id,
+                    'status': str(status),
+                    'result': self.decode(str(result)),
+                    'date_done': date_done,
+                    'traceback': self.decode(str(traceback)),
+                    'children': self.decode(str(children)),
+        })
 
     def __reduce__(self, args=(), kwargs={}):
         kwargs.update(
             dict(servers=self.servers,
                  keyspace=self.keyspace,
-                 column_family=self.column_family,
-                 cassandra_options=self.cassandra_options))
+                 table=self.table))
         return super(NewCassandraBackend, self).__reduce__(args, kwargs)

+ 92 - 14
docs/configuration.rst

@@ -214,8 +214,8 @@ Can be one of the following:
     See :ref:`conf-cassandra-result-backend`.
 
 * new_cassandra
-    Use `Cassandra`_ to store the results, using other backend than _cassandra_.
-    See :ref:`conf-cassandra-result-backend`.
+    Use `new_cassandra`_ to store the results, using newer database driver than _cassandra_.
+    See :ref:`conf-new_cassandra-result-backend`.
 
 * ironcache
     Use `IronCache`_ to store the results.
@@ -532,30 +532,110 @@ Example configuration
         'taskmeta_collection': 'my_taskmeta_collection',
     }
 
-.. _conf-cassandra-result-backend:
+.. _conf-new_cassandra-result-backend:
 
-Cassandra backend settings
+
+new_cassandra backend settings
 --------------------------
 
 .. note::
 
-    The Cassandra backend requires the :mod:`pycassa` library:
-    http://pypi.python.org/pypi/pycassa/
+    This Cassandra backend driver requires :mod:`cassandra-driver`.
+    https://pypi.python.org/pypi/cassandra-driver
 
-    To install the pycassa package use `pip` or `easy_install`:
+    To install, use `pip` or `easy_install`:
 
     .. code-block:: bash
 
-        $ pip install pycassa
+        $ pip install cassandra-driver
 
-    If you are using new_cassandra, :mod:`cassandra-driver` is required instead:
-    https://pypi.python.org/pypi/cassandra-driver
+This backend requires the following configuration directives to be set.
+
+.. setting:: CASSANDRA_SERVERS
+
+CASSANDRA_SERVERS
+~~~~~~~~~~~~~~~~~
+
+List of ``host`` Cassandra servers. e.g.::
+
+    CASSANDRA_SERVERS = ['localhost']
 
-    To install, use `pip` or `easy_install`:
+
+.. setting:: CASSANDRA_PORT
+
+CASSANDRA_PORT
+~~~~~~~~~~~~~~
+
+Port to contact the Cassandra servers on. Default is 9042.
+
+.. setting:: CASSANDRA_KEYSPACE
+
+CASSANDRA_KEYSPACE
+~~~~~~~~~~~~~~~~~~
+
+The keyspace in which to store the results. e.g.::
+
+    CASSANDRA_KEYSPACE = 'tasks_keyspace'
+
+.. setting:: CASSANDRA_COLUMN_FAMILY
+
+CASSANDRA_TABLE
+~~~~~~~~~~~~~~~~~~~~~~~
+
+The table (column family) in which to store the results. e.g.::
+
+    CASSANDRA_TABLE = 'tasks'
+
+.. setting:: CASSANDRA_READ_CONSISTENCY
+
+CASSANDRA_READ_CONSISTENCY
+~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+The read consistency used. Values can be ``ONE``, ``TWO``, ``THREE``, ``QUORUM``, ``ALL``,
+``LOCAL_QUORUM``, ``EACH_QUORUM``, ``LOCAL_ONE``.
+
+.. setting:: CASSANDRA_WRITE_CONSISTENCY
+
+CASSANDRA_WRITE_CONSISTENCY
+~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+The write consistency used. Values can be ``ONE``, ``TWO``, ``THREE``, ``QUORUM``, ``ALL``,
+``LOCAL_QUORUM``, ``EACH_QUORUM``, ``LOCAL_ONE``.
+
+.. setting:: CASSANDRA_ENTRY_TTL
+
+CASSANDRA_ENTRY_TTL
+~~~~~~~~~~~~~~~~~~~
+
+Time-to-live for status entries. They will expire and be removed after that many seconds
+after adding. Default (None) means they will never expire.
+
+Example configuration
+~~~~~~~~~~~~~~~~~~~~~
+
+.. code-block:: python
+
+    CASSANDRA_SERVERS = ['localhost']
+    CASSANDRA_KEYSPACE = 'celery'
+    CASSANDRA_COLUMN_FAMILY = 'task_results'
+    CASSANDRA_READ_CONSISTENCY = 'ONE'
+    CASSANDRA_WRITE_CONSISTENCY = 'ONE'
+    CASSANDRA_ENTRY_TTL = 86400
+
+
+Cassandra backend settings
+--------------------------
+
+.. note::
+
+    The Cassandra backend requires the :mod:`pycassa` library:
+    http://pypi.python.org/pypi/pycassa/
+
+    To install the pycassa package use `pip` or `easy_install`:
 
     .. code-block:: bash
 
-        $ pip install cassandra-driver
+        $ pip install pycassa
 
 
 This backend requires the following configuration directives to be set.
@@ -628,8 +708,6 @@ Options to be passed to the `pycassa connection pool`_ (optional).
 
 .. _`pycassa connection pool`: http://pycassa.github.com/pycassa/api/pycassa/pool.html
 
-Not used in new_cassandra
-
 Example configuration
 ~~~~~~~~~~~~~~~~~~~~~