瀏覽代碼

Set TIMER2_TRACE_THREAD on init, to trace who started timer threads that are not shutdown

Ask Solem 15 年之前
父節點
當前提交
d995744e39
共有 2 個文件被更改,包括 21 次插入0 次删除
  1. 9 0
      celery/tests/__init__.py
  2. 12 0
      celery/utils/timer2.py

+ 9 - 0
celery/tests/__init__.py

@@ -7,6 +7,7 @@ config = os.environ.setdefault("CELERY_TEST_CONFIG_MODULE",
 
 os.environ["CELERY_CONFIG_MODULE"] = config
 os.environ["CELERY_LOADER"] = "default"
+os.environ["TIMER2_TRACE_THREAD"] = "yes"
 
 
 def teardown():
@@ -23,3 +24,11 @@ def teardown():
         sys.stderr.write(
             "\n\n**WARNING**: Remaning threads at teardown: %r...\n" % (
                 remaining_threads))
+        for thread in remaining_threads:
+            try:
+                started_by = thread._started_by[thread.ident]
+            except (AttributeError, KeyError):
+                pass
+            else:
+                sys.stderr.write("THREAD %r STARTED BY:\n%r\n" % (
+                    thread, started_by))

+ 12 - 0
celery/utils/timer2.py

@@ -4,6 +4,7 @@ from __future__ import generators
 
 import atexit
 import heapq
+import os
 import sys
 import traceback
 import warnings
@@ -126,6 +127,8 @@ class Schedule(object):
         events = list(self._queue)
         return map(heapq.heappop, [events] * len(events))
 
+TRACE_THREAD = os.environ.get("TIMER2_TRACE_THREAD")
+
 
 class Timer(Thread):
     Entry = Entry
@@ -133,6 +136,7 @@ class Timer(Thread):
     running = False
     on_tick = None
     _timer_count = count(1).next
+    _started_by = {}
 
     def __init__(self, schedule=None, on_error=None, on_tick=None, **kwargs):
         self.schedule = schedule or Schedule(on_error=on_error)
@@ -146,6 +150,12 @@ class Timer(Thread):
         self.setDaemon(True)
         self.setName("Timer-%s" % (self._timer_count(), ))
 
+    if TRACE_THREAD:
+        def start(self, *args, **kwargs):
+            import traceback
+            self._started_by[self.ident] = traceback.format_stack()
+            return Thread.start(self, *args, **kwargs)
+
     def apply_entry(self, entry):
         try:
             entry()
@@ -192,6 +202,8 @@ class Timer(Thread):
             self._stopped.wait()
             self.join(1e100)
             self.running = False
+            if TRACE_THREAD:
+                self._started_by.pop(self.ident, None)
 
     def ensure_started(self):
         if not self.running and not self.is_alive():