Parcourir la source

celeryd now supports a --pidfile argument.

A pidfile is not created by default, but if this argument is used
celeryd will not start if the pidfile exists and the pid contained is still alive.
Ask Solem il y a 14 ans
Parent
commit
271ca68a66
3 fichiers modifiés avec 238 ajouts et 11 suppressions
  1. 16 9
      celery/apps/worker.py
  2. 4 0
      celery/bin/celeryd.py
  3. 218 2
      celery/platforms.py

+ 16 - 9
celery/apps/worker.py

@@ -1,3 +1,4 @@
+import atexit
 import logging
 import multiprocessing
 import os
@@ -36,7 +37,7 @@ class Worker(object):
             hostname=None, discard=False, run_clockservice=False,
             schedule=None, task_time_limit=None, task_soft_time_limit=None,
             max_tasks_per_child=None, queues=None, events=False, db=None,
-            include=None, app=None, **kwargs):
+            include=None, app=None, pidfile=None, **kwargs):
         self.app = app = app_or_default(app)
         self.concurrency = (concurrency or
                             app.conf.CELERYD_CONCURRENCY or
@@ -58,6 +59,7 @@ class Worker(object):
         self.use_queues = queues or []
         self.queues = None
         self.include = include or []
+        self.pidfile = pidfile
         self._isatty = sys.stdout.isatty()
 
         if isinstance(self.use_queues, basestring):
@@ -81,7 +83,7 @@ class Worker(object):
         # Dump configuration to screen so we have some basic information
         # for when users sends bug reports.
         print(self.startup_info())
-        set_process_status("Running...")
+        self.set_process_status("Running...")
 
         self.run_worker()
 
@@ -155,6 +157,9 @@ class Worker(object):
         }
 
     def run_worker(self):
+        if self.pidfile:
+            pidlock = platforms.create_pidlock(self.pidfile).__enter__()
+            atexit.register(pidlock.release)
         worker = self.WorkController(app=self.app,
                                 concurrency=self.concurrency,
                                 loglevel=self.loglevel,
@@ -196,6 +201,15 @@ class Worker(object):
         """See http://github.com/ask/celery/issues#issue/161"""
         os.environ.setdefault("celery_dummy_proxy", "set_by_celeryd")
 
+    def set_process_status(self, info):
+        arg_start = "manage" in sys.argv[0] and 2 or 1
+        if sys.argv[arg_start:]:
+            info = "%s (%s)" % (info, " ".join(sys.argv[arg_start:]))
+        return platforms.set_mp_process_title("celeryd", info=info,
+                                              hostname=self.hostname)
+
+
+
 
 def install_worker_int_handler(worker):
 
@@ -261,12 +275,5 @@ def install_HUP_not_supported_handler(worker):
     platforms.install_signal_handler("SIGHUP", warn_on_HUP_handler)
 
 
-def set_process_status(info):
-    arg_start = "manage" in sys.argv[0] and 2 or 1
-    if sys.argv[arg_start:]:
-        info = "%s (%s)" % (info, " ".join(sys.argv[arg_start:]))
-    return platforms.set_mp_process_title("celeryd", info=info)
-
-
 def run_worker(*args, **kwargs):
     return Worker(*args, **kwargs).run()

+ 4 - 0
celery/bin/celeryd.py

@@ -145,6 +145,10 @@ class WorkerCommand(Command):
                 action="store", dest="include",
                 help="Comma separated list of additional modules to import. "
                  "Example: -I foo.tasks,bar.tasks"),
+            Option('--pidfile', default=None,
+                help="Optional file used to store the workers pid. "
+                     "The worker will not start if this file already exists "
+                     "and the pid is still alive."),
         )
 
 

+ 218 - 2
celery/platforms.py

@@ -1,9 +1,223 @@
+import os
+import sys
+import pwd
+import grp
+import errno
+import atexit
 import signal
 try:
     from setproctitle import setproctitle as _setproctitle
 except ImportError:
     _setproctitle = None
 
+CAN_DETACH = True
+try:
+    import resource
+except ImportError:
+    CAN_DETACH = False
+
+DAEMON_UMASK = 0
+DAEMON_WORKDIR = "/"
+DAEMON_REDIRECT_TO = getattr(os, "devnull", "/dev/nulll")
+
+
+def create_pidlock(pidfile):
+    """Create a pidfile to be used with python-daemon.
+
+    If the pidfile already exists the program exits with an error message,
+    however if the process it refers to is not running anymore, the pidfile
+    is just deleted.
+
+    """
+
+    from lockfile import pidlockfile, LockFailed
+
+    class PIDFile(object):
+
+        def __init__(self, path):
+            self.path = os.path.abspath(path)
+
+        def __enter__(self):
+            try:
+                pidlockfile.write_pid_to_pidfile(self.path)
+            except OSError, exc:
+                raise LockFailed(str(exc))
+            return self
+
+        def __exit__(self, *_exc):
+            self.release()
+
+        def is_locked(self):
+            return os.path.exists(self.path)
+
+        def release(self):
+            try:
+                os.unlink(self.path)
+            except OSError, exc:
+                if exc.errno in (errno.ENOENT, errno.EACCES):
+                    return
+                raise
+
+        def read_pid(self):
+            return pidlockfile.read_pid_from_pidfile(self.path)
+
+        def is_stale(self):
+            pid = self.read_pid()
+            try:
+                os.kill(pid, 0)
+            except os.error, exc:
+                if exc.errno == errno.ESRCH:
+                    sys.stderr.write("Stale pidfile exists. Removing it.\n")
+                    self.release()
+                    return True
+            except TypeError, exc:
+                sys.stderr.write("Broken pidfile found. Removing it.\n")
+                self.release()
+                return True
+            return False
+
+    pidlock = PIDFile(pidfile)
+    if pidlock.is_locked() and not pidlock.is_stale():
+        raise SystemExit(
+                "ERROR: Pidfile (%s) already exists.\n"
+                "Seems we're already running? (PID: %s)" % (
+                    pidfile, pidlock.read_pid()))
+    return pidlock
+
+
+class DaemonContext(object):
+    _is_open = False
+
+    def __init__(self, pidfile=None, chroot_directory=None,
+            working_directory=DAEMON_WORKDIR, umask=DAEMON_UMASK, **kwargs):
+        self.chroot_directory = chroot_directory
+        self.working_directory = working_directory
+        self.umask = umask
+
+    def detach(self):
+        if os.fork() == 0:      # first child
+            os.setsid()         # create new session
+            if os.fork() > 0:   # second child
+                os._exit(0)
+        else:
+            os._exit(0)
+
+    def open(self):
+        from daemon import daemon
+        if self._is_open:
+            return
+
+        self.detach()
+
+        if self.chroot_directory is not None:
+            daemon.change_root_directory(self.chroot_directory)
+        os.chdir(self.working_directory)
+        os.umask(self.umask)
+
+        daemon.close_all_open_files()
+
+        os.open(DAEMON_REDIRECT_TO, os.O_RDWR)
+        os.dup2(0, 1)
+        os.dup2(0, 2)
+
+        self._is_open = True
+
+    def close(self):
+        if self._is_open:
+            self._is_open = False
+
+def create_daemon_context(logfile=None, pidfile=None, **options):
+    if not CAN_DETACH:
+        raise RuntimeError(
+                "This platform does not support detach.")
+
+    # set SIGCLD back to the default SIG_DFL (before python-daemon overrode
+    # it) lets the parent wait() for the terminated child process and stops
+    # the 'OSError: [Errno 10] No child processes' problem.
+    reset_signal("SIGCLD")
+
+    # Since without stderr any errors will be silently suppressed,
+    # we need to know that we have access to the logfile.
+    if logfile:
+        open(logfile, "a").close()
+    if pidfile:
+        open(pidfile, "a").close()
+
+    defaults = {"umask": lambda: 0,
+                "chroot_directory": lambda: None,
+                "working_directory": lambda: os.getcwd()}
+
+    for opt_name, opt_default_gen in defaults.items():
+        if opt_name not in options or options[opt_name] is None:
+            options[opt_name] = opt_default_gen()
+
+    context = DaemonContext(**options)
+
+    return context, context.close
+
+
+
+def parse_uid(uid):
+    """Parse user id.
+
+    uid can be an interger (uid) or a string (username), if a username
+    the uid is taken from the password file.
+
+    """
+    try:
+        return int(uid)
+    except ValueError:
+        return pwd.getpwnam(uid).pw_uid
+
+
+def parse_gid(gid):
+    """Parse group id.
+
+    gid can be an integer (gid) or a string (group name), if a group name
+    the gid is taken from the password file.
+
+    """
+    try:
+        return int(gid)
+    except ValueError:
+        return grp.getgrnam(gid).gr_gid
+
+
+def setegid(gid):
+    """Set effective group id."""
+    gid = parse_gid(gid)
+    if gid != os.getgid():
+        os.setegid
+
+
+def seteuid(uid):
+    """Set effective user id."""
+    uid = parse_uid(uid)
+    if uid != os.getuid():
+        os.seteuid(uid)
+
+
+def set_effective_user(uid=None, gid=None):
+    """Change process privileges to new user/group.
+
+    If uid and gid is set the effective user/group is set.
+
+    If only uid is set, the effective uer is set, and the group is
+    set to the users primary group.
+
+    If only gid is set, the effective group is set.
+
+    """
+    uid = uid and parse_uid(uid)
+    gid = gid and parse_gid(gid)
+
+    if uid:
+        # If gid isn't defined, get the primary gid of the uer.
+        setegid(gid or pwd.getpwuid(uid).pw_gid)
+        seteuid(uid)
+    else:
+        gid and setegid(gid)
+
 
 def reset_signal(signal_name):
     """Reset signal to the default signal handler.
@@ -60,12 +274,14 @@ def set_process_title(progname, info=None):
     return proctitle
 
 
-def set_mp_process_title(progname, info=None):
+def set_mp_process_title(progname, info=None, hostname=None):
     """Set the ps name using the multiprocessing process name.
 
     Only works if :mod:`setproctitle` is installed.
 
     """
     from multiprocessing.process import current_process
-    return set_process_title("%s.%s" % (progname, current_process().name),
+    if hostname:
+        progname = "%s@%s" % (progname, hostname.split(".")[0])
+    return set_process_title("%s:%s" % (progname, current_process().name),
                              info=info)