cassandra.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.backends.cassandra
  4. ~~~~~~~~~~~~~~~~~~~~~~~~~
  5. Apache Cassandra result store backend.
  6. """
  7. from __future__ import absolute_import
  8. try: # pragma: no cover
  9. import pycassa
  10. from thrift import Thrift
  11. C = pycassa.cassandra.ttypes
  12. except ImportError: # pragma: no cover
  13. pycassa = None # noqa
  14. import socket
  15. import time
  16. from celery import states
  17. from celery.exceptions import ImproperlyConfigured
  18. from celery.utils.log import get_logger
  19. from celery.utils.timeutils import maybe_timedelta, timedelta_seconds
  20. from .base import BaseBackend
  21. logger = get_logger(__name__)
  22. class CassandraBackend(BaseBackend):
  23. """Highly fault tolerant Cassandra backend.
  24. .. attribute:: servers
  25. List of Cassandra servers with format: ``hostname:port``.
  26. :raises celery.exceptions.ImproperlyConfigured: if
  27. module :mod:`pycassa` is not available.
  28. """
  29. servers = []
  30. keyspace = None
  31. column_family = None
  32. detailed_mode = False
  33. _retry_timeout = 300
  34. _retry_wait = 3
  35. def __init__(self, servers=None, keyspace=None, column_family=None,
  36. cassandra_options=None, detailed_mode=False, **kwargs):
  37. """Initialize Cassandra backend.
  38. Raises :class:`celery.exceptions.ImproperlyConfigured` if
  39. the :setting:`CASSANDRA_SERVERS` setting is not set.
  40. """
  41. super(CassandraBackend, self).__init__(**kwargs)
  42. self.expires = kwargs.get('expires') or maybe_timedelta(
  43. self.app.conf.CELERY_TASK_RESULT_EXPIRES)
  44. if not pycassa:
  45. raise ImproperlyConfigured(
  46. 'You need to install the pycassa library to use the '
  47. 'Cassandra backend. See https://github.com/pycassa/pycassa')
  48. conf = self.app.conf
  49. self.servers = (servers or
  50. conf.get('CASSANDRA_SERVERS') or
  51. self.servers)
  52. self.keyspace = (keyspace or
  53. conf.get('CASSANDRA_KEYSPACE') or
  54. self.keyspace)
  55. self.column_family = (column_family or
  56. conf.get('CASSANDRA_COLUMN_FAMILY') or
  57. self.column_family)
  58. self.cassandra_options = dict(conf.get('CASSANDRA_OPTIONS') or {},
  59. **cassandra_options or {})
  60. self.detailed_mode = (detailed_mode or
  61. conf.get('CASSANDRA_DETAILED_MODE') or
  62. self.detailed_mode)
  63. read_cons = conf.get('CASSANDRA_READ_CONSISTENCY') or 'LOCAL_QUORUM'
  64. write_cons = conf.get('CASSANDRA_WRITE_CONSISTENCY') or 'LOCAL_QUORUM'
  65. try:
  66. self.read_consistency = getattr(pycassa.ConsistencyLevel,
  67. read_cons)
  68. except AttributeError:
  69. self.read_consistency = pycassa.ConsistencyLevel.LOCAL_QUORUM
  70. try:
  71. self.write_consistency = getattr(pycassa.ConsistencyLevel,
  72. write_cons)
  73. except AttributeError:
  74. self.write_consistency = pycassa.ConsistencyLevel.LOCAL_QUORUM
  75. if not self.servers or not self.keyspace or not self.column_family:
  76. raise ImproperlyConfigured(
  77. 'Cassandra backend not configured.')
  78. self._column_family = None
  79. def _retry_on_error(self, fun, *args, **kwargs):
  80. ts = time.time() + self._retry_timeout
  81. while 1:
  82. try:
  83. return fun(*args, **kwargs)
  84. except (pycassa.InvalidRequestException,
  85. pycassa.TimedOutException,
  86. pycassa.UnavailableException,
  87. socket.error,
  88. socket.timeout,
  89. Thrift.TException), exc:
  90. if time.time() > ts:
  91. raise
  92. logger.warn('Cassandra error: %r. Retrying...', exc)
  93. time.sleep(self._retry_wait)
  94. def _get_column_family(self):
  95. if self._column_family is None:
  96. conn = pycassa.ConnectionPool(self.keyspace,
  97. server_list=self.servers,
  98. **self.cassandra_options)
  99. self._column_family = \
  100. pycassa.ColumnFamily(conn, self.column_family,
  101. read_consistency_level=self.read_consistency,
  102. write_consistency_level=self.write_consistency)
  103. return self._column_family
  104. def process_cleanup(self):
  105. if self._column_family is not None:
  106. self._column_family = None
  107. def _store_result(self, task_id, result, status, traceback=None):
  108. """Store return value and status of an executed task."""
  109. def _do_store():
  110. cf = self._get_column_family()
  111. date_done = self.app.now()
  112. meta = {'status': status,
  113. 'date_done': date_done.strftime('%Y-%m-%dT%H:%M:%SZ'),
  114. 'traceback': self.encode(traceback),
  115. 'children': self.encode(self.current_task_children())}
  116. if self.detailed_mode:
  117. meta['result'] = result
  118. cf.insert(task_id, {date_done: self.encode(meta)},
  119. ttl=timedelta_seconds(self.expires))
  120. else:
  121. meta['result'] = self.encode(result)
  122. cf.insert(task_id, meta,
  123. ttl=timedelta_seconds(self.expires))
  124. return self._retry_on_error(_do_store)
  125. def _get_task_meta_for(self, task_id):
  126. """Get task metadata for a task by id."""
  127. def _do_get():
  128. cf = self._get_column_family()
  129. try:
  130. if self.detailed_mode:
  131. row = cf.get(task_id, column_reversed=True, column_count=1)
  132. meta = self.decode(row.values()[0])
  133. meta['task_id'] = task_id
  134. else:
  135. obj = cf.get(task_id)
  136. meta = {
  137. 'task_id': task_id,
  138. 'status': obj['status'],
  139. 'result': self.decode(obj['result']),
  140. 'date_done': obj['date_done'],
  141. 'traceback': self.decode(obj['traceback']),
  142. 'children': self.decode(obj['children']),
  143. }
  144. except (KeyError, pycassa.NotFoundException):
  145. meta = {'status': states.PENDING, 'result': None}
  146. return meta
  147. return self._retry_on_error(_do_get)
  148. def __reduce__(self, args=(), kwargs={}):
  149. kwargs.update(
  150. dict(servers=self.servers,
  151. keyspace=self.keyspace,
  152. column_family=self.column_family,
  153. cassandra_options=self.cassandra_options))
  154. return super(CassandraBackend, self).__reduce__(args, kwargs)