database.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. from datetime import datetime
  2. from celery.backends.base import BaseDictBackend
  3. from celery.db.models import Task, TaskSet
  4. from celery.db.session import ResultSession
  5. from celery.exceptions import ImproperlyConfigured
  6. def _sqlalchemy_installed():
  7. try:
  8. import sqlalchemy
  9. except ImportError:
  10. raise ImproperlyConfigured(
  11. "The database result backend requires SQLAlchemy to be installed."
  12. "See http://pypi.python.org/pypi/SQLAlchemy")
  13. return sqlalchemy
  14. _sqlalchemy_installed()
  15. class DatabaseBackend(BaseDictBackend):
  16. """The database result backend."""
  17. def __init__(self, dburi=None, result_expires=None,
  18. engine_options=None, **kwargs):
  19. super(DatabaseBackend, self).__init__(**kwargs)
  20. self.result_expires = result_expires or \
  21. self.app.conf.CELERY_TASK_RESULT_EXPIRES
  22. self.dburi = dburi or self.app.conf.CELERY_RESULT_DBURI
  23. self.engine_options = dict(engine_options or {},
  24. **self.app.conf.CELERY_RESULT_ENGINE_OPTIONS or {})
  25. if not self.dburi:
  26. raise ImproperlyConfigured(
  27. "Missing connection string! Do you have "
  28. "CELERY_RESULT_DBURI set to a real value?")
  29. def ResultSession(self):
  30. return ResultSession(dburi=self.dburi, **self.engine_options)
  31. def _store_result(self, task_id, result, status, traceback=None):
  32. """Store return value and status of an executed task."""
  33. session = self.ResultSession()
  34. try:
  35. task = session.query(Task).filter(Task.task_id == task_id).first()
  36. if not task:
  37. task = Task(task_id)
  38. session.add(task)
  39. session.flush()
  40. task.result = result
  41. task.status = status
  42. task.traceback = traceback
  43. session.commit()
  44. finally:
  45. session.close()
  46. return result
  47. def _get_task_meta_for(self, task_id):
  48. """Get task metadata for a task by id."""
  49. session = self.ResultSession()
  50. try:
  51. task = session.query(Task).filter(Task.task_id == task_id).first()
  52. if task is None:
  53. from celery import states
  54. task = Task(task_id)
  55. task.status = states.PENDING
  56. return task.to_dict()
  57. finally:
  58. session.close()
  59. def _save_taskset(self, taskset_id, result):
  60. """Store the result of an executed taskset."""
  61. session = self.ResultSession()
  62. try:
  63. taskset = TaskSet(taskset_id, result)
  64. session.add(taskset)
  65. session.flush()
  66. session.commit()
  67. return result
  68. finally:
  69. session.close()
  70. def _restore_taskset(self, taskset_id):
  71. """Get taskset metadata for a taskset by id."""
  72. session = self.ResultSession()
  73. try:
  74. taskset = session.query(TaskSet).filter(
  75. TaskSet.taskset_id == taskset_id).first()
  76. if taskset:
  77. return taskset.to_dict()
  78. finally:
  79. session.close()
  80. def _forget(self, task_id):
  81. """Forget about result."""
  82. session = self.ResultSession()
  83. try:
  84. session.query(Task).filter(Task.task_id == task_id).delete()
  85. session.commit()
  86. finally:
  87. session.close()
  88. def cleanup(self):
  89. """Delete expired metadata."""
  90. session = self.ResultSession()
  91. expires = self.result_expires
  92. try:
  93. session.query(Task).filter(
  94. Task.date_done < (datetime.now() - expires)).delete()
  95. session.query(TaskSet).filter(
  96. TaskSet.date_done < (datetime.now() - expires)).delete()
  97. session.commit()
  98. finally:
  99. session.close()