Browse Source

Temporary fix for #1306

Ask Solem 12 years ago
parent
commit
8fb8c2661b
1 changed files with 24 additions and 0 deletions
  1. 24 0
      celery/worker/__init__.py

+ 24 - 0
celery/worker/__init__.py

@@ -48,6 +48,11 @@ TERMINATE = 0x3
 #: Default socket timeout at shutdown.
 SHUTDOWN_SOCKET_TIMEOUT = 5.0
 
+MAXTASKS_NO_BILLIARD = """\
+maxtasksperchild enabled but billiard C extension not installed!
+This may lead to a deadlock, please install the billiard C extension.
+"""
+
 logger = get_logger(__name__)
 
 
@@ -140,6 +145,25 @@ class Pool(bootsteps.StartStopComponent):
             except AttributeError:
                 pass
 
+        # This makes sure the timers are fired even if writing
+        # to the pool inqueue blocks.
+        # XXX Ugly hack to fix #1306 and the worker will still be blocking
+        # but at least it will recover if there are no worker processes
+        # to write to.
+        # This can be fixed properly when we are
+        # able to use the pure-python version of multiprocessing
+        # (celery 3.1+)
+        try:
+            import _billiard
+        except ImportError:
+            # billiard C extension not installed
+            if worker.maxtasksperchild:
+                logger.warning(MAXTASKS_NO_BILLIARD)
+        _quick_put = pool._pool._quick_put
+        def quick_put(obj):
+            _quick_put(obj, hub.maintain_pool)
+        pool._pool._quick_put = quick_put
+
         pool.init_callbacks(
             on_process_up=lambda w: add_reader(w.sentinel, maintain_pool),
             on_process_down=lambda w: remove(w.sentinel),