Ver Fonte

Wait for available pool processes before applying tasks to the pool.

This is enabled by billiard commit
http://github.com/ask/billiard/commit/7eb7874982a9599902144e6e3cbe32106b711b05

This means it doesn't have to wait for dozens of tasks to finish at shutdown
because it applied n prefetched tasks at once.

Some overhead for very short tasks though, then the shutdown probably doesn't
matter either so can disable with CELERYD_POOL_PUTLOCKS = False

See http://github.com/ask/celery/issues/closed#issue/122

Closes #122
Ask Solem há 15 anos atrás
pai
commit
f7205b8092
3 ficheiros alterados com 14 adições e 5 exclusões
  1. 2 0
      celery/conf.py
  2. 7 3
      celery/worker/__init__.py
  3. 5 2
      celery/worker/pool.py

+ 2 - 0
celery/conf.py

@@ -44,6 +44,7 @@ _DEFAULTS = {
     "CELERY_BROKER_CONNECTION_RETRY": True,
     "CELERY_BROKER_CONNECTION_MAX_RETRIES": 100,
     "CELERY_ACKS_LATE": False,
+    "CELERYD_POOL_PUTLOCKS": True,
     "CELERYD_POOL": "celery.worker.pool.TaskPool",
     "CELERYD_MEDIATOR": "celery.worker.controllers.Mediator",
     "CELERYD_ETA_SCHEDULER": "celery.worker.controllers.ScheduleController",
@@ -145,6 +146,7 @@ CELERYD_LOG_LEVEL = _get("CELERYD_LOG_LEVEL",
 CELERYD_LOG_LEVEL = LOG_LEVELS[CELERYD_LOG_LEVEL.upper()]
 CELERYD_CONCURRENCY = _get("CELERYD_CONCURRENCY")
 CELERYD_PREFETCH_MULTIPLIER = _get("CELERYD_PREFETCH_MULTIPLIER")
+CELERYD_POOL_PUTLOCKS = _get("CELERYD_POOL_PUTLOCKS")
 
 CELERYD_POOL = _get("CELERYD_POOL")
 CELERYD_LISTENER = _get("CELERYD_LISTENER")

+ 7 - 3
celery/worker/__init__.py

@@ -3,6 +3,7 @@
 The Multiprocessing Worker Server
 
 """
+import time
 import socket
 import logging
 import traceback
@@ -113,7 +114,8 @@ class WorkController(object):
             schedule_filename=conf.CELERYBEAT_SCHEDULE_FILENAME,
             task_time_limit=conf.CELERYD_TASK_TIME_LIMIT,
             task_soft_time_limit=conf.CELERYD_TASK_SOFT_TIME_LIMIT,
-            max_tasks_per_child=conf.CELERYD_MAX_TASKS_PER_CHILD):
+            max_tasks_per_child=conf.CELERYD_MAX_TASKS_PER_CHILD,
+            pool_putlocks=conf.CELERYD_POOL_PUTLOCKS):
 
         # Options
         self.loglevel = loglevel or self.loglevel
@@ -127,6 +129,7 @@ class WorkController(object):
         self.task_time_limit = task_time_limit
         self.task_soft_time_limit = task_soft_time_limit
         self.max_tasks_per_child = max_tasks_per_child
+        self.pool_putlocks = pool_putlocks
         self._finalize = Finalize(self, self.stop, exitpriority=1)
 
         # Queues
@@ -144,7 +147,8 @@ class WorkController(object):
                                 initializer=process_initializer,
                                 maxtasksperchild=self.max_tasks_per_child,
                                 timeout=self.task_time_limit,
-                                soft_timeout=self.task_soft_time_limit)
+                                soft_timeout=self.task_soft_time_limit,
+                                putlocks=self.pool_putlocks)
         self.mediator = instantiate(mediator_cls, self.ready_queue,
                                     callback=self.process_task,
                                     logger=self.logger)
@@ -192,7 +196,7 @@ class WorkController(object):
         try:
             try:
                 wrapper.task.execute(wrapper, self.pool,
-                                     self.loglevel, self.logfile)
+                    self.loglevel, self.logfile)
             except Exception, exc:
                 self.logger.critical("Internal error %s: %s\n%s" % (
                                 exc.__class__, exc, traceback.format_exc()))

+ 5 - 2
celery/worker/pool.py

@@ -28,13 +28,15 @@ class TaskPool(object):
     """
 
     def __init__(self, limit, logger=None, initializer=None,
-            maxtasksperchild=None, timeout=None, soft_timeout=None):
+            maxtasksperchild=None, timeout=None, soft_timeout=None,
+            putlocks=True):
         self.limit = limit
         self.logger = logger or log.get_default_logger()
         self.initializer = initializer
         self.maxtasksperchild = maxtasksperchild
         self.timeout = timeout
         self.soft_timeout = soft_timeout
+        self.putlocks = putlocks
         self._pool = None
 
     def start(self):
@@ -78,7 +80,8 @@ class TaskPool(object):
         return self._pool.apply_async(target, args, kwargs,
                                       callback=on_ready,
                                       accept_callback=accept_callback,
-                                      timeout_callback=timeout_callback)
+                                      timeout_callback=timeout_callback,
+                                      waitforslot=self.putlocks)
 
     def on_ready(self, callbacks, errbacks, ret_value):
         """What to do when a worker task is ready and its return value has