Parcourir la source

Always use mediator, emergency deadlock fix until 3.1

Ask Solem il y a 12 ans
Parent
commit
8e4d53e5ac
3 fichiers modifiés avec 11 ajouts et 6 suppressions
  1. 2 3
      celery/worker/__init__.py
  2. 8 2
      celery/worker/buckets.py
  3. 1 1
      celery/worker/mediator.py

+ 2 - 3
celery/worker/__init__.py

@@ -209,19 +209,18 @@ class Queues(bootsteps.Component):
 
     def create(self, w):
         BucketType = TaskBucket
-        w.start_mediator = not w.disable_rate_limits
+        w.start_mediator = True
         if not w.pool_cls.rlimit_safe:
             w.start_mediator = False
             BucketType = AsyncTaskBucket
         process_task = w.process_task
         if w.use_eventloop:
-            w.start_mediator = False
+            w.start_mediator = True  # Need async write, fixed in 3.1
             BucketType = AsyncTaskBucket
             if w.pool_putlocks and w.pool_cls.uses_semaphore:
                 process_task = w.process_task_sem
         if w.disable_rate_limits:
             w.ready_queue = FastQueue()
-            w.ready_queue.put = process_task
         else:
             w.ready_queue = BucketType(
                 task_registry=w.app.tasks, callback=process_task, worker=w,

+ 8 - 2
celery/worker/buckets.py

@@ -39,6 +39,12 @@ class AsyncTaskBucket(object):
         self.worker = worker
         self.buckets = {}
         self.refresh()
+        self._queue = Queue()
+        self._quick_put = self._queue.put
+        self.get = self._queue.get
+
+    def get(self, *args, **kwargs):
+        return self._queue.get(*args, **kwargs)
 
     def cont(self, request, bucket, tokens):
         if not bucket.can_consume(tokens):
@@ -47,7 +53,7 @@ class AsyncTaskBucket(object):
                 hold * 1000.0, self.cont, (request, bucket, tokens),
             )
         else:
-            self.callback(request)
+            self._quick_put(request)
 
     def put(self, request):
         name = request.name
@@ -56,7 +62,7 @@ class AsyncTaskBucket(object):
         except KeyError:
             bucket = self.add_bucket_for_type(name)
         if not bucket:
-            return self.callback(request)
+            return self._quick_put(request)
         return self.cont(request, bucket, 1)
 
     def add_task_type(self, name):

+ 1 - 1
celery/worker/mediator.py

@@ -36,7 +36,7 @@ class WorkerComponent(StartStopComponent):
         w.mediator = None
 
     def include_if(self, w):
-        return w.start_mediator and not w.use_eventloop
+        return w.start_mediator
 
     def create(self, w):
         m = w.mediator = self.instantiate(w.mediator_cls, w.ready_queue,