Browse Source

Regression for --app argument. Fixes #1653

Ask Solem 11 years ago
parent
commit
d7b6637636
2 changed files with 63 additions and 3 deletions
  1. 2 0
      celery/app/utils.py
  2. 61 3
      celery/concurrency/asynpool.py

+ 2 - 0
celery/app/utils.py

@@ -246,6 +246,8 @@ def find_app(app, symbol_by_name=symbol_by_name, imp=import_from_cwd):
                     if isinstance(suspect, Celery):
                         return suspect
                 raise
+            else:
+                return found
         else:
             return found
     return sym

+ 61 - 3
celery/concurrency/asynpool.py

@@ -31,7 +31,7 @@ from collections import deque, namedtuple
 from io import BytesIO
 from pickle import HIGHEST_PROTOCOL
 from time import sleep
-from weakref import WeakValueDictionary, ref
+from weakref import WeakSet, WeakValueDictionary, ref
 
 from amqp.utils import promise
 from billiard.pool import RUN, TERMINATE, ACK, NACK, WorkersJoined
@@ -78,7 +78,13 @@ except (ImportError, NameError):  # pragma: no cover
 
 
 logger = get_logger(__name__)
-error, debug = logger.error, logger.debug
+crit, error, warn, info, debug = (
+    logger.critical, logger.error, logger.warn, logger.info, logger.debug,
+)
+
+DD_ALERT_CHANGE = """\
+defcon {level}: tasks not accepted since {s}s: {tasks}\
+"""
 
 UNAVAIL = frozenset([errno.EAGAIN, errno.EINTR])
 
@@ -98,7 +104,11 @@ Ack = namedtuple('Ack', ('id', 'fd', 'payload'))
 
 def gen_not_started(gen):
     # gi_frame is None when generator stopped.
-    return gen.gi_frame and gen.gi_frame.f_lasti == -1
+    return gen.gi_frame is not None and gen.gi_frame.f_lasti == -1
+
+
+def gen_stopped(gen):
+    return gen.gi_frame is None
 
 
 def _get_job_writer(job):
@@ -155,6 +165,51 @@ def _select(readers=None, writers=None, err=None, timeout=0):
             raise
 
 
+class deadlock_detection(object):
+
+    def __init__(self, pool, interval=5.0):
+        self.pool = pool
+        self.interval = interval
+
+    def register_with_event_loop(self, hub):
+        hub.call_repeatedly((self.interval * 1.25) * 5.0, self.checkpoint, hub)
+
+    def is_waiting(self, job):
+        if not (job._accepted or job.ready()):
+            writer = _get_job_writer(job)
+            if not writer or gen_stopped(writer):
+                return True
+        return False
+
+    def _waiting_for_ack(self, jobs):
+        is_waiting = self.is_waiting
+        return (job for job in jobs if is_waiting(job))
+
+    def alert(self, severity, level, jobs):
+        severity(DD_ALERT_CHANGE.format(
+            level=level, s=(5 - level) * self.interval,
+            jobs=', '.join(job.correlation_id for job in jobs),
+        ))
+
+    def checkpoint(self, hub):
+        self.defcon(5, hub, values(self.pool._cache))
+
+    def defcon(self, level, hub, jobs):
+        jobs = WeakSet(self._waiting_for_ack(jobs))
+        if jobs:
+            if level <= 1:
+                self.alert(crit, level, jobs)
+                raise MemoryError(
+                    'Possible deadlock detected, contact support.')
+            elif level <= 3:
+                self.alert(warn, level, jobs)
+            elif level <= 4:
+                self.alert(info, level, jobs)
+            else:
+                self.alert(debug, level, jobs)
+            hub.call_later(self.interval, self.defcon, level - 1, jobs)
+
+
 class Worker(_pool.Worker):
     """Pool worker process."""
     dead = False
@@ -400,6 +455,9 @@ class AsynPool(_pool.Pool):
         self._create_process_handlers(hub)
         self._create_write_handlers(hub)
 
+        dd = deadlock_detection(self)
+        dd.register_with_event_loop(hub)
+
         # Add handler for when a process exits (calls maintain_pool)
         [hub.add_reader(fd, self._event_process_exit, hub, fd)
          for fd in self.process_sentinels]