cassandra.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. # -*- coding: utf-8 -*-
  2. """celery.backends.cassandra"""
  3. from __future__ import absolute_import
  4. try:
  5. import pycassa
  6. from thrift import Thrift
  7. C = pycassa.cassandra.ttypes
  8. except ImportError:
  9. pycassa = None
  10. import socket
  11. import time
  12. from .. import states
  13. from ..exceptions import ImproperlyConfigured
  14. from ..utils.timeutils import maybe_timedelta, timedelta_seconds
  15. from .base import BaseDictBackend
  16. class CassandraBackend(BaseDictBackend):
  17. """Highly fault tolerant Cassandra backend.
  18. .. attribute:: servers
  19. List of Cassandra servers with format: "hostname:port".
  20. :raises celery.exceptions.ImproperlyConfigured: if
  21. module :mod:`pycassa` is not available.
  22. """
  23. servers = []
  24. keyspace = None
  25. column_family = None
  26. detailed_mode = False
  27. _retry_timeout = 300
  28. _retry_wait = 3
  29. def __init__(self, servers=None, keyspace=None, column_family=None,
  30. cassandra_options=None, detailed_mode=False, **kwargs):
  31. """Initialize Cassandra backend.
  32. Raises :class:`celery.exceptions.ImproperlyConfigured` if
  33. the :setting:`CASSANDRA_SERVERS` setting is not set.
  34. """
  35. super(CassandraBackend, self).__init__(**kwargs)
  36. self.logger = self.app.log.setup_logger(
  37. name="celery.backends.cassandra")
  38. self.expires = kwargs.get("expires") or maybe_timedelta(
  39. self.app.conf.CELERY_TASK_RESULT_EXPIRES)
  40. if not pycassa:
  41. raise ImproperlyConfigured(
  42. "You need to install the pycassa library to use the "
  43. "Cassandra backend. See https://github.com/pycassa/pycassa")
  44. conf = self.app.conf
  45. self.servers = (servers or
  46. conf.get("CASSANDRA_SERVERS") or
  47. self.servers)
  48. self.keyspace = (keyspace or
  49. conf.get("CASSANDRA_KEYSPACE") or
  50. self.keyspace)
  51. self.column_family = (column_family or
  52. conf.get("CASSANDRA_COLUMN_FAMILY") or
  53. self.column_family)
  54. self.cassandra_options = dict(conf.get("CASSANDRA_OPTIONS") or {},
  55. **cassandra_options or {})
  56. self.detailed_mode = (detailed_mode or
  57. conf.get("CASSANDRA_DETAILED_MODE") or
  58. self.detailed_mode)
  59. read_cons = conf.get("CASSANDRA_READ_CONSISTENCY") or "LOCAL_QUORUM"
  60. write_cons = conf.get("CASSANDRA_WRITE_CONSISTENCY") or "LOCAL_QUORUM"
  61. try:
  62. self.read_consistency = getattr(pycassa.ConsistencyLevel,
  63. read_cons)
  64. except AttributeError:
  65. self.read_consistency = pycassa.ConsistencyLevel.LOCAL_QUORUM
  66. try:
  67. self.write_consistency = getattr(pycassa.ConsistencyLevel,
  68. write_cons)
  69. except AttributeError:
  70. self.write_consistency = pycassa.ConsistencyLevel.LOCAL_QUORUM
  71. if not self.servers or not self.keyspace or not self.column_family:
  72. raise ImproperlyConfigured(
  73. "Cassandra backend not configured.")
  74. self._column_family = None
  75. def _retry_on_error(self, fun, *args, **kwargs):
  76. ts = time.time() + self._retry_timeout
  77. while 1:
  78. try:
  79. return fun(*args, **kwargs)
  80. except (pycassa.InvalidRequestException,
  81. pycassa.TimedOutException,
  82. pycassa.UnavailableException,
  83. socket.error,
  84. socket.timeout,
  85. Thrift.TException), exc:
  86. if time.time() > ts:
  87. raise
  88. self.logger.warn('Cassandra error: %r. Retrying...', exc)
  89. time.sleep(self._retry_wait)
  90. def _get_column_family(self):
  91. if self._column_family is None:
  92. conn = pycassa.ConnectionPool(self.keyspace,
  93. server_list=self.servers,
  94. **self.cassandra_options)
  95. self._column_family = \
  96. pycassa.ColumnFamily(conn, self.column_family,
  97. read_consistency_level=self.read_consistency,
  98. write_consistency_level=self.write_consistency)
  99. return self._column_family
  100. def process_cleanup(self):
  101. if self._column_family is not None:
  102. self._column_family = None
  103. def _store_result(self, task_id, result, status, traceback=None):
  104. """Store return value and status of an executed task."""
  105. def _do_store():
  106. cf = self._get_column_family()
  107. date_done = self.app.now()
  108. meta = {"status": status,
  109. "date_done": date_done.strftime('%Y-%m-%dT%H:%M:%SZ'),
  110. "traceback": self.encode(traceback)}
  111. if self.detailed_mode:
  112. meta["result"] = result
  113. cf.insert(task_id, {date_done: self.encode(meta)},
  114. ttl=timedelta_seconds(self.expires))
  115. else:
  116. meta["result"] = self.encode(result)
  117. cf.insert(task_id, meta,
  118. ttl=timedelta_seconds(self.expires))
  119. return self._retry_on_error(_do_store)
  120. def _get_task_meta_for(self, task_id):
  121. """Get task metadata for a task by id."""
  122. def _do_get():
  123. cf = self._get_column_family()
  124. try:
  125. if self.detailed_mode:
  126. row = cf.get(task_id, column_reversed=True, column_count=1)
  127. meta = self.decode(row.values()[0])
  128. meta["task_id"] = task_id
  129. else:
  130. obj = cf.get(task_id)
  131. meta = {
  132. "task_id": task_id,
  133. "status": obj["status"],
  134. "result": self.decode(obj["result"]),
  135. "date_done": obj["date_done"],
  136. "traceback": self.decode(obj["traceback"]),
  137. }
  138. except (KeyError, pycassa.NotFoundException):
  139. meta = {"status": states.PENDING, "result": None}
  140. return meta
  141. return self._retry_on_error(_do_get)
  142. def __reduce__(self, args=(), kwargs={}):
  143. kwargs.update(
  144. dict(servers=self.servers,
  145. keyspace=self.keyspace,
  146. column_family=self.column_family,
  147. cassandra_options=self.cassandra_options))
  148. return super(CassandraBackend, self).__reduce__(args, kwargs)