Browse Source

create_daemon_context renamed -> detached and is now an actual context

Ask Solem 14 years ago
parent
commit
17315f121a

+ 11 - 19
celery/bin/celerybeat.py

@@ -22,10 +22,12 @@
     `ERROR`, `CRITICAL`, or `FATAL`.
     `ERROR`, `CRITICAL`, or `FATAL`.
 
 
 """
 """
+from __future__ import with_statement, absolute_import
+
 from functools import partial
 from functools import partial
 
 
+from celery.platforms import detached
 from celery.bin.base import Command, Option, daemon_options
 from celery.bin.base import Command, Option, daemon_options
-from celery.platforms import create_daemon_context
 
 
 
 
 class BeatCommand(Command):
 class BeatCommand(Command):
@@ -34,26 +36,16 @@ class BeatCommand(Command):
     def run(self, detach=False, logfile=None, pidfile=None, uid=None,
     def run(self, detach=False, logfile=None, pidfile=None, uid=None,
             gid=None, umask=None, working_directory=None, **kwargs):
             gid=None, umask=None, working_directory=None, **kwargs):
         kwargs.pop("app", None)
         kwargs.pop("app", None)
-
-        beat = partial(self.app.Beat, logfile=logfile, pidfile=pidfile,
-                       **kwargs)
-
-        if not detach:
+        beat = partial(self.app.Beat,
+                       logfile=logfile, pidfile=pidfile, **kwargs)
+        workdir = working_directory
+
+        if detach:
+            with detached(logfile, pidfile, uid, gid, umask, workdir):
+                return beat().run()
+        else:
             return beat().run()
             return beat().run()
 
 
-        context, on_stop = create_daemon_context(
-                                logfile=logfile,
-                                pidfile=pidfile,
-                                uid=uid,
-                                gid=gid,
-                                umask=umask,
-                                working_directory=working_directory)
-        context.open()
-        try:
-            beat().run()
-        finally:
-            on_stop()
-
     def get_options(self):
     def get_options(self):
         conf = self.app.conf
         conf = self.app.conf
 
 

+ 17 - 37
celery/bin/celeryd_detach.py

@@ -1,50 +1,30 @@
+from __future__ import absolute_import, with_statement
+
 import os
 import os
 import sys
 import sys
 
 
 from optparse import OptionParser, BadOptionError
 from optparse import OptionParser, BadOptionError
 
 
 from celery import __version__
 from celery import __version__
+from celery.platforms import detached
 from celery.bin.base import daemon_options
 from celery.bin.base import daemon_options
-from celery.platforms import create_daemon_context
 
 
 OPTION_LIST = daemon_options(default_pidfile="celeryd.pid")
 OPTION_LIST = daemon_options(default_pidfile="celeryd.pid")
 
 
 
 
-class detached(object):
-
-    def __init__(self, path, argv, logfile=None, pidfile=None, uid=None,
-            gid=None, umask=0, working_directory=None):
-        self.path = path
-        self.argv = argv
-        self.logfile = logfile
-        self.pidfile = pidfile
-        self.uid = uid
-        self.gid = gid
-        self.umask = umask
-        self.working_directory = working_directory
-
-    def start(self):
-        context, on_stop = create_daemon_context(
-                                logfile=self.logfile,
-                                pidfile=self.pidfile,
-                                uid=self.uid,
-                                gid=self.gid,
-                                umask=self.umask,
-                                working_directory=self.working_directory)
-        context.open()
+def detach(path, argv, logfile=None, pidfile=None, uid=None,
+           gid=None, umask=0, working_directory=None):
+    with detached(logfile, pidfile, uid, gid, umask, working_directory):
         try:
         try:
-            try:
-                os.execv(self.path, [self.path] + self.argv)
-            except Exception:
-                import logging
-                from celery.log import setup_logger
-                logger = setup_logger(logfile=self.logfile,
-                                      loglevel=logging.ERROR)
-                logger.critical("Can't exec %r" % (
-                    " ".join([self.path] + self.argv), ),
+            os.execv(path, [path] + argv)
+        except Exception:
+            import logging
+            from celery.log import setup_logger
+            logger = setup_logger(logfile=self.logfile,
+                                  loglevel=logging.ERROR)
+            logger.critical("Can't exec %r" % (
+                    " ".join([path] + argv), ),
                     exc_info=sys.exc_info())
                     exc_info=sys.exc_info())
-        finally:
-            on_stop()
 
 
 
 
 class PartialOptionParser(OptionParser):
 class PartialOptionParser(OptionParser):
@@ -134,9 +114,9 @@ class detached_celeryd(object):
             argv = sys.argv
             argv = sys.argv
         prog_name = os.path.basename(argv[0])
         prog_name = os.path.basename(argv[0])
         options, values, leftovers = self.parse_options(prog_name, argv[1:])
         options, values, leftovers = self.parse_options(prog_name, argv[1:])
-        detached(path=self.execv_path,
-                 argv=self.execv_argv + leftovers,
-                 **vars(options)).start()
+        detach(path=self.execv_path,
+               argv=self.execv_argv + leftovers,
+               **vars(options))
 
 
 
 
 def main():
 def main():

+ 14 - 15
celery/bin/celeryev.py

@@ -1,8 +1,12 @@
+from __future__ import absolute_import, with_statement
+
 import sys
 import sys
 
 
+from functools import partial
+
 from celery import platforms
 from celery import platforms
+from celery.platforms import detached
 from celery.bin.base import Command, Option, daemon_options
 from celery.bin.base import Command, Option, daemon_options
-from celery.platforms import create_daemon_context
 
 
 
 
 class EvCommand(Command):
 class EvCommand(Command):
@@ -39,22 +43,17 @@ class EvCommand(Command):
             gid=None, umask=None, working_directory=None,
             gid=None, umask=None, working_directory=None,
             detach=False, **kwargs):
             detach=False, **kwargs):
         from celery.events.snapshot import evcam
         from celery.events.snapshot import evcam
+        workdir = working_directory
         self.set_process_status("cam")
         self.set_process_status("cam")
         kwargs["app"] = self.app
         kwargs["app"] = self.app
-        if not detach:
-            return evcam(camera, logfile=logfile, pidfile=pidfile, **kwargs)
-        context, on_stop = create_daemon_context(
-                                logfile=logfile,
-                                pidfile=pidfile,
-                                uid=uid,
-                                gid=gid,
-                                umask=umask,
-                                working_directory=working_directory)
-        context.open()
-        try:
-            return evcam(camera, logfile=logfile, pidfile=pidfile, **kwargs)
-        finally:
-            on_stop()
+        cam = partial(evcam, camera,
+                      logfile=logfile, pidfile=pidfile, **kwargs)
+
+        if detach:
+            with detached(logfile, pidfile, uid, gid, umask, workdir):
+                return cam()
+        else:
+            return cam()
 
 
     def set_process_status(self, prog, info=""):
     def set_process_status(self, prog, info=""):
         prog = "%s:%s" % (self.prog_name, prog)
         prog = "%s:%s" % (self.prog_name, prog)

+ 4 - 0
celery/events/snapshot.py

@@ -83,6 +83,10 @@ def evcam(camera, freq=1.0, maxrate=None, loglevel=0,
     logger = app.log.setup_logger(loglevel=loglevel,
     logger = app.log.setup_logger(loglevel=loglevel,
                                   logfile=logfile,
                                   logfile=logfile,
                                   name="celery.evcam")
                                   name="celery.evcam")
+    if pidfile:
+        pidlock = platforms.create_pidlock(pidfile).acquire()
+        atexit.register(pidlock.release)
+
     logger.info(
     logger.info(
         "-> evcam: Taking snapshots with %s (every %s secs.)\n" % (
         "-> evcam: Taking snapshots with %s (every %s secs.)\n" % (
             camera, freq))
             camera, freq))

+ 59 - 80
celery/platforms.py

@@ -1,27 +1,16 @@
+from __future__ import absolute_import
+
 import os
 import os
 import sys
 import sys
 import errno
 import errno
 import signal
 import signal
-try:
-    from setproctitle import setproctitle as _setproctitle
-except ImportError:
-    _setproctitle = None  # noqa
-
-CAN_DETACH = True
-try:
-    import resource
-except ImportError:
-    CAN_DETACH = False
-
-try:
-    import pwd
-except ImportError:
-    pwd = None  # noqa
-
-try:
-    import grp
-except ImportError:
-    grp = None  # noqa
+
+from celery.local import try_import
+
+_setproctitle = try_import("setproctitle")
+resource = try_import("resource")
+pwd = try_import("pwd")
+grp = try_import("grp")
 
 
 DAEMON_UMASK = 0
 DAEMON_UMASK = 0
 DAEMON_WORKDIR = "/"
 DAEMON_WORKDIR = "/"
@@ -61,12 +50,14 @@ class PIDFile(object):
         except OSError, exc:
         except OSError, exc:
             raise LockFailed(str(exc))
             raise LockFailed(str(exc))
         return self
         return self
+    __enter__ = acquire
 
 
     def is_locked(self):
     def is_locked(self):
         return os.path.exists(self.path)
         return os.path.exists(self.path)
 
 
-    def release(self):
+    def release(self, *args):
         self.remove()
         self.remove()
+    __exit__ = release
 
 
     def read_pid(self):
     def read_pid(self):
         try:
         try:
@@ -134,76 +125,63 @@ def create_pidlock(pidfile):
 class DaemonContext(object):
 class DaemonContext(object):
     _is_open = False
     _is_open = False
 
 
-    def __init__(self, pidfile=None,
-            working_directory=DAEMON_WORKDIR, umask=DAEMON_UMASK, **kwargs):
-        self.working_directory = working_directory
+    def __init__(self, pidfile=None, workdir=DAEMON_WORKDIR,
+            umask=DAEMON_UMASK, **kwargs):
+        self.workdir = workdir
         self.umask = umask
         self.umask = umask
 
 
-    def detach(self):
-        if os.fork() == 0:      # first child
-            os.setsid()         # create new session
-            if os.fork() > 0:   # second child
-                os._exit(0)
-        else:
-            os._exit(0)
-
     def open(self):
     def open(self):
-        if self._is_open:
-            return
+        if not self._is_open:
+            self._detach()
 
 
-        self.detach()
+            os.chdir(self.workdir)
+            os.umask(self.umask)
 
 
-        os.chdir(self.working_directory)
-        os.umask(self.umask)
+            for fd in reversed(range(get_fdmax(default=2048))):
+                try:
+                    os.close(fd)
+                except OSError, exc:
+                    if exc.errno != errno.EBADF:
+                        raise
 
 
-        for fd in reversed(range(get_fdmax(default=2048))):
-            try:
-                os.close(fd)
-            except OSError, exc:
-                if exc.errno != errno.EBADF:
-                    raise
-
-        os.open(DAEMON_REDIRECT_TO, os.O_RDWR)
-        os.dup2(0, 1)
-        os.dup2(0, 2)
+            os.open(DAEMON_REDIRECT_TO, os.O_RDWR)
+            os.dup2(0, 1)
+            os.dup2(0, 2)
 
 
-        self._is_open = True
+            self._is_open = True
+    __enter__ = open
 
 
-    def close(self):
+    def close(self, *args):
         if self._is_open:
         if self._is_open:
             self._is_open = False
             self._is_open = False
+    __exit__ = close
 
 
+    def _detach(self):
+        if os.fork() == 0:      # first child
+            os.setsid()         # create new session
+            if os.fork() > 0:   # second child
+                os._exit(0)
+        else:
+            os._exit(0)
+        return self
 
 
-def create_daemon_context(logfile=None, pidfile=None, uid=None, gid=None,
-        **options):
-    if not CAN_DETACH:
-        raise RuntimeError(
-                "This platform does not support detach.")
 
 
-    # Make sure SIGCLD is using the default handler.
-    reset_signal("SIGCLD")
+def detached(logfile=None, pidfile=None, uid=None, gid=None, umask=0,
+             workdir=None, **opts):
+    if not resource:
+        raise RuntimeError("This platform does not support detach.")
+    workdir = os.getcwd() if workdir is None else workdir
 
 
+    reset_signal("SIGCLD")  # Make sure SIGCLD is using the default handler.
     set_effective_user(uid=uid, gid=gid)
     set_effective_user(uid=uid, gid=gid)
 
 
     # Since without stderr any errors will be silently suppressed,
     # Since without stderr any errors will be silently suppressed,
     # we need to know that we have access to the logfile.
     # we need to know that we have access to the logfile.
-    if logfile:
-        open(logfile, "a").close()
-    if pidfile:
-        # Doesn't actually create the pidfile, but makes sure it's
-        # not stale.
-        create_pidlock(pidfile)
-
-    defaults = {"umask": lambda: 0,
-                "working_directory": lambda: os.getcwd()}
+    logfile and open(logfile, "a").close()
+    # Doesn't actually create the pidfile, but makes sure it's not stale.
+    pidfile and create_pidlock(pidfile)
 
 
-    for opt_name, opt_default_gen in defaults.items():
-        if opt_name not in options or options[opt_name] is None:
-            options[opt_name] = opt_default_gen()
-
-    context = DaemonContext(**options)
-
-    return context, context.close
+    return DaemonContext(umask=umask, workdir=workdir)
 
 
 
 
 def parse_uid(uid):
 def parse_uid(uid):
@@ -337,7 +315,7 @@ def install_signal_handler(signal_name=None, handler=None, **sigmap):
 
 
 
 
 def strargv(argv):
 def strargv(argv):
-    arg_start = "manage" in argv[0] and 2 or 1
+    arg_start = 2 if "manage" in argv[0] else 1
     if len(argv) > arg_start:
     if len(argv) > arg_start:
         return " ".join(argv[arg_start:])
         return " ".join(argv[arg_start:])
     return ""
     return ""
@@ -350,9 +328,9 @@ def set_process_title(progname, info=None):
 
 
     """
     """
     proctitle = "[%s]" % progname
     proctitle = "[%s]" % progname
-    proctitle = info and "%s %s" % (proctitle, info) or proctitle
+    proctitle = "%s %s" % (proctitle, info) if info else proctitle
     if _setproctitle:
     if _setproctitle:
-        _setproctitle(proctitle)
+        _setproctitle.setproctitle(proctitle)
     return proctitle
     return proctitle
 
 
 
 
@@ -362,11 +340,12 @@ def set_mp_process_title(progname, info=None, hostname=None):
     Only works if :mod:`setproctitle` is installed.
     Only works if :mod:`setproctitle` is installed.
 
 
     """
     """
+    if hostname:
+        progname = "%s@%s" % (progname, hostname.split(".")[0])
     try:
     try:
         from multiprocessing.process import current_process
         from multiprocessing.process import current_process
     except ImportError:
     except ImportError:
-        return
-    if hostname:
-        progname = "%s@%s" % (progname, hostname.split(".")[0])
-    return set_process_title("%s:%s" % (progname, current_process().name),
-                             info=info)
+        return set_process_title(progname, info=info)
+    else:
+        return set_process_title("%s:%s" % (progname,
+                                            current_process().name), info=info)

+ 9 - 8
celery/tests/test_bin/test_celerybeat.py

@@ -161,24 +161,25 @@ class MockDaemonContext(object):
     opened = False
     opened = False
     closed = False
     closed = False
 
 
+    def __init__(self, *args, **kwargs):
+        pass
+
     def open(self):
     def open(self):
         self.__class__.opened = True
         self.__class__.opened = True
+        return self
+    __enter__ = open
 
 
-    def close(self):
+    def close(self, *args):
         self.__class__.closed = True
         self.__class__.closed = True
-
-
-def create_daemon_context(*args, **kwargs):
-    context = MockDaemonContext()
-    return context, context.close
+    __exit__ = close
 
 
 
 
 class test_div(AppCase):
 class test_div(AppCase):
 
 
     def setup(self):
     def setup(self):
         self.prev, beatapp.Beat = beatapp.Beat, MockBeat
         self.prev, beatapp.Beat = beatapp.Beat, MockBeat
-        self.ctx, celerybeat_bin.create_daemon_context = \
-                celerybeat_bin.create_daemon_context, create_daemon_context
+        self.ctx, celerybeat_bin.detached = \
+                celerybeat_bin.detached, MockDaemonContext
 
 
     def teardown(self):
     def teardown(self):
         beatapp.Beat = self.prev
         beatapp.Beat = self.prev