Browse Source

Handle shut down a bit better.

When shutting down make sure we kill the PeriodicWorkController thread
otherwise queries will still be sent to the DB. More work needs to be done
handling Crtl-C, but it is acceptable for now.
Brian Rosner 16 years ago
parent
commit
dff651b635
1 changed files with 45 additions and 26 deletions
  1. 45 26
      celery/worker.py

+ 45 - 26
celery/worker.py

@@ -274,12 +274,24 @@ class PeriodicWorkController(threading.Thread):
         >>> PeriodicWorkController().start()
     
     """
-
+    
+    def __init__(self):
+        super(PeriodicWorkController, self).__init__()
+        self._shutdown = threading.Event()
+        self._stopped = threading.Event()
+    
     def run(self):
         """Don't use :meth:`run`. use :meth:`start`."""
         while True:
+            if self._shutdown.isSet():
+                break
             default_periodic_status_backend.run_periodic_tasks()
             time.sleep(1)
+        self._stopped.set() # indicate that we are stopped
+    
+    def stop(self):
+        self._shutdown.set()
+        self._stopped.wait() # block until this thread is done
 
 
 class WorkController(object):
@@ -348,6 +360,7 @@ class WorkController(object):
                                     self.queue_wakeup_after
         self.logger = setup_logger(loglevel, logfile)
         self.pool = TaskPool(self.concurrency, logger=self.logger)
+        self.periodicworkcontroller = PeriodicWorkController()
         self.task_consumer = None
         self.task_consumer_it = None
         self.is_detached = is_detached
@@ -419,7 +432,11 @@ class WorkController(object):
     def schedule_retry_tasks(self):
         """Reschedule all requeued tasks waiting for retry."""
         pass
-
+    
+    def shutdown(self):
+        # shut down the periodic work controller thread
+        self.periodicworkcontroller.stop()
+        self.pool.terminate()
 
     def run(self):
         """Starts the workers main loop."""
@@ -427,7 +444,7 @@ class WorkController(object):
         ev_msg_waiting = EventTimer(log_wait, self.empty_msg_emit_every)
 
         self.pool.run()
-        PeriodicWorkController().start()
+        self.periodicworkcontroller.start()
 
         # If not running as daemon, and DEBUG logging level is enabled,
         # print pool PIDs and sleep for a second before we start.
@@ -436,26 +453,28 @@ class WorkController(object):
                 "|".join(map(str, self.pool.get_worker_pids()))))
             if not self.is_detached:
                 time.sleep(1)
-
-        while True:
-            try:
-                self.execute_next_task()
-            except ValueError:
-                # execute_next_task didn't return a r/name/id tuple,
-                # probably because it got an exception.
-                continue
-            except EmptyQueue:
-                ev_msg_waiting.tick()
-                time.sleep(self.queue_wakeup_after)
-                continue
-            except UnknownTask, exc:
-                self.logger.info("Unknown task ignored: %s" % (exc))
-                continue
-            except Exception, exc:
-                self.logger.critical("Message queue raised %s: %s\n%s" % (
-                             exc.__class__, exc, traceback.format_exc()))
-                continue
-            except:
-                self.pool.terminate()
-                raise
-
+        
+        try:
+            while True:
+                try:
+                    self.execute_next_task()
+                except ValueError:
+                    # execute_next_task didn't return a r/name/id tuple,
+                    # probably because it got an exception.
+                    continue
+                except EmptyQueue:
+                    ev_msg_waiting.tick()
+                    time.sleep(self.queue_wakeup_after)
+                    continue
+                except UnknownTask, exc:
+                    self.logger.info("Unknown task ignored: %s" % (exc))
+                    continue
+                except Exception, exc:
+                    self.logger.critical("Message queue raised %s: %s\n%s" % (
+                                 exc.__class__, exc, traceback.format_exc()))
+                    continue
+                except:
+                    self.shutdown()
+                    raise
+        except (SystemExit, KeyboardInterrupt):
+            self.shutdown()