mongodb.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. """MongoDB backend for celery."""
  2. from datetime import datetime
  3. from django.core.exceptions import ImproperlyConfigured
  4. from celery.serialization import pickle
  5. from celery.backends.base import BaseBackend
  6. from celery.loaders import settings
  7. from celery.conf import TASK_RESULT_EXPIRES
  8. try:
  9. import pymongo
  10. except ImportError:
  11. pymongo = None
  12. class Bunch:
  13. def __init__(self, **kw):
  14. self.__dict__.update(kw)
  15. class Backend(BaseBackend):
  16. capabilities = ["ResultStore"]
  17. mongodb_host = 'localhost'
  18. mongodb_port = 27017
  19. mongodb_user = None
  20. mongodb_password = None
  21. mongodb_database = 'celery'
  22. mongodb_taskmeta_collection = 'celery_taskmeta'
  23. def __init__(self, *args, **kwargs):
  24. """Initialize MongoDB backend instance.
  25. :raises django.core.exceptions.ImproperlyConfigured: if
  26. module :mod:`pymongo` is not available.
  27. """
  28. if not pymongo:
  29. raise ImproperlyConfigured(
  30. "You need to install the pymongo library to use the "
  31. "MongoDB backend.")
  32. conf = getattr(settings, "CELERY_MONGODB_BACKEND_SETTINGS", None)
  33. if conf is not None:
  34. if not isinstance(conf, dict):
  35. raise ImproperlyConfigured(
  36. "MongoDB backend settings should be grouped in a dict")
  37. self.mongodb_host = conf.get('host', self.mongodb_host)
  38. self.mongodb_port = int(conf.get('port', self.mongodb_port))
  39. self.mongodb_user = conf.get('user', self.mongodb_user)
  40. self.mongodb_password = conf.get(
  41. 'password', self.mongodb_password)
  42. self.mongodb_database = conf.get(
  43. 'database', self.mongodb_database)
  44. self.mongodb_taskmeta_collection = conf.get(
  45. 'taskmeta_collection', self.mongodb_taskmeta_collection)
  46. super(Backend, self).__init__(*args, **kwargs)
  47. self._cache = {}
  48. self._connection = None
  49. self._database = None
  50. def _get_connection(self):
  51. """Connect to the MongoDB server."""
  52. if self._connection is None:
  53. from pymongo.connection import Connection
  54. self._connection = Connection(self.mongodb_host,
  55. self.mongodb_port)
  56. return self._connection
  57. def _get_database(self):
  58. """"Get database from MongoDB connection and perform authentication
  59. if necessary."""
  60. if self._database is None:
  61. conn = self._get_connection()
  62. db = conn[self.mongodb_database]
  63. if self.mongodb_user and self.mongodb_password:
  64. auth = db.authenticate(self.mongodb_user,
  65. self.mongodb_password)
  66. if not auth:
  67. raise ImproperlyConfigured(
  68. "Invalid MongoDB username or password.")
  69. self._database = db
  70. return self._database
  71. def process_cleanup(self):
  72. if self._connection is not None:
  73. # MongoDB connection will be closed automatically when object
  74. # goes out of scope
  75. self._connection = None
  76. def store_result(self, task_id, result, status, traceback=None):
  77. """Store return value and status of an executed task."""
  78. from pymongo.binary import Binary
  79. result = self.encode_result(result, status)
  80. meta = {"_id": task_id,
  81. "status": status,
  82. "result": Binary(pickle.dumps(result)),
  83. "date_done": datetime.now(),
  84. "traceback": Binary(pickle.dumps(traceback))}
  85. db = self._get_database()
  86. taskmeta_collection = db[self.mongodb_taskmeta_collection]
  87. taskmeta_collection.save(meta, safe=True)
  88. def is_successful(self, task_id):
  89. """Returns ``True`` if the task executed successfully."""
  90. return self.get_status(task_id) == "SUCCESS"
  91. def get_status(self, task_id):
  92. """Get status of a task."""
  93. return self._get_task_meta_for(task_id)["status"]
  94. def get_traceback(self, task_id):
  95. """Get the traceback of a failed task."""
  96. meta = self._get_task_meta_for(task_id)
  97. return meta["traceback"]
  98. def get_result(self, task_id):
  99. """Get the result for a task."""
  100. meta = self._get_task_meta_for(task_id)
  101. if meta["status"] == "FAILURE":
  102. return self.exception_to_python(meta["result"])
  103. else:
  104. return meta["result"]
  105. def _get_task_meta_for(self, task_id):
  106. """Get task metadata for a task by id."""
  107. if task_id in self._cache:
  108. return self._cache[task_id]
  109. db = self._get_database()
  110. taskmeta_collection = db[self.mongodb_taskmeta_collection]
  111. obj = taskmeta_collection.find_one({"_id": task_id})
  112. if not obj:
  113. return {"status": "PENDING", "result": None}
  114. meta = {
  115. "task_id": obj["_id"],
  116. "status": obj["status"],
  117. "result": pickle.loads(str(obj["result"])),
  118. "date_done": obj["date_done"],
  119. "traceback": pickle.loads(str(obj["traceback"])),
  120. }
  121. if meta["status"] == "SUCCESS":
  122. self._cache[task_id] = meta
  123. return meta
  124. def cleanup(self):
  125. """Delete expired metadata."""
  126. db = self._get_database()
  127. taskmeta_collection = db[self.mongodb_taskmeta_collection]
  128. taskmeta_collection.remove({
  129. "date_done": {
  130. "$lt": datetime.now() - TASK_RESULT_EXPIRES,
  131. }
  132. })