Przeglądaj źródła

Beat: Keep the same connection/producer

Ask Solem 14 lat temu
rodzic
commit
1db111890e
1 zmienionych plików z 21 dodań i 12 usunięć
  1. 21 12
      celery/beat.py

+ 21 - 12
celery/beat.py

@@ -132,6 +132,9 @@ class Scheduler(UserDict):
     """
     Entry = ScheduleEntry
 
+    _connection = None
+    _publisher = None
+
     def __init__(self, schedule=None, logger=None, max_interval=None,
             app=None, Publisher=None, lazy=False, **kwargs):
         UserDict.__init__(self)
@@ -162,6 +165,18 @@ class Scheduler(UserDict):
                                                        result.task_id))
         return next_time_to_run
 
+    @property
+    def connection(self):
+        if self._connection is None:
+            self._connection = self.app.broker_connection()
+        return self._connection
+
+    @property
+    def publisher(self):
+        if self._publisher is None:
+            self._publisher = self.Publisher(connection=self.connection)
+        return self._publisher
+
     def tick(self):
         """Run a tick, that is one iteration of the scheduler.
 
@@ -169,19 +184,13 @@ class Scheduler(UserDict):
 
         """
         remaining_times = []
-        connection = self.app.broker_connection()
-        publisher = self.Publisher(connection=connection)
         try:
-            try:
-                for entry in self.schedule.itervalues():
-                    next_time_to_run = self.maybe_due(entry, publisher)
-                    if next_time_to_run:
-                        remaining_times.append(next_time_to_run)
-            except RuntimeError:
-                pass
-        finally:
-            publisher.close()
-            connection.close()
+            for entry in self.schedule.itervalues():
+                next_time_to_run = self.maybe_due(entry, self.publisher)
+                if next_time_to_run:
+                    remaining_times.append(next_time_to_run)
+        except RuntimeError:
+            pass
 
         return min(remaining_times + [self.max_interval])