database.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. from datetime import datetime
  2. from celery.backends.base import BaseDictBackend
  3. from celery.db.models import Task, TaskSet
  4. from celery.db.session import ResultSession
  5. from celery.exceptions import ImproperlyConfigured
  6. def _sqlalchemy_installed():
  7. try:
  8. import sqlalchemy
  9. except ImportError:
  10. raise ImproperlyConfigured(
  11. "The database result backend requires SQLAlchemy to be installed."
  12. "See http://pypi.python.org/pypi/SQLAlchemy")
  13. return sqlalchemy
  14. _sqlalchemy_installed()
  15. class DatabaseBackend(BaseDictBackend):
  16. """The database result backend."""
  17. def __init__(self, dburi=None, result_expires=None,
  18. engine_options=None, **kwargs):
  19. super(DatabaseBackend, self).__init__(**kwargs)
  20. self.result_expires = result_expires or \
  21. self.app.conf.CELERY_TASK_RESULT_EXPIRES
  22. self.dburi = dburi or self.app.conf.CELERY_RESULT_DBURI
  23. self.engine_options = dict(engine_options or {},
  24. **self.app.conf.CELERY_RESULT_ENGINE_OPTIONS or {})
  25. if not self.dburi:
  26. raise ImproperlyConfigured(
  27. "Missing connection string! Do you have "
  28. "CELERY_RESULT_DBURI set to a real value?")
  29. def ResultSession(self):
  30. return ResultSession(dburi=self.dburi, **self.engine_options)
  31. def _store_result(self, task_id, result, status, traceback=None):
  32. """Store return value and status of an executed task."""
  33. session = self.ResultSession()
  34. try:
  35. task = session.query(Task).filter(Task.task_id == task_id).first()
  36. if not task:
  37. task = Task(task_id)
  38. session.add(task)
  39. session.flush()
  40. task.result = result
  41. task.status = status
  42. task.traceback = traceback
  43. session.commit()
  44. finally:
  45. session.close()
  46. return result
  47. def _get_task_meta_for(self, task_id):
  48. """Get task metadata for a task by id."""
  49. session = self.ResultSession()
  50. try:
  51. task = session.query(Task).filter(Task.task_id == task_id).first()
  52. if not task:
  53. task = Task(task_id)
  54. session.add(task)
  55. session.flush()
  56. session.commit()
  57. return task.to_dict()
  58. finally:
  59. session.close()
  60. def _save_taskset(self, taskset_id, result):
  61. """Store the result of an executed taskset."""
  62. session = self.ResultSession()
  63. try:
  64. taskset = TaskSet(taskset_id, result)
  65. session.add(taskset)
  66. session.flush()
  67. session.commit()
  68. return result
  69. finally:
  70. session.close()
  71. def _restore_taskset(self, taskset_id):
  72. """Get taskset metadata for a taskset by id."""
  73. session = self.ResultSession()
  74. try:
  75. taskset = session.query(TaskSet).filter(
  76. TaskSet.taskset_id == taskset_id).first()
  77. if taskset:
  78. return taskset.to_dict()
  79. finally:
  80. session.close()
  81. def _forget(self, task_id):
  82. """Forget about result."""
  83. session = self.ResultSession()
  84. try:
  85. session.query(Task).filter(Task.task_id == task_id).delete()
  86. session.commit()
  87. finally:
  88. session.close()
  89. def cleanup(self):
  90. """Delete expired metadata."""
  91. session = self.ResultSession()
  92. expires = self.result_expires
  93. try:
  94. session.query(Task).filter(
  95. Task.date_done < (datetime.now() - expires)).delete()
  96. session.query(TaskSet).filter(
  97. TaskSet.date_done < (datetime.now() - expires)).delete()
  98. session.commit()
  99. finally:
  100. session.close()