mongodb.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.backends.mongodb
  4. ~~~~~~~~~~~~~~~~~~~~~~~
  5. MongoDB result store backend.
  6. """
  7. from __future__ import absolute_import
  8. from datetime import datetime
  9. try:
  10. import pymongo
  11. except ImportError: # pragma: no cover
  12. pymongo = None # noqa
  13. if pymongo:
  14. try:
  15. from bson.binary import Binary
  16. except ImportError: # pragma: no cover
  17. from pymongo.binary import Binary # noqa
  18. else: # pragma: no cover
  19. Binary = None # noqa
  20. from kombu.syn import detect_environment
  21. from kombu.utils import cached_property
  22. from celery import states
  23. from celery.exceptions import ImproperlyConfigured
  24. from celery.five import string_t
  25. from celery.utils.timeutils import maybe_timedelta
  26. from .base import BaseBackend
  27. __all__ = ['MongoBackend']
  28. class Bunch(object):
  29. def __init__(self, **kw):
  30. self.__dict__.update(kw)
  31. class MongoBackend(BaseBackend):
  32. host = 'localhost'
  33. port = 27017
  34. user = None
  35. password = None
  36. database_name = 'celery'
  37. taskmeta_collection = 'celery_taskmeta'
  38. max_pool_size = 10
  39. options = None
  40. supports_autoexpire = False
  41. _connection = None
  42. def __init__(self, app=None, url=None, **kwargs):
  43. """Initialize MongoDB backend instance.
  44. :raises celery.exceptions.ImproperlyConfigured: if
  45. module :mod:`pymongo` is not available.
  46. """
  47. self.options = {}
  48. super(MongoBackend, self).__init__(app, **kwargs)
  49. self.expires = kwargs.get('expires') or maybe_timedelta(
  50. self.app.conf.CELERY_TASK_RESULT_EXPIRES)
  51. if not pymongo:
  52. raise ImproperlyConfigured(
  53. 'You need to install the pymongo library to use the '
  54. 'MongoDB backend.')
  55. config = self.app.conf.get('CELERY_MONGODB_BACKEND_SETTINGS')
  56. if config is not None:
  57. if not isinstance(config, dict):
  58. raise ImproperlyConfigured(
  59. 'MongoDB backend settings should be grouped in a dict')
  60. config = dict(config) # do not modify original
  61. self.host = config.pop('host', self.host)
  62. self.port = int(config.pop('port', self.port))
  63. self.user = config.pop('user', self.user)
  64. self.password = config.pop('password', self.password)
  65. self.database_name = config.pop('database', self.database_name)
  66. self.taskmeta_collection = config.pop(
  67. 'taskmeta_collection', self.taskmeta_collection,
  68. )
  69. self.options = dict(config, **config.pop('options', None) or {})
  70. # Set option defaults
  71. if pymongo.version_tuple >= (3, ):
  72. self.options.setdefault('maxPoolSize', self.max_pool_size)
  73. else:
  74. self.options.setdefault('max_pool_size', self.max_pool_size)
  75. self.options.setdefault('auto_start_request', False)
  76. self.url = url
  77. if self.url:
  78. # Specifying backend as an URL
  79. self.host = self.url
  80. def _get_connection(self):
  81. """Connect to the MongoDB server."""
  82. if self._connection is None:
  83. from pymongo import MongoClient
  84. # The first pymongo.Connection() argument (host) can be
  85. # a list of ['host:port'] elements or a mongodb connection
  86. # URI. If this is the case, don't use self.port
  87. # but let pymongo get the port(s) from the URI instead.
  88. # This enables the use of replica sets and sharding.
  89. # See pymongo.Connection() for more info.
  90. url = self.host
  91. if isinstance(url, string_t) \
  92. and not url.startswith('mongodb://'):
  93. url = 'mongodb://{0}:{1}'.format(url, self.port)
  94. if url == 'mongodb://':
  95. url = url + 'localhost'
  96. if detect_environment() != 'default':
  97. self.options['use_greenlets'] = True
  98. self._connection = MongoClient(host=url, **self.options)
  99. return self._connection
  100. def process_cleanup(self):
  101. if self._connection is not None:
  102. # MongoDB connection will be closed automatically when object
  103. # goes out of scope
  104. del(self.collection)
  105. del(self.database)
  106. self._connection = None
  107. def _store_result(self, task_id, result, status,
  108. traceback=None, request=None, **kwargs):
  109. """Store return value and status of an executed task."""
  110. meta = {'_id': task_id,
  111. 'status': status,
  112. 'result': Binary(self.encode(result)),
  113. 'date_done': datetime.utcnow(),
  114. 'traceback': Binary(self.encode(traceback)),
  115. 'children': Binary(self.encode(
  116. self.current_task_children(request),
  117. ))}
  118. self.collection.save(meta)
  119. return result
  120. def _get_task_meta_for(self, task_id):
  121. """Get task metadata for a task by id."""
  122. obj = self.collection.find_one({'_id': task_id})
  123. if not obj:
  124. return {'status': states.PENDING, 'result': None}
  125. meta = {
  126. 'task_id': obj['_id'],
  127. 'status': obj['status'],
  128. 'result': self.decode(obj['result']),
  129. 'date_done': obj['date_done'],
  130. 'traceback': self.decode(obj['traceback']),
  131. 'children': self.decode(obj['children']),
  132. }
  133. return meta
  134. def _save_group(self, group_id, result):
  135. """Save the group result."""
  136. meta = {'_id': group_id,
  137. 'result': Binary(self.encode(result)),
  138. 'date_done': datetime.utcnow()}
  139. self.collection.save(meta)
  140. return result
  141. def _restore_group(self, group_id):
  142. """Get the result for a group by id."""
  143. obj = self.collection.find_one({'_id': group_id})
  144. if not obj:
  145. return
  146. meta = {
  147. 'task_id': obj['_id'],
  148. 'result': self.decode(obj['result']),
  149. 'date_done': obj['date_done'],
  150. }
  151. return meta
  152. def _delete_group(self, group_id):
  153. """Delete a group by id."""
  154. self.collection.remove({'_id': group_id})
  155. def _forget(self, task_id):
  156. """
  157. Remove result from MongoDB.
  158. :raises celery.exceptions.OperationsError: if the task_id could not be
  159. removed.
  160. """
  161. # By using safe=True, this will wait until it receives a response from
  162. # the server. Likewise, it will raise an OperationsError if the
  163. # response was unable to be completed.
  164. self.collection.remove({'_id': task_id})
  165. def cleanup(self):
  166. """Delete expired metadata."""
  167. self.collection.remove(
  168. {'date_done': {'$lt': self.app.now() - self.expires}},
  169. )
  170. def __reduce__(self, args=(), kwargs={}):
  171. return super(MongoBackend, self).__reduce__(
  172. args, dict(kwargs, expires=self.expires, url=self.url),
  173. )
  174. def _get_database(self):
  175. conn = self._get_connection()
  176. db = conn[self.database_name]
  177. if self.user and self.password:
  178. if not db.authenticate(self.user,
  179. self.password):
  180. raise ImproperlyConfigured(
  181. 'Invalid MongoDB username or password.')
  182. return db
  183. @cached_property
  184. def database(self):
  185. """Get database from MongoDB connection and perform authentication
  186. if necessary."""
  187. return self._get_database()
  188. @cached_property
  189. def collection(self):
  190. """Get the metadata task collection."""
  191. collection = self.database[self.taskmeta_collection]
  192. # Ensure an index on date_done is there, if not process the index
  193. # in the background. Once completed cleanup will be much faster
  194. collection.ensure_index('date_done', background='true')
  195. return collection