models.py 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. from django.db import models
  2. from celery.registry import tasks
  3. from celery.managers import TaskManager, PeriodicTaskManager
  4. from yadayada.models import PickledObjectField
  5. from django.utils.translation import ugettext_lazy as _
  6. from Queue import Queue
  7. class RetryQueue(object):
  8. queue = Queue()
  9. class Item(object):
  10. def __init__(self, task_name, task_id, args, kwargs):
  11. self.task_name = task_name
  12. self.task_id = task_id
  13. self.args = args
  14. self.kwargs = kwargs
  15. self.retry_count = 0
  16. def retry(self):
  17. self.task.requeue(self.task_id, self.args, self.kwargs)
  18. self.retry_count += 1
  19. self.last_retry_at = time.time()
  20. return self.retry_count
  21. @property
  22. def task(self):
  23. return tasks[self.task_name]
  24. def put(self, task_name, task_id, args, kwargs):
  25. self.queue.put(self.Item(task_name, task_id, args, kwargs))
  26. def get(self):
  27. if not self.queue.qsize():
  28. return None
  29. return self.queue.get()
  30. retry_queue = RetryQueue()
  31. TASK_STATUS_PENDING = "PENDING"
  32. TASK_STATUS_RETRY = "RETRY"
  33. TASK_STATUS_FAILURE = "FAILURE"
  34. TASK_STATUS_DONE = "DONE"
  35. TASK_STATUSES = (TASK_STATUS_PENDING, TASK_STATUS_RETRY,
  36. TASK_STATUS_FAILURE, TASK_STATUS_DONE)
  37. TASK_STATUSES_CHOICES = zip(TASK_STATUSES, TASK_STATUSES)
  38. class TaskMeta(models.Model):
  39. task_id = models.CharField(_(u"task id"), max_length=255, unique=True)
  40. status = models.CharField(_(u"task status"), max_length=50,
  41. default=TASK_STATUS_PENDING, choices=TASK_STATUSES_CHOICES)
  42. result = PickledObjectField()
  43. date_done = models.DateTimeField(_(u"done at"), auto_now=True)
  44. objects = TaskManager()
  45. class Meta:
  46. verbose_name = _(u"task meta")
  47. verbose_name_plural = _(u"task meta")
  48. def __unicode__(self):
  49. return u"<Task: %s done:%s>" % (self.task_id, self.status)
  50. class PeriodicTaskMeta(models.Model):
  51. name = models.CharField(_(u"name"), max_length=255, unique=True)
  52. last_run_at = models.DateTimeField(_(u"last time run"),
  53. auto_now=True, blank=True)
  54. total_run_count = models.PositiveIntegerField(_(u"total run count"),
  55. default=0)
  56. objects = PeriodicTaskManager()
  57. class Meta:
  58. verbose_name = _(u"periodic task")
  59. verbose_name_plural = _(u"periodic tasks")
  60. def __unicode__(self):
  61. return u"<PeriodicTask: %s [last-run:%s, total-run:%d]>" % (
  62. self.name, self.last_run_at, self.total_run_count)
  63. def delay(self, *args, **kwargs):
  64. self.task.delay()
  65. self.total_run_count = self.total_run_count + 1
  66. self.save()
  67. @property
  68. def task(self):
  69. return tasks[self.name]