models.py 1.1 KB

123456789101112131415161718192021222324252627282930313233
  1. from django.db import models
  2. from celery.registry import tasks
  3. from celery.managers import PeriodicTaskManager
  4. from django.utils.translation import ugettext_lazy as _
  5. __all__ = ["PeriodicTaskMeta"]
  6. class PeriodicTaskMeta(models.Model):
  7. name = models.CharField(_(u"name"), max_length=255, unique=True)
  8. last_run_at = models.DateTimeField(_(u"last time run"),
  9. auto_now=True, blank=True)
  10. total_run_count = models.PositiveIntegerField(_(u"total run count"),
  11. default=0)
  12. objects = PeriodicTaskManager()
  13. class Meta:
  14. verbose_name = _(u"periodic task")
  15. verbose_name_plural = _(u"periodic tasks")
  16. def __unicode__(self):
  17. return u"<PeriodicTask: %s [last-run:%s, total-run:%d]>" % (
  18. self.name, self.last_run_at, self.total_run_count)
  19. def delay(self, **kwargs):
  20. self.task.delay()
  21. self.total_run_count = self.total_run_count + 1
  22. self.save()
  23. @property
  24. def task(self):
  25. return tasks[self.name]