|
@@ -1,13 +1,56 @@
|
|
|
from django.db import models
|
|
|
from celery.registry import tasks
|
|
|
from celery.managers import TaskManager, PeriodicTaskManager
|
|
|
+from celery.managers import RetryQueueManager
|
|
|
from yadayada.models import PickledObjectField
|
|
|
from django.utils.translation import ugettext_lazy as _
|
|
|
-
|
|
|
-
|
|
|
+from Queue import Queue
|
|
|
+
|
|
|
+
|
|
|
+class RetryQueue(object):
|
|
|
+ queue = Queue()
|
|
|
+
|
|
|
+ class Item(object):
|
|
|
+ def __init__(self, task_name, task_id, args, kwargs):
|
|
|
+ self.task_name = task_name
|
|
|
+ self.task_id = task_id
|
|
|
+ self.args = args
|
|
|
+ self.kwargs = kwargs
|
|
|
+ self.retry_count = 0
|
|
|
+
|
|
|
+ def retry(self):
|
|
|
+ self.task.requeue(self.task_id, self.args, self.kwargs)
|
|
|
+ self.retry_count += 1
|
|
|
+ self.last_retry_at = time.time()
|
|
|
+ return self.retry_count
|
|
|
+
|
|
|
+ @property
|
|
|
+ def task(self):
|
|
|
+ return tasks[self.task_name]
|
|
|
+
|
|
|
+ def put(self, task_name, task_id, args, kwargs):
|
|
|
+ self.queue.put(self.Item(task_name, task_id, args, kwargs))
|
|
|
+
|
|
|
+ def get(self):
|
|
|
+ if not self.queue.qsize():
|
|
|
+ return None
|
|
|
+ return self.queue.get()
|
|
|
+retry_queue = RetryQueue()
|
|
|
+
|
|
|
+
|
|
|
+TASK_STATUS_PENDING = "PENDING"
|
|
|
+TASK_STATUS_RETRY = "RETRY"
|
|
|
+TASK_STATUS_FAILURE = "FAILURE"
|
|
|
+TASK_STATUS_DONE = "DONE"
|
|
|
+TASK_STATUSES = (TASK_STATUS_PENDING, TASK_STATUS_RETRY,
|
|
|
+ TASK_STATUS_FAILURE, TASK_STATUS_DONE)
|
|
|
+TASK_STATUSES_CHOICES = zip(TASK_STATUSES, TASK_STATUSES)
|
|
|
+
|
|
|
class TaskMeta(models.Model):
|
|
|
task_id = models.CharField(_(u"task id"), max_length=255, unique=True)
|
|
|
- is_done = models.BooleanField(_(u"is done"), default=False)
|
|
|
+ status = models.CharField(_(u"task status"), max_length=50,
|
|
|
+ default=TASK_STATUS_PENDING, choices=TASK_STATUSES_CHOICES)
|
|
|
+ result = PickledObjectField()
|
|
|
date_done = models.DateTimeField(_(u"done at"), auto_now=True)
|
|
|
|
|
|
objects = TaskManager()
|
|
@@ -20,34 +63,6 @@ class TaskMeta(models.Model):
|
|
|
return u"<Task: %s done:%s>" % (self.task_id, self.is_done)
|
|
|
|
|
|
|
|
|
-class RetryTask(models.Model):
|
|
|
- task_id = models.CharField(_(u"task id"), max_length=255, unique=True)
|
|
|
- task_name = models.CharField_(u"task name"), max_length=255)
|
|
|
- stack = PickledObjectField()
|
|
|
- retry_count = models.PositiveIntegerField(_(u"retry count"), default=0)
|
|
|
- last_retry_at = models.DateTimeField(_(u"last retry at"), auto_now=True,
|
|
|
- blank=True)
|
|
|
-
|
|
|
- objects = RetryQueueManager()
|
|
|
-
|
|
|
- class Meta:
|
|
|
- verbose_name = _(u"retry task")
|
|
|
- verbose_name_plural = _(u"retry tasks")
|
|
|
-
|
|
|
- def __unicode__(self):
|
|
|
- return u"<RetryTask: %s (retries: %d, last at: %s)" % (
|
|
|
- self.task_id, self.retry_count, self.last_retry)
|
|
|
-
|
|
|
- def retry(self):
|
|
|
- task = tasks[self.task_name]
|
|
|
- args = self.stack.get("args")
|
|
|
- kwargs = self.stack.get("kwargs")
|
|
|
- task.retry(self.task_id, args, kwargs)
|
|
|
- self.retry_count += 1
|
|
|
- self.save()
|
|
|
- return self.retry_count
|
|
|
-
|
|
|
-
|
|
|
class PeriodicTaskMeta(models.Model):
|
|
|
name = models.CharField(_(u"name"), max_length=255, unique=True)
|
|
|
last_run_at = models.DateTimeField(_(u"last time run"),
|