database.py 3.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. from datetime import datetime
  2. from celery import conf
  3. from celery.backends.base import BaseDictBackend
  4. from celery.db.models import Task, TaskSet
  5. from celery.db.session import ResultSession
  6. from celery.exceptions import ImproperlyConfigured
  7. class DatabaseBackend(BaseDictBackend):
  8. """The database result backend."""
  9. def __init__(self, dburi=None, result_expires=None,
  10. engine_options=None, **kwargs):
  11. self.result_expires = result_expires or conf.TASK_RESULT_EXPIRES
  12. self.dburi = dburi or conf.RESULT_DBURI
  13. self.engine_options = dict(engine_options or {},
  14. **conf.RESULT_ENGINE_OPTIONS or {})
  15. if not self.dburi:
  16. raise ImproperlyConfigured(
  17. "Missing connection string! Do you have "
  18. "CELERY_RESULT_DBURI set to a real value?")
  19. super(DatabaseBackend, self).__init__(**kwargs)
  20. def ResultSession(self):
  21. return ResultSession(dburi=self.dburi, **self.engine_options)
  22. def _store_result(self, task_id, result, status, traceback=None):
  23. """Store return value and status of an executed task."""
  24. session = self.ResultSession()
  25. try:
  26. task = session.query(Task).filter(Task.task_id == task_id).first()
  27. if not task:
  28. task = Task(task_id)
  29. session.add(task)
  30. session.flush()
  31. task.result = result
  32. task.status = status
  33. task.traceback = traceback
  34. session.commit()
  35. finally:
  36. session.close()
  37. return result
  38. def _get_task_meta_for(self, task_id):
  39. """Get task metadata for a task by id."""
  40. session = self.ResultSession()
  41. try:
  42. task = session.query(Task).filter(Task.task_id == task_id).first()
  43. if not task:
  44. task = Task(task_id)
  45. session.add(task)
  46. session.flush()
  47. session.commit()
  48. return task.to_dict()
  49. finally:
  50. session.close()
  51. def _save_taskset(self, taskset_id, result):
  52. """Store the result of an executed taskset."""
  53. session = self.ResultSession()
  54. try:
  55. taskset = TaskSet(taskset_id, result)
  56. session.add(taskset)
  57. session.flush()
  58. session.commit()
  59. return result
  60. finally:
  61. session.close()
  62. def _restore_taskset(self, taskset_id):
  63. """Get taskset metadata for a taskset by id."""
  64. session = self.ResultSession()
  65. try:
  66. taskset = session.query(TaskSet).filter(
  67. TaskSet.taskset_id == taskset_id).first()
  68. if taskset:
  69. return taskset.to_dict()
  70. finally:
  71. session.close()
  72. def cleanup(self):
  73. """Delete expired metadata."""
  74. session = self.ResultSession()
  75. expires = self.result_expires
  76. try:
  77. session.query(Task).filter(
  78. Task.date_done < (datetime.now() - expires)).delete()
  79. session.query(TaskSet).filter(
  80. TaskSet.date_done < (datetime.now() - expires)).delete()
  81. session.commit()
  82. finally:
  83. session.close()