Explorar el Código

Thread Pool: Set default app for all threads. Closes #2701

Ask Solem hace 9 años
padre
commit
b02ad4d3fa
Se han modificado 3 ficheros con 7 adiciones y 2 borrados
  1. 3 2
      celery/concurrency/base.py
  2. 3 0
      celery/concurrency/threads.py
  3. 1 0
      celery/worker/components.py

+ 3 - 2
celery/concurrency/base.py

@@ -74,13 +74,14 @@ class BasePool(object):
     task_join_will_block = True
     body_can_be_buffer = False
 
-    def __init__(self, limit=None, putlocks=True,
-                 forking_enable=True, callbacks_propagate=(), **options):
+    def __init__(self, limit=None, putlocks=True, forking_enable=True,
+                 callbacks_propagate=(), app=None, **options):
         self.limit = limit
         self.putlocks = putlocks
         self.options = options
         self.forking_enable = forking_enable
         self.callbacks_propagate = callbacks_propagate
+        self.app = app
 
     def on_start(self):
         pass

+ 3 - 0
celery/concurrency/threads.py

@@ -34,6 +34,9 @@ class TaskPool(BasePool):
         super(TaskPool, self).__init__(*args, **kwargs)
 
     def on_start(self):
+        # make sure all threads have the same current_app.
+        self.app.set_default()
+
         self._pool = self.ThreadPool(self.limit)
         # threadpool stores all work requests until they are processed
         # we don't need this dict, and it occupies way too much memory.

+ 1 - 0
celery/worker/components.py

@@ -170,6 +170,7 @@ class Pool(bootsteps.StartStopStep):
             forking_enable=forking_enable,
             semaphore=semaphore,
             sched_strategy=self.optimization,
+            app=w.app,
         )
         _set_task_join_will_block(pool.task_join_will_block)
         return pool