Ver código fonte

Beat: Use task.apply_async instead of send_task if task registered, also reuse publisher

Ask Solem 14 anos atrás
pai
commit
135c981760
1 arquivos alterados com 21 adições e 7 exclusões
  1. 21 7
      celery/beat.py

+ 21 - 7
celery/beat.py

@@ -15,7 +15,7 @@ from celery import conf
 from celery import platforms
 from celery.execute import send_task
 from celery.schedules import maybe_schedule
-from celery.messaging import establish_connection
+from celery.messaging import establish_connection, TaskPublisher
 from celery.utils import instantiate
 from celery.utils.info import humanize_seconds
 
@@ -130,6 +130,7 @@ class Scheduler(UserDict):
 
     """
     Entry = ScheduleEntry
+    Publisher = TaskPublisher
 
     def __init__(self, schedule=None, logger=None, max_interval=None,
             **kwargs):
@@ -141,13 +142,13 @@ class Scheduler(UserDict):
         self.max_interval = max_interval or conf.CELERYBEAT_MAX_LOOP_INTERVAL
         self.setup_schedule()
 
-    def maybe_due(self, entry, connection=None):
+    def maybe_due(self, entry, publisher=None):
         is_due, next_time_to_run = entry.is_due()
 
         if is_due:
             self.logger.debug("Scheduler: Sending due task %s" % entry.task)
             try:
-                result = self.apply_async(entry, connection=connection)
+                result = self.apply_async(entry, publisher=publisher)
             except SchedulingError, exc:
                 self.logger.error("Scheduler: %s" % exc)
             else:
@@ -163,15 +164,17 @@ class Scheduler(UserDict):
         """
         remaining_times = []
         connection = establish_connection()
+        publisher = self.Publisher(connection=connection)
         try:
             try:
                 for entry in self.schedule.itervalues():
-                    next_time_to_run = self.maybe_due(entry, connection)
+                    next_time_to_run = self.maybe_due(entry, publisher)
                     if next_time_to_run:
                         remaining_times.append(next_time_to_run)
             except RuntimeError:
                 pass
         finally:
+            publisher.close()
             connection.close()
 
         return min(remaining_times + [self.max_interval])
@@ -180,15 +183,26 @@ class Scheduler(UserDict):
         new_entry = self.schedule[entry.name] = entry.next()
         return new_entry
 
-    def apply_async(self, entry, connection=None, **kwargs):
+    def apply_async(self, entry, publisher=None, **kwargs):
         # Update timestamps and run counts before we actually execute,
         # so we have that done if an exception is raised (doesn't schedule
         # forever.)
         entry = self.reserve(entry)
 
         try:
-            result = self.send_task(entry.task, entry.args, entry.kwargs,
-                                    connection=connection, **entry.options)
+            task = registry.tasks[entry.task]
+        except KeyError:
+            task = None
+
+        try:
+            if task:
+                result = task.apply_async(entry.args, entry.kwargs,
+                                          publisher=publisher,
+                                          **entry.options)
+            else:
+                result = self.send_task(entry.task, entry.args, entry.kwargs,
+                                        publisher=publisher,
+                                        **entry.options)
         except Exception, exc:
             raise SchedulingError("Couldn't apply scheduled task %s: %s" % (
                     entry.name, exc))