Jelajahi Sumber

Use ignore_errno

Ask Solem 12 tahun lalu
induk
melakukan
6f2245a560

+ 1 - 4
celery/beat.py

@@ -322,11 +322,8 @@ class PersistentScheduler(Scheduler):
 
     def _remove_db(self):
         for suffix in self.known_suffixes:
-            try:
+            with platforms.ignore_errno(errno.ENOENT):
                 os.remove(self.schedule_filename + suffix)
-            except OSError, exc:
-                if exc.errno != errno.ENOENT:
-                    raise
 
     def setup_schedule(self):
         try:

+ 3 - 5
celery/contrib/rdb.py

@@ -45,6 +45,8 @@ from pdb import Pdb
 
 from billiard import current_process
 
+from celery.platforms import ignore_errno
+
 default_port = 6899
 
 CELERY_RDB_HOST = os.environ.get('CELERY_RDB_HOST') or '127.0.0.1'
@@ -133,12 +135,8 @@ class Rdb(Pdb):
     def set_trace(self, frame=None):
         if frame is None:
             frame = _frame().f_back
-        try:
+        with ignore_errno(errno.ECONNRESET):
             Pdb.set_trace(self, frame)
-        except socket.error, exc:
-            # connection reset by peer.
-            if exc.errno != errno.ECONNRESET:
-                raise
 
     def set_quit(self):
         # this raises a BdbQuit exception that we are unable to catch.

+ 23 - 26
celery/platforms.py

@@ -18,17 +18,17 @@ import shlex
 import signal as _signal
 import sys
 
+from billiard import current_process
 from contextlib import contextmanager
 
 from .local import try_import
 
-from kombu.utils.limits import TokenBucket
-
 _setproctitle = try_import('setproctitle')
 resource = try_import('resource')
 pwd = try_import('pwd')
 grp = try_import('grp')
 
+# exitcodes
 EX_OK = getattr(os, 'EX_OK', 0)
 EX_FAILURE = 1
 EX_UNAVAILABLE = getattr(os, 'EX_UNAVAILABLE', 69)
@@ -44,8 +44,6 @@ DAEMON_WORKDIR = '/'
 PIDFILE_FLAGS = os.O_CREAT | os.O_EXCL | os.O_WRONLY
 PIDFILE_MODE = ((os.R_OK | os.W_OK) << 6) | ((os.R_OK) << 3) | ((os.R_OK))
 
-_setps_bucket = TokenBucket(0.5)  # 30/m, every 2 seconds
-
 PIDLOCKED = """ERROR: Pidfile (%s) already exists.
 Seems we're already running? (PID: %s)"""
 
@@ -142,12 +140,8 @@ class PIDFile(object):
 
     def read_pid(self):
         """Reads and returns the current pid."""
-        try:
+        with ignore_errno('ENOENT'):
             fh = open(self.path, 'r')
-        except IOError, exc:
-            if exc.errno == errno.ENOENT:
-                return
-            raise
 
         try:
             line = fh.readline()
@@ -164,12 +158,8 @@ class PIDFile(object):
 
     def remove(self):
         """Removes the lock."""
-        try:
+        with ignore_errno(errno.ENOENT, errno.EACCES):
             os.unlink(self.path)
-        except OSError, exc:
-            if exc.errno in (errno.ENOENT, errno.EACCES):
-                return
-            raise
 
     def remove_if_stale(self):
         """Removes the lock if the process is not running.
@@ -286,7 +276,7 @@ class DaemonContext(object):
             preserve = [fileno(f) for f in self.stdfds if fileno(f)]
             for fd in reversed(range(get_fdmax(default=2048))):
                 if fd not in preserve:
-                    with ignore_EBADF():
+                    with ignore_errno(errno.EBADF):
                         os.close(fd)
 
             for fd in self.stdfds:
@@ -625,19 +615,16 @@ if os.environ.get('NOSETPS'):  # pragma: no cover
         pass
 else:
 
-    def set_mp_process_title(progname, info=None, hostname=None,  # noqa
-            rate_limit=False):
+    def set_mp_process_title(progname, info=None, hostname=None):  # noqa
         """Set the ps name using the multiprocessing process name.
 
         Only works if :mod:`setproctitle` is installed.
 
         """
-        if not rate_limit or _setps_bucket.can_consume(1):
-            from billiard import current_process
-            if hostname:
-                progname = '%s@%s' % (progname, hostname.split('.')[0])
-            return set_process_title(
-                '%s:%s' % (progname, current_process().name), info=info)
+        if hostname:
+            progname = '%s@%s' % (progname, hostname.split('.')[0])
+        return set_process_title(
+            '%s:%s' % (progname, current_process().name), info=info)
 
 
 def shellsplit(s, posix=True):
@@ -648,10 +635,20 @@ def shellsplit(s, posix=True):
     return list(lexer)
 
 
+def get_errno(n):
+    if isinstance(n, basestring):
+        return getattr(errno, n)
+    return n
+
+
 @contextmanager
-def ignore_EBADF():
+def ignore_errno(*errnos):
+    errnos = [get_errno(errno) for errno in errnos]
     try:
         yield
-    except OSError, exc:
-        if exc.errno != errno.EBADF:
+    except Exception, exc:
+        try:
+            if exc.errno not in errnos:
+                raise
+        except AttributeError:
             raise

+ 4 - 4
celery/tests/utilities/test_platforms.py

@@ -13,7 +13,7 @@ from celery import platforms
 from celery.platforms import (
     get_fdmax,
     shellsplit,
-    ignore_EBADF,
+    ignore_errno,
     set_process_title,
     signals,
     maybe_drop_privileges,
@@ -36,17 +36,17 @@ from celery.platforms import (
 from celery.tests.utils import Case, WhateverIO, override_stdouts
 
 
-class test_ignore_EBADF(Case):
+class test_ignore_errno(Case):
 
     def test_raises_EBADF(self):
-        with ignore_EBADF():
+        with ignore_errno('EBADF'):
             exc = OSError()
             exc.errno = errno.EBADF
             raise exc
 
     def test_otherwise(self):
         with self.assertRaises(OSError):
-            with ignore_EBADF():
+            with ignore_errno('EBADF'):
                 exc = OSError()
                 exc.errno = errno.ENOENT
                 raise exc

+ 3 - 7
celery/worker/autoreload.py

@@ -8,7 +8,6 @@
 from __future__ import absolute_import
 from __future__ import with_statement
 
-import errno
 import hashlib
 import os
 import select
@@ -19,7 +18,7 @@ from collections import defaultdict
 
 from kombu.utils import eventio
 
-from celery.platforms import ignore_EBADF
+from celery.platforms import ignore_errno
 from celery.utils.imports import module_file
 from celery.utils.log import get_logger
 from celery.utils.threads import bgThread, Event
@@ -149,7 +148,7 @@ class KQueueMonitor(BaseMonitor):
         for f, fd in self.filemap.iteritems():
             if fd is not None:
                 poller.unregister(fd)
-                with ignore_EBADF():  # pragma: no cover
+                with ignore_errno('EBADF'):  # pragma: no cover
                     os.close(fd)
         self.filemap.clear()
         self.fdmap.clear()
@@ -247,11 +246,8 @@ class Autoreloader(bgThread):
 
     def body(self):
         self.on_init()
-        try:
+        with ignore_errno('EINTR', 'EAGAIN'):
             self._monitor.start()
-        except OSError, exc:
-            if exc.errno not in (errno.EINTR, errno.EAGAIN):
-                raise
 
     def _maybe_modified(self, f):
         digest = file_hash(f)