database.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. from datetime import datetime
  2. from celery.backends.base import BaseDictBackend
  3. from celery.db.models import Task, TaskSet
  4. from celery.db.session import ResultSession
  5. from celery.exceptions import ImproperlyConfigured
  6. try:
  7. import sqlalchemy as _
  8. except ImportError:
  9. raise ImproperlyConfigured(
  10. "The database result backend requires SQLAlchemy to be installed."
  11. "See http://pypi.python.org/pypi/SQLAlchemy")
  12. class DatabaseBackend(BaseDictBackend):
  13. """The database result backend."""
  14. def __init__(self, dburi=None, result_expires=None,
  15. engine_options=None, **kwargs):
  16. super(DatabaseBackend, self).__init__(**kwargs)
  17. self.result_expires = result_expires or \
  18. self.app.conf.CELERY_TASK_RESULT_EXPIRES
  19. self.dburi = dburi or self.app.conf.CELERY_RESULT_DBURI
  20. self.engine_options = dict(engine_options or {},
  21. **self.app.conf.CELERY_RESULT_ENGINE_OPTIONS or {})
  22. if not self.dburi:
  23. raise ImproperlyConfigured(
  24. "Missing connection string! Do you have "
  25. "CELERY_RESULT_DBURI set to a real value?")
  26. def ResultSession(self):
  27. return ResultSession(dburi=self.dburi, **self.engine_options)
  28. def _store_result(self, task_id, result, status, traceback=None):
  29. """Store return value and status of an executed task."""
  30. session = self.ResultSession()
  31. try:
  32. task = session.query(Task).filter(Task.task_id == task_id).first()
  33. if not task:
  34. task = Task(task_id)
  35. session.add(task)
  36. session.flush()
  37. task.result = result
  38. task.status = status
  39. task.traceback = traceback
  40. session.commit()
  41. finally:
  42. session.close()
  43. return result
  44. def _get_task_meta_for(self, task_id):
  45. """Get task metadata for a task by id."""
  46. session = self.ResultSession()
  47. try:
  48. task = session.query(Task).filter(Task.task_id == task_id).first()
  49. if not task:
  50. task = Task(task_id)
  51. session.add(task)
  52. session.flush()
  53. session.commit()
  54. return task.to_dict()
  55. finally:
  56. session.close()
  57. def _save_taskset(self, taskset_id, result):
  58. """Store the result of an executed taskset."""
  59. session = self.ResultSession()
  60. try:
  61. taskset = TaskSet(taskset_id, result)
  62. session.add(taskset)
  63. session.flush()
  64. session.commit()
  65. return result
  66. finally:
  67. session.close()
  68. def _restore_taskset(self, taskset_id):
  69. """Get taskset metadata for a taskset by id."""
  70. session = self.ResultSession()
  71. try:
  72. taskset = session.query(TaskSet).filter(
  73. TaskSet.taskset_id == taskset_id).first()
  74. if taskset:
  75. return taskset.to_dict()
  76. finally:
  77. session.close()
  78. def cleanup(self):
  79. """Delete expired metadata."""
  80. session = self.ResultSession()
  81. expires = self.result_expires
  82. try:
  83. session.query(Task).filter(
  84. Task.date_done < (datetime.now() - expires)).delete()
  85. session.query(TaskSet).filter(
  86. TaskSet.date_done < (datetime.now() - expires)).delete()
  87. session.commit()
  88. finally:
  89. session.close()