Browse Source

Moves process_initializer to processes pool imp

Ask Solem 13 years ago
parent
commit
388354d3e7
2 changed files with 31 additions and 34 deletions
  1. 30 1
      celery/concurrency/processes/__init__.py
  2. 1 33
      celery/worker/__init__.py

+ 30 - 1
celery/concurrency/processes/__init__.py

@@ -4,6 +4,10 @@ from __future__ import absolute_import
 import platform
 import signal as _signal
 
+
+from ... import platforms
+from ... import signals
+from ...app import app_or_default
 from ..base import BasePool
 from .pool import Pool, RUN
 
@@ -15,6 +19,29 @@ if platform.system() == "Windows":  # pragma: no cover
 else:
     from os import kill as _kill                 # noqa
 
+#: List of signals to reset when a child process starts.
+WORKER_SIGRESET = frozenset(["SIGTERM",
+                             "SIGHUP",
+                             "SIGTTIN",
+                             "SIGTTOU",
+                             "SIGUSR1"])
+
+#: List of signals to ignore when a child process starts.
+WORKER_SIGIGNORE = frozenset(["SIGINT"])
+
+def process_initializer(app, hostname):
+    """Initializes the process so it can be used to process tasks."""
+    app = app_or_default(app)
+    app.set_current()
+    platforms.signals.reset(*WORKER_SIGRESET)
+    platforms.signals.ignore(*WORKER_SIGIGNORE)
+    platforms.set_mp_process_title("celeryd", hostname=hostname)
+    # This is for Windows and other platforms not supporting
+    # fork(). Note that init_worker makes sure it's only
+    # run once per process.
+    app.loader.init_worker()
+    app.loader.init_worker_process()
+    signals.worker_process_init.send(sender=None)
 
 class TaskPool(BasePool):
     """Process Pool for processing tasks in parallel.
@@ -40,7 +67,9 @@ class TaskPool(BasePool):
         Will pre-fork all workers so they're ready to accept tasks.
 
         """
-        self._pool = self.Pool(processes=self.limit, **self.options)
+        self._pool = self.Pool(processes=self.limit,
+                               initializer=process_initializer,
+                               **self.options)
         self.on_apply = self._pool.apply_async
 
     def on_stop(self):

+ 1 - 33
celery/worker/__init__.py

@@ -22,7 +22,7 @@ from kombu.utils.finalize import Finalize
 
 from .. import beat
 from .. import concurrency as _concurrency
-from .. import registry, platforms, signals
+from .. import registry, signals
 from ..app import app_or_default
 from ..app.abstract import configured, from_config
 from ..exceptions import SystemTerminate
@@ -36,37 +36,6 @@ RUN = 0x1
 CLOSE = 0x2
 TERMINATE = 0x3
 
-#: List of signals to reset when a child process starts.
-WORKER_SIGRESET = frozenset(["SIGTERM",
-                             "SIGHUP",
-                             "SIGTTIN",
-                             "SIGTTOU",
-                             "SIGUSR1"])
-
-#: List of signals to ignore when a child process starts.
-WORKER_SIGIGNORE = frozenset(["SIGINT"])
-
-
-def process_initializer(app, hostname):
-    """Initializes the process so it can be used to process tasks.
-
-    Used for multiprocessing environments.
-
-    """
-    app = app_or_default(app)
-    app.set_current()
-    platforms.signals.reset(*WORKER_SIGRESET)
-    platforms.signals.ignore(*WORKER_SIGIGNORE)
-    platforms.set_mp_process_title("celeryd", hostname=hostname)
-
-    # This is for Windows and other platforms not supporting
-    # fork(). Note that init_worker makes sure it's only
-    # run once per process.
-    app.loader.init_worker()
-    app.loader.init_worker_process()
-
-    signals.worker_process_init.send(sender=None)
-
 
 class WorkController(configurated):
     """Unmanaged worker instance."""
@@ -143,7 +112,6 @@ class WorkController(configurated):
 
         self.pool = instantiate(self.pool_cls, min_concurrency,
                                 logger=self.logger,
-                                initializer=process_initializer,
                                 initargs=(self.app, self.hostname),
                                 maxtasksperchild=self.max_tasks_per_child,
                                 timeout=self.task_time_limit,