Ver código fonte

Merge branch 'master' of https://github.com/ask/celery

Branko Čibej 14 anos atrás
pai
commit
78d5f4fc92
2 arquivos alterados com 52 adições e 17 exclusões
  1. 24 17
      celery/concurrency/processes/pool.py
  2. 28 0
      celery/worker/state.py

+ 24 - 17
celery/concurrency/processes/pool.py

@@ -27,6 +27,8 @@ from multiprocessing.util import Finalize, debug
 from celery.exceptions import SoftTimeLimitExceeded, TimeLimitExceeded
 from celery.exceptions import WorkerLostError
 
+_Semaphore = threading._Semaphore
+
 #
 # Constants representing the state of a pool
 #
@@ -55,6 +57,23 @@ job_counter = itertools.count()
 def mapstar(args):
     return map(*args)
 
+
+class LaxBoundedSemaphore(threading._Semaphore):
+    """Semaphore that checks that # release is <= # acquires,
+    but ignores if # releases >= value."""
+
+    def __init__(self, value=1, verbose=None):
+        _Semaphore.__init__(self, value, verbose)
+        self._initial_value = value
+
+    def release(self):
+        if self._Semaphore__value < self._initial_value:
+            return _Semaphore.release(self)
+        if __debug__:
+            self._note("%s.release: success, value=%s (unchanged)" % (
+                self, self._Semaphore__value))
+
+
 #
 # Code run by worker processes
 #
@@ -277,10 +296,7 @@ class TimeoutHandler(PoolThread):
             job._set(i, (False, TimeLimitExceeded()))
             # release sem
             if putlock is not None:
-                try:
-                    putlock.release()
-                except Exception:
-                    pass
+                putlock.release()
             # Remove from _pool
             process, _index = _process_by_pid(job._worker_pid)
             # Run timeout callback
@@ -344,10 +360,7 @@ class ResultHandler(PoolThread):
                 return
             if not item.ready():
                 if putlock is not None:
-                    try:
-                        putlock.release()
-                    except Exception:
-                        pass
+                    putlock.release()
             try:
                 item._set(i, obj)
             except KeyError:
@@ -384,10 +397,7 @@ class ResultHandler(PoolThread):
 
         # Notify waiting threads
         if putlock is not None:
-            try:
-                putlock.release()
-            except Exception:
-                pass
+            putlock.release()
 
         while cache and self._state != TERMINATE:
             try:
@@ -465,7 +475,7 @@ class Pool(object):
         self._worker_handler = self.Supervisor(self)
         self._worker_handler.start()
 
-        self._putlock = threading.BoundedSemaphore(self._processes)
+        self._putlock = LaxBoundedSemaphore(self._processes)
         self._task_handler = self.TaskHandler(self._taskqueue,
                                               self._quick_put,
                                               self._outqueue,
@@ -530,10 +540,7 @@ class Pool(object):
                 for worker_pid in job.worker_pids():
                     if worker_pid in cleaned and not job.ready():
                         if self._putlock is not None:
-                            try:
-                                self._putlock.release()
-                            except Exception:
-                                pass
+                            self._putlock.release()
                         err = WorkerLostError("Worker exited prematurely.")
                         job._set(None, (False, err))
                         continue

+ 28 - 0
celery/worker/state.py

@@ -1,3 +1,4 @@
+import os
 import platform
 import shelve
 
@@ -49,6 +50,33 @@ def task_ready(request):
     reserved_requests.discard(request)
 
 
+if os.environ.get("CELERY_BENCH"):
+    from time import time
+
+    all_count = 0
+    bench_start = None
+    bench_every = int(os.environ.get("CELERY_BENCH_EVERY", 1000))
+    __reserved = task_reserved
+    __ready = task_ready
+
+    def task_reserved(request):
+        global bench_start
+        if bench_start is None:
+            bench_start = time()
+        return __reserved(request)
+
+    def task_ready(request):
+        global all_count, bench_start
+        all_count += 1
+        if not all_count % bench_every:
+            print("* Time spent processing %s tasks (since first "
+                    "task received): ~%.4fs\n" % (
+                bench_every, time() - bench_start))
+            bench_start = None
+
+        return __ready(request)
+
+
 class Persistent(object):
     storage = shelve
     _is_open = False