Browse Source

Merge branch 'block-detect'

Conflicts:
	celery/utils/debug.py
	celery/worker/__init__.py
Ask Solem 12 years ago
parent
commit
0dfc2f5096
3 changed files with 49 additions and 0 deletions
  1. 20 0
      celery/platforms.py
  2. 28 0
      celery/utils/debug.py
  3. 1 0
      celery/worker/__init__.py

+ 20 - 0
celery/platforms.py

@@ -11,6 +11,7 @@ from __future__ import absolute_import, print_function
 
 import atexit
 import errno
+import math
 import os
 import platform as _platform
 import signal as _signal
@@ -552,6 +553,25 @@ class Signals(object):
     ignored = _signal.SIG_IGN
     default = _signal.SIG_DFL
 
+    if hasattr(_signal, 'setitimer'):
+
+        def arm_alarm(self, seconds):
+            _signal.setitimer(_signal.ITIMER_REAL, seconds)
+    else:
+        try:
+            from itimer import alarm as _itimer_alarm  # noqa
+        except ImportError:
+
+            def arm_alarm(self, seconds):  # noqa
+                _signal.alarm(math.ceil(seconds))
+        else:
+
+            def arm_alarm(self, seconds):      # noqa
+                return _itimer_alarm(seconds)  # noqa
+
+    def reset_alarm(self):
+        return _signal.alarm(0)
+
     def supported(self, signal_name):
         """Returns true value if ``signal_name`` exists on this platform."""
         try:

+ 28 - 0
celery/utils/debug.py

@@ -10,7 +10,10 @@ from __future__ import absolute_import
 
 import os
 
+from contextlib import contextmanager
+
 from celery.five import format_d, range
+from celery.platforms import signals
 
 try:
     from psutil import Process
@@ -21,6 +24,31 @@ _process = None
 _mem_sample = []
 
 
+def _on_blocking(signum, frame):
+    import inspect
+    raise RuntimeError(
+        'Blocking detection timed-out at: %s' % (
+            inspect.getframeinfo(frame), ))
+
+
+@contextmanager
+def blockdetection(timeout):
+    if not timeout:
+        yield
+    else:
+        old_handler = signals['ALRM']
+        old_handler = None if old_handler == _on_blocking else old_handler
+
+        signals['ALRM'] = _on_blocking
+
+        try:
+            yield signals.arm_alarm(timeout)
+        finally:
+            if old_handler:
+                signals['ALRM'] = old_handler
+            signals.reset_alarm()
+
+
 def sample_mem():
     """Sample RSS memory usage.
 

+ 1 - 0
celery/worker/__init__.py

@@ -78,6 +78,7 @@ class WorkController(configurated):
 
     pidlock = None
     namespace = None
+    pool = None
 
     class Namespace(bootsteps.Namespace):
         """Worker bootstep namespace."""