cassandra.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. # -* coding: utf-8 -*-
  2. """
  3. celery.backends.cassandra
  4. ~~~~~~~~~~~~~~~~~~~~~~~~~
  5. Apache Cassandra result store backend using DataStax driver
  6. """
  7. from __future__ import absolute_import
  8. import sys
  9. try: # pragma: no cover
  10. import cassandra
  11. import cassandra.cluster
  12. except ImportError: # pragma: no cover
  13. cassandra = None # noqa
  14. from celery import states
  15. from celery.exceptions import ImproperlyConfigured
  16. from celery.utils.log import get_logger
  17. from .base import BaseBackend
  18. __all__ = ['CassandraBackend']
  19. logger = get_logger(__name__)
  20. E_NO_CASSANDRA = """
  21. You need to install the cassandra-driver library to
  22. use the Cassandra backend. See https://github.com/datastax/python-driver
  23. """
  24. Q_INSERT_RESULT = """
  25. INSERT INTO {table} (
  26. task_id, status, result, date_done, traceback, children) VALUES (
  27. %s, %s, %s, %s, %s, %s) {expires};
  28. """
  29. Q_SELECT_RESULT = """
  30. SELECT status, result, date_done, traceback, children
  31. FROM {table}
  32. WHERE task_id=%s
  33. LIMIT 1
  34. """
  35. Q_CREATE_RESULT_TABLE = """
  36. CREATE TABLE {table} (
  37. task_id text,
  38. status text,
  39. result blob,
  40. date_done timestamp,
  41. traceback blob,
  42. children blob,
  43. PRIMARY KEY ((task_id), date_done)
  44. ) WITH CLUSTERING ORDER BY (date_done DESC);
  45. """
  46. Q_EXPIRES = """
  47. USING TTL {0}
  48. """
  49. if sys.version_info[0] == 3:
  50. def buf_t(x):
  51. return bytes(x, 'utf8')
  52. else:
  53. buf_t = buffer # noqa
  54. class CassandraBackend(BaseBackend):
  55. """Cassandra backend utilizing DataStax driver
  56. :raises celery.exceptions.ImproperlyConfigured: if
  57. module :mod:`cassandra` is not available.
  58. """
  59. #: List of Cassandra servers with format: ``hostname``.
  60. servers = None
  61. supports_autoexpire = True # autoexpire supported via entry_ttl
  62. def __init__(self, servers=None, keyspace=None, table=None, entry_ttl=None,
  63. port=9042, **kwargs):
  64. """Initialize Cassandra backend.
  65. Raises :class:`celery.exceptions.ImproperlyConfigured` if
  66. the :setting:`cassandra_servers` setting is not set.
  67. """
  68. super(CassandraBackend, self).__init__(**kwargs)
  69. if not cassandra:
  70. raise ImproperlyConfigured(E_NO_CASSANDRA)
  71. conf = self.app.conf
  72. self.servers = (servers or
  73. conf.get('cassandra_servers', None))
  74. self.port = (port or
  75. conf.get('cassandra_port', None))
  76. self.keyspace = (keyspace or
  77. conf.get('cassandra_keyspace', None))
  78. self.table = (table or
  79. conf.get('cassandra_table', None))
  80. if not self.servers or not self.keyspace or not self.table:
  81. raise ImproperlyConfigured('Cassandra backend not configured.')
  82. expires = (entry_ttl or conf.get('cassandra_entry_ttl', None))
  83. self.cqlexpires = (Q_EXPIRES.format(expires)
  84. if expires is not None else '')
  85. read_cons = conf.get('cassandra_read_consistency') or 'LOCAL_QUORUM'
  86. write_cons = conf.get('cassandra_write_consistency') or 'LOCAL_QUORUM'
  87. self.read_consistency = getattr(
  88. cassandra.ConsistencyLevel, read_cons,
  89. cassandra.ConsistencyLevel.LOCAL_QUORUM,
  90. )
  91. self.write_consistency = getattr(
  92. cassandra.ConsistencyLevel, write_cons,
  93. cassandra.ConsistencyLevel.LOCAL_QUORUM,
  94. )
  95. self._connection = None
  96. self._session = None
  97. self._write_stmt = None
  98. self._read_stmt = None
  99. def process_cleanup(self):
  100. if self._connection is not None:
  101. self._connection = None
  102. if self._session is not None:
  103. self._session.shutdown()
  104. self._session = None
  105. def _get_connection(self, write=False):
  106. """Prepare the connection for action
  107. :param write: bool - are we a writer?
  108. """
  109. if self._connection is None:
  110. try:
  111. self._connection = cassandra.cluster.Cluster(self.servers,
  112. port=self.port)
  113. self._session = self._connection.connect(self.keyspace)
  114. # We are forced to do concatenation below, as formatting would
  115. # blow up on superficial %s that will be processed by Cassandra
  116. self._write_stmt = cassandra.query.SimpleStatement(
  117. Q_INSERT_RESULT.format(
  118. table=self.table, expires=self.cqlexpires),
  119. )
  120. self._write_stmt.consistency_level = self.write_consistency
  121. self._read_stmt = cassandra.query.SimpleStatement(
  122. Q_SELECT_RESULT.format(table=self.table),
  123. )
  124. self._read_stmt.consistency_level = self.read_consistency
  125. if write:
  126. # Only possible writers "workers" are allowed to issue
  127. # CREATE TABLE. This is to prevent conflicting situations
  128. # where both task-creator and task-executor would issue it
  129. # at the same time.
  130. # Anyway; if you're doing anything critical, you should
  131. # have created this table in advance, in which case
  132. # this query will be a no-op (AlreadyExists)
  133. self._make_stmt = cassandra.query.SimpleStatement(
  134. Q_CREATE_RESULT_TABLE.format(table=self.table),
  135. )
  136. self._make_stmt.consistency_level = self.write_consistency
  137. try:
  138. self._session.execute(self._make_stmt)
  139. except cassandra.AlreadyExists:
  140. pass
  141. except cassandra.OperationTimedOut:
  142. # a heavily loaded or gone Cassandra cluster failed to respond.
  143. # leave this class in a consistent state
  144. self._connection = None
  145. if self._session is not None:
  146. self._session.shutdown()
  147. raise # we did fail after all - reraise
  148. def _store_result(self, task_id, result, status,
  149. traceback=None, request=None, **kwargs):
  150. """Store return value and status of an executed task."""
  151. self._get_connection(write=True)
  152. self._session.execute(self._write_stmt, (
  153. task_id,
  154. status,
  155. buf_t(self.encode(result)),
  156. self.app.now(),
  157. buf_t(self.encode(traceback)),
  158. buf_t(self.encode(self.current_task_children(request)))
  159. ))
  160. def _get_task_meta_for(self, task_id):
  161. """Get task metadata for a task by id."""
  162. self._get_connection()
  163. res = self._session.execute(self._read_stmt, (task_id, ))
  164. if not res:
  165. return {'status': states.PENDING, 'result': None}
  166. status, result, date_done, traceback, children = res[0]
  167. return self.meta_from_decoded({
  168. 'task_id': task_id,
  169. 'status': status,
  170. 'result': self.decode(result),
  171. 'date_done': date_done.strftime('%Y-%m-%dT%H:%M:%SZ'),
  172. 'traceback': self.decode(traceback),
  173. 'children': self.decode(children),
  174. })
  175. def __reduce__(self, args=(), kwargs={}):
  176. kwargs.update(
  177. dict(servers=self.servers,
  178. keyspace=self.keyspace,
  179. table=self.table))
  180. return super(CassandraBackend, self).__reduce__(args, kwargs)