|  | @@ -1,5 +1,6 @@
 | 
	
		
			
				|  |  |  """celery.managers"""
 | 
	
		
			
				|  |  |  from django.db import models
 | 
	
		
			
				|  |  | +from django.db import connection
 | 
	
		
			
				|  |  |  from celery.registry import tasks
 | 
	
		
			
				|  |  |  from datetime import datetime, timedelta
 | 
	
		
			
				|  |  |  import random
 | 
	
	
		
			
				|  | @@ -55,6 +56,22 @@ class TaskManager(models.Manager):
 | 
	
		
			
				|  |  |  class PeriodicTaskManager(models.Manager):
 | 
	
		
			
				|  |  |      """Manager for :class:`celery.models.PeriodicTask` models."""
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +    def lock(self):
 | 
	
		
			
				|  |  | +        """Lock the periodic task table for reading."""
 | 
	
		
			
				|  |  | +        cursor = connection.cursor()
 | 
	
		
			
				|  |  | +        table = self.model._meta.db_table
 | 
	
		
			
				|  |  | +        cursor.execute("LOCK TABLES %s READ" % table)
 | 
	
		
			
				|  |  | +        row = cursor.fetchone()
 | 
	
		
			
				|  |  | +        return row
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    def unlock(self):
 | 
	
		
			
				|  |  | +        """Unlock the periodic task table."""
 | 
	
		
			
				|  |  | +        cursor = connection.cursor()
 | 
	
		
			
				|  |  | +        table = self.model._meta.db_table
 | 
	
		
			
				|  |  | +        cursor.execute("UNLOCK TABLES")
 | 
	
		
			
				|  |  | +        row = cursor.fetchone()
 | 
	
		
			
				|  |  | +        return row
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |      def get_waiting_tasks(self):
 | 
	
		
			
				|  |  |          """Get all waiting periodic tasks.
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -65,10 +82,12 @@ class PeriodicTaskManager(models.Manager):
 | 
	
		
			
				|  |  |          # XXX This will become a lot of queries. Maybe just only create
 | 
	
		
			
				|  |  |          # the rows at init, and then select all later.
 | 
	
		
			
				|  |  |          for task_name, task in periodic_tasks.items():
 | 
	
		
			
				|  |  | +            self.lock()
 | 
	
		
			
				|  |  |              task_meta, created = self.get_or_create(name=task_name)
 | 
	
		
			
				|  |  |              # task_run.every must be a timedelta object.
 | 
	
		
			
				|  |  |              run_every_drifted = task.run_every + SERVER_DRIFT
 | 
	
		
			
				|  |  |              run_at = task_meta.last_run_at + run_every_drifted
 | 
	
		
			
				|  |  |              if datetime.now() > run_at:
 | 
	
		
			
				|  |  |                  waiting.append(task_meta)
 | 
	
		
			
				|  |  | +            self.unlock()
 | 
	
		
			
				|  |  |          return waiting
 |