Просмотр исходного кода

Ability to retry task if it fails.

Ask Solem 16 лет назад
Родитель
Сommit
997dcf07dd
4 измененных файлов с 79 добавлено и 2 удалено
  1. 19 0
      celery/managers.py
  2. 10 2
      celery/messaging.py
  3. 29 0
      celery/models.py
  4. 21 0
      celery/task.py

+ 19 - 0
celery/managers.py

@@ -39,3 +39,22 @@ class PeriodicTaskManager(models.Manager):
             if datetime.now() > run_at:
                 waiting.append(task_meta)
         return waiting
+
+class RetryQueueManager(models.Manager):
+
+    def add(self, task_name, task_id, args, kwargs):
+        new_item, c = self.get_or_create(task_id=task_id, defaults={
+                                            "task_name": task_name})
+        new_item.stack = {"args": args, "kwargs": kwargs}
+        new_item.save()
+        return new_item
+
+    def get_waiting_tasks(self):
+        waiting_tasks = self.all().order_by('-last_retry', 'retry_count')
+        to_retry = []
+        for waiting_task in waiting_tasks:
+            task = tasks[waiting_task.task_name]
+            retry_at = waiting_task.last_retry_at + task.retry_interval
+            if datetime.now() > retry_at:
+                to_retry.append(waiting_task)
+        return to_retry

+ 10 - 2
celery/messaging.py

@@ -18,15 +18,23 @@ class TaskPublisher(Publisher):
         return self._delay_task(task_name=task_name, args=task_args,
                                 kwargs=task_kwargs)
 
+
     def delay_task_in_set(self, task_name, taskset_id, task_args,
             task_kwargs):
         return self._delay_task(task_name=task_name, part_of_set=taskset_id,
                                 args=task_args, kwargs=task_kwargs)
+    
+    def requeue_task(self, task_name, task_id, task_args, task_kwargs,
+            part_of_set=None):
+        return self._delay_task(task_name=task_name, part_of_set=part_of_set,
+                                task_id=task_id, args=task_args,
+                                kwargs=task_kwargs)
 
-    def _delay_task(self, task_name, part_of_set=None, args=None, kwargs=None):
+    def _delay_task(self, task_name, task_id=None, part_of_set=None,
+            args=None, kwargs=None):
         args = args or []
         kwargs = kwargs or {}
-        task_id = str(uuid.uuid4())
+        task_id = task_id or str(uuid.uuid4())
         message_data = {
             "id": task_id,
             "task": task_name,

+ 29 - 0
celery/models.py

@@ -1,6 +1,7 @@
 from django.db import models
 from celery.registry import tasks
 from celery.managers import TaskManager, PeriodicTaskManager
+from yadayada.models import PickledObjectField
 from django.utils.translation import ugettext_lazy as _
 
 
@@ -19,6 +20,34 @@ class TaskMeta(models.Model):
         return u"<Task: %s done:%s>" % (self.task_id, self.is_done)
 
 
+class RetryTask(models.Model):
+    task_id = models.CharField(_(u"task id"), max_length=255, unique=True)
+    task_name = models.CharField_(u"task name"), max_length=255)
+    stack = PickledObjectField()
+    retry_count = models.PositiveIntegerField(_(u"retry count"), default=0)
+    last_retry_at = models.DateTimeField(_(u"last retry at"), auto_now=True,
+                                         blank=True)
+
+    objects = RetryQueueManager()
+
+    class Meta:
+        verbose_name = _(u"retry task")
+        verbose_name_plural = _(u"retry tasks")
+
+    def __unicode__(self):
+        return u"<RetryTask: %s (retries: %d, last at: %s)" % (
+                self.task_id, self.retry_count, self.last_retry)
+
+    def retry(self):
+        task = tasks[self.task_name]
+        args = self.stack.get("args")
+        kwargs = self.stack.get("kwargs")
+        task.retry(self.task_id, args, kwargs)
+        self.retry_count += 1
+        self.save()
+        return self.retry_count
+
+
 class PeriodicTaskMeta(models.Model):
     name = models.CharField(_(u"name"), max_length=255, unique=True)
     last_run_at = models.DateTimeField(_(u"last time run"),

+ 21 - 0
celery/task.py

@@ -6,10 +6,15 @@ from celery.messaging import TaskPublisher, TaskConsumer
 from celery.models import TaskMeta
 from django.core.cache import cache
 from datetime import timedelta
+from celery.models import RetryTask
 import uuid
 import traceback
 
 
+class RetryTask(Exception):
+    """The task has failed and is to be appended to the retry queue."""
+
+
 def delay_task(task_name, *args, **kwargs):
     """Delay a task for execution by the ``celery`` daemon.
 
@@ -111,6 +116,10 @@ class Task(object):
     """
     name = None
     type = "regular"
+    max_retries = 0 # unlimited
+    retry_interval = timedelta(seconds=2)
+
+    RetryTask = RetryTask
 
     def __init__(self):
         if not self.name:
@@ -121,10 +130,13 @@ class Task(object):
         the ``run`` method. It also catches any exceptions and logs them."""
         try:
             retval = self.run(*args, **kwargs)
+        except RetryTask, e:
+            self.retry(kwargs["task_id"], args, kwargs)
         except Exception, e:
             logger = self.get_logger(**kwargs)
             logger.critical("Task got exception %s: %s\n%s" % (
                                 e.__class__, e, traceback.format_exc()))
+            self.handle_exception(e, args, kwargs)
             return
         else:
             return retval
@@ -147,6 +159,15 @@ class Task(object):
         """Get a celery task message consumer."""
         return TaskConsumer(connection=DjangoAMQPConnection())
 
+    def requeue(self, task_id, args, kwargs):
+        self.get_publisher().requeue_task(self.name, task_id, args, kwargs)
+
+    def retry(self, task_id, args, kwargs):
+        RetryTask.objects.add(self.name, task_id, args, kwargs)
+
+    def handle_exception(self, exception, retry_args, retry_kwargs):
+        pass
+
     @classmethod
     def delay(cls, *args, **kwargs):
         """Delay this task for execution by the ``celery`` daemon(s)."""