managers.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. """celery.managers"""
  2. from django.db import models
  3. from django.db import connection
  4. from celery.registry import tasks
  5. from celery.conf import TASK_RESULT_EXPIRES
  6. from datetime import datetime, timedelta
  7. from django.conf import settings
  8. import random
  9. # server_drift can be negative, but timedelta supports addition on
  10. # negative seconds.
  11. SERVER_DRIFT = timedelta(seconds=random.vonmisesvariate(1, 4))
  12. class TableLock(object):
  13. """Base class for database table locks. Also works as a NOOP lock."""
  14. def __init__(self, table, type="read"):
  15. self.table = table
  16. self.type = type
  17. self.cursor = None
  18. def lock_table(self):
  19. """Lock the table."""
  20. pass
  21. def unlock_table(self):
  22. """Release previously locked tables."""
  23. pass
  24. @classmethod
  25. def acquire(cls, table, type=None):
  26. """Acquire table lock."""
  27. lock = cls(table, type)
  28. lock.lock_table()
  29. return lock
  30. def release(self):
  31. """Release the lock."""
  32. self.unlock_table()
  33. if self.cursor:
  34. self.cursor.close()
  35. self.cursor = None
  36. class MySQLTableLock(TableLock):
  37. """Table lock support for MySQL."""
  38. def lock_table(self):
  39. """Lock MySQL table."""
  40. self.cursor = connection.cursor()
  41. self.cursor.execute("LOCK TABLES %s %s" % (
  42. self.table, self.type.upper()))
  43. def unlock_table(self):
  44. """Unlock MySQL table."""
  45. self.cursor.execute("UNLOCK TABLES")
  46. TABLE_LOCK_FOR_ENGINE = {"mysql": MySQLTableLock}
  47. table_lock = TABLE_LOCK_FOR_ENGINE.get(settings.DATABASE_ENGINE, TableLock)
  48. class TaskManager(models.Manager):
  49. """Manager for :class:`celery.models.Task` models."""
  50. def get_task(self, task_id):
  51. """Get task meta for task by ``task_id``."""
  52. task, created = self.get_or_create(task_id=task_id)
  53. return task
  54. def is_done(self, task_id):
  55. """Returns ``True`` if the task was executed successfully."""
  56. return self.get_task(task_id).status == "DONE"
  57. def get_all_expired(self):
  58. """Get all expired task results."""
  59. return self.filter(date_done__lt=datetime.now() - TASK_RESULT_EXPIRES)
  60. def delete_expired(self):
  61. """Delete all expired task results."""
  62. self.get_all_expired().delete()
  63. def store_result(self, task_id, result, status):
  64. """Store the result and status of a task.
  65. :param task_id: task id
  66. :param result: The return value of the task, or an exception
  67. instance raised by the task.
  68. :param status: Task status. See
  69. :meth:`celery.result.AsyncResult.get_status` for a list of
  70. possible status values.
  71. """
  72. task, created = self.get_or_create(task_id=task_id, defaults={
  73. "status": status,
  74. "result": result})
  75. if not created:
  76. task.status = status
  77. task.result = result
  78. task.save()
  79. class PeriodicTaskManager(models.Manager):
  80. """Manager for :class:`celery.models.PeriodicTask` models."""
  81. def init_entries(self):
  82. """Add entries for all registered periodic tasks.
  83. Should be run at worker start.
  84. """
  85. periodic_tasks = tasks.get_all_periodic()
  86. for task_name in periodic_tasks.keys():
  87. task_meta, created = self.get_or_create(name=task_name)
  88. def is_time(self, last_run_at, run_every):
  89. """Check if if it is time to run the periodic task.
  90. :param last_run_at: Last time the periodic task was run.
  91. :param run_every: How often to run the periodic task.
  92. :rtype bool:
  93. """
  94. run_every_drifted = run_every + SERVER_DRIFT
  95. run_at = last_run_at + run_every_drifted
  96. if datetime.now() > run_at:
  97. return True
  98. return False
  99. def get_waiting_tasks(self):
  100. """Get all waiting periodic tasks.
  101. :returns: list of :class:`celery.models.PeriodicTaskMeta` objects.
  102. """
  103. periodic_tasks = tasks.get_all_periodic()
  104. db_table = self.model._meta.db_table
  105. # Find all periodic tasks to be run.
  106. waiting = []
  107. for task_meta in self.all():
  108. if task_meta.name in periodic_tasks:
  109. task = periodic_tasks[task_meta.name]
  110. run_every = task.run_every
  111. if self.is_time(task_meta.last_run_at, run_every):
  112. # Get the object again to be sure noone else
  113. # has already taken care of it.
  114. lock = table_lock.acquire(db_table, "write")
  115. try:
  116. secure = self.get(pk=task_meta.pk)
  117. if self.is_time(secure.last_run_at, run_every):
  118. secure.last_run_at = datetime.now()
  119. secure.save()
  120. waiting.append(secure)
  121. finally:
  122. lock.release()
  123. return waiting