mongodb.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. """MongoDB backend for celery."""
  2. from datetime import datetime
  3. try:
  4. import pymongo
  5. except ImportError:
  6. pymongo = None
  7. from celery import states
  8. from celery.backends.base import BaseDictBackend
  9. from celery.exceptions import ImproperlyConfigured
  10. from celery.serialization import pickle
  11. class Bunch:
  12. def __init__(self, **kw):
  13. self.__dict__.update(kw)
  14. class MongoBackend(BaseDictBackend):
  15. mongodb_host = "localhost"
  16. mongodb_port = 27017
  17. mongodb_user = None
  18. mongodb_password = None
  19. mongodb_database = "celery"
  20. mongodb_taskmeta_collection = "celery_taskmeta"
  21. def __init__(self, *args, **kwargs):
  22. """Initialize MongoDB backend instance.
  23. :raises celery.exceptions.ImproperlyConfigured: if
  24. module :mod:`pymongo` is not available.
  25. """
  26. self.result_expires = kwargs.get("result_expires") or \
  27. self.app.conf.CELERY_TASK_RESULT_EXPIRES
  28. if not pymongo:
  29. raise ImproperlyConfigured(
  30. "You need to install the pymongo library to use the "
  31. "MongoDB backend.")
  32. config = self.app.conf.get("CELERY_MONGODB_BACKEND_SETTINGS", None)
  33. if config is not None:
  34. if not isinstance(config, dict):
  35. raise ImproperlyConfigured(
  36. "MongoDB backend settings should be grouped in a dict")
  37. self.mongodb_host = config.get("host", self.mongodb_host)
  38. self.mongodb_port = int(config.get("port", self.mongodb_port))
  39. self.mongodb_user = config.get("user", self.mongodb_user)
  40. self.mongodb_password = config.get(
  41. "password", self.mongodb_password)
  42. self.mongodb_database = config.get(
  43. "database", self.mongodb_database)
  44. self.mongodb_taskmeta_collection = config.get(
  45. "taskmeta_collection", self.mongodb_taskmeta_collection)
  46. super(MongoBackend, self).__init__(*args, **kwargs)
  47. self._connection = None
  48. self._database = None
  49. def _get_connection(self):
  50. """Connect to the MongoDB server."""
  51. if self._connection is None:
  52. from pymongo.connection import Connection
  53. self._connection = Connection(self.mongodb_host,
  54. self.mongodb_port)
  55. return self._connection
  56. def _get_database(self):
  57. """"Get database from MongoDB connection and perform authentication
  58. if necessary."""
  59. if self._database is None:
  60. conn = self._get_connection()
  61. db = conn[self.mongodb_database]
  62. if self.mongodb_user and self.mongodb_password:
  63. auth = db.authenticate(self.mongodb_user,
  64. self.mongodb_password)
  65. if not auth:
  66. raise ImproperlyConfigured(
  67. "Invalid MongoDB username or password.")
  68. self._database = db
  69. return self._database
  70. def process_cleanup(self):
  71. if self._connection is not None:
  72. # MongoDB connection will be closed automatically when object
  73. # goes out of scope
  74. self._connection = None
  75. def _store_result(self, task_id, result, status, traceback=None):
  76. """Store return value and status of an executed task."""
  77. from pymongo.binary import Binary
  78. meta = {"_id": task_id,
  79. "status": status,
  80. "result": Binary(pickle.dumps(result)),
  81. "date_done": datetime.now(),
  82. "traceback": Binary(pickle.dumps(traceback))}
  83. db = self._get_database()
  84. taskmeta_collection = db[self.mongodb_taskmeta_collection]
  85. taskmeta_collection.save(meta, safe=True)
  86. return result
  87. def _get_task_meta_for(self, task_id):
  88. """Get task metadata for a task by id."""
  89. db = self._get_database()
  90. taskmeta_collection = db[self.mongodb_taskmeta_collection]
  91. obj = taskmeta_collection.find_one({"_id": task_id})
  92. if not obj:
  93. return {"status": states.PENDING, "result": None}
  94. meta = {
  95. "task_id": obj["_id"],
  96. "status": obj["status"],
  97. "result": pickle.loads(str(obj["result"])),
  98. "date_done": obj["date_done"],
  99. "traceback": pickle.loads(str(obj["traceback"])),
  100. }
  101. return meta
  102. def cleanup(self):
  103. """Delete expired metadata."""
  104. db = self._get_database()
  105. taskmeta_collection = db[self.mongodb_taskmeta_collection]
  106. taskmeta_collection.remove({
  107. "date_done": {
  108. "$lt": datetime.now() - self.result_expires,
  109. }
  110. })