database.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. # -*- coding: utf-8 -*-
  2. from __future__ import absolute_import
  3. from datetime import datetime
  4. from .. import states
  5. from ..db.models import Task, TaskSet
  6. from ..db.session import ResultSession
  7. from ..exceptions import ImproperlyConfigured
  8. from ..utils.timeutils import maybe_timedelta
  9. from .base import BaseDictBackend
  10. def _sqlalchemy_installed():
  11. try:
  12. import sqlalchemy
  13. except ImportError:
  14. raise ImproperlyConfigured(
  15. "The database result backend requires SQLAlchemy to be installed."
  16. "See http://pypi.python.org/pypi/SQLAlchemy")
  17. return sqlalchemy
  18. _sqlalchemy_installed()
  19. class DatabaseBackend(BaseDictBackend):
  20. """The database result backend."""
  21. # ResultSet.iterate should sleep this much between each pool,
  22. # to not bombard the database with queries.
  23. subpolling_interval = 0.5
  24. def __init__(self, dburi=None, expires=None,
  25. engine_options=None, **kwargs):
  26. super(DatabaseBackend, self).__init__(**kwargs)
  27. conf = self.app.conf
  28. self.expires = maybe_timedelta(self.prepare_expires(expires))
  29. self.dburi = dburi or conf.CELERY_RESULT_DBURI
  30. self.engine_options = dict(engine_options or {},
  31. **conf.CELERY_RESULT_ENGINE_OPTIONS or {})
  32. self.short_lived_sessions = kwargs.get("short_lived_sessions",
  33. conf.CELERY_RESULT_DB_SHORT_LIVED_SESSIONS)
  34. if not self.dburi:
  35. raise ImproperlyConfigured(
  36. "Missing connection string! Do you have "
  37. "CELERY_RESULT_DBURI set to a real value?")
  38. def ResultSession(self):
  39. return ResultSession(
  40. dburi=self.dburi,
  41. short_lived_sessions=self.short_lived_sessions,
  42. **self.engine_options)
  43. def _store_result(self, task_id, result, status, traceback=None):
  44. """Store return value and status of an executed task."""
  45. session = self.ResultSession()
  46. try:
  47. task = session.query(Task).filter(Task.task_id == task_id).first()
  48. if not task:
  49. task = Task(task_id)
  50. session.add(task)
  51. session.flush()
  52. task.result = result
  53. task.status = status
  54. task.traceback = traceback
  55. session.commit()
  56. finally:
  57. session.close()
  58. return result
  59. def _get_task_meta_for(self, task_id):
  60. """Get task metadata for a task by id."""
  61. session = self.ResultSession()
  62. try:
  63. task = session.query(Task).filter(Task.task_id == task_id).first()
  64. if task is None:
  65. task = Task(task_id)
  66. task.status = states.PENDING
  67. task.result = None
  68. return task.to_dict()
  69. finally:
  70. session.close()
  71. def _save_taskset(self, taskset_id, result):
  72. """Store the result of an executed taskset."""
  73. session = self.ResultSession()
  74. try:
  75. taskset = TaskSet(taskset_id, result)
  76. session.add(taskset)
  77. session.flush()
  78. session.commit()
  79. return result
  80. finally:
  81. session.close()
  82. def _restore_taskset(self, taskset_id):
  83. """Get metadata for taskset by id."""
  84. session = self.ResultSession()
  85. try:
  86. taskset = session.query(TaskSet).filter(
  87. TaskSet.taskset_id == taskset_id).first()
  88. if taskset:
  89. return taskset.to_dict()
  90. finally:
  91. session.close()
  92. def _delete_taskset(self, taskset_id):
  93. """Delete metadata for taskset by id."""
  94. session = self.ResultSession()
  95. try:
  96. session.query(TaskSet).filter(
  97. TaskSet.taskset_id == taskset_id).delete()
  98. session.flush()
  99. session.commit()
  100. finally:
  101. session.close()
  102. def _forget(self, task_id):
  103. """Forget about result."""
  104. session = self.ResultSession()
  105. try:
  106. session.query(Task).filter(Task.task_id == task_id).delete()
  107. session.commit()
  108. finally:
  109. session.close()
  110. def cleanup(self):
  111. """Delete expired metadata."""
  112. session = self.ResultSession()
  113. expires = self.expires
  114. try:
  115. session.query(Task).filter(
  116. Task.date_done < (datetime.utcnow() - expires)).delete()
  117. session.query(TaskSet).filter(
  118. TaskSet.date_done < (datetime.utcnow() - expires)).delete()
  119. session.commit()
  120. finally:
  121. session.close()
  122. def __reduce__(self, args=(), kwargs={}):
  123. kwargs.update(
  124. dict(dburi=self.dburi,
  125. expires=self.expires,
  126. engine_options=self.engine_options))
  127. return super(DatabaseBackend, self).__reduce__(args, kwargs)