managers.py 3.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. """celery.managers"""
  2. from django.db import models
  3. from django.db import connection
  4. from celery.registry import tasks
  5. from datetime import datetime, timedelta
  6. from django.conf import settings
  7. import random
  8. # server_drift can be negative, but timedelta supports addition on
  9. # negative seconds.
  10. SERVER_DRIFT = timedelta(seconds=random.vonmisesvariate(1, 4))
  11. class TaskManager(models.Manager):
  12. """Manager for :class:`celery.models.Task` models."""
  13. def get_task(self, task_id):
  14. """Get task meta for task by ``task_id``."""
  15. task, created = self.get_or_create(task_id=task_id)
  16. return task
  17. def is_done(self, task_id):
  18. """Returns ``True`` if the task was executed successfully."""
  19. return self.get_task(task_id).status == "DONE"
  20. def get_all_expired(self):
  21. """Get all expired task results."""
  22. # TODO Make the timedelta configurable
  23. return self.filter(date_done__lt=datetime.now() - timedelta(days=5))
  24. def delete_expired(self):
  25. """Delete all expired task results."""
  26. self.get_all_expired().delete()
  27. def store_result(self, task_id, result, status):
  28. """Store the result and status of a task.
  29. :param task_id: task id
  30. :param result: The return value of the task, or an exception
  31. instance raised by the task.
  32. :param status: Task status. See
  33. :meth:`celery.result.AsyncResult.get_status` for a list of
  34. possible status values.
  35. """
  36. task, created = self.get_or_create(task_id=task_id, defaults={
  37. "status": status,
  38. "result": result})
  39. if not created:
  40. task.status = status
  41. task.result = result
  42. task.save()
  43. class PeriodicTaskManager(models.Manager):
  44. """Manager for :class:`celery.models.PeriodicTask` models."""
  45. def lock(self):
  46. """Lock the periodic task table for reading."""
  47. if settings.DATABASE_ENGINE != "mysql":
  48. return
  49. cursor = connection.cursor()
  50. table = self.model._meta.db_table
  51. cursor.execute("LOCK TABLES %s READ" % table)
  52. row = cursor.fetchone()
  53. return row
  54. def unlock(self):
  55. """Unlock the periodic task table."""
  56. if settings.DATABASE_ENGINE != "mysql":
  57. return
  58. cursor = connection.cursor()
  59. table = self.model._meta.db_table
  60. cursor.execute("UNLOCK TABLES")
  61. row = cursor.fetchone()
  62. return row
  63. def get_waiting_tasks(self):
  64. """Get all waiting periodic tasks.
  65. :returns: list of :class:`celery.models.PeriodicTaskMeta` objects.
  66. """
  67. periodic_tasks = tasks.get_all_periodic()
  68. waiting = []
  69. # XXX This will become a lot of queries. Maybe just only create
  70. # the rows at init, and then select all later.
  71. for task_name, task in periodic_tasks.items():
  72. self.lock()
  73. task_meta, created = self.get_or_create(name=task_name)
  74. # task_run.every must be a timedelta object.
  75. run_every_drifted = task.run_every + SERVER_DRIFT
  76. run_at = task_meta.last_run_at + run_every_drifted
  77. if datetime.now() > run_at:
  78. waiting.append(task_meta)
  79. self.unlock()
  80. return waiting