Explorar o código

Skip Mediator and ready_queue and just send directly to pool if DISABLE_RATE_LIMITS=True

Ask Solem %!s(int64=14) %!d(string=hai) anos
pai
achega
c6f778035e
Modificáronse 1 ficheiros con 9 adicións e 4 borrados
  1. 9 4
      celery/worker/__init__.py

+ 9 - 4
celery/worker/__init__.py

@@ -125,6 +125,7 @@ class WorkController(object):
             task_soft_time_limit=conf.CELERYD_TASK_SOFT_TIME_LIMIT,
             max_tasks_per_child=conf.CELERYD_MAX_TASKS_PER_CHILD,
             pool_putlocks=conf.CELERYD_POOL_PUTLOCKS,
+            disable_rate_limits=conf.DISABLE_RATE_LIMITS,
             db=conf.CELERYD_STATE_DB):
 
         # Options
@@ -150,8 +151,9 @@ class WorkController(object):
             Finalize(persistence, persistence.save, exitpriority=5)
 
         # Queues
-        if conf.DISABLE_RATE_LIMITS:
+        if disable_rate_limits:
             self.ready_queue = FastQueue()
+            self.ready_queue.put = self.process_task
         else:
             self.ready_queue = TaskBucket(task_registry=registry.tasks)
 
@@ -166,9 +168,12 @@ class WorkController(object):
                                 timeout=self.task_time_limit,
                                 soft_timeout=self.task_soft_time_limit,
                                 putlocks=self.pool_putlocks)
-        self.mediator = instantiate(mediator_cls, self.ready_queue,
-                                    callback=self.process_task,
-                                    logger=self.logger)
+
+        self.mediator = None
+        if not disable_rate_limits:
+            self.mediator = instantiate(mediator_cls, self.ready_queue,
+                                        callback=self.process_task,
+                                        logger=self.logger)
         self.scheduler = instantiate(eta_scheduler_cls,
                                precision=conf.CELERYD_ETA_SCHEDULER_PRECISION,
                                on_error=self.on_timer_error,