mongodb.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. """MongoDB backend for celery."""
  2. from datetime import datetime
  3. from billiard.serialization import pickle
  4. try:
  5. import pymongo
  6. except ImportError:
  7. pymongo = None
  8. from celery import conf
  9. from celery import states
  10. from celery.loaders import load_settings
  11. from celery.backends.base import BaseDictBackend
  12. from celery.exceptions import ImproperlyConfigured
  13. class Bunch:
  14. def __init__(self, **kw):
  15. self.__dict__.update(kw)
  16. class MongoBackend(BaseDictBackend):
  17. mongodb_host = "localhost"
  18. mongodb_port = 27017
  19. mongodb_user = None
  20. mongodb_password = None
  21. mongodb_database = "celery"
  22. mongodb_taskmeta_collection = "celery_taskmeta"
  23. def __init__(self, *args, **kwargs):
  24. """Initialize MongoDB backend instance.
  25. :raises celery.exceptions.ImproperlyConfigured: if
  26. module :mod:`pymongo` is not available.
  27. """
  28. if not pymongo:
  29. raise ImproperlyConfigured(
  30. "You need to install the pymongo library to use the "
  31. "MongoDB backend.")
  32. settings = load_settings()
  33. config = getattr(settings, "CELERY_MONGODB_BACKEND_SETTINGS", None)
  34. if config is not None:
  35. if not isinstance(config, dict):
  36. raise ImproperlyConfigured(
  37. "MongoDB backend settings should be grouped in a dict")
  38. self.mongodb_host = config.get("host", self.mongodb_host)
  39. self.mongodb_port = int(config.get("port", self.mongodb_port))
  40. self.mongodb_user = config.get("user", self.mongodb_user)
  41. self.mongodb_password = config.get(
  42. "password", self.mongodb_password)
  43. self.mongodb_database = config.get(
  44. "database", self.mongodb_database)
  45. self.mongodb_taskmeta_collection = config.get(
  46. "taskmeta_collection", self.mongodb_taskmeta_collection)
  47. super(MongoBackend, self).__init__(*args, **kwargs)
  48. self._connection = None
  49. self._database = None
  50. def _get_connection(self):
  51. """Connect to the MongoDB server."""
  52. if self._connection is None:
  53. from pymongo.connection import Connection
  54. self._connection = Connection(self.mongodb_host,
  55. self.mongodb_port)
  56. return self._connection
  57. def _get_database(self):
  58. """"Get database from MongoDB connection and perform authentication
  59. if necessary."""
  60. if self._database is None:
  61. conn = self._get_connection()
  62. db = conn[self.mongodb_database]
  63. if self.mongodb_user and self.mongodb_password:
  64. auth = db.authenticate(self.mongodb_user,
  65. self.mongodb_password)
  66. if not auth:
  67. raise ImproperlyConfigured(
  68. "Invalid MongoDB username or password.")
  69. self._database = db
  70. return self._database
  71. def process_cleanup(self):
  72. if self._connection is not None:
  73. # MongoDB connection will be closed automatically when object
  74. # goes out of scope
  75. self._connection = None
  76. def _store_result(self, task_id, result, status, traceback=None):
  77. """Store return value and status of an executed task."""
  78. from pymongo.binary import Binary
  79. meta = {"_id": task_id,
  80. "status": status,
  81. "result": Binary(pickle.dumps(result)),
  82. "date_done": datetime.now(),
  83. "traceback": Binary(pickle.dumps(traceback))}
  84. db = self._get_database()
  85. taskmeta_collection = db[self.mongodb_taskmeta_collection]
  86. taskmeta_collection.save(meta, safe=True)
  87. def _get_task_meta_for(self, task_id):
  88. """Get task metadata for a task by id."""
  89. db = self._get_database()
  90. taskmeta_collection = db[self.mongodb_taskmeta_collection]
  91. obj = taskmeta_collection.find_one({"_id": task_id})
  92. if not obj:
  93. return {"status": states.PENDING, "result": None}
  94. meta = {
  95. "task_id": obj["_id"],
  96. "status": obj["status"],
  97. "result": pickle.loads(str(obj["result"])),
  98. "date_done": obj["date_done"],
  99. "traceback": pickle.loads(str(obj["traceback"])),
  100. }
  101. return meta
  102. def cleanup(self):
  103. """Delete expired metadata."""
  104. db = self._get_database()
  105. taskmeta_collection = db[self.mongodb_taskmeta_collection]
  106. taskmeta_collection.remove({
  107. "date_done": {
  108. "$lt": datetime.now() - conf.TASK_RESULT_EXPIRES,
  109. }
  110. })