Browse Source

Threads must call os._exit to terminate the server on crash

Ask Solem 14 years ago
parent
commit
ac0baa5542

+ 19 - 4
celery/concurrency/processes/pool.py

@@ -13,6 +13,7 @@ __all__ = ['Pool']
 #
 #
 
 
 import os
 import os
+import sys
 import errno
 import errno
 import threading
 import threading
 import Queue
 import Queue
@@ -22,6 +23,7 @@ import time
 import signal
 import signal
 
 
 from multiprocessing import Process, cpu_count, TimeoutError
 from multiprocessing import Process, cpu_count, TimeoutError
+from multiprocessing import util
 from multiprocessing.util import Finalize, debug
 from multiprocessing.util import Finalize, debug
 
 
 from celery.exceptions import SoftTimeLimitExceeded, TimeLimitExceeded
 from celery.exceptions import SoftTimeLimitExceeded, TimeLimitExceeded
@@ -58,6 +60,11 @@ def mapstar(args):
     return map(*args)
     return map(*args)
 
 
 
 
+def error(msg, *args, **kwargs):
+    if util._logger:
+        util._logger.error(msg, *args, **kwargs)
+
+
 class LaxBoundedSemaphore(threading._Semaphore):
 class LaxBoundedSemaphore(threading._Semaphore):
     """Semaphore that checks that # release is <= # acquires,
     """Semaphore that checks that # release is <= # acquires,
     but ignores if # releases >= value."""
     but ignores if # releases >= value."""
@@ -168,6 +175,14 @@ class PoolThread(threading.Thread):
         self._state = RUN
         self._state = RUN
         self.daemon = True
         self.daemon = True
 
 
+    def run(self):
+        try:
+            return self.body()
+        except Exception, exc:
+            error("Thread %r crashed: %r" % (self.__class__.__name__, exc, ),
+                  exc_info=sys.exc_info())
+            os._exit(1)
+
     def terminate(self):
     def terminate(self):
         self._state = TERMINATE
         self._state = TERMINATE
 
 
@@ -181,7 +196,7 @@ class Supervisor(PoolThread):
         self.pool = pool
         self.pool = pool
         super(Supervisor, self).__init__()
         super(Supervisor, self).__init__()
 
 
-    def run(self):
+    def body(self):
         debug('worker handler starting')
         debug('worker handler starting')
         while self._state == RUN and self.pool._state == RUN:
         while self._state == RUN and self.pool._state == RUN:
             self.pool._maintain_pool()
             self.pool._maintain_pool()
@@ -198,7 +213,7 @@ class TaskHandler(PoolThread):
         self.pool = pool
         self.pool = pool
         super(TaskHandler, self).__init__()
         super(TaskHandler, self).__init__()
 
 
-    def run(self):
+    def body(self):
         taskqueue = self.taskqueue
         taskqueue = self.taskqueue
         outqueue = self.outqueue
         outqueue = self.outqueue
         put = self.put
         put = self.put
@@ -249,7 +264,7 @@ class TimeoutHandler(PoolThread):
         self.putlock = putlock
         self.putlock = putlock
         super(TimeoutHandler, self).__init__()
         super(TimeoutHandler, self).__init__()
 
 
-    def run(self):
+    def body(self):
         processes = self.processes
         processes = self.processes
         cache = self.cache
         cache = self.cache
         putlock = self.putlock
         putlock = self.putlock
@@ -338,7 +353,7 @@ class ResultHandler(PoolThread):
         self.putlock = putlock
         self.putlock = putlock
         super(ResultHandler, self).__init__()
         super(ResultHandler, self).__init__()
 
 
-    def run(self):
+    def body(self):
         get = self.get
         get = self.get
         outqueue = self.outqueue
         outqueue = self.outqueue
         cache = self.cache
         cache = self.cache

+ 24 - 16
celery/utils/timer2.py

@@ -4,6 +4,8 @@ from __future__ import generators
 
 
 import atexit
 import atexit
 import heapq
 import heapq
+import logging
+import os
 import sys
 import sys
 import traceback
 import traceback
 import warnings
 import warnings
@@ -146,6 +148,7 @@ class Timer(Thread):
         self._shutdown = Event()
         self._shutdown = Event()
         self._stopped = Event()
         self._stopped = Event()
         self.mutex = Lock()
         self.mutex = Lock()
+        self.logger = logging.getLogger("timer2.Timer")
         self.not_empty = Condition(self.mutex)
         self.not_empty = Condition(self.mutex)
         self.setDaemon(True)
         self.setDaemon(True)
         self.setName("Timer-%s" % (self._timer_count(), ))
         self.setName("Timer-%s" % (self._timer_count(), ))
@@ -172,23 +175,28 @@ class Timer(Thread):
         return self.apply_entry(entry)
         return self.apply_entry(entry)
 
 
     def run(self):
     def run(self):
-        self.running = True
-        self.scheduler = iter(self.schedule)
-
-        while not self._shutdown.isSet():
-            delay = self.next()
-            if delay:
-                if self.on_tick:
-                    self.on_tick(delay)
-                if sleep is None:
-                    break
-                sleep(delay)
         try:
         try:
-            self._stopped.set()
-        except TypeError:           # pragma: no cover
-            # we lost the race at interpreter shutdown,
-            # so gc collected built-in modules.
-            pass
+            self.running = True
+            self.scheduler = iter(self.schedule)
+
+            while not self._shutdown.isSet():
+                delay = self.next()
+                if delay:
+                    if self.on_tick:
+                        self.on_tick(delay)
+                    if sleep is None:
+                        break
+                    sleep(delay)
+            try:
+                self._stopped.set()
+            except TypeError:           # pragma: no cover
+                # we lost the race at interpreter shutdown,
+                # so gc collected built-in modules.
+                pass
+        except Exception, exc:
+            self.logger.error("Thread Timer crashed: %r" % (exc, ),
+                  exc_info=sys.exc_info())
+            os._exit(1)
 
 
     def stop(self):
     def stop(self):
         if self.running:
         if self.running:

+ 7 - 1
celery/worker/autoscale.py

@@ -1,3 +1,4 @@
+import os
 import sys
 import sys
 import threading
 import threading
 import traceback
 import traceback
@@ -56,7 +57,12 @@ class Autoscaler(threading.Thread):
 
 
     def run(self):
     def run(self):
         while not self._shutdown.isSet():
         while not self._shutdown.isSet():
-            self.scale()
+            try:
+                self.scale()
+            except Exception, exc:
+                self.logger.error("Thread Autoscaler crashed: %r" % (exc, ),
+                                  exc_info=sys.exc_info())
+                os._exit(1)
         self._stopped.set()
         self._stopped.set()
 
 
     def stop(self):
     def stop(self):

+ 9 - 1
celery/worker/controllers.py

@@ -4,6 +4,7 @@ Worker Controller Threads
 
 
 """
 """
 import logging
 import logging
+import os
 import sys
 import sys
 import threading
 import threading
 import traceback
 import traceback
@@ -47,6 +48,7 @@ class Mediator(threading.Thread):
             "Mediator: Running callback for task: %s[%s]" % (
             "Mediator: Running callback for task: %s[%s]" % (
                 task.task_name, task.task_id))
                 task.task_name, task.task_id))
 
 
+
         try:
         try:
             self.callback(task)
             self.callback(task)
         except Exception, exc:
         except Exception, exc:
@@ -61,7 +63,13 @@ class Mediator(threading.Thread):
     def run(self):
     def run(self):
         """Move tasks forver or until :meth:`stop` is called."""
         """Move tasks forver or until :meth:`stop` is called."""
         while not self._shutdown.isSet():
         while not self._shutdown.isSet():
-            self.move()
+            try:
+                self.move()
+            except Exception, exc:
+                self.logger.error("Mediator crash: %r" % (exc, ),
+                    exc_info=sys.exc_info())
+                # exiting by normal means does not work here, so force exit.
+                os._exit(1)
         self._stopped.set()
         self._stopped.set()
 
 
     def stop(self):
     def stop(self):