test_models.py 3.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. import unittest
  2. from datetime import datetime, timedelta
  3. from celery.models import TaskMeta, TaskSetMeta, PeriodicTaskMeta
  4. from celery.task import PeriodicTask
  5. from celery.registry import tasks
  6. from celery.utils import gen_unique_id
  7. class TestPeriodicTask(PeriodicTask):
  8. name = "celery.unittest.test_models.test_periodic_task"
  9. run_every = timedelta(minutes=30)
  10. class TestModels(unittest.TestCase):
  11. def createTaskMeta(self):
  12. id = gen_unique_id()
  13. taskmeta, created = TaskMeta.objects.get_or_create(task_id=id)
  14. return taskmeta
  15. def createTaskSetMeta(self):
  16. id = gen_unique_id()
  17. tasksetmeta, created = TaskSetMeta.objects.get_or_create(taskset_id=id)
  18. return tasksetmeta
  19. def createPeriodicTaskMeta(self, name):
  20. ptaskmeta, created = PeriodicTaskMeta.objects.get_or_create(name=name,
  21. defaults={"last_run_at": datetime.now()})
  22. return ptaskmeta
  23. def test_taskmeta(self):
  24. m1 = self.createTaskMeta()
  25. m2 = self.createTaskMeta()
  26. m3 = self.createTaskMeta()
  27. self.assertTrue(unicode(m1).startswith("<Task:"))
  28. self.assertTrue(m1.task_id)
  29. self.assertTrue(isinstance(m1.date_done, datetime))
  30. self.assertEquals(TaskMeta.objects.get_task(m1.task_id).task_id,
  31. m1.task_id)
  32. self.assertFalse(TaskMeta.objects.is_done(m1.task_id))
  33. TaskMeta.objects.store_result(m1.task_id, True, status="DONE")
  34. TaskMeta.objects.store_result(m2.task_id, True, status="DONE")
  35. self.assertTrue(TaskMeta.objects.is_done(m1.task_id))
  36. self.assertTrue(TaskMeta.objects.is_done(m2.task_id))
  37. # Have to avoid save() because it applies the auto_now=True.
  38. TaskMeta.objects.filter(task_id=m1.task_id).update(
  39. date_done=datetime.now() - timedelta(days=10))
  40. expired = TaskMeta.objects.get_all_expired()
  41. self.assertTrue(m1 in expired)
  42. self.assertFalse(m2 in expired)
  43. self.assertFalse(m3 in expired)
  44. TaskMeta.objects.delete_expired()
  45. self.assertFalse(m1 in TaskMeta.objects.all())
  46. def test_tasksetmeta(self):
  47. m1 = self.createTaskSetMeta()
  48. m2 = self.createTaskSetMeta()
  49. m3 = self.createTaskSetMeta()
  50. self.assertTrue(unicode(m1).startswith("<TaskSet:"))
  51. self.assertTrue(m1.taskset_id)
  52. self.assertTrue(isinstance(m1.date_done, datetime))
  53. self.assertEquals(TaskSetMeta.objects.get_taskset(m1.taskset_id).taskset_id,
  54. m1.taskset_id)
  55. # Have to avoid save() because it applies the auto_now=True.
  56. TaskSetMeta.objects.filter(taskset_id=m1.taskset_id).update(
  57. date_done=datetime.now() - timedelta(days=10))
  58. expired = TaskSetMeta.objects.get_all_expired()
  59. self.assertTrue(m1 in expired)
  60. self.assertFalse(m2 in expired)
  61. self.assertFalse(m3 in expired)
  62. TaskSetMeta.objects.delete_expired()
  63. self.assertFalse(m1 in TaskSetMeta.objects.all())
  64. def test_periodic_taskmeta(self):
  65. tasks.register(TestPeriodicTask)
  66. p = self.createPeriodicTaskMeta(TestPeriodicTask.name)
  67. # check that repr works.
  68. self.assertTrue(unicode(p).startswith("<PeriodicTask:"))
  69. self.assertFalse(p in PeriodicTaskMeta.objects.get_waiting_tasks())
  70. p.last_run_at = datetime.now() - (TestPeriodicTask.run_every +
  71. timedelta(seconds=10))
  72. p.save()
  73. self.assertTrue(p in PeriodicTaskMeta.objects.get_waiting_tasks())
  74. self.assertTrue(isinstance(p.task, TestPeriodicTask))
  75. p.delay()