mongodb.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.backends.mongodb
  4. ~~~~~~~~~~~~~~~~~~~~~~~
  5. MongoDB result store backend.
  6. """
  7. from __future__ import absolute_import
  8. from datetime import datetime, timedelta
  9. try:
  10. import pymongo
  11. except ImportError: # pragma: no cover
  12. pymongo = None # noqa
  13. if pymongo:
  14. try:
  15. from bson.binary import Binary
  16. except ImportError: # pragma: no cover
  17. from pymongo.binary import Binary # noqa
  18. from pymongo.errors import InvalidDocument # noqa
  19. else: # pragma: no cover
  20. Binary = None # noqa
  21. InvalidDocument = None # noqa
  22. from kombu.syn import detect_environment
  23. from kombu.utils import cached_property
  24. from kombu.exceptions import EncodeError
  25. from celery import states
  26. from celery.exceptions import ImproperlyConfigured
  27. from celery.five import string_t, items
  28. from .base import BaseBackend
  29. __all__ = ['MongoBackend']
  30. class Bunch(object):
  31. def __init__(self, **kw):
  32. self.__dict__.update(kw)
  33. class MongoBackend(BaseBackend):
  34. mongo_host = None
  35. host = 'localhost'
  36. port = 27017
  37. user = None
  38. password = None
  39. database_name = 'celery'
  40. taskmeta_collection = 'celery_taskmeta'
  41. groupmeta_collection = 'celery_groupmeta'
  42. max_pool_size = 10
  43. options = None
  44. supports_autoexpire = False
  45. _connection = None
  46. def __init__(self, app=None, url=None, **kwargs):
  47. """Initialize MongoDB backend instance.
  48. :raises celery.exceptions.ImproperlyConfigured: if
  49. module :mod:`pymongo` is not available.
  50. """
  51. self.options = {}
  52. super(MongoBackend, self).__init__(app, **kwargs)
  53. if not pymongo:
  54. raise ImproperlyConfigured(
  55. 'You need to install the pymongo library to use the '
  56. 'MongoDB backend.')
  57. self.url = url
  58. # Set option defaults
  59. for key, value in items(self._prepare_client_options()):
  60. self.options.setdefault(key, value)
  61. # update conf with mongo uri data, only if uri was given
  62. if self.url:
  63. uri_data = pymongo.uri_parser.parse_uri(self.url)
  64. # build the hosts list to create a mongo connection
  65. hostslist = [
  66. "{0}:{1}".format(x[0], x[1]) for x in uri_data['nodelist']
  67. ]
  68. self.user = uri_data['username']
  69. self.password = uri_data['password']
  70. self.mongo_host = hostslist
  71. if uri_data['database']:
  72. # if no database is provided in the uri, use default
  73. self.database_name = uri_data['database']
  74. self.options.update(uri_data['options'])
  75. # update conf with specific settings
  76. config = self.app.conf.get('CELERY_MONGODB_BACKEND_SETTINGS')
  77. if config is not None:
  78. if not isinstance(config, dict):
  79. raise ImproperlyConfigured(
  80. 'MongoDB backend settings should be grouped in a dict')
  81. config = dict(config) # do not modify original
  82. if 'host' in config or 'port' in config:
  83. # these should take over uri conf
  84. self.mongo_host = None
  85. self.host = config.pop('host', self.host)
  86. self.port = config.pop('port', self.port)
  87. self.mongo_host = config.pop('mongo_host', self.mongo_host)
  88. self.user = config.pop('user', self.user)
  89. self.password = config.pop('password', self.password)
  90. self.database_name = config.pop('database', self.database_name)
  91. self.taskmeta_collection = config.pop(
  92. 'taskmeta_collection', self.taskmeta_collection,
  93. )
  94. self.groupmeta_collection = config.pop(
  95. 'groupmeta_collection', self.groupmeta_collection,
  96. )
  97. self.options.update(config.pop('options', {}))
  98. self.options.update(config)
  99. def _prepare_client_options(self):
  100. if pymongo.version_tuple >= (3,):
  101. return {'maxPoolSize': self.max_pool_size}
  102. else: # pragma: no cover
  103. return {'max_pool_size': self.max_pool_size,
  104. 'auto_start_request': False}
  105. def _get_connection(self):
  106. """Connect to the MongoDB server."""
  107. if self._connection is None:
  108. from pymongo import MongoClient
  109. host = self.mongo_host
  110. if not host:
  111. # The first pymongo.Connection() argument (host) can be
  112. # a list of ['host:port'] elements or a mongodb connection
  113. # URI. If this is the case, don't use self.port
  114. # but let pymongo get the port(s) from the URI instead.
  115. # This enables the use of replica sets and sharding.
  116. # See pymongo.Connection() for more info.
  117. host = self.host
  118. if isinstance(host, string_t) \
  119. and not host.startswith('mongodb://'):
  120. host = 'mongodb://{0}:{1}'.format(host, self.port)
  121. if host == 'mongodb://':
  122. host += 'localhost'
  123. # don't change self.options
  124. conf = dict(self.options)
  125. conf['host'] = host
  126. if detect_environment() != 'default':
  127. conf['use_greenlets'] = True
  128. self._connection = MongoClient(**conf)
  129. return self._connection
  130. def process_cleanup(self):
  131. if self._connection is not None:
  132. # MongoDB connection will be closed automatically when object
  133. # goes out of scope
  134. del(self.collection)
  135. del(self.database)
  136. self._connection = None
  137. def encode(self, data):
  138. if self.serializer == 'bson':
  139. # mongodb handles serialization
  140. return data
  141. return super(MongoBackend, self).encode(data)
  142. def decode(self, data):
  143. if self.serializer == 'bson':
  144. return data
  145. return super(MongoBackend, self).decode(data)
  146. def _store_result(self, task_id, result, status,
  147. traceback=None, request=None, **kwargs):
  148. """Store return value and status of an executed task."""
  149. meta = {'_id': task_id,
  150. 'status': status,
  151. 'result': self.encode(result),
  152. 'date_done': datetime.utcnow(),
  153. 'traceback': self.encode(traceback),
  154. 'children': self.encode(
  155. self.current_task_children(request),
  156. )}
  157. try:
  158. self.collection.save(meta)
  159. except InvalidDocument as exc:
  160. raise EncodeError(exc)
  161. return result
  162. def _get_task_meta_for(self, task_id):
  163. """Get task metadata for a task by id."""
  164. obj = self.collection.find_one({'_id': task_id})
  165. if obj:
  166. return self.meta_from_decoded({
  167. 'task_id': obj['_id'],
  168. 'status': obj['status'],
  169. 'result': self.decode(obj['result']),
  170. 'date_done': obj['date_done'],
  171. 'traceback': self.decode(obj['traceback']),
  172. 'children': self.decode(obj['children']),
  173. })
  174. return {'status': states.PENDING, 'result': None}
  175. def _save_group(self, group_id, result):
  176. """Save the group result."""
  177. task_ids = [i.id for i in result]
  178. meta = {'_id': group_id,
  179. 'result': self.encode(task_ids),
  180. 'date_done': datetime.utcnow()}
  181. self.group_collection.save(meta)
  182. return result
  183. def _restore_group(self, group_id):
  184. """Get the result for a group by id."""
  185. obj = self.group_collection.find_one({'_id': group_id})
  186. if obj:
  187. tasks = [self.app.AsyncResult(task)
  188. for task in self.decode(obj['result'])]
  189. return {
  190. 'task_id': obj['_id'],
  191. 'result': tasks,
  192. 'date_done': obj['date_done'],
  193. }
  194. def _delete_group(self, group_id):
  195. """Delete a group by id."""
  196. self.group_collection.remove({'_id': group_id})
  197. def _forget(self, task_id):
  198. """
  199. Remove result from MongoDB.
  200. :raises celery.exceptions.OperationsError: if the task_id could not be
  201. removed.
  202. """
  203. # By using safe=True, this will wait until it receives a response from
  204. # the server. Likewise, it will raise an OperationsError if the
  205. # response was unable to be completed.
  206. self.collection.remove({'_id': task_id})
  207. def cleanup(self):
  208. """Delete expired metadata."""
  209. self.collection.remove(
  210. {'date_done': {'$lt': self.app.now() - self.expires_delta}},
  211. )
  212. self.group_collection.remove(
  213. {'date_done': {'$lt': self.app.now() - self.expires_delta}},
  214. )
  215. def __reduce__(self, args=(), kwargs={}):
  216. return super(MongoBackend, self).__reduce__(
  217. args, dict(kwargs, expires=self.expires, url=self.url),
  218. )
  219. def _get_database(self):
  220. conn = self._get_connection()
  221. db = conn[self.database_name]
  222. if self.user and self.password:
  223. if not db.authenticate(self.user,
  224. self.password):
  225. raise ImproperlyConfigured(
  226. 'Invalid MongoDB username or password.')
  227. return db
  228. @cached_property
  229. def database(self):
  230. """Get database from MongoDB connection and perform authentication
  231. if necessary."""
  232. return self._get_database()
  233. @cached_property
  234. def collection(self):
  235. """Get the metadata task collection."""
  236. collection = self.database[self.taskmeta_collection]
  237. # Ensure an index on date_done is there, if not process the index
  238. # in the background. Once completed cleanup will be much faster
  239. collection.ensure_index('date_done', background='true')
  240. return collection
  241. @cached_property
  242. def group_collection(self):
  243. """Get the metadata task collection."""
  244. collection = self.database[self.groupmeta_collection]
  245. # Ensure an index on date_done is there, if not process the index
  246. # in the background. Once completed cleanup will be much faster
  247. collection.ensure_index('date_done', background='true')
  248. return collection
  249. @cached_property
  250. def expires_delta(self):
  251. return timedelta(seconds=self.expires)