Prechádzať zdrojové kódy

Pass app to multiprocessing pool workers.

Ask Solem 15 rokov pred
rodič
commit
c24011bc2a
1 zmenil súbory, kde vykonal 4 pridanie a 3 odobranie
  1. 4 3
      celery/worker/__init__.py

+ 4 - 3
celery/worker/__init__.py

@@ -30,12 +30,13 @@ WORKER_SIGRESET = frozenset(["SIGTERM",
 WORKER_SIGIGNORE = frozenset(["SIGINT"])
 WORKER_SIGIGNORE = frozenset(["SIGINT"])
 
 
 
 
-def process_initializer():
+def process_initializer(app):
     """Initializes the process so it can be used to process tasks.
     """Initializes the process so it can be used to process tasks.
 
 
     Used for multiprocessing environments.
     Used for multiprocessing environments.
 
 
     """
     """
+    app = app_or_default(app)
     map(platform.reset_signal, WORKER_SIGRESET)
     map(platform.reset_signal, WORKER_SIGRESET)
     map(platform.ignore_signal, WORKER_SIGIGNORE)
     map(platform.ignore_signal, WORKER_SIGIGNORE)
     platform.set_mp_process_title("celeryd")
     platform.set_mp_process_title("celeryd")
@@ -43,8 +44,7 @@ def process_initializer():
     # This is for windows and other platforms not supporting
     # This is for windows and other platforms not supporting
     # fork(). Note that init_worker makes sure it's only
     # fork(). Note that init_worker makes sure it's only
     # run once per process.
     # run once per process.
-    from celery.loaders import current_loader
-    current_loader().init_worker()
+    app.loader.init_worker()
 
 
     signals.worker_process_init.send(sender=None)
     signals.worker_process_init.send(sender=None)
 
 
@@ -122,6 +122,7 @@ class WorkController(object):
         self.pool = instantiate(self.pool_cls, self.concurrency,
         self.pool = instantiate(self.pool_cls, self.concurrency,
                                 logger=self.logger,
                                 logger=self.logger,
                                 initializer=process_initializer,
                                 initializer=process_initializer,
+                                initargs=(self.app, ),
                                 maxtasksperchild=self.max_tasks_per_child,
                                 maxtasksperchild=self.max_tasks_per_child,
                                 timeout=self.task_time_limit,
                                 timeout=self.task_time_limit,
                                 soft_timeout=self.task_soft_time_limit,
                                 soft_timeout=self.task_soft_time_limit,