浏览代码

Merge branch 'master' into statistics

Ask Solem 16 年之前
父节点
当前提交
86b2ea2077
共有 13 个文件被更改,包括 129 次插入173 次删除
  1. 1 0
      AUTHORS
  2. 1 1
      README
  3. 0 1
      THANKS
  4. 1 1
      celery/__init__.py
  5. 97 20
      celery/bin/celeryd.py
  6. 2 11
      celery/management/commands/celeryd.py
  7. 0 122
      celery/platform.py
  8. 3 3
      celery/views.py
  9. 14 3
      celery/worker.py
  10. 0 10
      docs/reference/celery.platform.rst
  11. 8 0
      docs/reference/celery.pool.rst
  12. 1 0
      docs/reference/index.rst
  13. 1 1
      setup.py

+ 1 - 0
AUTHORS

@@ -1,2 +1,3 @@
 Ask Solem <askh@opera.com>
 Ask Solem <askh@opera.com>
 Grégoire Cachet <gregoire@audacy.fr>
 Grégoire Cachet <gregoire@audacy.fr>
+Vitaly Babiy <vbabiy86@gmail.com>

+ 1 - 1
README

@@ -2,7 +2,7 @@
 celery - Distributed Task Queue for Django.
 celery - Distributed Task Queue for Django.
 ============================================
 ============================================
 
 
-:Version: 0.3.3
+:Version: 0.3.5
 
 
 Introduction
 Introduction
 ============
 ============

+ 0 - 1
THANKS

@@ -1,4 +1,3 @@
 Thanks to Rune Halvorsen <runeh@opera.com> for the name.
 Thanks to Rune Halvorsen <runeh@opera.com> for the name.
 Thanks to Anton Tsigularov <antont@opera.com> for the previous name (crunchy)
 Thanks to Anton Tsigularov <antont@opera.com> for the previous name (crunchy)
     which we had to abandon because of an existing project with that name.
     which we had to abandon because of an existing project with that name.
-Thanks to Vitaly Babiy for bug reports.

+ 1 - 1
celery/__init__.py

@@ -1,5 +1,5 @@
 """Distributed Task Queue for Django"""
 """Distributed Task Queue for Django"""
-VERSION = (0, 3, 3)
+VERSION = (0, 3, 5)
 __version__ = ".".join(map(str, VERSION))
 __version__ = ".".join(map(str, VERSION))
 __author__ = "Ask Solem"
 __author__ = "Ask Solem"
 __contact__ = "askh@opera.com"
 __contact__ = "askh@opera.com"

+ 97 - 20
celery/bin/celeryd.py

@@ -26,7 +26,7 @@
     daemon sleeps until it wakes up to check if there's any
     daemon sleeps until it wakes up to check if there's any
     new messages on the queue.
     new messages on the queue.
 
 
-.. cmdoption:: -d, --daemon
+.. cmdoption:: -d, --detach, --daemon
 
 
     Run in the background as a daemon.
     Run in the background as a daemon.
 
 
@@ -35,6 +35,26 @@
     Discard all waiting tasks before the daemon is started.
     Discard all waiting tasks before the daemon is started.
     **WARNING**: This is unrecoverable, and the tasks will be
     **WARNING**: This is unrecoverable, and the tasks will be
     deleted from the messaging server.
     deleted from the messaging server.
+    
+.. cmdoption:: -u, --uid
+
+    User-id to run ``celeryd`` as when in daemon mode.
+
+.. cmdoption:: -g, --gid
+       
+    Group-id to run ``celeryd`` as when in daemon mode.
+
+.. cmdoption:: --umask
+    
+    umask of the process when in daemon mode.
+
+.. cmdoption:: --workdir
+
+    Directory to change to when in daemon mode.
+
+.. cmdoption:: --chroot
+
+    Change root directory to this path when in daemon mode.
 
 
 """
 """
 import os
 import os
@@ -45,7 +65,6 @@ if django_project_dir:
     sys.path.append(django_project_dir)
     sys.path.append(django_project_dir)
 
 
 from django.conf import settings
 from django.conf import settings
-from celery.platform import PIDFile, daemonize, remove_pidfile
 from celery.log import emergency_error
 from celery.log import emergency_error
 from celery.conf import LOG_LEVELS, DAEMON_LOG_FILE, DAEMON_LOG_LEVEL
 from celery.conf import LOG_LEVELS, DAEMON_LOG_FILE, DAEMON_LOG_LEVEL
 from celery.conf import DAEMON_CONCURRENCY, DAEMON_PID_FILE
 from celery.conf import DAEMON_CONCURRENCY, DAEMON_PID_FILE
@@ -56,11 +75,45 @@ from celery.worker import WorkController
 import traceback
 import traceback
 import optparse
 import optparse
 import atexit
 import atexit
+from daemon import DaemonContext
+from daemon.pidlockfile import PIDLockFile
 
 
 
 
-def main(concurrency=DAEMON_CONCURRENCY, daemon=False,
+def acquire_pidlock(pidfile):
+    """Get the :class:`daemon.pidlockfile.PIDLockFile` handler for
+    ``pidfile``.
+
+    If the ``pidfile`` already exists, but the process is not running the
+    ``pidfile`` will be removed, a ``"stale pidfile"`` message is emitted
+    and execution continues as normally. However, if the process is still
+    running the program will exit complaning that the program is already
+    running in the background somewhere.
+
+    """
+    pidlock = PIDLockFile(pidfile)
+    if not pidlock.is_locked():
+        return pidlock
+    pid = pidlock.read_pid()
+    try:
+        os.kill(pid, 0)
+    except os.error, exc:
+        if exc.errno == errno.ESRCH:
+            sys.stderr.write("Stale pidfile exists. Removing it.\n")
+            pidlock.release() 
+            return
+    else:
+        raise SystemExit(
+                "ERROR: Pidfile (%s) already exists.\n"
+                "Seems celeryd is already running? (PID: %d)" % (
+                    pidfile, pid))
+    return pidlock        
+
+
+def run_worker(concurrency=DAEMON_CONCURRENCY, daemon=False,
         loglevel=DAEMON_LOG_LEVEL, logfile=DAEMON_LOG_FILE, discard=False,
         loglevel=DAEMON_LOG_LEVEL, logfile=DAEMON_LOG_FILE, discard=False,
-        pidfile=DAEMON_PID_FILE, queue_wakeup_after=QUEUE_WAKEUP_AFTER):
+        pidfile=DAEMON_PID_FILE, queue_wakeup_after=QUEUE_WAKEUP_AFTER,
+        umask=0, uid=None, gid=None, working_directory=None, chroot=None,
+        **kwargs):
     """Run the celery daemon."""
     """Run the celery daemon."""
     if settings.DATABASE_ENGINE == "sqlite3" and concurrency > 1:
     if settings.DATABASE_ENGINE == "sqlite3" and concurrency > 1:
         import warnings
         import warnings
@@ -68,6 +121,9 @@ def main(concurrency=DAEMON_CONCURRENCY, daemon=False,
                 "concurrency. We'll be using a single process only.",
                 "concurrency. We'll be using a single process only.",
                 UserWarning)
                 UserWarning)
         concurrency = 1
         concurrency = 1
+    
+    if not isinstance(loglevel, int):
+        loglevel = LOG_LEVELS[loglevel.upper()]
 
 
     if discard:
     if discard:
         discarded_count = discard_all()
         discarded_count = discard_all()
@@ -77,11 +133,24 @@ def main(concurrency=DAEMON_CONCURRENCY, daemon=False,
         sys.stderr.write("Discard: Erased %d %s from the queue.\n" % (
         sys.stderr.write("Discard: Erased %d %s from the queue.\n" % (
             discarded_count, what))
             discarded_count, what))
     if daemon:
     if daemon:
+        # Since without stderr any errors will be silently suppressed,
+        # we need to know that we have access to the logfile
+        pidlock = acquire_pidlock(pidfile)
+        if not umask:
+            umask = 0
+        if logfile:
+            open(logfile, "a").close()
+        uid = uid and int(uid) or os.geteuid()
+        gid = gid and int(gid) or os.getegid()
+        working_directory = working_directory or os.getcwd()
         sys.stderr.write("Launching celeryd in the background...\n")
         sys.stderr.write("Launching celeryd in the background...\n")
-        pidfile_handler = PIDFile(pidfile)
-        pidfile_handler.check()
-        daemonize(pidfile=pidfile_handler)
-        atexit.register(remove_pidfile, pidfile)
+        context = DaemonContext(chroot_directory=chroot,
+                                working_directory=working_directory,
+                                umask=umask,
+                                pidfile=pidlock,
+                                uid=uid,
+                                gid=gid)
+        context.open()
     else:
     else:
         logfile = None # log to stderr when not running as daemon.
         logfile = None # log to stderr when not running as daemon.
 
 
@@ -121,26 +190,34 @@ OPTION_LIST = (
             help="If the queue is empty, this is the time *in seconds* the "
             help="If the queue is empty, this is the time *in seconds* the "
                  "daemon sleeps until it wakes up to check if there's any "
                  "daemon sleeps until it wakes up to check if there's any "
                  "new messages on the queue."),
                  "new messages on the queue."),
-    optparse.make_option('-d', '--daemon', default=False,
+    optparse.make_option('-d', '--detach', '--daemon', default=False,
             action="store_true", dest="daemon",
             action="store_true", dest="daemon",
             help="Run in the background as a daemon."),
             help="Run in the background as a daemon."),
-)
+    optparse.make_option('-u', '--uid', default=None,
+            action="store", dest="uid",
+            help="User-id to run celeryd as when in daemon mode."),
+    optparse.make_option('-g', '--gid', default=None,
+            action="store", dest="gid",
+            help="Group-id to run celeryd as when in daemon mode."),
+    optparse.make_option('--umask', default=0,
+            action="store", type="int", dest="umask",
+            help="umask of the process when in daemon mode."),
+    optparse.make_option('--workdir', default=None,
+            action="store", dest="working_directory",
+            help="Directory to change to when in daemon mode."),
+    optparse.make_option('--chroot', default=None,
+            action="store", dest="chroot",
+            help="Change root directory to this path when in daemon mode."),
+    )
 
 
 
 
 def parse_options(arguments):
 def parse_options(arguments):
-    """Option parsers for the available options to ``celeryd``."""
+    """Parse the available options to ``celeryd``."""
     parser = optparse.OptionParser(option_list=OPTION_LIST)
     parser = optparse.OptionParser(option_list=OPTION_LIST)
     options, values = parser.parse_args(arguments)
     options, values = parser.parse_args(arguments)
-    if not isinstance(options.loglevel, int):
-        options.loglevel = LOG_LEVELS[options.loglevel.upper()]
     return options
     return options
 
 
+
 if __name__ == "__main__":
 if __name__ == "__main__":
     options = parse_options(sys.argv[1:])
     options = parse_options(sys.argv[1:])
-    main(concurrency=options.concurrency,
-         daemon=options.daemon,
-         logfile=options.logfile,
-         loglevel=options.loglevel,
-         pidfile=options.pidfile,
-         discard=options.discard,
-         queue_wakeup_after=options.queue_wakeup_after)
+    run_worker(**options)

+ 2 - 11
celery/management/commands/celeryd.py

@@ -4,8 +4,7 @@ Start the celery daemon from the Django management command.
 
 
 """
 """
 from django.core.management.base import BaseCommand
 from django.core.management.base import BaseCommand
-from celery.bin.celeryd import main, OPTION_LIST
-from celery.conf import LOG_LEVELS
+from celery.bin.celeryd import run_worker, OPTION_LIST
 
 
 
 
 class Command(BaseCommand):
 class Command(BaseCommand):
@@ -15,12 +14,4 @@ class Command(BaseCommand):
 
 
     def handle(self, *args, **options):
     def handle(self, *args, **options):
         """Handle the management command."""
         """Handle the management command."""
-        if not isinstance(options.get('loglevel'), int):
-            options['loglevel'] = LOG_LEVELS[options.get('loglevel').upper()]
-        main(concurrency=options.get('concurrency'),
-             daemon=options.get('daemon'),
-             logfile=options.get('logfile'),
-             discard=options.get('discard'),
-             loglevel=options.get('loglevel'),
-             pidfile=options.get('pidfile'),
-             queue_wakeup_after=options.get('queue_wakeup_after'))
+        run_worker(**options)

+ 0 - 122
celery/platform.py

@@ -1,122 +0,0 @@
-"""celery.platform"""
-import os
-import sys
-import errno
-import resource
-
-
-# File mode creation mask of the daemon.
-# No point in changing this, as we don't really create any files.
-DAEMON_UMASK = 0
-
-# Default working directory for the daemon.
-DAEMON_WORKDIR = "/"
-
-# Default maximum for the number of available file descriptors.
-DAEMON_MAXFD = 1024
-
-# The standard I/O file descriptors are redirected to /dev/null by default.
-if (hasattr(os, "devnull")):
-    REDIRECT_TO = os.devnull
-else:
-    REDIRECT_TO = "/dev/null"
-
-
-class PIDFile(object):
-    """Manages a pid file."""
-
-    def __init__(self, pidfile):
-        self.pidfile = pidfile
-
-    def get_pid(self):
-        """Get the process id stored in the pidfile."""
-        pidfile_fh = file(self.pidfile, "r")
-        pid = int(pidfile_fh.read().strip())
-        pidfile_fh.close()
-        return pid
-
-    def check(self):
-        """Check the status of the pidfile.
-
-        If the pidfile exists, and the process is not running, it will
-        remove the stale pidfile and continue as normal. If the process
-        *is* running, it will exit the program with an error message.
-
-        """
-        if os.path.exists(self.pidfile) and os.path.isfile(self.pidfile):
-            pid = self.get_pid()
-            try:
-                os.kill(pid, 0)
-            except os.error, e:
-                if e.errno == errno.ESRCH:
-                    sys.stderr.write("Stale pidfile exists. removing it.\n")
-                    self.remove()
-            else:
-                raise SystemExit("celeryd is already running.")
-
-    def remove(self):
-        """Remove the pidfile."""
-        os.unlink(self.pidfile)
-
-    def write(self, pid=None):
-        """Write a pidfile.
-
-        If ``pid`` is not specified the pid of the current process
-        will be used.
-
-        """
-        if not pid:
-            pid = os.getpid()
-        pidfile_fh = file(self.pidfile, "w")
-        pidfile_fh.write("%d\n" % pid)
-        pidfile_fh.close()
-
-
-def remove_pidfile(pidfile):
-    """Remove the pidfile."""
-    os.unlink(pidfile)
-
-
-def daemonize(pidfile):
-    """Detach a process from the controlling terminal and run it in the
-    background as a daemon."""
-
-    try:
-        pid = os.fork()
-    except OSError, e:
-        raise Exception("%s [%d]" % (e.strerror, e.errno))
-
-    if pid == 0: # child
-        os.setsid()
-
-        try:
-            pid = os.fork() # second child
-        except OSError, e:
-            raise Exception("%s [%d]" % (e.strerror, e.errno))
-
-        if pid == 0: # second child
-            #os.chdir(DAEMON_WORKDIR)
-            os.umask(DAEMON_UMASK)
-        else: # parent (first child)
-            pidfile.write(pid)
-            os._exit(0)
-    else: # root process
-        os._exit(0)
-
-    maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
-    if (maxfd == resource.RLIM_INFINITY):
-        maxfd = DAEMON_MAXFD
-
-    # Iterate through and close all file descriptors.
-    for fd in range(0, maxfd):
-        try:
-            os.close(fd)
-        except OSError:
-            pass
-
-    os.open(REDIRECT_TO, os.O_RDWR)
-    # Duplicate standard input to standard output and standard error.
-    os.dup2(0, 1)
-    os.dup2(0, 2)
-
-    return 0

+ 3 - 3
celery/views.py

@@ -2,13 +2,13 @@
 from django.http import HttpResponse
 from django.http import HttpResponse
 from celery.task import is_done, delay_task
 from celery.task import is_done, delay_task
 from celery.result import AsyncResult
 from celery.result import AsyncResult
-import simplejson
+from carrot.serialization import serialize as JSON_dump
 
 
 
 
 def is_task_done(request, task_id):
 def is_task_done(request, task_id):
     """Returns task execute status in JSON format."""
     """Returns task execute status in JSON format."""
     response_data = {"task": {"id": task_id, "executed": is_done(task_id)}}
     response_data = {"task": {"id": task_id, "executed": is_done(task_id)}}
-    return HttpResponse(simplejson.dumps(response_data))
+    return HttpResponse(JSON_dump(response_data))
 
 
 
 
 def task_status(request, task_id):
 def task_status(request, task_id):
@@ -19,4 +19,4 @@ def task_status(request, task_id):
                         "status": async_result.status,
                         "status": async_result.status,
                         "result": async_result.result,
                         "result": async_result.result,
     }}
     }}
-    return HttpResponse(simplejson.dumps(response_data))
+    return HttpResponse(JSON_dump(response_data))

+ 14 - 3
celery/worker.py

@@ -75,9 +75,20 @@ def jail(task_id, task_name, func, args, kwargs):
     from django.db import connection
     from django.db import connection
     connection.close()
     connection.close()
 
 
-    # Reset cache connection
-    from django.core.cache import cache
-    cache.close()
+    # Reset cache connection only if using memcached/libmemcached
+    from django.core import cache
+    # XXX At Opera we use a custom memcached backend that uses libmemcached
+    # instead of libmemcache (cmemcache). Should find a better solution for
+    # this, but for now "memcached" should probably be unique enough of a
+    # string to not make problems.
+    cache_backend = cache.settings.CACHE_BACKEND
+    if hasattr(cache, "parse_backend_uri"):
+        cache_scheme = cache.parse_backend_uri(cache_backend)[0]
+    else:
+        # Django <= 1.0.2
+        cache_scheme = cache_backend.split(":", 1)[0]
+    if "memcached" in scheme:
+        cache.cache.close()
 
 
     # Backend process cleanup
     # Backend process cleanup
     default_backend.process_cleanup()
     default_backend.process_cleanup()

+ 0 - 10
docs/reference/celery.platform.rst

@@ -1,10 +0,0 @@
-====================================
-Platform Specific - celery.platform
-====================================
-
-.. currentmodule:: celery.platform
-
-.. automodule:: celery.platform
-    :members:
-
-

+ 8 - 0
docs/reference/celery.pool.rst

@@ -0,0 +1,8 @@
+=============================
+Task Pool - celery.pool
+=============================
+
+.. currentmodule:: celery.pool
+
+.. automodule:: celery.pool
+    :members:

+ 1 - 0
docs/reference/index.rst

@@ -13,6 +13,7 @@
     celery.registry
     celery.registry
     celery.discovery
     celery.discovery
     celery.worker
     celery.worker
+    celery.pool
     celery.backends
     celery.backends
     celery.backends.base
     celery.backends.base
     celery.backends.database
     celery.backends.database

+ 1 - 1
setup.py

@@ -63,8 +63,8 @@ setup(
     scripts=["bin/celeryd"],
     scripts=["bin/celeryd"],
     zip_safe=False,
     zip_safe=False,
     install_requires=[
     install_requires=[
-        'simplejson',
         'carrot>=0.4.1',
         'carrot>=0.4.1',
+        'python-daemon',
         'django',
         'django',
     ],
     ],
     cmdclass = {"test": RunTests},
     cmdclass = {"test": RunTests},