mongodb.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. """MongoDB backend for celery."""
  2. from datetime import datetime
  3. from django.core.exceptions import ImproperlyConfigured
  4. from billiard.serialization import pickle
  5. try:
  6. import pymongo
  7. except ImportError:
  8. pymongo = None
  9. from celery.backends.base import BaseBackend
  10. from celery.loaders import load_settings
  11. from celery.conf import TASK_RESULT_EXPIRES
  12. class Bunch:
  13. def __init__(self, **kw):
  14. self.__dict__.update(kw)
  15. class MongoBackend(BaseBackend):
  16. capabilities = ["ResultStore"]
  17. mongodb_host = 'localhost'
  18. mongodb_port = 27017
  19. mongodb_user = None
  20. mongodb_password = None
  21. mongodb_database = 'celery'
  22. mongodb_taskmeta_collection = 'celery_taskmeta'
  23. def __init__(self, *args, **kwargs):
  24. """Initialize MongoDB backend instance.
  25. :raises django.core.exceptions.ImproperlyConfigured: if
  26. module :mod:`pymongo` is not available.
  27. """
  28. if not pymongo:
  29. raise ImproperlyConfigured(
  30. "You need to install the pymongo library to use the "
  31. "MongoDB backend.")
  32. settings = load_settings()
  33. conf = getattr(settings, "CELERY_MONGODB_BACKEND_SETTINGS", None)
  34. if conf is not None:
  35. if not isinstance(conf, dict):
  36. raise ImproperlyConfigured(
  37. "MongoDB backend settings should be grouped in a dict")
  38. self.mongodb_host = conf.get('host', self.mongodb_host)
  39. self.mongodb_port = int(conf.get('port', self.mongodb_port))
  40. self.mongodb_user = conf.get('user', self.mongodb_user)
  41. self.mongodb_password = conf.get(
  42. 'password', self.mongodb_password)
  43. self.mongodb_database = conf.get(
  44. 'database', self.mongodb_database)
  45. self.mongodb_taskmeta_collection = conf.get(
  46. 'taskmeta_collection', self.mongodb_taskmeta_collection)
  47. super(MongoBackend, self).__init__(*args, **kwargs)
  48. self._cache = {}
  49. self._connection = None
  50. self._database = None
  51. def _get_connection(self):
  52. """Connect to the MongoDB server."""
  53. if self._connection is None:
  54. from pymongo.connection import Connection
  55. self._connection = Connection(self.mongodb_host,
  56. self.mongodb_port)
  57. return self._connection
  58. def _get_database(self):
  59. """"Get database from MongoDB connection and perform authentication
  60. if necessary."""
  61. if self._database is None:
  62. conn = self._get_connection()
  63. db = conn[self.mongodb_database]
  64. if self.mongodb_user and self.mongodb_password:
  65. auth = db.authenticate(self.mongodb_user,
  66. self.mongodb_password)
  67. if not auth:
  68. raise ImproperlyConfigured(
  69. "Invalid MongoDB username or password.")
  70. self._database = db
  71. return self._database
  72. def process_cleanup(self):
  73. if self._connection is not None:
  74. # MongoDB connection will be closed automatically when object
  75. # goes out of scope
  76. self._connection = None
  77. def store_result(self, task_id, result, status, traceback=None):
  78. """Store return value and status of an executed task."""
  79. from pymongo.binary import Binary
  80. result = self.encode_result(result, status)
  81. meta = {"_id": task_id,
  82. "status": status,
  83. "result": Binary(pickle.dumps(result)),
  84. "date_done": datetime.now(),
  85. "traceback": Binary(pickle.dumps(traceback))}
  86. db = self._get_database()
  87. taskmeta_collection = db[self.mongodb_taskmeta_collection]
  88. taskmeta_collection.save(meta, safe=True)
  89. def is_successful(self, task_id):
  90. """Returns ``True`` if the task executed successfully."""
  91. return self.get_status(task_id) == "SUCCESS"
  92. def get_status(self, task_id):
  93. """Get status of a task."""
  94. return self._get_task_meta_for(task_id)["status"]
  95. def get_traceback(self, task_id):
  96. """Get the traceback of a failed task."""
  97. meta = self._get_task_meta_for(task_id)
  98. return meta["traceback"]
  99. def get_result(self, task_id):
  100. """Get the result for a task."""
  101. meta = self._get_task_meta_for(task_id)
  102. if meta["status"] == "FAILURE":
  103. return self.exception_to_python(meta["result"])
  104. else:
  105. return meta["result"]
  106. def _get_task_meta_for(self, task_id):
  107. """Get task metadata for a task by id."""
  108. if task_id in self._cache:
  109. return self._cache[task_id]
  110. db = self._get_database()
  111. taskmeta_collection = db[self.mongodb_taskmeta_collection]
  112. obj = taskmeta_collection.find_one({"_id": task_id})
  113. if not obj:
  114. return {"status": "PENDING", "result": None}
  115. meta = {
  116. "task_id": obj["_id"],
  117. "status": obj["status"],
  118. "result": pickle.loads(str(obj["result"])),
  119. "date_done": obj["date_done"],
  120. "traceback": pickle.loads(str(obj["traceback"])),
  121. }
  122. if meta["status"] == "SUCCESS":
  123. self._cache[task_id] = meta
  124. return meta
  125. def cleanup(self):
  126. """Delete expired metadata."""
  127. db = self._get_database()
  128. taskmeta_collection = db[self.mongodb_taskmeta_collection]
  129. taskmeta_collection.remove({
  130. "date_done": {
  131. "$lt": datetime.now() - TASK_RESULT_EXPIRES,
  132. }
  133. })