mongodb.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.backends.mongodb
  4. ~~~~~~~~~~~~~~~~~~~~~~~
  5. MongoDB result store backend.
  6. """
  7. from __future__ import absolute_import
  8. from datetime import datetime
  9. try:
  10. import pymongo
  11. except ImportError: # pragma: no cover
  12. pymongo = None # noqa
  13. if pymongo:
  14. try:
  15. from bson.binary import Binary
  16. except ImportError: # pragma: no cover
  17. from pymongo.binary import Binary # noqa
  18. else: # pragma: no cover
  19. Binary = None # noqa
  20. from kombu.utils import cached_property
  21. from celery import states
  22. from celery.exceptions import ImproperlyConfigured
  23. from celery.five import string_t
  24. from celery.utils.timeutils import maybe_timedelta
  25. from .base import BaseBackend
  26. class Bunch(object):
  27. def __init__(self, **kw):
  28. self.__dict__.update(kw)
  29. class MongoBackend(BaseBackend):
  30. mongodb_host = 'localhost'
  31. mongodb_port = 27017
  32. mongodb_user = None
  33. mongodb_password = None
  34. mongodb_database = 'celery'
  35. mongodb_taskmeta_collection = 'celery_taskmeta'
  36. mongodb_max_pool_size = 10
  37. supports_autoexpire = False
  38. def __init__(self, *args, **kwargs):
  39. """Initialize MongoDB backend instance.
  40. :raises celery.exceptions.ImproperlyConfigured: if
  41. module :mod:`pymongo` is not available.
  42. """
  43. super(MongoBackend, self).__init__(*args, **kwargs)
  44. self.expires = kwargs.get('expires') or maybe_timedelta(
  45. self.app.conf.CELERY_TASK_RESULT_EXPIRES)
  46. if not pymongo:
  47. raise ImproperlyConfigured(
  48. 'You need to install the pymongo library to use the '
  49. 'MongoDB backend.')
  50. config = self.app.conf.get('CELERY_MONGODB_BACKEND_SETTINGS', None)
  51. if config is not None:
  52. if not isinstance(config, dict):
  53. raise ImproperlyConfigured(
  54. 'MongoDB backend settings should be grouped in a dict')
  55. self.mongodb_host = config.get('host', self.mongodb_host)
  56. self.mongodb_port = int(config.get('port', self.mongodb_port))
  57. self.mongodb_user = config.get('user', self.mongodb_user)
  58. self.mongodb_options = config.get('options', {})
  59. self.mongodb_password = config.get(
  60. 'password', self.mongodb_password)
  61. self.mongodb_database = config.get(
  62. 'database', self.mongodb_database)
  63. self.mongodb_taskmeta_collection = config.get(
  64. 'taskmeta_collection', self.mongodb_taskmeta_collection)
  65. self.mongodb_max_pool_size = config.get(
  66. 'max_pool_size', self.mongodb_max_pool_size)
  67. self._connection = None
  68. def _get_connection(self):
  69. """Connect to the MongoDB server."""
  70. if self._connection is None:
  71. from pymongo.connection import Connection
  72. # The first pymongo.Connection() argument (host) can be
  73. # a list of ['host:port'] elements or a mongodb connection
  74. # URI. If this is the case, don't use self.mongodb_port
  75. # but let pymongo get the port(s) from the URI instead.
  76. # This enables the use of replica sets and sharding.
  77. # See pymongo.Connection() for more info.
  78. args = [self.mongodb_host]
  79. kwargs = {
  80. 'max_pool_size': self.mongodb_max_pool_size,
  81. 'ssl': self.app.conf.BROKER_USE_SSL
  82. }
  83. if isinstance(self.mongodb_host, string_t) \
  84. and not self.mongodb_host.startswith('mongodb://'):
  85. args.append(self.mongodb_port)
  86. self._connection = Connection(
  87. *args, **dict(kwargs, **self.mongodb_options or {})
  88. )
  89. return self._connection
  90. def process_cleanup(self):
  91. if self._connection is not None:
  92. # MongoDB connection will be closed automatically when object
  93. # goes out of scope
  94. self._connection = None
  95. def _store_result(self, task_id, result, status, traceback=None):
  96. """Store return value and status of an executed task."""
  97. meta = {'_id': task_id,
  98. 'status': status,
  99. 'result': Binary(self.encode(result)),
  100. 'date_done': datetime.utcnow(),
  101. 'traceback': Binary(self.encode(traceback)),
  102. 'children': Binary(self.encode(self.current_task_children()))}
  103. self.collection.save(meta, safe=True)
  104. return result
  105. def _get_task_meta_for(self, task_id):
  106. """Get task metadata for a task by id."""
  107. obj = self.collection.find_one({'_id': task_id})
  108. if not obj:
  109. return {'status': states.PENDING, 'result': None}
  110. meta = {
  111. 'task_id': obj['_id'],
  112. 'status': obj['status'],
  113. 'result': self.decode(obj['result']),
  114. 'date_done': obj['date_done'],
  115. 'traceback': self.decode(obj['traceback']),
  116. 'children': self.decode(obj['children']),
  117. }
  118. return meta
  119. def _save_group(self, group_id, result):
  120. """Save the group result."""
  121. meta = {'_id': group_id,
  122. 'result': Binary(self.encode(result)),
  123. 'date_done': datetime.utcnow()}
  124. self.collection.save(meta, safe=True)
  125. return result
  126. def _restore_group(self, group_id):
  127. """Get the result for a group by id."""
  128. obj = self.collection.find_one({'_id': group_id})
  129. if not obj:
  130. return
  131. meta = {
  132. 'task_id': obj['_id'],
  133. 'result': self.decode(obj['result']),
  134. 'date_done': obj['date_done'],
  135. }
  136. return meta
  137. def _delete_group(self, group_id):
  138. """Delete a group by id."""
  139. self.collection.remove({'_id': group_id})
  140. def _forget(self, task_id):
  141. """
  142. Remove result from MongoDB.
  143. :raises celery.exceptions.OperationsError: if the task_id could not be
  144. removed.
  145. """
  146. # By using safe=True, this will wait until it receives a response from
  147. # the server. Likewise, it will raise an OperationsError if the
  148. # response was unable to be completed.
  149. self.collection.remove({'_id': task_id}, safe=True)
  150. def cleanup(self):
  151. """Delete expired metadata."""
  152. self.collection.remove(
  153. {'date_done': {'$lt': self.app.now() - self.expires}},
  154. )
  155. def __reduce__(self, args=(), kwargs={}):
  156. kwargs.update(
  157. dict(expires=self.expires))
  158. return super(MongoBackend, self).__reduce__(args, kwargs)
  159. def _get_database(self):
  160. conn = self._get_connection()
  161. db = conn[self.mongodb_database]
  162. if self.mongodb_user and self.mongodb_password:
  163. if not db.authenticate(self.mongodb_user,
  164. self.mongodb_password):
  165. raise ImproperlyConfigured(
  166. 'Invalid MongoDB username or password.')
  167. return db
  168. @cached_property
  169. def database(self):
  170. """Get database from MongoDB connection and perform authentication
  171. if necessary."""
  172. return self._get_database()
  173. @cached_property
  174. def collection(self):
  175. """Get the metadata task collection."""
  176. collection = self.database[self.mongodb_taskmeta_collection]
  177. # Ensure an index on date_done is there, if not process the index
  178. # in the background. Once completed cleanup will be much faster
  179. collection.ensure_index('date_done', background='true')
  180. return collection