瀏覽代碼

Use ``python-daemon`` instead of the homegrown stuff. (this adds a dependency
to python-daemon)

Ask Solem 16 年之前
父節點
當前提交
48a55d3476
共有 5 個文件被更改,包括 100 次插入163 次删除
  1. 97 20
      celery/bin/celeryd.py
  2. 2 11
      celery/management/commands/celeryd.py
  3. 0 122
      celery/platform.py
  4. 0 10
      docs/reference/celery.platform.rst
  5. 1 0
      setup.py

+ 97 - 20
celery/bin/celeryd.py

@@ -26,7 +26,7 @@
     daemon sleeps until it wakes up to check if there's any
     daemon sleeps until it wakes up to check if there's any
     new messages on the queue.
     new messages on the queue.
 
 
-.. cmdoption:: -d, --daemon
+.. cmdoption:: -d, --detach, --daemon
 
 
     Run in the background as a daemon.
     Run in the background as a daemon.
 
 
@@ -35,6 +35,26 @@
     Discard all waiting tasks before the daemon is started.
     Discard all waiting tasks before the daemon is started.
     **WARNING**: This is unrecoverable, and the tasks will be
     **WARNING**: This is unrecoverable, and the tasks will be
     deleted from the messaging server.
     deleted from the messaging server.
+    
+.. cmdoption:: -u, --uid
+
+    User-id to run ``celeryd`` as when in daemon mode.
+
+.. cmdoption:: -g, --gid
+       
+    Group-id to run ``celeryd`` as when in daemon mode.
+
+.. cmdoption:: --umask
+    
+    umask of the process when in daemon mode.
+
+.. cmdoption:: --workdir
+
+    Directory to change to when in daemon mode.
+
+.. cmdoption:: --chroot
+
+    Change root directory to this path when in daemon mode.
 
 
 """
 """
 import os
 import os
@@ -45,7 +65,6 @@ if django_project_dir:
     sys.path.append(django_project_dir)
     sys.path.append(django_project_dir)
 
 
 from django.conf import settings
 from django.conf import settings
-from celery.platform import PIDFile, daemonize, remove_pidfile
 from celery.log import emergency_error
 from celery.log import emergency_error
 from celery.conf import LOG_LEVELS, DAEMON_LOG_FILE, DAEMON_LOG_LEVEL
 from celery.conf import LOG_LEVELS, DAEMON_LOG_FILE, DAEMON_LOG_LEVEL
 from celery.conf import DAEMON_CONCURRENCY, DAEMON_PID_FILE
 from celery.conf import DAEMON_CONCURRENCY, DAEMON_PID_FILE
@@ -56,11 +75,45 @@ from celery.worker import WorkController
 import traceback
 import traceback
 import optparse
 import optparse
 import atexit
 import atexit
+from daemon import DaemonContext
+from daemon.pidlockfile import PIDLockFile
 
 
 
 
-def main(concurrency=DAEMON_CONCURRENCY, daemon=False,
+def acquire_pidlock(pidfile):
+    """Get the :class:`daemon.pidlockfile.PIDLockFile` handler for
+    ``pidfile``.
+
+    If the ``pidfile`` already exists, but the process is not running the
+    ``pidfile`` will be removed, a ``"stale pidfile"`` message is emitted
+    and execution continues as normally. However, if the process is still
+    running the program will exit complaning that the program is already
+    running in the background somewhere.
+
+    """
+    pidlock = PIDLockFile(pidfile)
+    if not pidlock.is_locked():
+        return pidlock
+    pid = pidlock.read_pid()
+    try:
+        os.kill(pid, 0)
+    except os.error, exc:
+        if exc.errno == errno.ESRCH:
+            sys.stderr.write("Stale pidfile exists. Removing it.\n")
+            pidlock.release() 
+            return
+    else:
+        raise SystemExit(
+                "ERROR: Pidfile (%s) already exists.\n"
+                "Seems celeryd is already running? (PID: %d)" % (
+                    pidfile, pid))
+    return pidlock        
+
+
+def run_worker(concurrency=DAEMON_CONCURRENCY, daemon=False,
         loglevel=DAEMON_LOG_LEVEL, logfile=DAEMON_LOG_FILE, discard=False,
         loglevel=DAEMON_LOG_LEVEL, logfile=DAEMON_LOG_FILE, discard=False,
-        pidfile=DAEMON_PID_FILE, queue_wakeup_after=QUEUE_WAKEUP_AFTER):
+        pidfile=DAEMON_PID_FILE, queue_wakeup_after=QUEUE_WAKEUP_AFTER,
+        umask=0, uid=None, gid=None, working_directory=None, chroot=None,
+        **kwargs):
     """Run the celery daemon."""
     """Run the celery daemon."""
     if settings.DATABASE_ENGINE == "sqlite3" and concurrency > 1:
     if settings.DATABASE_ENGINE == "sqlite3" and concurrency > 1:
         import warnings
         import warnings
@@ -68,6 +121,9 @@ def main(concurrency=DAEMON_CONCURRENCY, daemon=False,
                 "concurrency. We'll be using a single process only.",
                 "concurrency. We'll be using a single process only.",
                 UserWarning)
                 UserWarning)
         concurrency = 1
         concurrency = 1
+    
+    if not isinstance(loglevel, int):
+        loglevel = LOG_LEVELS[loglevel.upper()]
 
 
     if discard:
     if discard:
         discarded_count = discard_all()
         discarded_count = discard_all()
@@ -77,11 +133,24 @@ def main(concurrency=DAEMON_CONCURRENCY, daemon=False,
         sys.stderr.write("Discard: Erased %d %s from the queue.\n" % (
         sys.stderr.write("Discard: Erased %d %s from the queue.\n" % (
             discarded_count, what))
             discarded_count, what))
     if daemon:
     if daemon:
+        # Since without stderr any errors will be silently suppressed,
+        # we need to know that we have access to the logfile
+        pidlock = acquire_pidlock(pidfile)
+        if not umask:
+            umask = 0
+        if logfile:
+            open(logfile, "a").close()
+        uid = uid and int(uid) or os.geteuid()
+        gid = gid and int(gid) or os.getegid()
+        working_directory = working_directory or os.getcwd()
         sys.stderr.write("Launching celeryd in the background...\n")
         sys.stderr.write("Launching celeryd in the background...\n")
-        pidfile_handler = PIDFile(pidfile)
-        pidfile_handler.check()
-        daemonize(pidfile=pidfile_handler)
-        atexit.register(remove_pidfile, pidfile)
+        context = DaemonContext(chroot_directory=chroot,
+                                working_directory=working_directory,
+                                umask=umask,
+                                pidfile=pidlock,
+                                uid=uid,
+                                gid=gid)
+        context.open()
     else:
     else:
         logfile = None # log to stderr when not running as daemon.
         logfile = None # log to stderr when not running as daemon.
 
 
@@ -121,26 +190,34 @@ OPTION_LIST = (
             help="If the queue is empty, this is the time *in seconds* the "
             help="If the queue is empty, this is the time *in seconds* the "
                  "daemon sleeps until it wakes up to check if there's any "
                  "daemon sleeps until it wakes up to check if there's any "
                  "new messages on the queue."),
                  "new messages on the queue."),
-    optparse.make_option('-d', '--daemon', default=False,
+    optparse.make_option('-d', '--detach', '--daemon', default=False,
             action="store_true", dest="daemon",
             action="store_true", dest="daemon",
             help="Run in the background as a daemon."),
             help="Run in the background as a daemon."),
-)
+    optparse.make_option('-u', '--uid', default=None,
+            action="store", dest="uid",
+            help="User-id to run celeryd as when in daemon mode."),
+    optparse.make_option('-g', '--gid', default=None,
+            action="store", dest="gid",
+            help="Group-id to run celeryd as when in daemon mode."),
+    optparse.make_option('--umask', default=0,
+            action="store", type="int", dest="umask",
+            help="umask of the process when in daemon mode."),
+    optparse.make_option('--workdir', default=None,
+            action="store", dest="working_directory",
+            help="Directory to change to when in daemon mode."),
+    optparse.make_option('--chroot', default=None,
+            action="store", dest="chroot",
+            help="Change root directory to this path when in daemon mode."),
+    )
 
 
 
 
 def parse_options(arguments):
 def parse_options(arguments):
-    """Option parsers for the available options to ``celeryd``."""
+    """Parse the available options to ``celeryd``."""
     parser = optparse.OptionParser(option_list=OPTION_LIST)
     parser = optparse.OptionParser(option_list=OPTION_LIST)
     options, values = parser.parse_args(arguments)
     options, values = parser.parse_args(arguments)
-    if not isinstance(options.loglevel, int):
-        options.loglevel = LOG_LEVELS[options.loglevel.upper()]
     return options
     return options
 
 
+
 if __name__ == "__main__":
 if __name__ == "__main__":
     options = parse_options(sys.argv[1:])
     options = parse_options(sys.argv[1:])
-    main(concurrency=options.concurrency,
-         daemon=options.daemon,
-         logfile=options.logfile,
-         loglevel=options.loglevel,
-         pidfile=options.pidfile,
-         discard=options.discard,
-         queue_wakeup_after=options.queue_wakeup_after)
+    run_worker(**options)

+ 2 - 11
celery/management/commands/celeryd.py

@@ -4,8 +4,7 @@ Start the celery daemon from the Django management command.
 
 
 """
 """
 from django.core.management.base import BaseCommand
 from django.core.management.base import BaseCommand
-from celery.bin.celeryd import main, OPTION_LIST
-from celery.conf import LOG_LEVELS
+from celery.bin.celeryd import run_worker, OPTION_LIST
 
 
 
 
 class Command(BaseCommand):
 class Command(BaseCommand):
@@ -15,12 +14,4 @@ class Command(BaseCommand):
 
 
     def handle(self, *args, **options):
     def handle(self, *args, **options):
         """Handle the management command."""
         """Handle the management command."""
-        if not isinstance(options.get('loglevel'), int):
-            options['loglevel'] = LOG_LEVELS[options.get('loglevel').upper()]
-        main(concurrency=options.get('concurrency'),
-             daemon=options.get('daemon'),
-             logfile=options.get('logfile'),
-             discard=options.get('discard'),
-             loglevel=options.get('loglevel'),
-             pidfile=options.get('pidfile'),
-             queue_wakeup_after=options.get('queue_wakeup_after'))
+        run_worker(**options)

+ 0 - 122
celery/platform.py

@@ -1,122 +0,0 @@
-"""celery.platform"""
-import os
-import sys
-import errno
-import resource
-
-
-# File mode creation mask of the daemon.
-# No point in changing this, as we don't really create any files.
-DAEMON_UMASK = 0
-
-# Default working directory for the daemon.
-DAEMON_WORKDIR = "/"
-
-# Default maximum for the number of available file descriptors.
-DAEMON_MAXFD = 1024
-
-# The standard I/O file descriptors are redirected to /dev/null by default.
-if (hasattr(os, "devnull")):
-    REDIRECT_TO = os.devnull
-else:
-    REDIRECT_TO = "/dev/null"
-
-
-class PIDFile(object):
-    """Manages a pid file."""
-
-    def __init__(self, pidfile):
-        self.pidfile = pidfile
-
-    def get_pid(self):
-        """Get the process id stored in the pidfile."""
-        pidfile_fh = file(self.pidfile, "r")
-        pid = int(pidfile_fh.read().strip())
-        pidfile_fh.close()
-        return pid
-
-    def check(self):
-        """Check the status of the pidfile.
-
-        If the pidfile exists, and the process is not running, it will
-        remove the stale pidfile and continue as normal. If the process
-        *is* running, it will exit the program with an error message.
-
-        """
-        if os.path.exists(self.pidfile) and os.path.isfile(self.pidfile):
-            pid = self.get_pid()
-            try:
-                os.kill(pid, 0)
-            except os.error, e:
-                if e.errno == errno.ESRCH:
-                    sys.stderr.write("Stale pidfile exists. removing it.\n")
-                    self.remove()
-            else:
-                raise SystemExit("celeryd is already running.")
-
-    def remove(self):
-        """Remove the pidfile."""
-        os.unlink(self.pidfile)
-
-    def write(self, pid=None):
-        """Write a pidfile.
-
-        If ``pid`` is not specified the pid of the current process
-        will be used.
-
-        """
-        if not pid:
-            pid = os.getpid()
-        pidfile_fh = file(self.pidfile, "w")
-        pidfile_fh.write("%d\n" % pid)
-        pidfile_fh.close()
-
-
-def remove_pidfile(pidfile):
-    """Remove the pidfile."""
-    os.unlink(pidfile)
-
-
-def daemonize(pidfile):
-    """Detach a process from the controlling terminal and run it in the
-    background as a daemon."""
-
-    try:
-        pid = os.fork()
-    except OSError, e:
-        raise Exception("%s [%d]" % (e.strerror, e.errno))
-
-    if pid == 0: # child
-        os.setsid()
-
-        try:
-            pid = os.fork() # second child
-        except OSError, e:
-            raise Exception("%s [%d]" % (e.strerror, e.errno))
-
-        if pid == 0: # second child
-            #os.chdir(DAEMON_WORKDIR)
-            os.umask(DAEMON_UMASK)
-        else: # parent (first child)
-            pidfile.write(pid)
-            os._exit(0)
-    else: # root process
-        os._exit(0)
-
-    maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
-    if (maxfd == resource.RLIM_INFINITY):
-        maxfd = DAEMON_MAXFD
-
-    # Iterate through and close all file descriptors.
-    for fd in range(0, maxfd):
-        try:
-            os.close(fd)
-        except OSError:
-            pass
-
-    os.open(REDIRECT_TO, os.O_RDWR)
-    # Duplicate standard input to standard output and standard error.
-    os.dup2(0, 1)
-    os.dup2(0, 2)
-
-    return 0

+ 0 - 10
docs/reference/celery.platform.rst

@@ -1,10 +0,0 @@
-====================================
-Platform Specific - celery.platform
-====================================
-
-.. currentmodule:: celery.platform
-
-.. automodule:: celery.platform
-    :members:
-
-

+ 1 - 0
setup.py

@@ -64,6 +64,7 @@ setup(
     zip_safe=False,
     zip_safe=False,
     install_requires=[
     install_requires=[
         'carrot>=0.4.1',
         'carrot>=0.4.1',
+        'python-daemon',
         'django',
         'django',
     ],
     ],
     cmdclass = {"test": RunTests},
     cmdclass = {"test": RunTests},