Przeglądaj źródła

Debugging with blocking detection

Ask Solem 12 lat temu
rodzic
commit
82350bda1c

+ 20 - 0
celery/platforms.py

@@ -12,6 +12,7 @@ from __future__ import with_statement
 
 import atexit
 import errno
+import math
 import os
 import platform as _platform
 import shlex
@@ -532,6 +533,25 @@ class Signals(object):
     ignored = _signal.SIG_IGN
     default = _signal.SIG_DFL
 
+    if hasattr(_signal, 'setitimer'):
+
+        def arm_alarm(self, seconds):
+            _signal.setitimer(_signal.ITIMER_REAL, seconds)
+    else:
+        try:
+            from itimer import alarm as _itimer_alarm  # noqa
+        except ImportError:
+
+            def arm_alarm(self, seconds):  # noqa
+                _signal.alarm(math.ceil(seconds))
+        else:
+
+            def arm_alarm(self, seconds):      # noqa
+                return _itimer_alarm(seconds)  # noqa
+
+    def reset_alarm(self):
+        return _signal.alarm(0)
+
     def supported(self, signal_name):
         """Returns true value if ``signal_name`` exists on this platform."""
         try:

+ 29 - 0
celery/utils/debug.py

@@ -10,6 +10,10 @@ from __future__ import absolute_import
 
 import os
 
+from contextlib import contextmanager
+
+from celery.platforms import signals
+
 from .compat import format_d
 
 try:
@@ -21,6 +25,31 @@ _process = None
 _mem_sample = []
 
 
+def _on_blocking(signum, frame):
+    import inspect
+    raise RuntimeError(
+        'Blocking detection timed-out at: %s' % (
+            inspect.getframeinfo(frame), ))
+
+
+@contextmanager
+def blocking_detection(timeout):
+    if not timeout:
+        yield
+    else:
+        old_handler = signals['ALRM']
+        old_handler = None if old_handler == _on_blocking else old_handler
+
+        signals['ALRM'] = _on_blocking
+
+        try:
+            yield signals.arm_alarm(timeout)
+        finally:
+            if old_handler:
+                signals['ALRM'] = old_handler
+            signals.reset_alarm()
+
+
 def sample_mem():
     """Sample RSS memory usage.
 

+ 1 - 0
celery/worker/__init__.py

@@ -310,6 +310,7 @@ class WorkController(configurated):
 
     _state = None
     _running = 0
+    pool = None
 
     def __init__(self, loglevel=None, hostname=None, ready_callback=noop,
                  queues=None, app=None, pidfile=None, **kwargs):

+ 44 - 41
celery/worker/consumer.py

@@ -88,6 +88,7 @@ from celery.app import app_or_default
 from celery.datastructures import AttributeDict
 from celery.exceptions import InvalidTaskError, SystemTerminate
 from celery.task.trace import build_tracer
+from celery.utils.debug import blocking_detection
 from celery.utils import text
 from celery.utils import timer2
 from celery.utils.functional import noop
@@ -455,48 +456,50 @@ class Consumer(object):
                 # the number of seconds until we need to fire timers again.
                 poll_timeout = (fire_timers(propagate=errors) if scheduled
                                 else 1)
-
-                # We only update QoS when there is no more messages to read.
-                # This groups together qos calls, and makes sure that remote
-                # control commands will be prioritized over task messages.
-                if qos.prev != qos.value:
-                    update_qos()
-
-                update_readers(on_poll_start())
-                if readers or writers:
-                    connection.more_to_read = True
-                    while connection.more_to_read:
-                        try:
-                            events = poll(poll_timeout)
-                        except ValueError:  # Issue 882
-                            return
-                        if not events:
-                            on_poll_empty()
-                        for fileno, event in events or ():
+                print('POLL TIMEOUT: %r' % (poll_timeout, ))
+                with blocking_detection(10):
+
+                    # We only update QoS when there is no more messages to read.
+                    # This groups together qos calls, and makes sure that remote
+                    # control commands will be prioritized over task messages.
+                    if qos.prev != qos.value:
+                        update_qos()
+    
+                    update_readers(on_poll_start())
+                    if readers or writers:
+                        connection.more_to_read = True
+                        while connection.more_to_read:
                             try:
-                                if event & READ:
-                                    readers[fileno](fileno, event)
-                                if event & WRITE:
-                                    writers[fileno](fileno, event)
-                                if event & ERR:
-                                    for handlermap in readers, writers:
-                                        try:
-                                            handlermap[fileno](fileno, event)
-                                        except KeyError:
-                                            pass
-                            except (KeyError, Empty):
-                                continue
-                            except socket.error:
-                                if self._state != CLOSE:  # pragma: no cover
-                                    raise
-                        if keep_draining:
-                            drain_nowait()
-                            poll_timeout = 0
-                        else:
-                            connection.more_to_read = False
-                else:
-                    # no sockets yet, startup is probably not done.
-                    sleep(min(poll_timeout, 0.1))
+                                events = poll(poll_timeout)
+                            except ValueError:  # Issue 882
+                                return
+                            if not events:
+                                on_poll_empty()
+                            for fileno, event in events or ():
+                                try:
+                                    if event & READ:
+                                        readers[fileno](fileno, event)
+                                    if event & WRITE:
+                                        writers[fileno](fileno, event)
+                                    if event & ERR:
+                                        for handlermap in readers, writers:
+                                            try:
+                                                handlermap[fileno](fileno, event)
+                                            except KeyError:
+                                                pass
+                                except (KeyError, Empty):
+                                    continue
+                                except socket.error:
+                                    if self._state != CLOSE:  # pragma: no cover
+                                        raise
+                            if keep_draining:
+                                drain_nowait()
+                                poll_timeout = 0
+                            else:
+                                connection.more_to_read = False
+                    else:
+                        # no sockets yet, startup is probably not done.
+                        sleep(min(poll_timeout, 0.1))
 
     def on_task(self, task, task_reserved=task_reserved,
                 to_system_tz=timezone.to_system):

+ 5 - 1
celery/worker/job.py

@@ -374,7 +374,11 @@ class Request(object):
             # time to write the result.
             if self.store_errors:
                 if isinstance(exc, exceptions.WorkerLostError):
-                    self.task.backend.mark_as_failure(self.id, exc)
+                    print('+++ WRITE RESULT')
+                    try:
+                        self.task.backend.mark_as_failure(self.id, exc)
+                    finally:
+                        print('--- WRITE RESULT')
                 elif isinstance(exc, exceptions.Terminated):
                     self._announce_revoked('terminated', True, str(exc), False)
             # (acks_late) acknowledge after result stored.