mongodb.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313
  1. # -*- coding: utf-8 -*-
  2. """
  3. ``celery.backends.mongodb``
  4. ~~~~~~~~~~~~~~~~~~~~~~~~~~~
  5. MongoDB result store backend.
  6. """
  7. from __future__ import absolute_import, unicode_literals
  8. from datetime import datetime, timedelta
  9. from kombu.utils import cached_property
  10. from kombu.utils.url import maybe_sanitize_url
  11. from kombu.exceptions import EncodeError
  12. from celery import states
  13. from celery.exceptions import ImproperlyConfigured
  14. from celery.five import string_t, items
  15. from .base import BaseBackend
  16. try:
  17. import pymongo
  18. except ImportError: # pragma: no cover
  19. pymongo = None # noqa
  20. if pymongo:
  21. try:
  22. from bson.binary import Binary
  23. except ImportError: # pragma: no cover
  24. from pymongo.binary import Binary # noqa
  25. from pymongo.errors import InvalidDocument # noqa
  26. else: # pragma: no cover
  27. Binary = None # noqa
  28. class InvalidDocument(Exception): # noqa
  29. pass
  30. __all__ = ['MongoBackend']
  31. class MongoBackend(BaseBackend):
  32. """MongoDB result backend.
  33. :raises celery.exceptions.ImproperlyConfigured: if
  34. module :pypi:`pymongo` is not available.
  35. """
  36. mongo_host = None
  37. host = 'localhost'
  38. port = 27017
  39. user = None
  40. password = None
  41. database_name = 'celery'
  42. taskmeta_collection = 'celery_taskmeta'
  43. groupmeta_collection = 'celery_groupmeta'
  44. max_pool_size = 10
  45. options = None
  46. supports_autoexpire = False
  47. _connection = None
  48. def __init__(self, app=None, **kwargs):
  49. self.options = {}
  50. super(MongoBackend, self).__init__(app, **kwargs)
  51. if not pymongo:
  52. raise ImproperlyConfigured(
  53. 'You need to install the pymongo library to use the '
  54. 'MongoDB backend.')
  55. # Set option defaults
  56. for key, value in items(self._prepare_client_options()):
  57. self.options.setdefault(key, value)
  58. # update conf with mongo uri data, only if uri was given
  59. if self.url:
  60. if self.url == 'mongodb://':
  61. self.url += 'localhost'
  62. uri_data = pymongo.uri_parser.parse_uri(self.url)
  63. # build the hosts list to create a mongo connection
  64. hostslist = [
  65. '{0}:{1}'.format(x[0], x[1]) for x in uri_data['nodelist']
  66. ]
  67. self.user = uri_data['username']
  68. self.password = uri_data['password']
  69. self.mongo_host = hostslist
  70. if uri_data['database']:
  71. # if no database is provided in the uri, use default
  72. self.database_name = uri_data['database']
  73. self.options.update(uri_data['options'])
  74. # update conf with specific settings
  75. config = self.app.conf.get('mongodb_backend_settings')
  76. if config is not None:
  77. if not isinstance(config, dict):
  78. raise ImproperlyConfigured(
  79. 'MongoDB backend settings should be grouped in a dict')
  80. config = dict(config) # do not modify original
  81. if 'host' in config or 'port' in config:
  82. # these should take over uri conf
  83. self.mongo_host = None
  84. self.host = config.pop('host', self.host)
  85. self.port = config.pop('port', self.port)
  86. self.mongo_host = config.pop('mongo_host', self.mongo_host)
  87. self.user = config.pop('user', self.user)
  88. self.password = config.pop('password', self.password)
  89. self.database_name = config.pop('database', self.database_name)
  90. self.taskmeta_collection = config.pop(
  91. 'taskmeta_collection', self.taskmeta_collection,
  92. )
  93. self.groupmeta_collection = config.pop(
  94. 'groupmeta_collection', self.groupmeta_collection,
  95. )
  96. self.options.update(config.pop('options', {}))
  97. self.options.update(config)
  98. def _prepare_client_options(self):
  99. if pymongo.version_tuple >= (3,):
  100. return {'maxPoolSize': self.max_pool_size}
  101. else: # pragma: no cover
  102. return {'max_pool_size': self.max_pool_size,
  103. 'auto_start_request': False}
  104. def _get_connection(self):
  105. """Connect to the MongoDB server."""
  106. if self._connection is None:
  107. from pymongo import MongoClient
  108. host = self.mongo_host
  109. if not host:
  110. # The first pymongo.Connection() argument (host) can be
  111. # a list of ['host:port'] elements or a mongodb connection
  112. # URI. If this is the case, don't use self.port
  113. # but let pymongo get the port(s) from the URI instead.
  114. # This enables the use of replica sets and sharding.
  115. # See pymongo.Connection() for more info.
  116. host = self.host
  117. if isinstance(host, string_t) \
  118. and not host.startswith('mongodb://'):
  119. host = 'mongodb://{0}:{1}'.format(host, self.port)
  120. # don't change self.options
  121. conf = dict(self.options)
  122. conf['host'] = host
  123. self._connection = MongoClient(**conf)
  124. return self._connection
  125. def encode(self, data):
  126. if self.serializer == 'bson':
  127. # mongodb handles serialization
  128. return data
  129. return super(MongoBackend, self).encode(data)
  130. def decode(self, data):
  131. if self.serializer == 'bson':
  132. return data
  133. return super(MongoBackend, self).decode(data)
  134. def _store_result(self, task_id, result, state,
  135. traceback=None, request=None, **kwargs):
  136. """Store return value and state of an executed task."""
  137. meta = {'_id': task_id,
  138. 'status': state,
  139. 'result': self.encode(result),
  140. 'date_done': datetime.utcnow(),
  141. 'traceback': self.encode(traceback),
  142. 'children': self.encode(
  143. self.current_task_children(request),
  144. )}
  145. try:
  146. self.collection.save(meta)
  147. except InvalidDocument as exc:
  148. raise EncodeError(exc)
  149. return result
  150. def _get_task_meta_for(self, task_id):
  151. """Get task meta-data for a task by id."""
  152. obj = self.collection.find_one({'_id': task_id})
  153. if obj:
  154. return self.meta_from_decoded({
  155. 'task_id': obj['_id'],
  156. 'status': obj['status'],
  157. 'result': self.decode(obj['result']),
  158. 'date_done': obj['date_done'],
  159. 'traceback': self.decode(obj['traceback']),
  160. 'children': self.decode(obj['children']),
  161. })
  162. return {'status': states.PENDING, 'result': None}
  163. def _save_group(self, group_id, result):
  164. """Save the group result."""
  165. task_ids = [i.id for i in result]
  166. meta = {'_id': group_id,
  167. 'result': self.encode(task_ids),
  168. 'date_done': datetime.utcnow()}
  169. self.group_collection.save(meta)
  170. return result
  171. def _restore_group(self, group_id):
  172. """Get the result for a group by id."""
  173. obj = self.group_collection.find_one({'_id': group_id})
  174. if obj:
  175. tasks = [self.app.AsyncResult(task)
  176. for task in self.decode(obj['result'])]
  177. return {
  178. 'task_id': obj['_id'],
  179. 'result': tasks,
  180. 'date_done': obj['date_done'],
  181. }
  182. def _delete_group(self, group_id):
  183. """Delete a group by id."""
  184. self.group_collection.remove({'_id': group_id})
  185. def _forget(self, task_id):
  186. """Remove result from MongoDB.
  187. :raises celery.exceptions.OperationsError:
  188. if the task_id could not be removed.
  189. """
  190. # By using safe=True, this will wait until it receives a response from
  191. # the server. Likewise, it will raise an OperationsError if the
  192. # response was unable to be completed.
  193. self.collection.remove({'_id': task_id})
  194. def cleanup(self):
  195. """Delete expired meta-data."""
  196. self.collection.remove(
  197. {'date_done': {'$lt': self.app.now() - self.expires_delta}},
  198. )
  199. self.group_collection.remove(
  200. {'date_done': {'$lt': self.app.now() - self.expires_delta}},
  201. )
  202. def __reduce__(self, args=(), kwargs={}):
  203. return super(MongoBackend, self).__reduce__(
  204. args, dict(kwargs, expires=self.expires, url=self.url),
  205. )
  206. def _get_database(self):
  207. conn = self._get_connection()
  208. db = conn[self.database_name]
  209. if self.user and self.password:
  210. if not db.authenticate(self.user,
  211. self.password):
  212. raise ImproperlyConfigured(
  213. 'Invalid MongoDB username or password.')
  214. return db
  215. @cached_property
  216. def database(self):
  217. """Get database from MongoDB connection and perform authentication
  218. if necessary."""
  219. return self._get_database()
  220. @cached_property
  221. def collection(self):
  222. """Get the meta-data task collection."""
  223. collection = self.database[self.taskmeta_collection]
  224. # Ensure an index on date_done is there, if not process the index
  225. # in the background. Once completed cleanup will be much faster
  226. collection.ensure_index('date_done', background='true')
  227. return collection
  228. @cached_property
  229. def group_collection(self):
  230. """Get the meta-data task collection."""
  231. collection = self.database[self.groupmeta_collection]
  232. # Ensure an index on date_done is there, if not process the index
  233. # in the background. Once completed cleanup will be much faster
  234. collection.ensure_index('date_done', background='true')
  235. return collection
  236. @cached_property
  237. def expires_delta(self):
  238. return timedelta(seconds=self.expires)
  239. def as_uri(self, include_password=False):
  240. """Return the backend as an URI.
  241. :keyword include_password: Censor passwords.
  242. """
  243. if not self.url:
  244. return 'mongodb://'
  245. if include_password:
  246. return self.url
  247. if ',' not in self.url:
  248. return maybe_sanitize_url(self.url)
  249. uri1, remainder = self.url.split(',', 1)
  250. return ','.join([maybe_sanitize_url(uri1), remainder])