managers.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. """celery.managers"""
  2. from django.db import models
  3. from django.db import connection, transaction
  4. from celery.registry import tasks
  5. from celery.conf import TASK_RESULT_EXPIRES
  6. from datetime import datetime, timedelta
  7. from django.conf import settings
  8. import random
  9. # server_drift can be negative, but timedelta supports addition on
  10. # negative seconds.
  11. SERVER_DRIFT = timedelta(seconds=random.vonmisesvariate(1, 4))
  12. class TableLock(object):
  13. """Base class for database table locks. Also works as a NOOP lock."""
  14. def __init__(self, table, type="read"):
  15. self.table = table
  16. self.type = type
  17. self.cursor = None
  18. def lock_table(self):
  19. """Lock the table."""
  20. pass
  21. def unlock_table(self):
  22. """Release previously locked tables."""
  23. pass
  24. @classmethod
  25. def acquire(cls, table, type=None):
  26. """Acquire table lock."""
  27. lock = cls(table, type)
  28. lock.lock_table()
  29. return lock
  30. def release(self):
  31. """Release the lock."""
  32. self.unlock_table()
  33. if self.cursor:
  34. self.cursor.close()
  35. self.cursor = None
  36. class MySQLTableLock(TableLock):
  37. """Table lock support for MySQL."""
  38. def lock_table(self):
  39. """Lock MySQL table."""
  40. self.cursor = connection.cursor()
  41. self.cursor.execute("LOCK TABLES %s %s" % (
  42. self.table, self.type.upper()))
  43. def unlock_table(self):
  44. """Unlock MySQL table."""
  45. self.cursor.execute("UNLOCK TABLES")
  46. TABLE_LOCK_FOR_ENGINE = {"mysql": MySQLTableLock}
  47. table_lock = TABLE_LOCK_FOR_ENGINE.get(settings.DATABASE_ENGINE, TableLock)
  48. class TaskManager(models.Manager):
  49. """Manager for :class:`celery.models.Task` models."""
  50. def get_task(self, task_id):
  51. """Get task meta for task by ``task_id``."""
  52. task, created = self.get_or_create(task_id=task_id)
  53. return task
  54. def is_done(self, task_id):
  55. """Returns ``True`` if the task was executed successfully."""
  56. return self.get_task(task_id).status == "DONE"
  57. def get_all_expired(self):
  58. """Get all expired task results."""
  59. return self.filter(date_done__lt=datetime.now() - TASK_RESULT_EXPIRES)
  60. def delete_expired(self):
  61. """Delete all expired task results."""
  62. self.get_all_expired().delete()
  63. def store_result(self, task_id, result, status, traceback=None, exception_retry=True):
  64. """Store the result and status of a task.
  65. :param task_id: task id
  66. :param result: The return value of the task, or an exception
  67. instance raised by the task.
  68. :param status: Task status. See
  69. :meth:`celery.result.AsyncResult.get_status` for a list of
  70. possible status values.
  71. :keyword traceback: The traceback at the point of exception (if the
  72. task failed).
  73. :keyword exception_retry: If we should retry storing by rollbacking
  74. transaction on exception
  75. """
  76. try:
  77. task, created = self.get_or_create(task_id=task_id, defaults={
  78. "status": status,
  79. "result": result,
  80. "traceback": traceback})
  81. if not created:
  82. task.status = status
  83. task.result = result
  84. task.traceback = traceback
  85. task.save()
  86. except:
  87. # depending on the database backend we can get various exceptions.
  88. # for excample, psycopg2 raises an exception if some operation
  89. # breaks transaction, and saving task result won't be possible
  90. # until we rollback transaction
  91. if exception_retry:
  92. transaction.rollback_unless_managed()
  93. self.store_result(task_id, result, status, traceback, False)
  94. else:
  95. raise
  96. class PeriodicTaskManager(models.Manager):
  97. """Manager for :class:`celery.models.PeriodicTask` models."""
  98. def init_entries(self):
  99. """Add entries for all registered periodic tasks.
  100. Should be run at worker start.
  101. """
  102. periodic_tasks = tasks.get_all_periodic()
  103. for task_name in periodic_tasks.keys():
  104. task_meta, created = self.get_or_create(name=task_name)
  105. def is_time(self, last_run_at, run_every):
  106. """Check if if it is time to run the periodic task.
  107. :param last_run_at: Last time the periodic task was run.
  108. :param run_every: How often to run the periodic task.
  109. :rtype bool:
  110. """
  111. run_every_drifted = run_every + SERVER_DRIFT
  112. run_at = last_run_at + run_every_drifted
  113. if datetime.now() > run_at:
  114. return True
  115. return False
  116. def get_waiting_tasks(self):
  117. """Get all waiting periodic tasks.
  118. :returns: list of :class:`celery.models.PeriodicTaskMeta` objects.
  119. """
  120. periodic_tasks = tasks.get_all_periodic()
  121. db_table = self.model._meta.db_table
  122. # Find all periodic tasks to be run.
  123. waiting = []
  124. for task_meta in self.all():
  125. if task_meta.name in periodic_tasks:
  126. task = periodic_tasks[task_meta.name]
  127. run_every = task.run_every
  128. if self.is_time(task_meta.last_run_at, run_every):
  129. # Get the object again to be sure noone else
  130. # has already taken care of it.
  131. lock = table_lock.acquire(db_table, "write")
  132. try:
  133. secure = self.get(pk=task_meta.pk)
  134. if self.is_time(secure.last_run_at, run_every):
  135. secure.last_run_at = datetime.now()
  136. secure.save()
  137. waiting.append(secure)
  138. finally:
  139. lock.release()
  140. return waiting