Browse Source

Merge branch '3.0'

Conflicts:
	celery/beat.py
	celery/contrib/rdb.py
	celery/platforms.py
	celery/worker/autoreload.py
Ask Solem 12 years ago
parent
commit
1cd3ab5725

+ 1 - 4
celery/beat.py

@@ -322,11 +322,8 @@ class PersistentScheduler(Scheduler):
 
 
     def _remove_db(self):
     def _remove_db(self):
         for suffix in self.known_suffixes:
         for suffix in self.known_suffixes:
-            try:
+            with platforms.ignore_errno(errno.ENOENT):
                 os.remove(self.schedule_filename + suffix)
                 os.remove(self.schedule_filename + suffix)
-            except OSError as exc:
-                if exc.errno != errno.ENOENT:
-                    raise
 
 
     def setup_schedule(self):
     def setup_schedule(self):
         try:
         try:

+ 4 - 3
celery/bin/celeryd_multi.py

@@ -92,6 +92,7 @@ from __future__ import absolute_import, print_function
 
 
 import errno
 import errno
 import os
 import os
+import shlex
 import signal
 import signal
 import socket
 import socket
 import sys
 import sys
@@ -105,7 +106,7 @@ from kombu.utils import cached_property
 from kombu.utils.encoding import from_utf8
 from kombu.utils.encoding import from_utf8
 
 
 from celery import VERSION_BANNER
 from celery import VERSION_BANNER
-from celery.platforms import PIDFile, shellsplit
+from celery.platforms import Pidfile, IS_WINDOWS
 from celery.utils import term
 from celery.utils import term
 from celery.utils.text import pluralize
 from celery.utils.text import pluralize
 
 
@@ -299,7 +300,7 @@ class MultiTool(object):
             pid = None
             pid = None
             pidfile = expander(pidfile_template)
             pidfile = expander(pidfile_template)
             try:
             try:
-                pid = PIDFile(pidfile).read_pid()
+                pid = Pidfile(pidfile).read_pid()
             except ValueError:
             except ValueError:
                 pass
                 pass
             if pid:
             if pid:
@@ -373,7 +374,7 @@ class MultiTool(object):
 
 
     def waitexec(self, argv, path=sys.executable):
     def waitexec(self, argv, path=sys.executable):
         args = ' '.join([path] + list(argv))
         args = ' '.join([path] + list(argv))
-        argstr = shellsplit(from_utf8(args))
+        argstr = shlex.split(from_utf8(args), posix=not IS_WINDOWS)
         pipe = Popen(argstr, env=self.env)
         pipe = Popen(argstr, env=self.env)
         self.info('  {0}'.format(' '.join(argstr)))
         self.info('  {0}'.format(' '.join(argstr)))
         retcode = pipe.wait()
         retcode = pipe.wait()

+ 3 - 5
celery/contrib/rdb.py

@@ -46,6 +46,8 @@ from pdb import Pdb
 
 
 from billiard import current_process
 from billiard import current_process
 
 
+from celery.platforms import ignore_errno
+
 default_port = 6899
 default_port = 6899
 
 
 CELERY_RDB_HOST = os.environ.get('CELERY_RDB_HOST') or '127.0.0.1'
 CELERY_RDB_HOST = os.environ.get('CELERY_RDB_HOST') or '127.0.0.1'
@@ -148,12 +150,8 @@ class Rdb(Pdb):
     def set_trace(self, frame=None):
     def set_trace(self, frame=None):
         if frame is None:
         if frame is None:
             frame = _frame().f_back
             frame = _frame().f_back
-        try:
+        with ignore_errno(errno.ECONNRESET):
             Pdb.set_trace(self, frame)
             Pdb.set_trace(self, frame)
-        except socket.error as exc:
-            # connection reset by peer.
-            if exc.errno != errno.ECONNRESET:
-                raise
 
 
     def set_quit(self):
     def set_quit(self):
         # this raises a BdbQuit exception that we are unable to catch.
         # this raises a BdbQuit exception that we are unable to catch.

+ 83 - 76
celery/platforms.py

@@ -13,22 +13,21 @@ import atexit
 import errno
 import errno
 import os
 import os
 import platform as _platform
 import platform as _platform
-import shlex
 import signal as _signal
 import signal as _signal
 import sys
 import sys
 
 
+from billiard import current_process
 from contextlib import contextmanager
 from contextlib import contextmanager
 from itertools import imap
 from itertools import imap
 
 
 from .local import try_import
 from .local import try_import
 
 
-from kombu.utils.limits import TokenBucket
-
 _setproctitle = try_import('setproctitle')
 _setproctitle = try_import('setproctitle')
 resource = try_import('resource')
 resource = try_import('resource')
 pwd = try_import('pwd')
 pwd = try_import('pwd')
 grp = try_import('grp')
 grp = try_import('grp')
 
 
+# exitcodes
 EX_OK = getattr(os, 'EX_OK', 0)
 EX_OK = getattr(os, 'EX_OK', 0)
 EX_FAILURE = 1
 EX_FAILURE = 1
 EX_UNAVAILABLE = getattr(os, 'EX_UNAVAILABLE', 69)
 EX_UNAVAILABLE = getattr(os, 'EX_UNAVAILABLE', 69)
@@ -44,13 +43,12 @@ DAEMON_WORKDIR = '/'
 PIDFILE_FLAGS = os.O_CREAT | os.O_EXCL | os.O_WRONLY
 PIDFILE_FLAGS = os.O_CREAT | os.O_EXCL | os.O_WRONLY
 PIDFILE_MODE = ((os.R_OK | os.W_OK) << 6) | ((os.R_OK) << 3) | ((os.R_OK))
 PIDFILE_MODE = ((os.R_OK | os.W_OK) << 6) | ((os.R_OK) << 3) | ((os.R_OK))
 
 
-_setps_bucket = TokenBucket(0.5)  # 30/m, every 2 seconds
-
 PIDLOCKED = """ERROR: Pidfile ({0}) already exists.
 PIDLOCKED = """ERROR: Pidfile ({0}) already exists.
-Seems we're already running? (PID: {1})"""
+Seems we're already running? (pid: {1})"""
 
 
 
 
 def pyimplementation():
 def pyimplementation():
+    """Returns string identifying the current Python implementation."""
     if hasattr(_platform, 'python_implementation'):
     if hasattr(_platform, 'python_implementation'):
         return _platform.python_implementation()
         return _platform.python_implementation()
     elif sys.platform.startswith('java'):
     elif sys.platform.startswith('java'):
@@ -65,6 +63,12 @@ def pyimplementation():
 
 
 
 
 def _find_option_with_arg(argv, short_opts=None, long_opts=None):
 def _find_option_with_arg(argv, short_opts=None, long_opts=None):
+    """Search argv for option specifying its short and longopt
+    alternatives.
+
+    Returns the value of the option if found.
+
+    """
     for i, arg in enumerate(argv):
     for i, arg in enumerate(argv):
         if arg.startswith('-'):
         if arg.startswith('-'):
             if long_opts and arg.startswith('--'):
             if long_opts and arg.startswith('--'):
@@ -77,6 +81,10 @@ def _find_option_with_arg(argv, short_opts=None, long_opts=None):
 
 
 
 
 def maybe_patch_concurrency(argv, short_opts=None, long_opts=None):
 def maybe_patch_concurrency(argv, short_opts=None, long_opts=None):
+    """With short and long opt alternatives that specify the command-line
+    option to set the pool, this makes sure that anything that needs
+    to be patched is completed as early as possible.
+    (e.g. eventlet/gevent monkey patches)."""
     try:
     try:
         pool = _find_option_with_arg(argv, short_opts, long_opts)
         pool = _find_option_with_arg(argv, short_opts, long_opts)
     except KeyError:
     except KeyError:
@@ -89,7 +97,6 @@ def maybe_patch_concurrency(argv, short_opts=None, long_opts=None):
 
 
 class LockFailed(Exception):
 class LockFailed(Exception):
     """Raised if a pidlock can't be acquired."""
     """Raised if a pidlock can't be acquired."""
-    pass
 
 
 
 
 def get_fdmax(default=None):
 def get_fdmax(default=None):
@@ -106,13 +113,14 @@ def get_fdmax(default=None):
     return fdmax
     return fdmax
 
 
 
 
-class PIDFile(object):
-    """PID lock file.
+class Pidfile(object):
+    """Pidfile
 
 
     This is the type returned by :func:`create_pidlock`.
     This is the type returned by :func:`create_pidlock`.
 
 
-    **Should not be used directly, use the :func:`create_pidlock`
-    context instead**
+    TIP: Use the :func:`create_pidlock` function instead,
+    which is more convenient and also removes stale pidfiles (when
+    the process holding the lock is no longer running).
 
 
     """
     """
 
 
@@ -142,34 +150,23 @@ class PIDFile(object):
 
 
     def read_pid(self):
     def read_pid(self):
         """Reads and returns the current pid."""
         """Reads and returns the current pid."""
-        try:
-            fh = open(self.path, 'r')
-        except IOError as exc:
-            if exc.errno == errno.ENOENT:
-                return
-            raise
-
-        try:
-            line = fh.readline()
-            if line.strip() == line:  # must contain '\n'
-                raise ValueError(
-                    'Partial or invalid pidfile {0.path}'.format(self))
-        finally:
-            fh.close()
-
-        try:
-            return int(line.strip())
-        except ValueError:
-            raise ValueError('PID file {0.path} invalid.'.format(self))
+        with ignore_errno('ENOENT'):
+            with open(self.path, 'r') as fh:
+                line = fh.readline()
+                if line.strip() == line:  # must contain '\n'
+                    raise ValueError(
+                        'Partial or invalid pidfile {0.path}'.format(self))
+
+                try:
+                    return int(line.strip())
+                except ValueError:
+                    raise ValueError(
+                        'pidfile {0.path} contents invalid.'.format(self))
 
 
     def remove(self):
     def remove(self):
         """Removes the lock."""
         """Removes the lock."""
-        try:
+        with ignore_errno(errno.ENOENT, errno.EACCES):
             os.unlink(self.path)
             os.unlink(self.path)
-        except OSError as exc:
-            if exc.errno in (errno.ENOENT, errno.EACCES):
-                return
-            raise
 
 
     def remove_if_stale(self):
     def remove_if_stale(self):
         """Removes the lock if the process is not running.
         """Removes the lock if the process is not running.
@@ -217,19 +214,21 @@ class PIDFile(object):
                     "Inconsistency: Pidfile content doesn't match at re-read")
                     "Inconsistency: Pidfile content doesn't match at re-read")
         finally:
         finally:
             rfh.close()
             rfh.close()
+PIDFile = Pidfile  # compat alias
 
 
 
 
 def create_pidlock(pidfile):
 def create_pidlock(pidfile):
-    """Create and verify pid file.
+    """Create and verify pidfile.
 
 
-    If the pid file already exists the program exits with an error message,
-    however if the process it refers to is not running anymore, the pid file
+    If the pidfile already exists the program exits with an error message,
+    however if the process it refers to is not running anymore, the pidfile
     is deleted and the program continues.
     is deleted and the program continues.
 
 
-    The caller is responsible for releasing the lock before the program
-    exits.
+    This function will automatically install an :mod:`atexit` handler
+    to release the lock at exit, you can skip this by calling
+    :func:`_create_pidlock` instead.
 
 
-    :returns: :class:`PIDFile`.
+    :returns: :class:`Pidfile`.
 
 
     **Example**:
     **Example**:
 
 
@@ -244,7 +243,7 @@ def create_pidlock(pidfile):
 
 
 
 
 def _create_pidlock(pidfile):
 def _create_pidlock(pidfile):
-    pidlock = PIDFile(pidfile)
+    pidlock = Pidfile(pidfile)
     if pidlock.is_locked() and not pidlock.remove_if_stale():
     if pidlock.is_locked() and not pidlock.remove_if_stale():
         raise SystemExit(PIDLOCKED.format(pidfile, pidlock.read_pid()))
         raise SystemExit(PIDLOCKED.format(pidfile, pidlock.read_pid()))
     pidlock.acquire()
     pidlock.acquire()
@@ -252,6 +251,7 @@ def _create_pidlock(pidfile):
 
 
 
 
 def fileno(f):
 def fileno(f):
+    """Get object fileno, or :const:`None` if not defined."""
     try:
     try:
         return f.fileno()
         return f.fileno()
     except AttributeError:
     except AttributeError:
@@ -260,13 +260,11 @@ def fileno(f):
 
 
 class DaemonContext(object):
 class DaemonContext(object):
     _is_open = False
     _is_open = False
-    workdir = DAEMON_WORKDIR
-    umask = DAEMON_UMASK
 
 
     def __init__(self, pidfile=None, workdir=None, umask=None,
     def __init__(self, pidfile=None, workdir=None, umask=None,
             fake=False, **kwargs):
             fake=False, **kwargs):
-        self.workdir = workdir or self.workdir
-        self.umask = self.umask if umask is None else umask
+        self.workdir = workdir or DAEMON_WORKDIR
+        self.umask = DAEMON_UMASK if umask is None else umask
         self.fake = fake
         self.fake = fake
         self.stdfds = (sys.stdin, sys.stdout, sys.stderr)
         self.stdfds = (sys.stdin, sys.stdout, sys.stderr)
 
 
@@ -286,7 +284,7 @@ class DaemonContext(object):
             preserve = [fileno(f) for f in self.stdfds if fileno(f)]
             preserve = [fileno(f) for f in self.stdfds if fileno(f)]
             for fd in reversed(range(get_fdmax(default=2048))):
             for fd in reversed(range(get_fdmax(default=2048))):
                 if fd not in preserve:
                 if fd not in preserve:
-                    with ignore_EBADF():
+                    with ignore_errno(errno.EBADF):
                         os.close(fd)
                         os.close(fd)
 
 
             for fd in self.stdfds:
             for fd in self.stdfds:
@@ -316,7 +314,7 @@ def detached(logfile=None, pidfile=None, uid=None, gid=None, umask=0,
 
 
     :keyword logfile: Optional log file.  The ability to write to this file
     :keyword logfile: Optional log file.  The ability to write to this file
        will be verified before the process is detached.
        will be verified before the process is detached.
-    :keyword pidfile: Optional pid file.  The pid file will not be created,
+    :keyword pidfile: Optional pidfile.  The pidfile will not be created,
       as this is the responsibility of the child.  But the process will
       as this is the responsibility of the child.  But the process will
       exit if the pid lock exists and the pid written is still running.
       exit if the pid lock exists and the pid written is still running.
     :keyword uid: Optional user id or user name to change
     :keyword uid: Optional user id or user name to change
@@ -332,7 +330,6 @@ def detached(logfile=None, pidfile=None, uid=None, gid=None, umask=0,
 
 
     .. code-block:: python
     .. code-block:: python
 
 
-        import atexit
         from celery.platforms import detached, create_pidlock
         from celery.platforms import detached, create_pidlock
 
 
         with detached(logfile='/var/log/app.log', pidfile='/var/run/app.pid',
         with detached(logfile='/var/log/app.log', pidfile='/var/run/app.pid',
@@ -418,6 +415,7 @@ def _setgroups_hack(groups):
 
 
 
 
 def setgroups(groups):
 def setgroups(groups):
+    """Set active groups from a list of group ids."""
     max_groups = None
     max_groups = None
     try:
     try:
         max_groups = os.sysconf('SC_NGROUPS_MAX')
         max_groups = os.sysconf('SC_NGROUPS_MAX')
@@ -434,6 +432,8 @@ def setgroups(groups):
 
 
 
 
 def initgroups(uid, gid):
 def initgroups(uid, gid):
+    """Compat version of :func:`os.initgroups` which was first
+    added to Python 2.7."""
     if not pwd:  # pragma: no cover
     if not pwd:  # pragma: no cover
         return
         return
     username = pwd.getpwuid(uid)[0]
     username = pwd.getpwuid(uid)[0]
@@ -444,25 +444,13 @@ def initgroups(uid, gid):
     setgroups(groups)
     setgroups(groups)
 
 
 
 
-def setegid(gid):
-    """Set effective group id."""
-    gid = parse_gid(gid)
-    if gid != os.getegid():
-        os.setegid(gid)
-
-
-def seteuid(uid):
-    """Set effective user id."""
-    uid = parse_uid(uid)
-    if uid != os.geteuid():
-        os.seteuid(uid)
-
-
 def setgid(gid):
 def setgid(gid):
+    """Version of :func:`os.setgid` supporting group names."""
     os.setgid(parse_gid(gid))
     os.setgid(parse_gid(gid))
 
 
 
 
 def setuid(uid):
 def setuid(uid):
+    """Version of :func:`os.setuid` supporting usernames."""
     os.setuid(parse_uid(uid))
     os.setuid(parse_uid(uid))
 
 
 
 
@@ -625,29 +613,48 @@ if os.environ.get('NOSETPS'):  # pragma: no cover
         pass
         pass
 else:
 else:
 
 
-    def set_mp_process_title(progname, info=None, hostname=None,  # noqa
-            rate_limit=False):
+    def set_mp_process_title(progname, info=None, hostname=None):  # noqa
         """Set the ps name using the multiprocessing process name.
         """Set the ps name using the multiprocessing process name.
 
 
         Only works if :mod:`setproctitle` is installed.
         Only works if :mod:`setproctitle` is installed.
 
 
         """
         """
-        if not rate_limit or _setps_bucket.can_consume(1):
-            from billiard import current_process
-            if hostname:
-                progname = '{0}@{1}'.format(progname, hostname.split('.')[0])
-            return set_process_title(
-                '{0}:{1}'.format(progname, current_process().name), info=info)
+        if hostname:
+            progname = '{0}@{1}'.format(progname, hostname.split('.')[0])
+        return set_process_title(
+            '{0}:{1}'.format(progname, current_process().name), info=info)
 
 
 
 
-def shellsplit(s):
-    return shlex.split(s, posix=not IS_WINDOWS)
+def get_errno(n):
+    """Get errno for string, e.g. ``ENOENT``."""
+    if isinstance(n, basestring):
+        return getattr(errno, n)
+    return n
 
 
 
 
 @contextmanager
 @contextmanager
-def ignore_EBADF():
+def ignore_errno(*errnos, **kwargs):
+    """Context manager to ignore specific POSIX error codes.
+
+    Takes a list of error codes to ignore, which can be either
+    the name of the code, or the code integer itself::
+
+        >>> with ignore_errno('ENOENT'):
+        ...     with open('foo', 'r'):
+        ...         return r.read()
+
+        >>> with ignore_errno(errno.ENOENT, errno.EPERM):
+        ...    pass
+
+    :keyword types: A tuple of exceptions to ignore (when the errno matches),
+                    defaults to :exc:`Exception`.
+    """
+    types = kwargs.get('types') or (Exception, )
+    errnos = [get_errno(errno) for errno in errnos]
     try:
     try:
         yield
         yield
-    except OSError as exc:
-        if exc.errno != errno.EBADF:
+    except types, exc:
+        if not hasattr(exc, 'errno'):
+            raise
+        if exc.errno not in errnos:
             raise
             raise

+ 8 - 8
celery/tests/bin/test_celeryd_multi.py

@@ -261,7 +261,7 @@ class test_MultiTool(Case):
         self.assertEqual(sigs[1][0], ('b', 11, signal.SIGKILL))
         self.assertEqual(sigs[1][0], ('b', 11, signal.SIGKILL))
         self.assertEqual(sigs[2][0], ('c', 12, signal.SIGKILL))
         self.assertEqual(sigs[2][0], ('c', 12, signal.SIGKILL))
 
 
-    def prepare_pidfile_for_getpids(self, PIDFile):
+    def prepare_pidfile_for_getpids(self, Pidfile):
         class pids(object):
         class pids(object):
 
 
             def __init__(self, path):
             def __init__(self, path):
@@ -273,13 +273,13 @@ class test_MultiTool(Case):
                             'celeryd@bar.pid': 11}[self.path]
                             'celeryd@bar.pid': 11}[self.path]
                 except KeyError:
                 except KeyError:
                     raise ValueError()
                     raise ValueError()
-        PIDFile.side_effect = pids
+        Pidfile.side_effect = pids
 
 
-    @patch('celery.bin.celeryd_multi.PIDFile')
+    @patch('celery.bin.celeryd_multi.Pidfile')
     @patch('socket.gethostname')
     @patch('socket.gethostname')
-    def test_getpids(self, gethostname, PIDFile):
+    def test_getpids(self, gethostname, Pidfile):
         gethostname.return_value = 'e.com'
         gethostname.return_value = 'e.com'
-        self.prepare_pidfile_for_getpids(PIDFile)
+        self.prepare_pidfile_for_getpids(Pidfile)
         callback = Mock()
         callback = Mock()
 
 
         p = NamespacedOptionParser(['foo', 'bar', 'baz'])
         p = NamespacedOptionParser(['foo', 'bar', 'baz'])
@@ -303,12 +303,12 @@ class test_MultiTool(Case):
         # without callback, should work
         # without callback, should work
         nodes = self.t.getpids(p, 'celeryd', callback=None)
         nodes = self.t.getpids(p, 'celeryd', callback=None)
 
 
-    @patch('celery.bin.celeryd_multi.PIDFile')
+    @patch('celery.bin.celeryd_multi.Pidfile')
     @patch('socket.gethostname')
     @patch('socket.gethostname')
     @patch('celery.bin.celeryd_multi.sleep')
     @patch('celery.bin.celeryd_multi.sleep')
-    def test_shutdown_nodes(self, slepp, gethostname, PIDFile):
+    def test_shutdown_nodes(self, slepp, gethostname, Pidfile):
         gethostname.return_value = 'e.com'
         gethostname.return_value = 'e.com'
-        self.prepare_pidfile_for_getpids(PIDFile)
+        self.prepare_pidfile_for_getpids(Pidfile)
         self.assertIsNone(self.t.shutdown_nodes([]))
         self.assertIsNone(self.t.shutdown_nodes([]))
         self.t.signal_node = Mock()
         self.t.signal_node = Mock()
         node_alive = self.t.node_alive = Mock()
         node_alive = self.t.node_alive = Mock()

+ 55 - 90
celery/tests/utilities/test_platforms.py

@@ -12,40 +12,38 @@ from celery import platforms
 from celery.platforms import (
 from celery.platforms import (
     get_fdmax,
     get_fdmax,
     shellsplit,
     shellsplit,
-    ignore_EBADF,
+    ignore_errno,
     set_process_title,
     set_process_title,
     signals,
     signals,
     maybe_drop_privileges,
     maybe_drop_privileges,
     setuid,
     setuid,
     setgid,
     setgid,
-    seteuid,
-    setegid,
     initgroups,
     initgroups,
     parse_uid,
     parse_uid,
     parse_gid,
     parse_gid,
     detached,
     detached,
     DaemonContext,
     DaemonContext,
     create_pidlock,
     create_pidlock,
-    PIDFile,
+    Pidfile,
     LockFailed,
     LockFailed,
     setgroups,
     setgroups,
     _setgroups_hack
     _setgroups_hack
 )
 )
 
 
-from celery.tests.utils import Case, WhateverIO, override_stdouts
+from celery.tests.utils import Case, WhateverIO, override_stdouts, mock_open
 
 
 
 
-class test_ignore_EBADF(Case):
+class test_ignore_errno(Case):
 
 
     def test_raises_EBADF(self):
     def test_raises_EBADF(self):
-        with ignore_EBADF():
+        with ignore_errno('EBADF'):
             exc = OSError()
             exc = OSError()
             exc.errno = errno.EBADF
             exc.errno = errno.EBADF
             raise exc
             raise exc
 
 
     def test_otherwise(self):
     def test_otherwise(self):
         with self.assertRaises(OSError):
         with self.assertRaises(OSError):
-            with ignore_EBADF():
+            with ignore_errno('EBADF'):
                 exc = OSError()
                 exc = OSError()
                 exc.errno = errno.ENOENT
                 exc.errno = errno.ENOENT
                 raise exc
                 raise exc
@@ -178,20 +176,6 @@ if not current_app.IS_WINDOWS:
             parse_uid.assert_called_with('user')
             parse_uid.assert_called_with('user')
             _setuid.assert_called_with(5001)
             _setuid.assert_called_with(5001)
 
 
-        @patch('celery.platforms.parse_uid')
-        @patch('os.geteuid')
-        @patch('os.seteuid')
-        def test_seteuid(self, _seteuid, _geteuid, parse_uid):
-            parse_uid.return_value = 5001
-            _geteuid.return_value = 5001
-            seteuid('user')
-            parse_uid.assert_called_with('user')
-            self.assertFalse(_seteuid.called)
-
-            _geteuid.return_value = 1
-            seteuid('user')
-            _seteuid.assert_called_with(5001)
-
         @patch('celery.platforms.parse_gid')
         @patch('celery.platforms.parse_gid')
         @patch('os.setgid')
         @patch('os.setgid')
         def test_setgid(self, _setgid, parse_gid):
         def test_setgid(self, _setgid, parse_gid):
@@ -200,20 +184,6 @@ if not current_app.IS_WINDOWS:
             parse_gid.assert_called_with('group')
             parse_gid.assert_called_with('group')
             _setgid.assert_called_with(50001)
             _setgid.assert_called_with(50001)
 
 
-        @patch('celery.platforms.parse_gid')
-        @patch('os.getegid')
-        @patch('os.setegid')
-        def test_setegid(self, _setegid, _getegid, parse_gid):
-            parse_gid.return_value = 50001
-            _getegid.return_value = 50001
-            setegid('group')
-            parse_gid.assert_called_with('group')
-            self.assertFalse(_setegid.called)
-
-            _getegid.return_value = 1
-            setegid('group')
-            _setegid.assert_called_with(50001)
-
         def test_parse_uid_when_int(self):
         def test_parse_uid_when_int(self):
             self.assertEqual(parse_uid(5001), 5001)
             self.assertEqual(parse_uid(5001), 5001)
 
 
@@ -362,11 +332,11 @@ if not current_app.IS_WINDOWS:
                 pass
                 pass
             self.assertFalse(x._detach.called)
             self.assertFalse(x._detach.called)
 
 
-    class test_PIDFile(Case):
+    class test_Pidfile(Case):
 
 
-        @patch('celery.platforms.PIDFile')
-        def test_create_pidlock(self, PIDFile):
-            p = PIDFile.return_value = Mock()
+        @patch('celery.platforms.Pidfile')
+        def test_create_pidlock(self, Pidfile):
+            p = Pidfile.return_value = Mock()
             p.is_locked.return_value = True
             p.is_locked.return_value = True
             p.remove_if_stale.return_value = False
             p.remove_if_stale.return_value = False
             with self.assertRaises(SystemExit):
             with self.assertRaises(SystemExit):
@@ -377,7 +347,7 @@ if not current_app.IS_WINDOWS:
             self.assertIs(ret, p)
             self.assertIs(ret, p)
 
 
         def test_context(self):
         def test_context(self):
-            p = PIDFile('/var/pid')
+            p = Pidfile('/var/pid')
             p.write_pid = Mock()
             p.write_pid = Mock()
             p.remove = Mock()
             p.remove = Mock()
 
 
@@ -387,7 +357,7 @@ if not current_app.IS_WINDOWS:
             p.remove.assert_called_with()
             p.remove.assert_called_with()
 
 
         def test_acquire_raises_LockFailed(self):
         def test_acquire_raises_LockFailed(self):
-            p = PIDFile('/var/pid')
+            p = Pidfile('/var/pid')
             p.write_pid = Mock()
             p.write_pid = Mock()
             p.write_pid.side_effect = OSError()
             p.write_pid.side_effect = OSError()
 
 
@@ -397,59 +367,54 @@ if not current_app.IS_WINDOWS:
 
 
         @patch('os.path.exists')
         @patch('os.path.exists')
         def test_is_locked(self, exists):
         def test_is_locked(self, exists):
-            p = PIDFile('/var/pid')
+            p = Pidfile('/var/pid')
             exists.return_value = True
             exists.return_value = True
             self.assertTrue(p.is_locked())
             self.assertTrue(p.is_locked())
             exists.return_value = False
             exists.return_value = False
             self.assertFalse(p.is_locked())
             self.assertFalse(p.is_locked())
 
 
-        @patch('__builtin__.open')
-        def test_read_pid(self, open_):
-            s = open_.return_value = WhateverIO()
-            s.write('1816\n')
-            s.seek(0)
-            p = PIDFile('/var/pid')
-            self.assertEqual(p.read_pid(), 1816)
-
-        @patch('__builtin__.open')
-        def test_read_pid_partially_written(self, open_):
-            s = open_.return_value = WhateverIO()
-            s.write('1816')
-            s.seek(0)
-            p = PIDFile('/var/pid')
-            with self.assertRaises(ValueError):
-                p.read_pid()
-
-        @patch('__builtin__.open')
-        def test_read_pid_raises_ENOENT(self, open_):
+        def test_read_pid(self):
+            with mock_open() as s:
+                s.write('1816\n')
+                s.seek(0)
+                p = Pidfile('/var/pid')
+                self.assertEqual(p.read_pid(), 1816)
+
+        def test_read_pid_partially_written(self):
+            with mock_open() as s:
+                s.write('1816')
+                s.seek(0)
+                p = Pidfile('/var/pid')
+                with self.assertRaises(ValueError):
+                    p.read_pid()
+
+        def test_read_pid_raises_ENOENT(self):
             exc = IOError()
             exc = IOError()
             exc.errno = errno.ENOENT
             exc.errno = errno.ENOENT
-            open_.side_effect = exc
-            p = PIDFile('/var/pid')
-            self.assertIsNone(p.read_pid())
+            with mock_open(side_effect=exc):
+                p = Pidfile('/var/pid')
+                self.assertIsNone(p.read_pid())
 
 
-        @patch('__builtin__.open')
-        def test_read_pid_raises_IOError(self, open_):
+        def test_read_pid_raises_IOError(self):
             exc = IOError()
             exc = IOError()
             exc.errno = errno.EAGAIN
             exc.errno = errno.EAGAIN
-            open_.side_effect = exc
-            p = PIDFile('/var/pid')
-            with self.assertRaises(IOError):
-                p.read_pid()
-
-        @patch('__builtin__.open')
-        def test_read_pid_bogus_pidfile(self, open_):
-            s = open_.return_value = WhateverIO()
-            s.write('eighteensixteen\n')
-            s.seek(0)
-            p = PIDFile('/var/pid')
-            with self.assertRaises(ValueError):
-                p.read_pid()
+            with mock_open(side_effect=exc):
+                p = Pidfile('/var/pid')
+                with self.assertRaises(IOError):
+                    p.read_pid()
+
+        def test_read_pid_bogus_pidfile(self):
+            with mock_open() as s:
+                s.write('eighteensixteen\n')
+                s.seek(0)
+                p = Pidfile('/var/pid')
+                with self.assertRaises(ValueError):
+                    p.read_pid()
 
 
         @patch('os.unlink')
         @patch('os.unlink')
         def test_remove(self, unlink):
         def test_remove(self, unlink):
             unlink.return_value = True
             unlink.return_value = True
-            p = PIDFile('/var/pid')
+            p = Pidfile('/var/pid')
             p.remove()
             p.remove()
             unlink.assert_called_with(p.path)
             unlink.assert_called_with(p.path)
 
 
@@ -458,7 +423,7 @@ if not current_app.IS_WINDOWS:
             exc = OSError()
             exc = OSError()
             exc.errno = errno.ENOENT
             exc.errno = errno.ENOENT
             unlink.side_effect = exc
             unlink.side_effect = exc
-            p = PIDFile('/var/pid')
+            p = Pidfile('/var/pid')
             p.remove()
             p.remove()
             unlink.assert_called_with(p.path)
             unlink.assert_called_with(p.path)
 
 
@@ -467,7 +432,7 @@ if not current_app.IS_WINDOWS:
             exc = OSError()
             exc = OSError()
             exc.errno = errno.EACCES
             exc.errno = errno.EACCES
             unlink.side_effect = exc
             unlink.side_effect = exc
-            p = PIDFile('/var/pid')
+            p = Pidfile('/var/pid')
             p.remove()
             p.remove()
             unlink.assert_called_with(p.path)
             unlink.assert_called_with(p.path)
 
 
@@ -476,14 +441,14 @@ if not current_app.IS_WINDOWS:
             exc = OSError()
             exc = OSError()
             exc.errno = errno.EAGAIN
             exc.errno = errno.EAGAIN
             unlink.side_effect = exc
             unlink.side_effect = exc
-            p = PIDFile('/var/pid')
+            p = Pidfile('/var/pid')
             with self.assertRaises(OSError):
             with self.assertRaises(OSError):
                 p.remove()
                 p.remove()
             unlink.assert_called_with(p.path)
             unlink.assert_called_with(p.path)
 
 
         @patch('os.kill')
         @patch('os.kill')
         def test_remove_if_stale_process_alive(self, kill):
         def test_remove_if_stale_process_alive(self, kill):
-            p = PIDFile('/var/pid')
+            p = Pidfile('/var/pid')
             p.read_pid = Mock()
             p.read_pid = Mock()
             p.read_pid.return_value = 1816
             p.read_pid.return_value = 1816
             kill.return_value = 0
             kill.return_value = 0
@@ -498,7 +463,7 @@ if not current_app.IS_WINDOWS:
         @patch('os.kill')
         @patch('os.kill')
         def test_remove_if_stale_process_dead(self, kill):
         def test_remove_if_stale_process_dead(self, kill):
             with override_stdouts():
             with override_stdouts():
-                p = PIDFile('/var/pid')
+                p = Pidfile('/var/pid')
                 p.read_pid = Mock()
                 p.read_pid = Mock()
                 p.read_pid.return_value = 1816
                 p.read_pid.return_value = 1816
                 p.remove = Mock()
                 p.remove = Mock()
@@ -511,7 +476,7 @@ if not current_app.IS_WINDOWS:
 
 
         def test_remove_if_stale_broken_pid(self):
         def test_remove_if_stale_broken_pid(self):
             with override_stdouts():
             with override_stdouts():
-                p = PIDFile('/var/pid')
+                p = Pidfile('/var/pid')
                 p.read_pid = Mock()
                 p.read_pid = Mock()
                 p.read_pid.side_effect = ValueError()
                 p.read_pid.side_effect = ValueError()
                 p.remove = Mock()
                 p.remove = Mock()
@@ -520,7 +485,7 @@ if not current_app.IS_WINDOWS:
                 p.remove.assert_called_with()
                 p.remove.assert_called_with()
 
 
         def test_remove_if_stale_no_pidfile(self):
         def test_remove_if_stale_no_pidfile(self):
-            p = PIDFile('/var/pid')
+            p = Pidfile('/var/pid')
             p.read_pid = Mock()
             p.read_pid = Mock()
             p.read_pid.return_value = None
             p.read_pid.return_value = None
             p.remove = Mock()
             p.remove = Mock()
@@ -542,7 +507,7 @@ if not current_app.IS_WINDOWS:
             r.write('1816\n')
             r.write('1816\n')
             r.seek(0)
             r.seek(0)
 
 
-            p = PIDFile('/var/pid')
+            p = Pidfile('/var/pid')
             p.write_pid()
             p.write_pid()
             w.seek(0)
             w.seek(0)
             self.assertEqual(w.readline(), '1816\n')
             self.assertEqual(w.readline(), '1816\n')
@@ -569,7 +534,7 @@ if not current_app.IS_WINDOWS:
             r.write('11816\n')
             r.write('11816\n')
             r.seek(0)
             r.seek(0)
 
 
-            p = PIDFile('/var/pid')
+            p = Pidfile('/var/pid')
             with self.assertRaises(LockFailed):
             with self.assertRaises(LockFailed):
                 p.write_pid()
                 p.write_pid()
 
 

+ 1 - 0
celery/tests/utils.py

@@ -514,6 +514,7 @@ def mock_open(typ=WhateverIO, side_effect=None):
             if side_effect is not None:
             if side_effect is not None:
                 context.__enter__.side_effect = side_effect
                 context.__enter__.side_effect = side_effect
             val = context.__enter__.return_value = typ()
             val = context.__enter__.return_value = typ()
+            val.__exit__ = Mock()
             yield val
             yield val
 
 
 
 

+ 3 - 7
celery/worker/autoreload.py

@@ -7,7 +7,6 @@
 """
 """
 from __future__ import absolute_import
 from __future__ import absolute_import
 
 
-import errno
 import hashlib
 import hashlib
 import os
 import os
 import select
 import select
@@ -19,7 +18,7 @@ from threading import Event
 
 
 from kombu.utils import eventio
 from kombu.utils import eventio
 
 
-from celery.platforms import ignore_EBADF
+from celery.platforms import ignore_errno
 from celery.utils.imports import module_file
 from celery.utils.imports import module_file
 from celery.utils.log import get_logger
 from celery.utils.log import get_logger
 from celery.utils.threads import bgThread
 from celery.utils.threads import bgThread
@@ -149,7 +148,7 @@ class KQueueMonitor(BaseMonitor):
         for f, fd in self.filemap.iteritems():
         for f, fd in self.filemap.iteritems():
             if fd is not None:
             if fd is not None:
                 poller.unregister(fd)
                 poller.unregister(fd)
-                with ignore_EBADF():  # pragma: no cover
+                with ignore_errno('EBADF'):  # pragma: no cover
                     os.close(fd)
                     os.close(fd)
         self.filemap.clear()
         self.filemap.clear()
         self.fdmap.clear()
         self.fdmap.clear()
@@ -247,11 +246,8 @@ class Autoreloader(bgThread):
 
 
     def body(self):
     def body(self):
         self.on_init()
         self.on_init()
-        try:
+        with ignore_errno('EINTR', 'EAGAIN'):
             self._monitor.start()
             self._monitor.start()
-        except OSError as exc:
-            if exc.errno not in (errno.EINTR, errno.EAGAIN):
-                raise
 
 
     def _maybe_modified(self, f):
     def _maybe_modified(self, f):
         digest = file_hash(f)
         digest = file_hash(f)