Forráskód Böngészése

Move platform specific stuff (signal, daemon context) to celery.platform

Ask Solem 15 éve
szülő
commit
cb5bc614bb
2 módosított fájl, 107 hozzáadás és 82 törlés
  1. 15 82
      celery/bin/celeryd.py
  2. 92 0
      celery/platform.py

+ 15 - 82
celery/bin/celeryd.py

@@ -63,12 +63,6 @@
 """
 import os
 import sys
-CAN_DETACH = True
-try:
-    import resource
-except ImportError:
-    CAN_DETACH = False
-
 from celery.loaders import current_loader
 from celery.loaders import settings
 from celery import __version__
@@ -80,11 +74,10 @@ from celery import conf
 from celery import discovery
 from celery.task import discard_all
 from celery.worker import WorkController
-import signal
+from celery import platform
 import multiprocessing
 import traceback
 import optparse
-import atexit
 
 USE_STATISTICS = getattr(settings, "CELERY_STATISTICS", False)
 # Make sure the setting exists.
@@ -144,42 +137,6 @@ OPTION_LIST = (
     )
 
 
-def acquire_pidlock(pidfile):
-    """Get the :class:`daemon.pidlockfile.PIDLockFile` handler for
-    ``pidfile``.
-
-    If the ``pidfile`` already exists, but the process is not running the
-    ``pidfile`` will be removed, a ``"stale pidfile"`` message is emitted
-    and execution continues as normally. However, if the process is still
-    running the program will exit complaning that the program is already
-    running in the background somewhere.
-
-    """
-    from daemon.pidlockfile import PIDLockFile
-    import errno
-    pidlock = PIDLockFile(pidfile)
-    if not pidlock.is_locked():
-        return pidlock
-    pid = pidlock.read_pid()
-    try:
-        os.kill(pid, 0)
-    except os.error, exc:
-        if exc.errno == errno.ESRCH:
-            sys.stderr.write("Stale pidfile exists. Removing it.\n")
-            os.unlink(pidfile)
-            return PIDLockFile(pidfile)
-    except TypeError, exc:
-        sys.stderr.write("Broken pidfile found. Removing it.\n")
-        os.unlink(pidfile)
-        return PIDLockFile(pidfile)
-    else:
-        raise SystemExit(
-                "ERROR: Pidfile (%s) already exists.\n"
-                "Seems celeryd is already running? (PID: %d)" % (
-                    pidfile, pid))
-    return pidlock
-
-
 def run_worker(concurrency=DAEMON_CONCURRENCY, detach=False,
         loglevel=DAEMON_LOG_LEVEL, logfile=DAEMON_LOG_FILE, discard=False,
         pidfile=DAEMON_PID_FILE, umask=0, uid=None, gid=None,
@@ -187,14 +144,12 @@ def run_worker(concurrency=DAEMON_CONCURRENCY, detach=False,
         statistics=None, **kwargs):
     """Starts the celery worker server."""
 
+    print("Celery %s is starting." % __version__)
+
     # set SIGCLD back to the default SIG_DFL (before python-daemon overrode
     # it) lets the parent wait() for the terminated child process and stops
     # the 'OSError: [Errno 10] No child processes' problem.
-
-    if hasattr(signal, "SIGCLD"): # Make sure the platform supports signals.
-        signal.signal(signal.SIGCLD, signal.SIG_DFL)
-
-    print("Celery %s is starting." % __version__)
+    platform.reset_signal("SIGCLD")
 
     if statistics is not None:
         settings.CELERY_STATISTICS = statistics
@@ -242,31 +197,13 @@ def run_worker(concurrency=DAEMON_CONCURRENCY, detach=False,
 
     print("Celery has started.")
     if detach:
-        if not CAN_DETACH:
-            raise RuntimeError(
-                    "This operating system doesn't support detach. ")
-        from daemon import DaemonContext
         from celery.log import setup_logger, redirect_stdouts_to_logger
-
-        # Since without stderr any errors will be silently suppressed,
-        # we need to know that we have access to the logfile
-        if logfile:
-            open(logfile, "a").close()
-
-        pidlock = acquire_pidlock(pidfile)
-        if umask is None:
-            umask = 0
-        if uid is None:
-            uid = os.geteuid()
-        if gid is None:
-            gid = os.getegid()
-        working_directory = working_directory or os.getcwd()
-        context = DaemonContext(chroot_directory=chroot,
-                                working_directory=working_directory,
-                                umask=umask,
-                                pidfile=pidlock,
-                                uid=uid,
-                                gid=gid)
+        context = platform.create_daemon_context(logfile, pidfile,
+                                        chroot_directory=chroot,
+                                        working_directory=working_directory,
+                                        umask=umask,
+                                        uid=uid,
+                                        gid=gid)
         context.open()
         logger = setup_logger(loglevel, logfile)
         redirect_stdouts_to_logger(logger, loglevel)
@@ -283,7 +220,7 @@ def run_worker(concurrency=DAEMON_CONCURRENCY, detach=False,
 
         # Install signal handler that restarts celeryd on SIGHUP,
         # (only on POSIX systems)
-        install_restart_signal_handler(worker)
+        install_worker_restart_handler(worker)
 
         try:
             worker.start()
@@ -302,14 +239,9 @@ def run_worker(concurrency=DAEMON_CONCURRENCY, detach=False,
         raise
 
 
-def install_restart_signal_handler(worker):
-    """Installs a signal handler that restarts the current program
-    when it receives the ``SIGHUP`` signal.
-    """
-    if not hasattr(signal, "SIGHUP"):
-        return  # platform is not POSIX
+def install_worker_restart_handler(worker):
 
-    def restart_self(signum, frame):
+    def restart_worker_sig_handler(signum, frame):
         """Signal handler restarting the current python program."""
         worker.logger.info("Restarting celeryd (%s)" % (
             " ".join(sys.argv)))
@@ -322,7 +254,8 @@ def install_restart_signal_handler(worker):
             worker.stop()
         os.execv(sys.executable, [sys.executable] + sys.argv)
 
-    signal.signal(signal.SIGHUP, restart_self)
+    platform.install_signal_handler("SIGHUP", restart_worker_sig_handler)
+
 
 
 def parse_options(arguments):

+ 92 - 0
celery/platform.py

@@ -0,0 +1,92 @@
+import os
+import sys
+import signal
+
+
+CAN_DETACH = True
+try:
+    import resource
+except ImportError:
+    CAN_DETACH = False
+
+
+def acquire_pidlock(pidfile):
+    """Get the :class:`daemon.pidlockfile.PIDLockFile` handler for
+    ``pidfile``.
+
+    If the ``pidfile`` already exists, but the process is not running the
+    ``pidfile`` will be removed, a ``"stale pidfile"`` message is emitted
+    and execution continues as normally. However, if the process is still
+    running the program will exit complaning that the program is already
+    running in the background somewhere.
+
+    """
+    from daemon.pidlockfile import PIDLockFile
+    import errno
+    pidlock = PIDLockFile(pidfile)
+    if not pidlock.is_locked():
+        return pidlock
+    pid = pidlock.read_pid()
+    try:
+        os.kill(pid, 0)
+    except os.error, exc:
+        if exc.errno == errno.ESRCH:
+            sys.stderr.write("Stale pidfile exists. Removing it.\n")
+            os.unlink(pidfile)
+            return PIDLockFile(pidfile)
+    except TypeError, exc:
+        sys.stderr.write("Broken pidfile found. Removing it.\n")
+        os.unlink(pidfile)
+        return PIDLockFile(pidfile)
+    else:
+        raise SystemExit(
+                "ERROR: Pidfile (%s) already exists.\n"
+                "Seems celeryd is already running? (PID: %d)" % (
+                    pidfile, pid))
+    return pidlock
+
+
+def create_daemon_context(logfile=None, pidfile=None, **options):
+    if not CAN_DETACH:
+        raise RuntimeError(
+                "This operating system doesn't support detach.")
+
+    from daemon import DaemonContext
+
+    # Since without stderr any errors will be silently suppressed,
+    # we need to know that we have access to the logfile
+    if logfile:
+        open(logfile, "a").close()
+
+    options["pidlock"] = pidfile and acquire_pidlock(pidfile)
+
+    defaults = {"uid": lambda: os.geteuid(),
+                "gid": lambda: os.getegid(),
+                "umask": lambda: 0,
+                "chroot_directory": lambda: None,
+                "working_directory": lambda: os.getcwd()}
+
+    for opt_name, opt_default_gen in defaults.items():
+        if opt_name not in options:
+            options[opt_name] = opt_default_gen()
+
+    return DaemonContext(chroot_directory=chroot,
+                            working_directory=working_directory,
+                            umask=umask,
+                            pidfile=pidlock,
+                            uid=uid,
+                            gid=gid)
+
+
+def reset_signal(signal_name):
+    if hasattr(signal, signal_name):
+        signal.signal(getattr(signal, signal_name), signal.SIG_DFL)
+
+
+def install_signal_handler(signal_name, handler):
+    """Install a SIGHUP handler."""
+    if not hasattr(signal, signal):
+        return # Platform doesn't support signal.
+
+    signum = getattr(signal, signal_name)
+    signal.signal(signum, handler)