Browse Source

Worker -Ofair argument disables prefetch in prefork pool. Closes #1591

Ask Solem 11 years ago
parent
commit
b036fa7fa6
2 changed files with 38 additions and 7 deletions
  1. 35 6
      celery/concurrency/prefork.py
  2. 3 1
      celery/worker/components.py

+ 35 - 6
celery/concurrency/prefork.py

@@ -34,7 +34,8 @@ from amqp.utils import promise
 from billiard import forking_enable
 from billiard import pool as _pool
 from billiard.pool import (
-    RUN, CLOSE, TERMINATE, ACK, NACK, EX_RECYCLE, WorkersJoined, CoroStop,
+    RUN, CLOSE, TERMINATE, ACK, NACK, EX_RECYCLE,
+    WorkersJoined, CoroStop,
 )
 from billiard.queues import _SimpleQueue
 from kombu.async import READ, WRITE, ERR
@@ -73,6 +74,14 @@ MAXTASKS_NO_BILLIARD = """\
 #: Constant sent by child process when started (ready to accept work)
 WORKER_UP = 15
 
+SCHED_STRATEGY_PREFETCH = 1
+SCHED_STRATEGY_FAIR = 4
+
+SCHED_STRATEGIES = {
+    None: SCHED_STRATEGY_PREFETCH,
+    'fair': SCHED_STRATEGY_FAIR,
+}
+
 logger = get_logger(__name__)
 warning, debug = logger.warning, logger.debug
 
@@ -202,9 +211,9 @@ class ResultHandler(_pool.ResultHandler):
 
             try:
                 if reader.poll(0):
-                    ready, task = True, reader.recv()
+                    ready, message = True, reader.recv()
                 else:
-                    ready, task = False, None
+                    ready, message = False, None
             except (IOError, EOFError) as exc:
                 debug('result handler got %r -- exiting', exc)
                 raise CoroStop()
@@ -215,10 +224,10 @@ class ResultHandler(_pool.ResultHandler):
                 raise CoroStop()
 
             if ready:
-                if task is None:
+                if message is None:
                     debug('result handler got sentinel -- exiting')
                     raise CoroStop()
-                on_state_change(task)
+                on_state_change(message)
 
     def handle_event(self, fileno):
         if self._state == RUN:
@@ -281,7 +290,10 @@ class AsynPool(_pool.Pool):
     ResultHandler = ResultHandler
     Worker = Worker
 
-    def __init__(self, processes=None, synack=False, *args, **kwargs):
+    def __init__(self, processes=None, synack=False,
+                 sched_strategy=None, *args, **kwargs):
+        self.sched_strategy = SCHED_STRATEGIES.get(sched_strategy,
+                                                   sched_strategy)
         processes = self.cpu_count() if processes is None else processes
         self.synack = synack
         # create queue-pairs for all our processes in advance.
@@ -304,6 +316,10 @@ class AsynPool(_pool.Pool):
         # Set of active co-routines currently writing jobs.
         self._active_writers = set()
 
+        # Set of fds that are busy (executing task)
+        self._busy_workers = set()
+        self._mark_worker_as_available = self._busy_workers.discard
+
         # Holds jobs waiting to be written to child processes.
         self.outbound_buffer = deque()
 
@@ -398,6 +414,9 @@ class AsynPool(_pool.Pool):
             # remove tref
             self._discard_tref(job)
 
+    def on_job_ready(self, job, i, obj, inqW_fd):
+        self._mark_worker_as_available(inqW_fd)
+
     def _create_process_handlers(self, hub, READ=READ, ERR=ERR):
         """For async pool this will create the handlers called
         when a process is up/down and etc."""
@@ -407,6 +426,7 @@ class AsynPool(_pool.Pool):
         fileno_to_inq = self._fileno_to_inq
         fileno_to_outq = self._fileno_to_outq
         fileno_to_synq = self._fileno_to_synq
+        busy_workers = self._busy_workers
         maintain_pool = self.maintain_pool
         handle_result_event = self.handle_result_event
         process_flush_queues = self.process_flush_queues
@@ -438,6 +458,7 @@ class AsynPool(_pool.Pool):
             fileno_to_inq.pop(proc.inqW_fd, None)
             fileno_to_synq.pop(proc.synqW_fd, None)
             all_inqueues.discard(proc.inqW_fd)
+            busy_workers.discard(proc.inqW_fd)
             hub_remove(proc.sentinel)
             hub_remove(proc.outqR_fd)
         self.on_process_down = on_process_down
@@ -454,17 +475,21 @@ class AsynPool(_pool.Pool):
         put_message = outbound.append
         all_inqueues = self._all_inqueues
         active_writes = self._active_writes
+        busy_workers = self._busy_workers
         diff = all_inqueues.difference
         add_reader, add_writer = hub.add_reader, hub.add_writer
         hub_add, hub_remove = hub.add, hub.remove
         mark_write_fd_as_active = active_writes.add
         mark_write_gen_as_active = self._active_writers.add
+        mark_worker_as_busy = busy_workers.add
+        mark_worker_as_available = busy_workers.discard
         write_generator_done = self._active_writers.discard
         get_job = self._cache.__getitem__
         # puts back at the end of the queue
         self._put_back = outbound.appendleft
         precalc = {ACK: self._create_payload(ACK, (0, )),
                    NACK: self._create_payload(NACK, (0, ))}
+        is_fair_strategy = self.sched_strategy == SCHED_STRATEGY_FAIR
 
         def on_poll_start():
             # called for every event loop iteration, and if there
@@ -500,6 +525,9 @@ class AsynPool(_pool.Pool):
                 if ready_fd in active_writes:
                     # already writing to this fd
                     continue
+                if is_fair_strategy and ready_fd in busy_workers:
+                    # worker is already busy with another task
+                    continue
                 try:
                     job = pop_message()
                 except IndexError:
@@ -528,6 +556,7 @@ class AsynPool(_pool.Pool):
                         job._writer = ref(cor)
                         mark_write_gen_as_active(cor)
                         mark_write_fd_as_active(ready_fd)
+                        mark_worker_as_busy(ready_fd)
 
                         # Try to write immediately, in case there's an error.
                         try:

+ 3 - 1
celery/worker/components.py

@@ -118,7 +118,7 @@ class Pool(bootsteps.StartStopStep):
     requires = (Queues, )
 
     def __init__(self, w, autoscale=None, autoreload=None,
-                 no_execv=False, **kwargs):
+                 no_execv=False, optimization=None, **kwargs):
         if isinstance(autoscale, string_t):
             max_c, _, min_c = autoscale.partition(',')
             autoscale = [int(max_c), min_c and int(min_c) or 0]
@@ -130,6 +130,7 @@ class Pool(bootsteps.StartStopStep):
         if w.autoscale:
             w.max_concurrency, w.min_concurrency = w.autoscale
         self.autoreload_enabled = autoreload
+        self.optimization = optimization
 
     def close(self, w):
         if w.pool:
@@ -162,6 +163,7 @@ class Pool(bootsteps.StartStopStep):
             allow_restart=allow_restart,
             forking_enable=forking_enable,
             semaphore=semaphore,
+            sched_strategy=self.optimization,
         )
         return pool