Преглед на файлове

Merge branch 'dln/cassandra-backend'

Ask Solem преди 14 години
родител
ревизия
6e2c458b82
променени са 2 файла, в които са добавени 171 реда и са изтрити 0 реда
  1. 1 0
      celery/backends/__init__.py
  2. 170 0
      celery/backends/cassandra.py

+ 1 - 0
celery/backends/__init__.py

@@ -10,6 +10,7 @@ BACKEND_ALIASES = {
     "mongodb": "celery.backends.mongodb.MongoBackend",
     "tyrant": "celery.backends.tyrant.TyrantBackend",
     "database": "celery.backends.database.DatabaseBackend",
+    "cassandra": "celery.backends.cassandra.CassandraBackend",
 }
 
 _backend_cache = {}

+ 170 - 0
celery/backends/cassandra.py

@@ -0,0 +1,170 @@
+"""celery.backends.cassandra"""
+try:
+    import pycassa
+    from thrift import Thrift
+    C = __import__('cassandra').ttypes # FIXME: Namespace kludge
+except ImportError:
+    pycassa = None
+
+from datetime import datetime
+import itertools
+import random
+import socket
+import time
+
+from celery.backends.base import BaseDictBackend
+from celery import conf
+from celery.exceptions import ImproperlyConfigured
+from celery.loaders import load_settings
+from celery.log import setup_logger
+from celery.serialization import pickle
+from celery import states
+
+
+
+class CassandraBackend(BaseDictBackend):
+    """Highly fault tolerant Cassandra backend.
+
+    .. attribute:: servers
+
+        List of Cassandra servers with format: "hostname:port".
+
+    :raises celery.exceptions.ImproperlyConfigured: if
+        module :mod:`pycassa` is not available.
+
+    """
+    servers = []
+    keyspace = None
+    column_family = None
+    _retry_timeout = 300
+    _retry_wait = 3
+    _index_shards = 64
+    _index_keys = ["celery.results.index!%02x" % i for i in range(_index_shards)]
+
+    def __init__(self, servers=None, keyspace=None, column_family=None, cassandra_options=None, **kwargs):
+        """Initialize Cassandra backend.
+
+        Raises :class:`celery.exceptions.ImproperlyConfigured` if
+        :setting:`CASSANDRA_SERVERS` is not set.
+
+        """
+        self.logger = setup_logger("celery.backends.cassandra")
+
+        self.result_expires = kwargs.get("result_expires") or \
+                                conf.TASK_RESULT_EXPIRES
+
+        if not pycassa:
+            raise ImproperlyConfigured(
+                    "You need to install the pycassa library to use the "
+                    "Cassandra backend. See http://github.com/vomjom/pycassa")
+
+        settings = load_settings()
+
+        self.servers = servers or \
+                         getattr(settings, "CASSANDRA_SERVERS", self.servers)
+        self.keyspace = keyspace or \
+                          getattr(settings, "CASSANDRA_KEYSPACE",
+                                  self.keyspace)
+        self.column_family = column_family or \
+                               getattr(settings, "CASSANDRA_COLUMN_FAMILY",
+                                       self.column_family)
+        self.cassandra_options = dict(cassandra_options or {},
+                                   **getattr(settings, "CASSANDRA_OPTIONS", {}))
+        if not self.servers or not self.keyspace or not self.column_family:
+            raise ImproperlyConfigured(
+                    "Cassandra backend not configured.")
+
+        super(CassandraBackend, self).__init__()
+        self._column_family = None
+
+    def _retry_on_error(func):
+        def wrapper(*args, **kwargs):
+            self = args[0]
+            ts = time.time() + self._retry_timeout
+            while 1:
+                try:
+                    return func(*args, **kwargs)
+                except (pycassa.InvalidRequestException,
+                        pycassa.NoServerAvailable,
+                        pycassa.TimedOutException,
+                        pycassa.UnavailableException,
+                        socket.error,
+                        socket.timeout,
+                        Thrift.TException), exc:
+                    self.logger.warn('Cassandra error: %s. Retrying...' % exc)
+                    if time.time() > ts:
+                        raise
+                    time.sleep(self._retry_wait)
+        return wrapper
+
+    def _get_column_family(self):
+        if self._column_family is None:
+            conn = pycassa.connect(self.servers,
+                                   **self.cassandra_options)
+            self._column_family = \
+              pycassa.ColumnFamily(conn, self.keyspace,
+                                   self.column_family,
+                                   read_consistency_level=pycassa.ConsistencyLevel.DCQUORUM,
+                                   write_consistency_level=pycassa.ConsistencyLevel.DCQUORUM)
+        return self._column_family
+
+    def process_cleanup(self):
+        if self._column_family is not None:
+            self._column_family = None
+
+    @_retry_on_error
+    def _store_result(self, task_id, result, status, traceback=None):
+        """Store return value and status of an executed task."""
+        cf = self._get_column_family()
+        date_done = datetime.utcnow()
+        index_key = 'celery.results.index!%02x' % random.randrange(self._index_shards)
+        index_column_name = '%8x!%s' % (time.mktime(date_done.timetuple()), task_id)
+        meta = {"status": status,
+                "result": pickle.dumps(result),
+                "date_done": date_done.strftime('%Y-%m-%dT%H:%M:%SZ'),
+                "traceback": pickle.dumps(traceback)}
+        cf.insert(task_id, meta)
+        cf.insert(index_key, {index_column_name: status})
+
+    @_retry_on_error
+    def _get_task_meta_for(self, task_id):
+        """Get task metadata for a task by id."""
+        cf = self._get_column_family()
+        try:
+            obj = cf.get(task_id)
+            meta = {
+                "task_id": task_id,
+                "status": obj["status"],
+                "result": pickle.loads(str(obj["result"])),
+                "date_done": obj["date_done"],
+                "traceback": pickle.loads(str(obj["traceback"])),
+            }
+        except (KeyError, pycassa.NotFoundException):
+            meta = {"status": states.PENDING, "result": None}
+        return meta
+
+    def cleanup(self):
+        """Delete expired metadata."""
+        self.logger.debug('Running cleanup...')
+        expires = datetime.utcnow() - self.result_expires
+        end_column = '%8x"' % (time.mktime(expires.timetuple()))
+
+        cf = self._get_column_family()
+        column_parent = C.ColumnParent(cf.column_family)
+        slice_pred = C.SlicePredicate(slice_range=C.SliceRange('', end_column,
+                                                               count=2**30))
+        columns = cf.client.multiget_slice(cf.keyspace, self._index_keys,
+                                           column_parent, slice_pred,
+                                           pycassa.ConsistencyLevel.DCQUORUM)
+
+        index_cols = [c.column.name for c in itertools.chain(*columns.values())]
+        for k in self._index_keys:
+            cf.remove(k, index_cols)
+
+        task_ids = [c[9:] for c in index_cols]
+        for k in task_ids:
+            cf.remove(k)
+
+        self.logger.debug('Cleaned %i expired results' % len(task_ids))
+
+