cassandra.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. # -*- coding: utf-8 -*-
  2. """celery.backends.cassandra"""
  3. from __future__ import absolute_import
  4. try:
  5. import pycassa
  6. from thrift import Thrift
  7. C = pycassa.cassandra.ttypes
  8. except ImportError:
  9. pycassa = None
  10. import socket
  11. import time
  12. from datetime import datetime
  13. from .. import states
  14. from ..exceptions import ImproperlyConfigured
  15. from ..utils.timeutils import maybe_timedelta, timedelta_seconds
  16. from .base import BaseDictBackend
  17. class CassandraBackend(BaseDictBackend):
  18. """Highly fault tolerant Cassandra backend.
  19. .. attribute:: servers
  20. List of Cassandra servers with format: "hostname:port".
  21. :raises celery.exceptions.ImproperlyConfigured: if
  22. module :mod:`pycassa` is not available.
  23. """
  24. servers = []
  25. keyspace = None
  26. column_family = None
  27. _retry_timeout = 300
  28. _retry_wait = 3
  29. def __init__(self, servers=None, keyspace=None, column_family=None,
  30. cassandra_options=None, **kwargs):
  31. """Initialize Cassandra backend.
  32. Raises :class:`celery.exceptions.ImproperlyConfigured` if
  33. the :setting:`CASSANDRA_SERVERS` setting is not set.
  34. """
  35. super(CassandraBackend, self).__init__(**kwargs)
  36. self.logger = self.app.log.setup_logger(
  37. name="celery.backends.cassandra")
  38. self.expires = kwargs.get("expires") or maybe_timedelta(
  39. self.app.conf.CELERY_TASK_RESULT_EXPIRES)
  40. if not pycassa:
  41. raise ImproperlyConfigured(
  42. "You need to install the pycassa library to use the "
  43. "Cassandra backend. See https://github.com/pycassa/pycassa")
  44. self.servers = servers or \
  45. self.app.conf.get("CASSANDRA_SERVERS", self.servers)
  46. self.keyspace = keyspace or \
  47. self.app.conf.get("CASSANDRA_KEYSPACE",
  48. self.keyspace)
  49. self.column_family = column_family or \
  50. self.app.conf.get("CASSANDRA_COLUMN_FAMILY",
  51. self.column_family)
  52. self.cassandra_options = dict(cassandra_options or {},
  53. **self.app.conf.get("CASSANDRA_OPTIONS",
  54. {}))
  55. read_cons = self.app.conf.get("CASSANDRA_READ_CONSISTENCY",
  56. "LOCAL_QUORUM")
  57. write_cons = self.app.conf.get("CASSANDRA_WRITE_CONSISTENCY",
  58. "LOCAL_QUORUM")
  59. try:
  60. self.read_consistency = getattr(pycassa.ConsistencyLevel,
  61. read_cons)
  62. except AttributeError:
  63. self.read_consistency = pycassa.ConsistencyLevel.LOCAL_QUORUM
  64. try:
  65. self.write_consistency = getattr(pycassa.ConsistencyLevel,
  66. write_cons)
  67. except AttributeError:
  68. self.write_consistency = pycassa.ConsistencyLevel.LOCAL_QUORUM
  69. if not self.servers or not self.keyspace or not self.column_family:
  70. raise ImproperlyConfigured(
  71. "Cassandra backend not configured.")
  72. self._column_family = None
  73. def _retry_on_error(self, fun, *args, **kwargs):
  74. ts = time.time() + self._retry_timeout
  75. while 1:
  76. try:
  77. return fun(*args, **kwargs)
  78. except (pycassa.InvalidRequestException,
  79. pycassa.TimedOutException,
  80. pycassa.UnavailableException,
  81. socket.error,
  82. socket.timeout,
  83. Thrift.TException), exc:
  84. if time.time() > ts:
  85. raise
  86. self.logger.warn('Cassandra error: %r. Retrying...', exc)
  87. time.sleep(self._retry_wait)
  88. def _get_column_family(self):
  89. if self._column_family is None:
  90. conn = pycassa.connect(self.keyspace, servers=self.servers,
  91. **self.cassandra_options)
  92. self._column_family = \
  93. pycassa.ColumnFamily(conn, self.column_family,
  94. read_consistency_level=self.read_consistency,
  95. write_consistency_level=self.write_consistency)
  96. return self._column_family
  97. def process_cleanup(self):
  98. if self._column_family is not None:
  99. self._column_family = None
  100. def _store_result(self, task_id, result, status, traceback=None):
  101. """Store return value and status of an executed task."""
  102. def _do_store():
  103. cf = self._get_column_family()
  104. date_done = datetime.utcnow()
  105. meta = {"status": status,
  106. "result": self.encode(result),
  107. "date_done": date_done.strftime('%Y-%m-%dT%H:%M:%SZ'),
  108. "traceback": self.encode(traceback)}
  109. cf.insert(task_id, meta,
  110. ttl=timedelta_seconds(self.expires))
  111. return self._retry_on_error(_do_store)
  112. def _get_task_meta_for(self, task_id):
  113. """Get task metadata for a task by id."""
  114. def _do_get():
  115. cf = self._get_column_family()
  116. try:
  117. obj = cf.get(task_id)
  118. meta = {
  119. "task_id": task_id,
  120. "status": obj["status"],
  121. "result": self.decode(obj["result"]),
  122. "date_done": obj["date_done"],
  123. "traceback": self.decode(obj["traceback"]),
  124. }
  125. except (KeyError, pycassa.NotFoundException):
  126. meta = {"status": states.PENDING, "result": None}
  127. return meta
  128. return self._retry_on_error(_do_get)
  129. def __reduce__(self, args=(), kwargs={}):
  130. kwargs.update(
  131. dict(servers=self.servers,
  132. keyspace=self.keyspace,
  133. column_family=self.column_family,
  134. cassandra_options=self.cassandra_options))
  135. return super(CassandraBackend, self).__reduce__(args, kwargs)