cassandra.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. # -* coding: utf-8 -*-
  2. """Apache Cassandra result store backend using the DataStax driver."""
  3. from __future__ import absolute_import, unicode_literals
  4. import sys
  5. try: # pragma: no cover
  6. import cassandra
  7. import cassandra.auth
  8. import cassandra.cluster
  9. except ImportError: # pragma: no cover
  10. cassandra = None # noqa
  11. from celery import states
  12. from celery.exceptions import ImproperlyConfigured
  13. from celery.utils.log import get_logger
  14. from .base import BaseBackend
  15. __all__ = ['CassandraBackend']
  16. logger = get_logger(__name__)
  17. E_NO_CASSANDRA = """
  18. You need to install the cassandra-driver library to
  19. use the Cassandra backend. See https://github.com/datastax/python-driver
  20. """
  21. E_NO_SUCH_CASSANDRA_AUTH_PROVIDER = """
  22. CASSANDRA_AUTH_PROVIDER you provided is not a valid auth_provider class.
  23. See https://datastax.github.io/python-driver/api/cassandra/auth.html.
  24. """
  25. Q_INSERT_RESULT = """
  26. INSERT INTO {table} (
  27. task_id, status, result, date_done, traceback, children) VALUES (
  28. %s, %s, %s, %s, %s, %s) {expires};
  29. """
  30. Q_SELECT_RESULT = """
  31. SELECT status, result, date_done, traceback, children
  32. FROM {table}
  33. WHERE task_id=%s
  34. LIMIT 1
  35. """
  36. Q_CREATE_RESULT_TABLE = """
  37. CREATE TABLE {table} (
  38. task_id text,
  39. status text,
  40. result blob,
  41. date_done timestamp,
  42. traceback blob,
  43. children blob,
  44. PRIMARY KEY ((task_id), date_done)
  45. ) WITH CLUSTERING ORDER BY (date_done DESC);
  46. """
  47. Q_EXPIRES = """
  48. USING TTL {0}
  49. """
  50. if sys.version_info[0] == 3:
  51. def buf_t(x):
  52. return bytes(x, 'utf8')
  53. else:
  54. buf_t = buffer # noqa
  55. class CassandraBackend(BaseBackend):
  56. """Cassandra backend utilizing DataStax driver
  57. Raises:
  58. celery.exceptions.ImproperlyConfigured:
  59. if module :pypi:`cassandra-driver` is not available,
  60. or if the :setting:`cassandra_servers` setting is not set.
  61. """
  62. #: List of Cassandra servers with format: ``hostname``.
  63. servers = None
  64. supports_autoexpire = True # autoexpire supported via entry_ttl
  65. def __init__(self, servers=None, keyspace=None, table=None, entry_ttl=None,
  66. port=9042, **kwargs):
  67. super(CassandraBackend, self).__init__(**kwargs)
  68. if not cassandra:
  69. raise ImproperlyConfigured(E_NO_CASSANDRA)
  70. conf = self.app.conf
  71. self.servers = servers or conf.get('cassandra_servers', None)
  72. self.port = port or conf.get('cassandra_port', None)
  73. self.keyspace = keyspace or conf.get('cassandra_keyspace', None)
  74. self.table = table or conf.get('cassandra_table', None)
  75. if not self.servers or not self.keyspace or not self.table:
  76. raise ImproperlyConfigured('Cassandra backend not configured.')
  77. expires = entry_ttl or conf.get('cassandra_entry_ttl', None)
  78. self.cqlexpires = (
  79. Q_EXPIRES.format(expires) if expires is not None else '')
  80. read_cons = conf.get('cassandra_read_consistency') or 'LOCAL_QUORUM'
  81. write_cons = conf.get('cassandra_write_consistency') or 'LOCAL_QUORUM'
  82. self.read_consistency = getattr(
  83. cassandra.ConsistencyLevel, read_cons,
  84. cassandra.ConsistencyLevel.LOCAL_QUORUM)
  85. self.write_consistency = getattr(
  86. cassandra.ConsistencyLevel, write_cons,
  87. cassandra.ConsistencyLevel.LOCAL_QUORUM)
  88. self.auth_provider = None
  89. auth_provider = conf.get('cassandra_auth_provider', None)
  90. auth_kwargs = conf.get('cassandra_auth_kwargs', None)
  91. if auth_provider and auth_kwargs:
  92. auth_provider_class = getattr(cassandra.auth, auth_provider, None)
  93. if not auth_provider_class:
  94. raise ImproperlyConfigured(E_NO_SUCH_CASSANDRA_AUTH_PROVIDER)
  95. self.auth_provider = auth_provider_class(**auth_kwargs)
  96. self._connection = None
  97. self._session = None
  98. self._write_stmt = None
  99. self._read_stmt = None
  100. self._make_stmt = None
  101. def process_cleanup(self):
  102. if self._connection is not None:
  103. self._connection.shutdown() # also shuts down _session
  104. self._connection = None
  105. self._session = None
  106. def _get_connection(self, write=False):
  107. """Prepare the connection for action
  108. Arguments:
  109. write (bool): are we a writer?
  110. """
  111. if self._connection is not None:
  112. return
  113. try:
  114. self._connection = cassandra.cluster.Cluster(
  115. self.servers, port=self.port,
  116. auth_provider=self.auth_provider)
  117. self._session = self._connection.connect(self.keyspace)
  118. # We're forced to do concatenation below, as formatting would
  119. # blow up on superficial %s that'll be processed by Cassandra
  120. self._write_stmt = cassandra.query.SimpleStatement(
  121. Q_INSERT_RESULT.format(
  122. table=self.table, expires=self.cqlexpires),
  123. )
  124. self._write_stmt.consistency_level = self.write_consistency
  125. self._read_stmt = cassandra.query.SimpleStatement(
  126. Q_SELECT_RESULT.format(table=self.table),
  127. )
  128. self._read_stmt.consistency_level = self.read_consistency
  129. if write:
  130. # Only possible writers "workers" are allowed to issue
  131. # CREATE TABLE. This is to prevent conflicting situations
  132. # where both task-creator and task-executor would issue it
  133. # at the same time.
  134. # Anyway; if you're doing anything critical, you should
  135. # have created this table in advance, in which case
  136. # this query will be a no-op (AlreadyExists)
  137. self._make_stmt = cassandra.query.SimpleStatement(
  138. Q_CREATE_RESULT_TABLE.format(table=self.table),
  139. )
  140. self._make_stmt.consistency_level = self.write_consistency
  141. try:
  142. self._session.execute(self._make_stmt)
  143. except cassandra.AlreadyExists:
  144. pass
  145. except cassandra.OperationTimedOut:
  146. # a heavily loaded or gone Cassandra cluster failed to respond.
  147. # leave this class in a consistent state
  148. if self._connection is not None:
  149. self._connection.shutdown() # also shuts down _session
  150. self._connection = None
  151. self._session = None
  152. raise # we did fail after all - reraise
  153. def _store_result(self, task_id, result, state,
  154. traceback=None, request=None, **kwargs):
  155. """Store return value and state of an executed task."""
  156. self._get_connection(write=True)
  157. self._session.execute(self._write_stmt, (
  158. task_id,
  159. state,
  160. buf_t(self.encode(result)),
  161. self.app.now(),
  162. buf_t(self.encode(traceback)),
  163. buf_t(self.encode(self.current_task_children(request)))
  164. ))
  165. def as_uri(self, include_password=True):
  166. return 'cassandra://'
  167. def _get_task_meta_for(self, task_id):
  168. """Get task meta-data for a task by id."""
  169. self._get_connection()
  170. res = self._session.execute(self._read_stmt, (task_id, ))
  171. if not res:
  172. return {'status': states.PENDING, 'result': None}
  173. status, result, date_done, traceback, children = res[0]
  174. return self.meta_from_decoded({
  175. 'task_id': task_id,
  176. 'status': status,
  177. 'result': self.decode(result),
  178. 'date_done': date_done.strftime('%Y-%m-%dT%H:%M:%SZ'),
  179. 'traceback': self.decode(traceback),
  180. 'children': self.decode(children),
  181. })
  182. def __reduce__(self, args=(), kwargs={}):
  183. kwargs.update(
  184. dict(servers=self.servers,
  185. keyspace=self.keyspace,
  186. table=self.table))
  187. return super(CassandraBackend, self).__reduce__(args, kwargs)