Prechádzať zdrojové kódy

celery.beat: Reuse the same connection when publishing a large set of tasks.

Mark Stover 15 rokov pred
rodič
commit
c40fbe981b
1 zmenil súbory, kde vykonal 18 pridanie a 14 odobranie
  1. 18 14
      celery/beat.py

+ 18 - 14
celery/beat.py

@@ -104,18 +104,22 @@ class Scheduler(UserDict):
         error = self.logger.error
 
         remaining_times = []
-        for entry in self.schedule.values():
-            is_due, next_time_to_run = self.is_due(entry)
-            if is_due:
-                debug("Scheduler: Sending due task %s" % entry.name)
-                try:
-                    result = self.apply_async(entry)
-                except SchedulingError, exc:
-                    error("Scheduler: %s" % exc)
-                else:
-                    debug("%s sent. id->%s" % (entry.name, result.task_id))
-            if next_time_to_run:
-                remaining_times.append(next_time_to_run)
+        connection = establish_connection()
+        try:
+            for entry in self.schedule.values():
+                is_due, next_time_to_run = self.is_due(entry)
+                if is_due:
+                    debug("Scheduler: Sending due task %s" % entry.name)
+                    try:
+                        result = self.apply_async(entry, connection=connection)
+                    except SchedulingError, exc:
+                        error("Scheduler: %s" % exc)
+                    else:
+                        debug("%s sent. id->%s" % (entry.name, result.task_id))
+                if next_time_to_run:
+                    remaining_times.append(next_time_to_run)
+        finally:
+            connection.close()
 
         return min(remaining_times + [self.max_interval])
 
@@ -125,7 +129,7 @@ class Scheduler(UserDict):
     def is_due(self, entry):
         return entry.is_due(self.get_task(entry.name))
 
-    def apply_async(self, entry):
+    def apply_async(self, entry, **kwargs):
 
         # Update timestamps and run counts before we actually execute,
         # so we have that done if an exception is raised (doesn't schedule
@@ -134,7 +138,7 @@ class Scheduler(UserDict):
         task = self.get_task(entry.name)
 
         try:
-            result = task.apply_async()
+            result = task.apply_async(**kwargs)
         except Exception, exc:
             raise SchedulingError("Couldn't apply scheduled task %s: %s" % (
                     task.name, exc))