Browse Source

Less code in the mainloop (refactored)

Ask Solem 16 years ago
parent
commit
e087c6524e
1 changed files with 24 additions and 9 deletions
  1. 24 9
      celery/worker.py

+ 24 - 9
celery/worker.py

@@ -58,6 +58,23 @@ class TaskWrapper(object):
         return pool.apply_async(self.task_func, self.args, task_func_kwargs)
 
 
+class EventTimer(object):
+    """Do something at an interval."""
+
+    def __init__(self, event, interval=None):
+        self.event = event
+        self.interval = interval
+        self.last_triggered = None
+
+    def tick(self):
+        if not self.interval: # never trigger if no interval.
+            return
+        if not self.last_triggered or \
+                time.time() > self.last_triggered + self.interval:
+            self.event()
+            self.last_triggered = time.time()
+
+
 class TaskDaemon(object):
     """Executes tasks waiting in the task queue.
 
@@ -116,13 +133,16 @@ class TaskDaemon(object):
         return waiting_tasks
 
     def run(self):
-        """Run the worker server."""
+        """The worker server's main loop."""
         results = ProcessQueue(self.concurrency, logger=self.logger,
                 done_msg="Task %(name)s[%(id)s] processed: %(return_value)s")
-        last_empty_emit = None
+        ev_msg_waiting = EventTimer(lambda: self.logger.info(
+                                            "Waiting for queue..."),
+                                self.empty_msg_emit_every)
+        ev_run_periodic_tasks = EventTimer(self.run_periodic_tasks, 1)
 
         while True:
-            self.run_periodic_tasks()
+            ev_run_periodic_tasks.tick()
             try:
                 result, task_name, task_id = self.execute_next_task()
             except ValueError:
@@ -130,12 +150,7 @@ class TaskDaemon(object):
                 # probably because it got an exception.
                 continue
             except EmptyQueue:
-                emit_every = self.empty_msg_emit_every
-                if emit_every:
-                    if not last_empty_emit or \
-                            time.time() > last_empty_emit + emit_every:
-                        self.logger.info("Waiting for queue.")
-                        last_empty_emit = time.time()
+                ev_msg_waiting.tick()
                 time.sleep(self.queue_wakeup_after)
                 continue
             except UnknownTask, e: