浏览代码

Changes acquire of the putlock semaphore to be non-blocking.

This so that the main thread is unlocked at shutdown,
even if there are no more threads alive to release it.

Closes #523.

comment by @ask:
  Doesn't use Condition.acquire(timeout) because,
  that wakes up _very_ frequently: 500 µs (0.0005 s.) at best,
  and 0.05s at best.  Since this is only applicable at shutdown
  we don't need this kind of responsiveness, as shutdown is
  already delayed by at least 1 second anyway.
Ionel Maries Cristian 13 年之前
父节点
当前提交
8feeab8a37
共有 1 个文件被更改,包括 14 次插入12 次删除
  1. 14 12
      celery/concurrency/processes/pool.py

+ 14 - 12
celery/concurrency/processes/pool.py

@@ -791,19 +791,21 @@ class Pool(object):
             warnings.warn(UserWarning("Soft timeouts are not supported: "
                     "on this platform: It does not have the SIGUSR1 signal."))
             soft_timeout = None
-        result = ApplyResult(self._cache, callback,
-                             accept_callback, timeout_callback,
-                             error_callback, soft_timeout, timeout)
-
         if waitforslot and self._putlock is not None:
-            self._putlock.acquire()
-            if self._state != RUN:
-                return
-        if timeout or soft_timeout:
-            # start the timeout handler thread when required.
-            self._start_timeout_handler()
-        self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
-        return result
+            while 1:
+                if self._state != RUN or self._putlock.acquire(False):
+                    break
+                time.sleep(1.0)
+        if self._state == RUN:
+            result = ApplyResult(self._cache, callback,
+                                 accept_callback, timeout_callback,
+                                 error_callback, soft_timeout, timeout)
+            if timeout or soft_timeout:
+                # start the timeout handler thread when required.
+                self._start_timeout_handler()
+            self._taskqueue.put(([(result._job, None,
+                                   func, args, kwds)], None))
+            return result
 
     def map_async(self, func, iterable, chunksize=None, callback=None):
         '''