فهرست منبع

The PeriodicTask worker is now its own thread instead of blocking the TaskController loop.

Ask Solem 17 سال پیش
والد
کامیت
11319d2dfe
1فایلهای تغییر یافته به همراه10 افزوده شده و 12 حذف شده
  1. 10 12
      celery/worker.py

+ 10 - 12
celery/worker.py

@@ -13,6 +13,7 @@ from celery.timer import EventTimer
 from django.core.mail import mail_admins
 from django.core.mail import mail_admins
 import multiprocessing
 import multiprocessing
 import traceback
 import traceback
+import threading
 import logging
 import logging
 import socket
 import socket
 import time
 import time
@@ -242,6 +243,13 @@ class TaskWrapper(object):
                 meta={"task_id": self.task_id, "task_name": self.task_name})
                 meta={"task_id": self.task_id, "task_name": self.task_name})
 
 
 
 
+class PeriodicWorkController(threading.Thread):
+
+    def run(self):
+        while True:
+            default_periodic_status_backend.run_periodic_tasks()
+
+
 class WorkController(object):
 class WorkController(object):
     """Executes tasks waiting in the task queue.
     """Executes tasks waiting in the task queue.
 
 
@@ -376,27 +384,18 @@ class WorkController(object):
         """
         """
         self.process_task(self.receive_message())
         self.process_task(self.receive_message())
 
 
-    def run_periodic_tasks(self):
-        """Schedule all waiting periodic tasks for execution.
-
-        """
-        self.logger.debug("Looking for periodic tasks ready for execution...")
-        default_periodic_status_backend.run_periodic_tasks()
-
     def schedule_retry_tasks(self):
     def schedule_retry_tasks(self):
         """Reschedule all requeued tasks waiting for retry."""
         """Reschedule all requeued tasks waiting for retry."""
         pass
         pass
 
 
+
     def run(self):
     def run(self):
         """Starts the workers main loop."""
         """Starts the workers main loop."""
         log_wait = lambda: self.logger.info("Waiting for queue...")
         log_wait = lambda: self.logger.info("Waiting for queue...")
         ev_msg_waiting = EventTimer(log_wait, self.empty_msg_emit_every)
         ev_msg_waiting = EventTimer(log_wait, self.empty_msg_emit_every)
-        events = [
-            EventTimer(self.run_periodic_tasks, 1),
-            EventTimer(self.schedule_retry_tasks, 2),
-        ]
 
 
         self.pool.run()
         self.pool.run()
+        PeriodicWorkController().start()
 
 
         # If not running as daemon, and DEBUG logging level is enabled,
         # If not running as daemon, and DEBUG logging level is enabled,
         # print pool PIDs and sleep for a second before we start.
         # print pool PIDs and sleep for a second before we start.
@@ -407,7 +406,6 @@ class WorkController(object):
                 time.sleep(1)
                 time.sleep(1)
 
 
         while True:
         while True:
-            [event.tick() for event in events]
             try:
             try:
                 self.execute_next_task()
                 self.execute_next_task()
             except ValueError:
             except ValueError: