mongodb.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. """MongoDB backend for celery."""
  2. import random
  3. from datetime import datetime, timedelta
  4. from django.core.exceptions import ImproperlyConfigured
  5. from celery.serialization import pickle
  6. from celery.backends.base import BaseBackend
  7. from celery.loaders import settings
  8. from celery.conf import TASK_RESULT_EXPIRES
  9. from celery.registry import tasks
  10. try:
  11. import pymongo
  12. except ImportError:
  13. pymongo = None
  14. # taken from celery.managers.PeriodicTaskManager
  15. SERVER_DRIFT = timedelta(seconds=random.vonmisesvariate(1, 4))
  16. class Bunch:
  17. def __init__(self, **kw):
  18. self.__dict__.update(kw)
  19. class Backend(BaseBackend):
  20. capabilities = ("ResultStore", "PeriodicStatus")
  21. mongodb_host = 'localhost'
  22. mongodb_port = 27017
  23. mongodb_user = None
  24. mongodb_password = None
  25. mongodb_database = 'celery'
  26. mongodb_taskmeta_collection = 'celery_taskmeta'
  27. mongodb_periodictaskmeta_collection = 'celery_periodictaskmeta'
  28. def __init__(self, *args, **kwargs):
  29. """Initialize MongoDB backend instance.
  30. :raises django.core.exceptions.ImproperlyConfigured: if
  31. module :mod:`pymongo` is not available.
  32. """
  33. if not pymongo:
  34. raise ImproperlyConfigured(
  35. "You need to install the pymongo library to use the "
  36. "MongoDB backend.")
  37. conf = getattr(settings, "CELERY_MONGODB_BACKEND_SETTINGS", None)
  38. if conf is not None:
  39. if not isinstance(conf, dict):
  40. raise ImproperlyConfigured(
  41. "MongoDB backend settings should be grouped in a dict")
  42. self.mongodb_host = conf.get('host', self.mongodb_host)
  43. self.mongodb_port = int(conf.get('port', self.mongodb_port))
  44. self.mongodb_user = conf.get('user', self.mongodb_user)
  45. self.mongodb_password = conf.get(
  46. 'password', self.mongodb_password)
  47. self.mongodb_database = conf.get(
  48. 'database', self.mongodb_database)
  49. self.mongodb_taskmeta_collection = conf.get(
  50. 'taskmeta_collection', self.mongodb_taskmeta_collection)
  51. self.mongodb_collection_periodictaskmeta = conf.get(
  52. 'periodictaskmeta_collection',
  53. self.mongodb_periodictaskmeta_collection)
  54. super(Backend, self).__init__(*args, **kwargs)
  55. self._cache = {}
  56. self._connection = None
  57. self._database = None
  58. def _get_connection(self):
  59. """Connect to the MongoDB server."""
  60. if self._connection is None:
  61. from pymongo.connection import Connection
  62. self._connection = Connection(self.mongodb_host,
  63. self.mongodb_port)
  64. return self._connection
  65. def _get_database(self):
  66. """"Get database from MongoDB connection and perform authentication
  67. if necessary."""
  68. if self._database is None:
  69. conn = self._get_connection()
  70. db = conn[self.mongodb_database]
  71. if self.mongodb_user and self.mongodb_password:
  72. auth = db.authenticate(self.mongodb_user,
  73. self.mongodb_password)
  74. if not auth:
  75. raise ImproperlyConfigured(
  76. "Invalid MongoDB username or password.")
  77. self._database = db
  78. return self._database
  79. def process_cleanup(self):
  80. if self._connection is not None:
  81. # MongoDB connection will be closed automatically when object
  82. # goes out of scope
  83. self._connection = None
  84. def init_periodic_tasks(self):
  85. """Create collection for periodic tasks in database."""
  86. db = self._get_database()
  87. collection = db[self.mongodb_periodictaskmeta_collection]
  88. collection.ensure_index("name", pymongo.ASCENDING, unique=True)
  89. periodic_tasks = tasks.get_all_periodic()
  90. for task_name in periodic_tasks.keys():
  91. if not collection.find_one({"name": task_name}):
  92. collection.save({"name": task_name,
  93. "last_run_at": datetime.fromtimestamp(0),
  94. "total_run_count": 0}, safe=True)
  95. def run_periodic_tasks(self):
  96. """Run all waiting periodic tasks.
  97. :returns: a list of ``(task, task_id)`` tuples containing
  98. the task class and id for the resulting tasks applied.
  99. """
  100. db = self._get_database()
  101. collection = db[self.mongodb_periodictaskmeta_collection]
  102. waiting_tasks = self._get_waiting_tasks()
  103. task_id_tuples = []
  104. for waiting_task in waiting_tasks:
  105. task = tasks[waiting_task['name']]
  106. resp = task.delay()
  107. collection.update({'_id': waiting_task['_id']},
  108. {"$inc": {"total_run_count": 1}})
  109. task_meta = Bunch(name=waiting_task['name'],
  110. last_run_at=waiting_task['last_run_at'],
  111. total_run_count=waiting_task['total_run_count'])
  112. task_id_tuples.append((task_meta, resp.task_id))
  113. return task_id_tuples
  114. def _is_time(self, last_run_at, run_every):
  115. """Check if if it is time to run the periodic task.
  116. :param last_run_at: Last time the periodic task was run.
  117. :param run_every: How often to run the periodic task.
  118. :rtype bool:
  119. """
  120. # code taken from celery.managers.PeriodicTaskManager
  121. run_every_drifted = run_every + SERVER_DRIFT
  122. run_at = last_run_at + run_every_drifted
  123. if datetime.now() > run_at:
  124. return True
  125. return False
  126. def _get_waiting_tasks(self):
  127. """Get all waiting periodic tasks."""
  128. db = self._get_database()
  129. collection = db[self.mongodb_periodictaskmeta_collection]
  130. periodic_tasks = tasks.get_all_periodic()
  131. # find all periodic tasks to be run
  132. waiting = []
  133. for task_meta in collection.find():
  134. if task_meta['name'] in periodic_tasks:
  135. task = periodic_tasks[task_meta['name']]
  136. run_every = task.run_every
  137. if self._is_time(task_meta['last_run_at'], run_every):
  138. collection.update(
  139. {"name": task_meta['name'],
  140. "last_run_at": task_meta['last_run_at']},
  141. {"$set": {"last_run_at": datetime.utcnow()}})
  142. if db.last_status()['updatedExisting']:
  143. waiting.append(task_meta)
  144. return waiting
  145. def store_result(self, task_id, result, status, traceback=None):
  146. """Store return value and status of an executed task."""
  147. from pymongo.binary import Binary
  148. if status == 'DONE':
  149. result = self.prepare_result(result)
  150. elif status == 'FAILURE':
  151. result = self.prepare_exception(result)
  152. meta = {"_id": task_id,
  153. "status": status,
  154. "result": Binary(pickle.dumps(result)),
  155. "date_done": datetime.utcnow(),
  156. "traceback": Binary(pickle.dumps(traceback))}
  157. db = self._get_database()
  158. taskmeta_collection = db[self.mongodb_taskmeta_collection]
  159. taskmeta_collection.save(meta, safe=True)
  160. def is_done(self, task_id):
  161. """Returns ``True`` if the task executed successfully."""
  162. return self.get_status(task_id) == "DONE"
  163. def get_status(self, task_id):
  164. """Get status of a task."""
  165. return self._get_task_meta_for(task_id)["status"]
  166. def get_traceback(self, task_id):
  167. """Get the traceback of a failed task."""
  168. meta = self._get_task_meta_for(task_id)
  169. return meta["traceback"]
  170. def get_result(self, task_id):
  171. """Get the result for a task."""
  172. meta = self._get_task_meta_for(task_id)
  173. if meta["status"] == "FAILURE":
  174. return self.exception_to_python(meta["result"])
  175. else:
  176. return meta["result"]
  177. def _get_task_meta_for(self, task_id):
  178. """Get task metadata for a task by id."""
  179. if task_id in self._cache:
  180. return self._cache[task_id]
  181. db = self._get_database()
  182. taskmeta_collection = db[self.mongodb_taskmeta_collection]
  183. obj = taskmeta_collection.find_one({"_id": task_id})
  184. if not obj:
  185. return {"status": "PENDING", "result": None}
  186. meta = {
  187. "task_id": obj["_id"],
  188. "status": obj["status"],
  189. "result": pickle.loads(str(obj["result"])),
  190. "date_done": obj["date_done"],
  191. "traceback": pickle.loads(str(obj["traceback"])),
  192. }
  193. if meta["status"] == "DONE":
  194. self._cache[task_id] = meta
  195. return meta
  196. def cleanup(self):
  197. """Delete expired metadata."""
  198. db = self._get_database()
  199. taskmeta_collection = db[self.mongodb_taskmeta_collection]
  200. taskmeta_collection.remove({
  201. "date_done": {
  202. "$lt": datetime.now() - TASK_RESULT_EXPIRES,
  203. }
  204. })