Kaynağa Gözat

Try to fix a race condition with PeriodicTasks.
Database entries for periodic tasks are now created at celeryd startup instead
of for each check (which has been a forgotten TODO/XXX in the code for a long
time)

Ask Solem 15 yıl önce
ebeveyn
işleme
022635ff85

+ 4 - 0
celery/backends/database.py

@@ -12,6 +12,10 @@ class Backend(BaseBackend):
         super(Backend, self).__init__(*args, **kwargs)
         self._cache = {}
 
+    def init_periodic_tasks(self):
+        """Create entries for all periodic tasks in the database."""
+        PeriodicTaskMeta.objects.init_entries()
+
     def run_periodic_tasks(self):
         """Run all waiting periodic tasks.
 

+ 34 - 11
celery/managers.py

@@ -77,22 +77,45 @@ class PeriodicTaskManager(models.Manager):
         row = cursor.fetchone()
         return row
 
+    def init_entries(self):
+        """Add entries for all registered periodic tasks.
+
+        Should be run at worker start.
+        """
+        periodic_tasks = tasks.get_all_periodic()
+        for task_name in periodic_tasks.keys():
+            task_meta, created = self.get_or_create(name=task_name)
+
+    def is_time(self, last_run_at, run_every):
+        run_every_drifted = run_every + SERVER_DRIFT
+        run_at = last_run_at + run_every_drifted
+        if datetime.now() > run_at:
+            return True
+        return False
+
     def get_waiting_tasks(self):
         """Get all waiting periodic tasks.
 
         :returns: list of :class:`celery.models.PeriodicTaskMeta` objects.
         """
         periodic_tasks = tasks.get_all_periodic()
+
+        # Find all periodic tasks to be run.
         waiting = []
-        # XXX This will become a lot of queries. Maybe just only create
-        # the rows at init, and then select all later.
-        for task_name, task in periodic_tasks.items():
-            self.lock()
-            task_meta, created = self.get_or_create(name=task_name)
-            # task_run.every must be a timedelta object.
-            run_every_drifted = task.run_every + SERVER_DRIFT
-            run_at = task_meta.last_run_at + run_every_drifted
-            if datetime.now() > run_at:
-                waiting.append(task_meta)
-            self.unlock()
+        for task_meta in self.all():
+            if task_meta.name in periodic_tasks:
+                task = periodic_tasks[task_meta.name]
+                run_every = task.run_every
+                if self.is_time(task_meta.last_run_at, run_every):
+                    # Get the object again to be sure noone else
+                    # has already taken care of it.
+                    self.lock()
+                    try:
+                        secure = self.get(pk=task_meta.pk)
+                        if self.is_time(secure.last_run_at, run_every):
+                            secure.last_run_at = datetime.now()
+                            secure.save()
+                            waiting.append(secure)
+                    finally:
+                        self.unlock()
         return waiting

+ 3 - 1
celery/models.py

@@ -8,6 +8,7 @@ from celery.registry import tasks
 from celery.managers import TaskManager, PeriodicTaskManager
 from celery.fields import PickledObjectField
 from django.utils.translation import ugettext_lazy as _
+from datetime import datetime
 
 TASK_STATUS_PENDING = "PENDING"
 TASK_STATUS_RETRY = "RETRY"
@@ -41,7 +42,8 @@ class PeriodicTaskMeta(models.Model):
     """Information about a Periodic Task."""
     name = models.CharField(_(u"name"), max_length=255, unique=True)
     last_run_at = models.DateTimeField(_(u"last time run"),
-                                       auto_now=True, blank=True)
+                                       blank=True, 
+                                       default=datetime.fromtimestamp(0))
     total_run_count = models.PositiveIntegerField(_(u"total run count"),
                                                   default=0)
 

+ 5 - 5
celery/tests/test_models.py

@@ -19,7 +19,8 @@ class TestModels(unittest.TestCase):
         return taskmeta
 
     def createPeriodicTaskMeta(self, name):
-        ptaskmeta, created = PeriodicTaskMeta.objects.get_or_create(name=name)
+        ptaskmeta, created = PeriodicTaskMeta.objects.get_or_create(name=name,
+                defaults={"last_run_at": datetime.now()})
         return ptaskmeta
 
     def test_taskmeta(self):
@@ -56,10 +57,9 @@ class TestModels(unittest.TestCase):
         # check that repr works.
         self.assertTrue(unicode(p).startswith("<PeriodicTask:"))
         self.assertFalse(p in PeriodicTaskMeta.objects.get_waiting_tasks())
-        # Have to avoid save() because it applies the auto_now=True.
-        PeriodicTaskMeta.objects.filter(name=p.name).update(
-                last_run_at=datetime.now() - (TestPeriodicTask.run_every +
-                timedelta(seconds=10)))
+        p.last_run_at = datetime.now() - (TestPeriodicTask.run_every +
+                timedelta(seconds=10))
+        p.save()
         self.assertTrue(p in PeriodicTaskMeta.objects.get_waiting_tasks())
         self.assertTrue(isinstance(p.task, TestPeriodicTask))
 

+ 3 - 0
celery/worker/controllers.py

@@ -101,6 +101,9 @@ class PeriodicWorkController(InfinityThread):
         self.hold_queue = hold_queue
         self.bucket_queue = bucket_queue
 
+        # Do backend-specific periodic task initialization.
+        default_periodic_status_backend.init_periodic_tasks()
+
     def on_iteration(self):
         logger = get_logger()
         logger.debug("PeriodicWorkController: Running periodic tasks...")