Browse Source

Changes acquire of the putlock semaphore to be non-blocking.

This so that the main thread is unlocked at shutdown,
even if there are no more threads alive to release it.

Closes #523.

comment by @ask:
  Doesn't use Condition.acquire(timeout) because,
  that wakes up _very_ frequently: 500 µs (0.0005 s.) at best,
  and 0.05s at best.  Since this is only applicable at shutdown
  we don't need this kind of responsiveness, as shutdown is
  already delayed by at least 1 second anyway.
Ionel Maries Cristian 13 years ago
parent
commit
3b61f8a9da
1 changed files with 14 additions and 12 deletions
  1. 14 12
      celery/concurrency/processes/pool.py

+ 14 - 12
celery/concurrency/processes/pool.py

@@ -791,19 +791,21 @@ class Pool(object):
             warnings.warn(UserWarning("Soft timeouts are not supported: "
                     "on this platform: It does not have the SIGUSR1 signal."))
             soft_timeout = None
-        result = ApplyResult(self._cache, callback,
-                             accept_callback, timeout_callback,
-                             error_callback, soft_timeout, timeout)
-
         if waitforslot and self._putlock is not None:
-            self._putlock.acquire()
-            if self._state != RUN:
-                return
-        if timeout or soft_timeout:
-            # start the timeout handler thread when required.
-            self._start_timeout_handler()
-        self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
-        return result
+            while 1:
+                if self._state != RUN or self._putlock.acquire(False):
+                    break
+                time.sleep(1.0)
+        if self._state == RUN:
+            result = ApplyResult(self._cache, callback,
+                                 accept_callback, timeout_callback,
+                                 error_callback, soft_timeout, timeout)
+            if timeout or soft_timeout:
+                # start the timeout handler thread when required.
+                self._start_timeout_handler()
+            self._taskqueue.put(([(result._job, None,
+                                   func, args, kwds)], None))
+            return result
 
     def map_async(self, func, iterable, chunksize=None, callback=None):
         '''