Browse Source

Loads of new tests

Ask Solem 13 years ago
parent
commit
c8b639bc60

+ 1 - 1
celery/__init__.py

@@ -17,7 +17,7 @@ __docformat__ = "restructuredtext"
 # Lazy loading
 from .__compat__ import recreate_module
 
-old_module, new_module = recreate_module(__name__,
+old_module, new_module = recreate_module(__name__,  # pragma: no cover
     by_module={
         "celery.app":       ["Celery", "bugreport"],
         "celery.app.state": ["current_app", "current_task"],

+ 2 - 1
celery/app/defaults.py

@@ -40,7 +40,8 @@ DEFAULT_TASK_LOG_FMT = """[%(asctime)s: %(levelname)s/%(processName)s] \
 
 
 def str_to_bool(term, table={"false": False, "no": False, "0": False,
-                             "true":  True, "yes": True,  "1": True}):
+                             "true":  True, "yes": True,  "1": True,
+                             "on":    True, "off": False}):
     try:
         return table[term.lower()]
     except KeyError:

+ 28 - 29
celery/bin/camqadm.py

@@ -18,16 +18,11 @@ from amqplib import client_0_8 as amqp
 from celery.app import app_or_default
 from celery.utils.functional import padlist
 
+from celery.app.defaults import str_to_bool
 from celery.bin.base import Command
 
-# Valid string -> bool coercions.
-BOOLS = {"1": True, "0": False,
-         "on": True, "off": False,
-         "yes": True, "no": False,
-         "true": True, "False": False}
-
 # Map to coerce strings to other types.
-COERCE = {bool: lambda value: BOOLS[value.lower()]}
+COERCE = {bool: str_to_bool}
 
 HELP_HEADER = """
 Commands
@@ -40,8 +35,8 @@ Example:
 """
 
 
-def say(m):
-    sys.stderr.write("%s\n" % (m, ))
+def say(m, fh=sys.stderr):
+    fh.write("%s\n" % (m, ))
 
 
 class Spec(object):
@@ -197,13 +192,17 @@ class AMQShell(cmd.Cmd):
     def __init__(self, *args, **kwargs):
         self.connect = kwargs.pop("connect")
         self.silent = kwargs.pop("silent", False)
+        self.out = kwargs.pop("out", sys.stderr)
         cmd.Cmd.__init__(self, *args, **kwargs)
         self._reconnect()
 
-    def say(self, m):
+    def note(self, m):
         """Say something to the user. Disabled if :attr:`silent`."""
         if not self.silent:
-            say(m)
+            say(m, fh=self.out)
+
+    def say(self, m):
+        say(m, fh=self.out)
 
     def get_amqp_api_command(self, cmd, arglist):
         """With a command name and a list of arguments, convert the arguments
@@ -229,24 +228,24 @@ class AMQShell(cmd.Cmd):
 
     def do_exit(self, *args):
         """The `"exit"` command."""
-        self.say("\n-> please, don't leave!")
+        self.note("\n-> please, don't leave!")
         sys.exit(0)
 
     def display_command_help(self, cmd, short=False):
         spec = self.amqp[cmd]
-        say("%s %s" % (cmd, spec.format_signature()))
+        self.say("%s %s" % (cmd, spec.format_signature()))
 
     def do_help(self, *args):
         if not args:
-            say(HELP_HEADER)
+            self.say(HELP_HEADER)
             for cmd_name in self.amqp.keys():
                 self.display_command_help(cmd_name, short=True)
-            say(EXAMPLE_TEXT)
+            self.say(EXAMPLE_TEXT)
         else:
             self.display_command_help(args[0])
 
     def default(self, line):
-        say("unknown syntax: '%s'. how about some 'help'?" % line)
+        self.say("unknown syntax: '%s'. how about some 'help'?" % line)
 
     def get_names(self):
         return set(self.builtins) | set(self.amqp)
@@ -307,16 +306,16 @@ class AMQShell(cmd.Cmd):
             except (AttributeError, KeyError), exc:
                 self.default(line)
             except Exception, exc:
-                say(exc)
+                self.say(exc)
                 self.needs_reconnect = True
 
     def respond(self, retval):
         """What to do with the return value of a command."""
         if retval is not None:
             if isinstance(retval, basestring):
-                say(retval)
+                self.say(retval)
             else:
-                pprint.pprint(retval)
+                self.say(pprint.pformat(retval))
 
     def _reconnect(self):
         """Re-establish connection to the AMQP server."""
@@ -331,36 +330,36 @@ class AMQShell(cmd.Cmd):
 
 class AMQPAdmin(object):
     """The celery :program:`camqadm` utility."""
+    Shell = AMQShell
 
     def __init__(self, *args, **kwargs):
         self.app = app_or_default(kwargs.get("app"))
-        self.silent = bool(args)
-        if "silent" in kwargs:
-            self.silent = kwargs["silent"]
+        self.out = kwargs.setdefault("out", sys.stderr)
+        self.silent = kwargs.get("silent")
         self.args = args
 
     def connect(self, conn=None):
         if conn:
             conn.close()
         conn = self.app.broker_connection()
-        self.say("-> connecting to %s." % conn.as_uri())
+        self.note("-> connecting to %s." % conn.as_uri())
         conn.connect()
-        self.say("-> connected.")
+        self.note("-> connected.")
         return conn
 
     def run(self):
-        shell = AMQShell(connect=self.connect)
+        shell = self.Shell(connect=self.connect, out=self.out)
         if self.args:
             return shell.onecmd(" ".join(self.args))
         try:
             return shell.cmdloop()
         except KeyboardInterrupt:
-            self.say("(bibi)")
+            self.note("(bibi)")
             pass
 
-    def say(self, m):
+    def note(self, m):
         if not self.silent:
-            say(m)
+            say(m, fh=self.out)
 
 
 class AMQPAdminCommand(Command):
@@ -377,5 +376,5 @@ def camqadm(*args, **options):
 def main():
     AMQPAdminCommand().execute_from_commandline()
 
-if __name__ == "__main__":              # pragma: no cover
+if __name__ == "__main__":  # pragma: no cover
     main()

+ 21 - 15
celery/bin/celery.py

@@ -11,7 +11,7 @@ from pprint import pformat
 from textwrap import wrap
 
 from celery import __version__
-from celery.app import app_or_default, current_app
+from celery.app import app_or_default
 from celery.platforms import EX_OK, EX_FAILURE, EX_UNAVAILABLE, EX_USAGE
 from celery.utils import term
 from celery.utils.imports import symbol_by_name
@@ -51,6 +51,7 @@ class Command(object):
     help = ""
     args = ""
     version = __version__
+    prog_name = "celery"
 
     option_list = BaseCommand.preload_options + (
         Option("--quiet", "-q", action="store_true", dest="quiet",
@@ -59,9 +60,12 @@ class Command(object):
             help="Don't colorize output."),
     )
 
-    def __init__(self, app=None, no_color=False):
+    def __init__(self, app=None, no_color=False, stdout=sys.stdout,
+            stderr=sys.stderr):
         self.app = app_or_default(app)
         self.colored = term.colored(enabled=not no_color)
+        self.stdout = stdout
+        self.stderr = stderr
 
     def __call__(self, *args, **kwargs):
         try:
@@ -77,13 +81,13 @@ class Command(object):
         return EX_USAGE
 
     def error(self, s):
-        self.out(s, fh=sys.stderr)
+        self.out(s, fh=self.stderr)
 
-    def out(self, s, fh=sys.stdout):
+    def out(self, s, fh=None):
         s = str(s)
         if not s.endswith("\n"):
             s += "\n"
-        fh.write(s)
+        (fh or self.stdout).write(s)
 
     def create_parser(self, prog_name, command):
         return OptionParser(prog=prog_name,
@@ -118,12 +122,13 @@ class Command(object):
 
     def prettify_dict_ok_error(self, n):
         c = self.colored
-        if "ok" in n:
+        try:
             return (c.green("OK"),
                     indent(self.prettify(n["ok"])[1]))
-        elif "error" in n:
-            return (c.red("ERROR"),
-                    indent(self.prettify(n["error"])[1]))
+        except KeyError:
+            pass
+        return (c.red("ERROR"),
+                indent(self.prettify(n["error"])[1]))
 
     def prettify(self, n):
         OK = str(self.colored.green("OK"))
@@ -228,7 +233,7 @@ class apply(Command):
             try:
                 expires = maybe_iso8601(expires)
             except (TypeError, ValueError):
-                pass
+                raise
 
         res = self.app.send_task(name, args=args, kwargs=kwargs,
                                  countdown=kw.get("countdown"),
@@ -245,8 +250,8 @@ apply = command(apply)
 class purge(Command):
 
     def run(self, *args, **kwargs):
-        queues = len(current_app.amqp.queues.keys())
-        messages_removed = current_app.control.discard_all()
+        queues = len(self.app.amqp.queues.keys())
+        messages_removed = self.app.control.discard_all()
         if messages_removed:
             self.out("Purged %s %s from %s known task %s." % (
                 messages_removed, pluralize(messages_removed, "message"),
@@ -355,7 +360,8 @@ class status(Command):
 
     def run(self, *args, **kwargs):
         replies = inspect(app=self.app,
-                          no_color=kwargs.get("no_color", False)) \
+                          no_color=kwargs.get("no_color", False),
+                          stdout=self.stdout, stderr=self.stderr) \
                     .run("ping", **dict(kwargs, quiet=True, show_body=False))
         if not replies:
             raise Error("No nodes replied within time constraint",
@@ -388,7 +394,7 @@ class migrate(Command):
 migrate = command(migrate)
 
 
-class shell(Command):
+class shell(Command):  # pragma: no cover
     option_list = Command.option_list + (
                 Option("--ipython", "-I", action="store_true",
                     dest="force_ipython", default=False,
@@ -499,7 +505,7 @@ help = command(help)
 class report(Command):
 
     def run(self, *args, **kwargs):
-        print(self.app.bugreport())
+        self.out(self.app.bugreport())
         return EX_OK
 report = command(report)
 

+ 6 - 5
celery/bin/celeryd_detach.py

@@ -8,7 +8,7 @@ import sys
 from optparse import OptionParser, BadOptionError
 
 from celery import __version__
-from celery.platforms import detached
+from celery.platforms import EX_FAILURE, detached
 from celery.utils.log import get_logger
 
 from celery.bin.base import daemon_options, Option
@@ -31,6 +31,7 @@ def detach(path, argv, logfile=None, pidfile=None, uid=None,
             current_app.log.setup_logging_subsystem("ERROR", logfile)
             logger.critical("Can't exec %r", " ".join([path] + argv),
                             exc_info=True)
+        return EX_FAILURE
 
 
 class PartialOptionParser(OptionParser):
@@ -129,13 +130,13 @@ class detached_celeryd(object):
                     config.append(arg)
         prog_name = os.path.basename(argv[0])
         options, values, leftovers = self.parse_options(prog_name, argv[1:])
-        detach(path=self.execv_path,
-               argv=self.execv_argv + leftovers + config,
-               **vars(options))
+        sys.exit(detach(path=self.execv_path,
+                 argv=self.execv_argv + leftovers + config,
+                  **vars(options)))
 
 
 def main():
     detached_celeryd().execute_from_commandline()
 
-if __name__ == "__main__":
+if __name__ == "__main__":  # pragma: no cover
     main()

+ 29 - 15
celery/bin/celeryd_multi.py

@@ -98,6 +98,7 @@ from collections import defaultdict
 from subprocess import Popen
 from time import sleep
 
+from kombu.utils import cached_property
 from kombu.utils.encoding import from_utf8
 
 from celery import __version__
@@ -138,9 +139,15 @@ def main():
 class MultiTool(object):
     retcode = 0  # Final exit code.
 
-    def __init__(self, env=None, fh=None):
+    def __init__(self, env=None, fh=None, quiet=False, verbose=False,
+            no_color=False, nosplash=False):
         self.fh = fh or sys.stderr
         self.env = env
+        self.nosplash = nosplash
+        self.quiet = quiet
+        self.verbose = verbose
+        self.no_color = no_color
+        self.prog_name = "celeryd-multi"
         self.commands = {"start": self.start,
                          "show": self.show,
                          "stop": self.stop,
@@ -156,10 +163,6 @@ class MultiTool(object):
         argv = list(argv)   # don't modify callers argv.
 
         # Reserve the --nosplash|--quiet|-q/--verbose options.
-        self.nosplash = False
-        self.quiet = False
-        self.verbose = False
-        self.no_color = False
         if "--nosplash" in argv:
             self.nosplash = argv.pop(argv.index("--nosplash"))
         if "--quiet" in argv:
@@ -171,13 +174,8 @@ class MultiTool(object):
         if "--no-color" in argv:
             self.no_color = argv.pop(argv.index("--no-color"))
 
-        self.colored = term.colored(enabled=not self.no_color)
-        self.OK = str(self.colored.green("OK"))
-        self.FAILED = str(self.colored.red("FAILED"))
-        self.DOWN = str(self.colored.magenta("DOWN"))
-
         self.prog_name = os.path.basename(argv.pop(0))
-        if len(argv) == 0 or argv[0][0] == "-":
+        if not argv or argv[0][0] == "-":
             return self.error()
 
         try:
@@ -318,10 +316,10 @@ class MultiTool(object):
             self.note("Killing node %s (%s)" % (nodename, pid))
             self.signal_node(nodename, pid, signal.SIGKILL)
 
-    def stop(self, argv, cmd):
+    def stop(self, argv, cmd, retry=None, callback=None):
         self.splash()
         p = NamespacedOptionParser(argv)
-        return self._stop_nodes(p, cmd)
+        return self._stop_nodes(p, cmd, retry=retry, callback=callback)
 
     def _stop_nodes(self, p, cmd, retry=None, callback=None):
         restargs = p.args[len(p.values):]
@@ -398,6 +396,22 @@ class MultiTool(object):
         if not self.quiet:
             self.say(str(msg), newline=newline)
 
+    @cached_property
+    def colored(self):
+        return term.colored(enabled=not self.no_color)
+
+    @cached_property
+    def OK(self):
+        return str(self.colored.green("OK"))
+
+    @cached_property
+    def FAILED(self):
+        return str(self.colored.red("FAILED"))
+
+    @cached_property
+    def DOWN(self):
+        return str(self.colored.magenta("DOWN"))
+
 
 def multi_args(p, cmd="celeryd", append="", prefix="", suffix=""):
     names = p.values
@@ -501,7 +515,7 @@ def quote(v):
 def format_opt(opt, value):
     if not value:
         return opt
-    if opt[0:2] == "--":
+    if opt.startswith("--"):
         return "%s=%s" % (opt, value)
     return "%s %s" % (opt, value)
 
@@ -532,7 +546,7 @@ def abbreviations(map):
 
 def findsig(args, default=signal.SIGTERM):
     for arg in reversed(args):
-        if len(arg) == 2 and arg[0] == "-" and arg[1].isdigit():
+        if len(arg) == 2 and arg[0] == "-":
             try:
                 return int(arg[1])
             except ValueError:

+ 18 - 8
celery/events/dumper.py

@@ -34,8 +34,18 @@ def humanize_type(type):
         return type.lower().replace("-", " ")
 
 
+def say(msg, out=sys.stdout):
+    out.write(msg + "\n")
+
+
 class Dumper(object):
 
+    def __init__(self, out=sys.stdout):
+        self.out = out
+
+    def say(self, msg):
+        say(msg, out=self.out)
+
     def on_event(self, event):
         timestamp = datetime.utcfromtimestamp(event.pop("timestamp"))
         type = event.pop("type").lower()
@@ -54,21 +64,21 @@ class Dumper(object):
         fields = ", ".join("%s=%s" % (key, event[key])
                         for key in sorted(event.keys()))
         sep = fields and ":" or ""
-        print("%s [%s] %s%s %s" % (hostname, timestamp,
-                                    humanize_type(type), sep, fields))
+        self.say("%s [%s] %s%s %s" % (hostname, timestamp,
+                                      humanize_type(type), sep, fields))
 
     def format_task_event(self, hostname, timestamp, type, task, event):
         fields = ", ".join("%s=%s" % (key, event[key])
                         for key in sorted(event.keys()))
         sep = fields and ":" or ""
-        print("%s [%s] %s%s %s %s" % (hostname, timestamp,
-                                    humanize_type(type), sep, task, fields))
+        self.say("%s [%s] %s%s %s %s" % (hostname, timestamp,
+                    humanize_type(type), sep, task, fields))
 
 
-def evdump(app=None):
-    sys.stderr.write("-> evdump: starting capture...\n")
+def evdump(app=None, out=sys.stdout):
     app = app_or_default(app)
-    dumper = Dumper()
+    dumper = Dumper(out=out)
+    dumper.say("-> evdump: starting capture...")
     conn = app.broker_connection()
     recv = app.events.Receiver(conn, handlers={"*": dumper.on_event})
     try:
@@ -76,5 +86,5 @@ def evdump(app=None):
     except (KeyboardInterrupt, SystemExit):
         conn and conn.close()
 
-if __name__ == "__main__":
+if __name__ == "__main__":  # pragma: no cover
     evdump()

+ 8 - 18
celery/loaders/base.py

@@ -15,8 +15,6 @@ import anyjson
 import importlib
 import os
 import re
-import traceback
-import warnings
 
 from datetime import datetime
 
@@ -188,22 +186,14 @@ class BaseLoader(object):
             sender=None, to=None, host=None, port=None,
             user=None, password=None, timeout=None,
             use_ssl=False, use_tls=False):
-        try:
-            message = self.mail.Message(sender=sender, to=to,
-                                        subject=safe_str(subject),
-                                        body=safe_str(body))
-            mailer = self.mail.Mailer(host=host, port=port,
-                                      user=user, password=password,
-                                      timeout=timeout, use_ssl=use_ssl,
-                                      use_tls=use_tls)
-            mailer.send(message)
-        except Exception, exc:
-            if not fail_silently:
-                raise
-            warnings.warn(self.mail.SendmailWarning(
-                "Mail could not be sent: %r %r\n%r" % (
-                    exc, {"To": to, "Subject": subject},
-                    traceback.format_stack())))
+        message = self.mail.Message(sender=sender, to=to,
+                                    subject=safe_str(subject),
+                                    body=safe_str(body))
+        mailer = self.mail.Mailer(host=host, port=port,
+                                  user=user, password=password,
+                                  timeout=timeout, use_ssl=use_ssl,
+                                  use_tls=use_tls)
+        mailer.send(message, fail_silently=fail_silently)
 
     def read_configuration(self):
         return {}

+ 29 - 29
celery/platforms.py

@@ -45,6 +45,10 @@ DAEMON_UMASK = 0
 DAEMON_WORKDIR = "/"
 DAEMON_REDIRECT_TO = getattr(os, "devnull", "/dev/null")
 
+
+PIDFILE_FLAGS = os.O_CREAT | os.O_EXCL | os.O_WRONLY
+PIDFILE_MODE = ((os.R_OK | os.W_OK) << 6) | ((os.R_OK) << 3) | ((os.R_OK))
+
 _setps_bucket = TokenBucket(0.5)  # 30/m, every 2 seconds
 
 
@@ -172,11 +176,7 @@ class PIDFile(object):
         pid = os.getpid()
         content = "%d\n" % (pid, )
 
-        open_flags = (os.O_CREAT | os.O_EXCL | os.O_WRONLY)
-        open_mode = (((os.R_OK | os.W_OK) << 6) |
-                        ((os.R_OK) << 3) |
-                        ((os.R_OK)))
-        pidfile_fd = os.open(self.path, open_flags, open_mode)
+        pidfile_fd = os.open(self.path, PIDFILE_FLAGS, PIDFILE_MODE)
         pidfile = os.fdopen(pidfile_fd, "w")
         try:
             pidfile.write(content)
@@ -184,15 +184,18 @@ class PIDFile(object):
             pidfile.flush()
             try:
                 os.fsync(pidfile_fd)
-            except AttributeError:
+            except AttributeError:  # pragma: no cover
                 pass
         finally:
             pidfile.close()
 
-        with open(self.path) as fh:
-            if fh.read() != content:
+        rfh = open(self.path)
+        try:
+            if rfh.read() != content:
                 raise LockFailed(
                     "Inconsistency: Pidfile content doesn't match at re-read")
+        finally:
+            rfh.close()
 
 
 def create_pidlock(pidfile):
@@ -337,12 +340,10 @@ def parse_uid(uid):
     try:
         return int(uid)
     except ValueError:
-        if pwd:
-            try:
-                return pwd.getpwnam(uid).pw_uid
-            except KeyError:
-                raise KeyError("User does not exist: %r" % (uid, ))
-        raise
+        try:
+            return pwd.getpwnam(uid).pw_uid
+        except (AttributeError, KeyError):
+            raise KeyError("User does not exist: %r" % (uid, ))
 
 
 def parse_gid(gid):
@@ -355,12 +356,10 @@ def parse_gid(gid):
     try:
         return int(gid)
     except ValueError:
-        if grp:
-            try:
-                return grp.getgrnam(gid).gr_gid
-            except KeyError:
-                raise KeyError("Group does not exist: %r" % (gid, ))
-        raise
+        try:
+            return grp.getgrnam(gid).gr_gid
+        except (AttributeError, KeyError):
+            raise KeyError("Group does not exist: %r" % (gid, ))
 
 
 def _setgroups_hack(groups):
@@ -386,7 +385,7 @@ def setgroups(groups):
     max_groups = None
     try:
         max_groups = os.sysconf("SC_NGROUPS_MAX")
-    except:
+    except Exception:
         pass
     try:
         return _setgroups_hack(groups[:max_groups])
@@ -399,13 +398,14 @@ def setgroups(groups):
 
 
 def initgroups(uid, gid):
-    if grp and pwd:
-        username = pwd.getpwuid(uid)[0]
-        if hasattr(os, "initgroups"):  # Python 2.7+
-            return os.initgroups(username, gid)
-        groups = [gr.gr_gid for gr in grp.getgrall()
-                                if username in gr.gr_mem]
-        setgroups(groups)
+    if not pwd:  # pragma: no cover
+        return
+    username = pwd.getpwuid(uid)[0]
+    if hasattr(os, "initgroups"):  # Python 2.7+
+        return os.initgroups(username, gid)
+    groups = [gr.gr_gid for gr in grp.getgrall()
+                            if username in gr.gr_mem]
+    setgroups(groups)
 
 
 def setegid(gid):
@@ -583,7 +583,7 @@ def set_process_title(progname, info=None):
     return proctitle
 
 
-if os.environ.get("NOSETPS"):
+if os.environ.get("NOSETPS"):  # pragma: no cover
 
     def set_mp_process_title(*a, **k):
         pass

+ 1 - 1
celery/task/__init__.py

@@ -22,7 +22,7 @@ class module(MagicModule):
         return self.task(*args, **kwargs)
 
 
-old_module, new_module = recreate_module(__name__,
+old_module, new_module = recreate_module(__name__,  # pragma: no cover
     by_module={
         "celery.task.base":   ["BaseTask", "Task", "PeriodicTask",
                                "task", "periodic_task"],

+ 16 - 31
celery/tests/test_app/test_loaders.py

@@ -9,6 +9,7 @@ from mock import patch
 from celery import loaders
 from celery.app import app_or_default
 from celery.exceptions import (
+        NotConfigured,
         ImproperlyConfigured,
         CPendingDeprecationWarning,
 )
@@ -16,6 +17,7 @@ from celery.loaders import base
 from celery.loaders import default
 from celery.loaders.app import AppLoader
 from celery.utils.imports import NotAPackage
+from celery.utils.mail import SendmailWarning
 
 from celery.tests.utils import AppCase, Case
 from celery.tests.compat import catch_warnings
@@ -36,33 +38,11 @@ class Object(object):
             setattr(self, k, v)
 
 
-class MockMail(object):
-
-    class SendmailWarning(UserWarning):
-        pass
-
-    class Message(Object):
-        pass
-
-    class Mailer(Object):
-        sent = []
-        raise_on_send = False
-
-        def send(self, message):
-            if self.__class__.raise_on_send:
-                raise KeyError("foo")
-            self.sent.append(message)
-
-
 class DummyLoader(base.BaseLoader):
 
     def read_configuration(self):
         return {"foo": "bar", "CELERY_IMPORTS": ("os", "sys")}
 
-    @property
-    def mail(self):
-        return MockMail()
-
 
 class TestLoaders(AppCase):
 
@@ -123,23 +103,27 @@ class TestLoaderBase(Case):
         self.loader.import_from_cwd("foo", imp=imp)
         self.assertTrue(imp.called)
 
-    def test_mail_admins_errors(self):
-        MockMail.Mailer.raise_on_send = True
+    @patch("celery.utils.mail.Mailer._send")
+    def test_mail_admins_errors(self, send):
+        send.side_effect = KeyError()
         opts = dict(self.message_options, **self.server_options)
 
-        with self.assertWarnsRegex(MockMail.SendmailWarning, r'KeyError'):
+        with self.assertWarnsRegex(SendmailWarning, r'KeyError'):
             self.loader.mail_admins(fail_silently=True, **opts)
 
         with self.assertRaises(KeyError):
             self.loader.mail_admins(fail_silently=False, **opts)
 
-    def test_mail_admins(self):
-        MockMail.Mailer.raise_on_send = False
+    @patch("celery.utils.mail.Mailer._send")
+    def test_mail_admins(self, send):
         opts = dict(self.message_options, **self.server_options)
-
         self.loader.mail_admins(**opts)
-        message = MockMail.Mailer.sent.pop()
-        self.assertDictContainsSubset(vars(message), self.message_options)
+        self.assertTrue(send.call_args)
+        message = send.call_args[0][0]
+        self.assertEqual(message.to, [self.message_options["to"]])
+        self.assertEqual(message.subject, self.message_options["subject"])
+        self.assertEqual(message.sender, self.message_options["sender"])
+        self.assertEqual(message.body, self.message_options["body"])
 
     def test_mail_attribute(self):
         from celery.utils import mail
@@ -184,7 +168,8 @@ class TestDefaultLoader(Case):
     def test_read_configuration_importerror(self, find_module):
         find_module.side_effect = ImportError()
         l = default.Loader()
-        l.read_configuration()
+        with self.assertWarnsRegex(NotConfigured, r'make sure it exists'):
+            l.read_configuration()
 
     def test_read_configuration(self):
         from types import ModuleType

+ 10 - 0
celery/tests/test_backends/__init__.py

@@ -1,6 +1,7 @@
 from __future__ import absolute_import
 from __future__ import with_statement
 
+from celery import current_app
 from celery import backends
 from celery.backends.amqp import AMQPBackend
 from celery.backends.cache import CacheBackend
@@ -28,3 +29,12 @@ class TestBackends(Case):
     def test_unknown_backend(self):
         with self.assertRaises(ValueError):
             backends.get_backend_cls("fasodaopjeqijwqe")
+
+    def test_default_backend(self):
+        self.assertEqual(backends.default_backend, current_app.backend)
+
+    def test_backend_by_url(self, url="redis://localhost/1"):
+        from celery.backends.redis import RedisBackend
+        backend, url_ = backends.get_backend_by_url(url)
+        self.assertIs(backend, RedisBackend)
+        self.assertEqual(url_, url)

+ 152 - 0
celery/tests/test_bin/test_camqadm.py

@@ -0,0 +1,152 @@
+from __future__ import absolute_import
+from __future__ import with_statement
+
+from mock import Mock, patch
+
+from celery import Celery
+from celery.bin.camqadm import (
+    AMQPAdmin,
+    AMQShell,
+    dump_message,
+    AMQPAdminCommand,
+    camqadm,
+    main,
+)
+
+from celery.tests.utils import AppCase, WhateverIO
+
+
+class test_AMQShell(AppCase):
+
+    def setup(self):
+        self.fh = WhateverIO()
+        self.app = Celery(broker="memory://", set_as_current=False)
+        self.adm = self.create_adm()
+        self.shell = AMQShell(connect=self.adm.connect, out=self.fh)
+
+    def create_adm(self, *args, **kwargs):
+        return AMQPAdmin(app=self.app, out=self.fh, *args, **kwargs)
+
+    def test_queue_declare(self):
+        self.shell.onecmd("queue.declare foo")
+        self.assertIn("ok", self.fh.getvalue())
+
+    def test_missing_command(self):
+        self.shell.onecmd("foo foo")
+        self.assertIn("unknown syntax", self.fh.getvalue())
+
+    def RV(self):
+        raise Exception(self.fh.getvalue())
+
+    def test_missing_namespace(self):
+        self.shell.onecmd("ns.cmd arg")
+        self.assertIn("unknown syntax", self.fh.getvalue())
+
+    def test_help(self):
+        self.shell.onecmd("help")
+        self.assertIn("Example:", self.fh.getvalue())
+
+    def test_help_command(self):
+        self.shell.onecmd("help queue.declare")
+        self.assertIn("passive:no", self.fh.getvalue())
+
+    def test_help_unknown_command(self):
+        self.shell.onecmd("help foo.baz")
+        self.assertIn("unknown syntax", self.fh.getvalue())
+
+    def test_exit(self):
+        with self.assertRaises(SystemExit):
+            self.shell.onecmd("exit")
+        self.assertIn("don't leave!", self.fh.getvalue())
+
+    def test_note_silent(self):
+        self.shell.silent = True
+        self.shell.note("foo bar")
+        self.assertNotIn("foo bar", self.fh.getvalue())
+
+    def test_reconnect(self):
+        self.shell.onecmd("queue.declare foo")
+        self.shell.needs_reconnect = True
+        self.shell.onecmd("queue.delete foo")
+
+    def test_completenames(self):
+        self.assertEqual(self.shell.completenames("queue.dec"),
+                ["queue.declare"])
+        self.assertEqual(self.shell.completenames("declare"),
+                ["queue.declare", "exchange.declare"])
+
+    def test_empty_line(self):
+        self.shell.emptyline = Mock()
+        self.shell.default = Mock()
+        self.shell.onecmd("")
+        self.shell.emptyline.assert_called_with()
+        self.shell.onecmd("foo")
+        self.shell.default.assert_called_with("foo")
+
+    def test_respond(self):
+        self.shell.respond({"foo": "bar"})
+        self.assertIn("foo", self.fh.getvalue())
+
+    def test_prompt(self):
+        self.assertTrue(self.shell.prompt)
+
+    def test_no_returns(self):
+        self.shell.onecmd("queue.declare foo")
+        self.shell.onecmd("exchange.declare bar direct yes")
+        self.shell.onecmd("queue.bind foo bar baz")
+        self.shell.onecmd("basic.ack 1")
+
+    def test_dump_message(self):
+        m = Mock()
+        m.body = "the quick brown fox"
+        m.properties = {"a": 1}
+        m.delivery_info = {"exchange": "bar"}
+        self.assertTrue(dump_message(m))
+
+    def test_dump_message_no_message(self):
+        self.assertIn("No messages in queue", dump_message(None))
+
+    def test_note(self):
+        self.adm.silent = True
+        self.adm.note("FOO")
+        self.assertNotIn("FOO", self.fh.getvalue())
+
+    def test_run(self):
+        a = self.create_adm("queue.declare foo")
+        a.run()
+        self.assertIn("ok", self.fh.getvalue())
+
+    def test_run_loop(self):
+        a = self.create_adm()
+        a.Shell = Mock()
+        shell = a.Shell.return_value = Mock()
+        shell.cmdloop = Mock()
+        a.run()
+        shell.cmdloop.assert_called_with()
+
+        shell.cmdloop.side_effect = KeyboardInterrupt()
+        a.run()
+        self.assertIn("bibi", self.fh.getvalue())
+
+    @patch("celery.bin.camqadm.AMQPAdminCommand")
+    def test_main(self, Command):
+        c = Command.return_value = Mock()
+        main()
+        c.execute_from_commandline.assert_called_with()
+
+    @patch("celery.bin.camqadm.AMQPAdmin")
+    def test_camqadm(self, cls):
+        c = cls.return_value = Mock()
+        camqadm()
+        c.run.assert_called_with()
+
+    @patch("celery.bin.camqadm.AMQPAdmin")
+    def test_AMQPAdminCommand(self, cls):
+        c = cls.return_value = Mock()
+        camqadm()
+        c.run.assert_called_with()
+
+        x = AMQPAdminCommand(app=self.app)
+        x.run()
+        self.assertIs(cls.call_args[1]["app"], self.app)
+        c.run.assert_called_with()

+ 240 - 0
celery/tests/test_bin/test_celery.py

@@ -0,0 +1,240 @@
+from __future__ import absolute_import
+from __future__ import with_statement
+
+from anyjson import dumps
+from datetime import datetime
+from mock import Mock, patch
+
+from celery import task
+from celery.platforms import EX_FAILURE, EX_USAGE, EX_OK
+from celery.bin.celery import (
+    Command,
+    Error,
+    worker,
+    list_,
+    apply,
+    purge,
+    result,
+    inspect,
+    status,
+    migrate,
+    shell,
+    help,
+    report,
+    CeleryCommand,
+    determine_exit_status,
+    main,
+)
+
+from celery.tests.utils import AppCase, WhateverIO
+
+
+@task
+def add(x, y):
+    return x + y
+
+
+class test_Command(AppCase):
+
+    def test_Error_repr(self):
+        x = Error("something happened")
+        self.assertIsNotNone(x.status)
+        self.assertTrue(x.reason)
+        self.assertTrue(str(x))
+
+    def setup(self):
+        self.out = WhateverIO()
+        self.err = WhateverIO()
+        self.cmd = Command(self.app, stdout=self.out, stderr=self.err)
+
+    def test_show_help(self):
+        self.cmd.run_from_argv = Mock()
+        self.assertEqual(self.cmd.show_help("foo"), EX_USAGE)
+        self.cmd.run_from_argv.assert_called_with(
+                self.cmd.prog_name, ["foo", "--help"]
+        )
+
+    def test_error(self):
+        self.cmd.out = Mock()
+        self.cmd.error("FOO")
+        self.assertTrue(self.cmd.out.called)
+
+    def test_out(self):
+        f = Mock()
+        self.cmd.out("foo", f)
+        f.write.assert_called_with("foo\n")
+        self.cmd.out("foo\n", f)
+
+    def test_call(self):
+        self.cmd.run = Mock()
+        self.cmd.run.return_value = None
+        self.assertEqual(self.cmd(), EX_OK)
+
+        self.cmd.run.side_effect = Error("error", EX_FAILURE)
+        self.assertEqual(self.cmd(), EX_FAILURE)
+
+    def test_run_from_argv(self):
+        with self.assertRaises(NotImplementedError):
+            self.cmd.run_from_argv("prog", ["foo", "bar"])
+        self.assertEqual(self.cmd.prog_name, "prog")
+
+    def test_prettify_list(self):
+        self.assertEqual(self.cmd.prettify([])[1], "- empty -")
+        self.assertIn("bar", self.cmd.prettify(["foo", "bar"])[1])
+
+    def test_prettify_dict(self):
+        self.assertIn("OK",
+            str(self.cmd.prettify({"ok": "the quick brown fox"})[0]))
+        self.assertIn("ERROR",
+            str(self.cmd.prettify({"error": "the quick brown fox"})[0]))
+
+    def test_prettify(self):
+        self.assertIn("OK", str(self.cmd.prettify("the quick brown")))
+        self.assertIn("OK", str(self.cmd.prettify(object())))
+        self.assertIn("OK", str(self.cmd.prettify({"foo": "bar"})))
+
+
+class test_Delegate(AppCase):
+
+    def test_get_options(self):
+        self.assertTrue(worker().get_options())
+
+    def test_run(self):
+        w = worker()
+        w.target.run = Mock()
+        w.run()
+        w.target.run.assert_called_with()
+
+
+class test_list(AppCase):
+
+    def test_list_bindings_no_support(self):
+        l = list_(app=self.app, stderr=WhateverIO())
+        management = Mock()
+        management.get_bindings.side_effect = NotImplementedError()
+        with self.assertRaises(Error):
+            l.list_bindings(management)
+
+    def test_run(self):
+        l = list_(app=self.app, stderr=WhateverIO())
+        l.run("bindings")
+
+        with self.assertRaises(Error):
+            l.run(None)
+
+        with self.assertRaises(Error):
+            l.run("foo")
+
+
+class test_apply(AppCase):
+
+    @patch("celery.app.base.Celery.send_task")
+    def test_run(self, send_task):
+        a = apply(app=self.app, stderr=WhateverIO(), stdout=WhateverIO())
+        a.run("tasks.add")
+        self.assertTrue(send_task.called)
+
+        a.run("tasks.add",
+              args=dumps([4, 4]),
+              kwargs=dumps({"x": 2, "y": 2}))
+        self.assertEqual(send_task.call_args[1]["args"], [4, 4])
+        self.assertEqual(send_task.call_args[1]["kwargs"], {"x": 2, "y": 2})
+
+        a.run("tasks.add", expires=10, countdown=10)
+        self.assertEqual(send_task.call_args[1]["expires"], 10)
+        self.assertEqual(send_task.call_args[1]["countdown"], 10)
+
+        now = datetime.now()
+        iso = now.isoformat()
+        a.run("tasks.add", expires=iso)
+        self.assertEqual(send_task.call_args[1]["expires"], now)
+        with self.assertRaises(ValueError):
+            a.run("tasks.add", expires="foobaribazibar")
+
+
+class test_purge(AppCase):
+
+    @patch("celery.app.control.Control.discard_all")
+    def test_run(self, discard_all):
+        out = WhateverIO()
+        a = purge(app=self.app, stdout=out)
+        discard_all.return_value = 0
+        a.run()
+        self.assertIn("No messages purged", out.getvalue())
+
+        discard_all.return_value = 100
+        a.run()
+        self.assertIn("100 messages", out.getvalue())
+
+
+class test_result(AppCase):
+
+    @patch("celery.result.AsyncResult.get")
+    def test_run(self, get):
+        out = WhateverIO()
+        r = result(app=self.app, stdout=out)
+        get.return_value = "Jerry"
+        r.run("id")
+        self.assertIn("Jerry", out.getvalue())
+
+        get.return_value = "Elaine"
+        r.run("id", task=add.name)
+        self.assertIn("Elaine", out.getvalue())
+
+
+class test_status(AppCase):
+
+    @patch("celery.bin.celery.inspect")
+    def test_run(self, inspect_):
+        out, err = WhateverIO(), WhateverIO()
+        ins = inspect_.return_value = Mock()
+        ins.run.return_value = []
+        s = status(self.app, stdout=out, stderr=err)
+        with self.assertRaises(Error):
+            s.run()
+
+        ins.run.return_value = ["a", "b", "c"]
+        s.run()
+        self.assertIn("3 nodes online", out.getvalue())
+        s.run(quiet=True)
+
+
+class test_migrate(AppCase):
+
+    @patch("celery.contrib.migrate.migrate_tasks")
+    def test_run(self, migrate_tasks):
+        out = WhateverIO()
+        m = migrate(app=self.app, stdout=out, stderr=WhateverIO())
+        with self.assertRaises(SystemExit):
+            m.run()
+        self.assertFalse(migrate_tasks.called)
+
+        m.run("memory://foo", "memory://bar")
+        self.assertTrue(migrate_tasks.called)
+
+        state = Mock()
+        state.count = 10
+        state.strtotal = 30
+        m.on_migrate_task(state, {"task": "tasks.add", "id": "ID"}, None)
+        self.assertIn("10/30", out.getvalue())
+
+
+class test_report(AppCase):
+
+    def test_run(self):
+        out = WhateverIO()
+        r = report(app=self.app, stdout=out)
+        self.assertEqual(r.run(), EX_OK)
+        self.assertTrue(out.getvalue())
+
+
+class test_help(AppCase):
+
+    def test_run(self):
+        out = WhateverIO()
+        h = help(app=self.app, stdout=out)
+        h.parser = Mock()
+        self.assertEqual(h.run(), EX_USAGE)
+        self.assertTrue(out.getvalue())
+        self.assertTrue(h.usage("help"))
+        h.parser.print_help.assert_called_with()

+ 100 - 0
celery/tests/test_bin/test_celeryd_detach.py

@@ -0,0 +1,100 @@
+from __future__ import absolute_import
+from __future__ import with_statement
+
+from mock import Mock, patch
+
+from celery import current_app
+from celery.bin.celeryd_detach import (
+    detach,
+    detached_celeryd,
+    main,
+)
+
+from celery.tests.utils import Case, override_stdouts
+
+
+if not current_app.IS_WINDOWS:
+    class test_detached(Case):
+
+        @patch("celery.bin.celeryd_detach.detached")
+        @patch("os.execv")
+        @patch("celery.bin.celeryd_detach.logger")
+        @patch("celery.app.log.Logging.setup_logging_subsystem")
+        def test_execs(self, setup_logs, logger, execv, detached):
+            context = detached.return_value = Mock()
+            context.__enter__ = Mock()
+            context.__exit__ = Mock()
+
+            detach("/bin/boo", ["a", "b", "c"], logfile="/var/log",
+                    pidfile="/var/pid")
+            detached.assert_called_with("/var/log", "/var/pid", None, None, 0,
+                                        None, False)
+            execv.assert_called_with("/bin/boo", ["/bin/boo", "a", "b", "c"])
+
+            execv.side_effect = Exception("foo")
+            r = detach("/bin/boo", ["a", "b", "c"], logfile="/var/log",
+                    pidfile="/var/pid")
+            context.__enter__.assert_called_with()
+            self.assertTrue(logger.critical.called)
+            setup_logs.assert_called_with("ERROR", "/var/log")
+            self.assertEqual(r, 1)
+
+
+class test_PartialOptionParser(Case):
+
+    def test_parser(self):
+        x = detached_celeryd()
+        p = x.Parser("celeryd_detach")
+        options, values = p.parse_args(['--logfile=foo', "--fake", "--enable",
+                                        "a", "b", "-c1", "-d", "2"])
+        self.assertEqual(options.logfile, "foo")
+        self.assertEqual(values, ["a", "b"])
+        self.assertEqual(p.leftovers, ["--enable", "-c1", "-d", "2"])
+
+        with override_stdouts():
+            with self.assertRaises(SystemExit):
+                p.parse_args(["--logfile"])
+            p.get_option("--logfile").nargs = 2
+            with self.assertRaises(SystemExit):
+                p.parse_args(["--logfile=a"])
+            with self.assertRaises(SystemExit):
+                p.parse_args(["--fake=abc"])
+
+        assert p.get_option("--logfile").nargs == 2
+        p.parse_args(["--logfile=a", "b"])
+        p.get_option("--logfile").nargs = 1
+
+
+class test_Command(Case):
+    argv = ["--autoscale=10,2", "-c", "1",
+            "--logfile=/var/log", "-lDEBUG",
+            "--", ".disable_rate_limits=1"]
+
+    def test_parse_options(self):
+        x = detached_celeryd()
+        o, v, l = x.parse_options("cd", self.argv)
+        self.assertEqual(o.logfile, "/var/log")
+        self.assertEqual(l, ["--autoscale=10,2", "-c", "1",
+                             "-lDEBUG", "--logfile=/var/log",
+                             "--pidfile=celeryd.pid"])
+        x.parse_options("cd", [])  # no args
+
+    @patch("sys.exit")
+    @patch("celery.bin.celeryd_detach.detach")
+    def test_execute_from_commandline(self, detach, exit):
+        x = detached_celeryd()
+        x.execute_from_commandline(self.argv)
+        self.assertTrue(exit.called)
+        detach.assert_called_with(path=x.execv_path, uid=None, gid=None,
+            umask=0, working_directory=None, fake=False,
+            logfile='/var/log', pidfile='celeryd.pid',
+            argv=['-m', 'celery.bin.celeryd', '-c', '1', '-lDEBUG',
+                  '--logfile=/var/log', '--pidfile=celeryd.pid',
+                  '--', '.disable_rate_limits=1'],
+        )
+
+    @patch("celery.bin.celeryd_detach.detached_celeryd")
+    def test_main(self, command):
+        c = command.return_value = Mock()
+        main()
+        c.execute_from_commandline.assert_called_with()

+ 433 - 0
celery/tests/test_bin/test_celeryd_multi.py

@@ -0,0 +1,433 @@
+from __future__ import absolute_import
+from __future__ import with_statement
+
+import errno
+import signal
+import sys
+
+from mock import Mock, patch
+
+from celery.bin.celeryd_multi import (
+    main,
+    MultiTool,
+    findsig,
+    abbreviations,
+    parse_ns_range,
+    format_opt,
+    quote,
+    NamespacedOptionParser,
+    multi_args,
+    __doc__ as doc,
+)
+
+from celery.tests.utils import Case, WhateverIO
+
+
+class test_functions(Case):
+
+    def test_findsig(self):
+        self.assertEqual(findsig(["a", "b", "c", "-1"]), 1)
+        self.assertEqual(findsig(["--foo=1", "-9"]), 9)
+        self.assertEqual(findsig(["-INT"]), signal.SIGINT)
+        self.assertEqual(findsig([]), signal.SIGTERM)
+        self.assertEqual(findsig(["-s"]), signal.SIGTERM)
+        self.assertEqual(findsig(["-log"]), signal.SIGTERM)
+
+    def test_abbreviations(self):
+        expander = abbreviations({"%s": "START",
+                                  "%x": "STOP"})
+        self.assertEqual(expander("foo%s"), "fooSTART")
+        self.assertEqual(expander("foo%x"), "fooSTOP")
+        self.assertEqual(expander("foo%y"), "foo%y")
+        self.assertIsNone(expander(None))
+
+    def test_parse_ns_range(self):
+        self.assertEqual(parse_ns_range("1-3", True), ['1', '2', '3'])
+        self.assertEqual(parse_ns_range("1-3", False), ['1-3'])
+        self.assertEqual(parse_ns_range("1-3,10,11,20", True),
+                ['1', '2', '3', '10', '11', '20'])
+
+    def test_format_opt(self):
+        self.assertEqual(format_opt("--foo", None), "--foo")
+        self.assertEqual(format_opt("-c", 1), "-c 1")
+        self.assertEqual(format_opt("--log", "foo"), "--log=foo")
+
+    def test_quote(self):
+        self.assertEqual(quote("the 'quick"), "'the '\\''quick'")
+
+
+class test_NamespacedOptionParser(Case):
+
+    def test_parse(self):
+        x = NamespacedOptionParser(["-c:1,3", "4"])
+        self.assertEqual(x.namespaces.get("1,3"), {"-c": "4"})
+        x = NamespacedOptionParser(["-c:jerry,elaine", "5",
+                                    "--loglevel:kramer=DEBUG",
+                                    "--flag",
+                                    "--logfile=foo", "-Q", "bar", "a", "b",
+                                    "--", ".disable_rate_limits=1"])
+        self.assertEqual(x.options, {"--logfile": "foo",
+                                     "-Q": "bar",
+                                     "--flag": None})
+        self.assertEqual(x.values, ["a", "b"])
+        self.assertEqual(x.namespaces.get("jerry,elaine"), {"-c": "5"})
+        self.assertEqual(x.namespaces.get("kramer"), {"--loglevel": "DEBUG"})
+        self.assertEqual(x.passthrough, "-- .disable_rate_limits=1")
+
+
+class test_multi_args(Case):
+
+    @patch("socket.gethostname")
+    def test_parse(self, gethostname):
+        p = NamespacedOptionParser(["-c:jerry,elaine", "5",
+                                    "--loglevel:kramer=DEBUG",
+                                    "--flag",
+                                    "--logfile=foo", "-Q", "bar", "jerry",
+                                    "elaine", "kramer",
+                                    "--", ".disable_rate_limits=1"])
+        it = multi_args(p, cmd="COMMAND", append="*AP*",
+                prefix="*P*", suffix="*S*")
+        names = list(it)
+        self.assertEqual(names[0][0:2], ('*P*jerry*S*',
+            [
+                'COMMAND', '-n *P*jerry*S*', '-Q bar',
+                '-c 5', '--flag', '--logfile=foo',
+                '-- .disable_rate_limits=1', '*AP*',
+            ]
+        ))
+        self.assertEqual(names[1][0:2], ('*P*elaine*S*',
+            [
+                'COMMAND', '-n *P*elaine*S*', '-Q bar',
+                '-c 5', '--flag', '--logfile=foo',
+                '-- .disable_rate_limits=1', '*AP*',
+            ]
+        ))
+        self.assertEqual(names[2][0:2], ('*P*kramer*S*',
+            [
+                'COMMAND', '--loglevel=DEBUG', '-n *P*kramer*S*',
+                '-Q bar', '--flag', '--logfile=foo',
+                '-- .disable_rate_limits=1', '*AP*',
+            ]
+        ))
+        expand = names[0][2]
+        self.assertEqual(expand("%h"), "*P*jerry*S*")
+        self.assertEqual(expand("%n"), "jerry")
+        names2 = list(multi_args(p, cmd="COMMAND", append="",
+                prefix="*P*", suffix="*S*"))
+        self.assertEqual(names2[0][1][-1], '-- .disable_rate_limits=1')
+
+        gethostname.return_value = 'example.com'
+        p2 = NamespacedOptionParser(["10", "-c:1", "5"])
+        names3 = list(multi_args(p2, cmd="COMMAND"))
+        self.assertEqual(len(names3), 10)
+        self.assertEqual(names3[0][0:2], ('celery1.example.com',
+            ['COMMAND', '-n celery1.example.com', '-c 5', '']))
+        for i, worker in enumerate(names3[1:], 2):
+            self.assertEqual(worker[0:2], ('celery%s.example.com' % i,
+                ['COMMAND', '-n celery%s.example.com' % i, '']))
+
+        names4 = list(multi_args(p2, cmd="COMMAND", suffix='""'))
+        self.assertEqual(len(names4), 10)
+        self.assertEqual(names4[0][0:2], ('celery1',
+            ['COMMAND', '-n celery1', '-c 5', '']))
+
+        p3 = NamespacedOptionParser(["foo", "-c:foo", "5"])
+        names5 = list(multi_args(p3, cmd="COMMAND", suffix='""'))
+        self.assertEqual(names5[0][0:2], ('foo',
+            ['COMMAND', '-n foo', '-c 5', '']))
+
+
+class test_MultiTool(Case):
+
+    def setUp(self):
+        self.fh = WhateverIO()
+        self.env = {}
+        self.t = MultiTool(env=self.env, fh=self.fh)
+
+    def test_note(self):
+        self.t.note("hello world")
+        self.assertEqual(self.fh.getvalue(), "hello world\n")
+
+    def test_note_quiet(self):
+        self.t.quiet = True
+        self.t.note("hello world")
+        self.assertFalse(self.fh.getvalue())
+
+    def test_info(self):
+        self.t.verbose = True
+        self.t.info("hello info")
+        self.assertEqual(self.fh.getvalue(), 'hello info\n')
+
+    def test_info_not_verbose(self):
+        self.t.verbose = False
+        self.t.info("hello info")
+        self.assertFalse(self.fh.getvalue())
+
+    def test_error(self):
+        self.t.say = Mock()
+        self.t.usage = Mock()
+        self.assertEqual(self.t.error("foo"), 1)
+        self.t.say.assert_called_with("foo")
+        self.t.usage.assert_called_with()
+
+        self.t.say = Mock()
+        self.assertEqual(self.t.error(), 1)
+        self.assertFalse(self.t.say.called)
+
+        self.assertEqual(self.t.retcode, 1)
+
+    @patch("celery.bin.celeryd_multi.Popen")
+    def test_waitexec(self, Popen):
+        self.t.note = Mock()
+        pipe = Popen.return_value = Mock()
+        pipe.wait.return_value = -10
+        self.assertEqual(self.t.waitexec(["-m", "foo"], "path"), 10)
+        Popen.assert_called_with(['path', '-m', 'foo'], env=self.t.env)
+        self.t.note.assert_called_with("* Child was terminated by signal 10")
+
+        pipe.wait.return_value = 2
+        self.assertEqual(self.t.waitexec(["-m", "foo"], "path"), 2)
+        self.t.note.assert_called_with(
+                "* Child terminated with failure code 2")
+
+        pipe.wait.return_value = 0
+        self.assertFalse(self.t.waitexec(["-m", "foo", "path"]))
+
+    def test_nosplash(self):
+        self.t.nosplash = True
+        self.t.splash()
+        self.assertFalse(self.fh.getvalue())
+
+    def test_splash(self):
+        self.t.nosplash = False
+        self.t.splash()
+        self.assertIn("celeryd-multi", self.fh.getvalue())
+
+    def test_usage(self):
+        self.t.usage()
+        self.assertTrue(self.fh.getvalue())
+
+    def test_help(self):
+        self.t.help([])
+        self.assertIn(doc, self.fh.getvalue())
+
+    def test_expand(self):
+        self.t.expand(['foo%n', 'ask', 'klask', 'dask'])
+        self.assertEqual(self.fh.getvalue(),
+                "fooask\nfooklask\nfoodask\n")
+
+    def test_restart(self):
+        stop = self.t._stop_nodes = Mock()
+        self.t.restart(['jerry', 'george'], "celeryd")
+        waitexec = self.t.waitexec = Mock()
+        self.assertTrue(stop.called)
+        callback = stop.call_args[1]["callback"]
+        self.assertTrue(callback)
+
+        waitexec.return_value = 0
+        callback("jerry", ["arg"], 13)
+        waitexec.assert_called_with(["arg"])
+        self.assertIn("OK", self.fh.getvalue())
+        self.fh.seek(0)
+        self.fh.truncate()
+
+        waitexec.return_value = 1
+        callback("jerry", ["arg"], 13)
+        self.assertIn("FAILED", self.fh.getvalue())
+
+    def test_stop(self):
+        self.t.getpids = Mock()
+        self.t.getpids.return_value = [2, 3, 4]
+        self.t.shutdown_nodes = Mock()
+        self.t.stop(["a", "b", "-INT"], "celeryd")
+        self.t.shutdown_nodes.assert_called_with(
+            [2, 3, 4], sig=signal.SIGINT, retry=None, callback=None,
+
+        )
+
+    def test_kill(self):
+        self.t.getpids = Mock()
+        self.t.getpids.return_value = [
+            ("a", None, 10),
+            ("b", None, 11),
+            ("c", None, 12)
+        ]
+        sig = self.t.signal_node = Mock()
+
+        self.t.kill(["a", "b", "c"], "celeryd")
+
+        sigs = sig.call_args_list
+        self.assertEqual(len(sigs), 3)
+        self.assertEqual(sigs[0][0], ("a", 10, signal.SIGKILL))
+        self.assertEqual(sigs[1][0], ("b", 11, signal.SIGKILL))
+        self.assertEqual(sigs[2][0], ("c", 12, signal.SIGKILL))
+
+    def prepare_pidfile_for_getpids(self, PIDFile):
+        class pids(object):
+
+            def __init__(self, path):
+                self.path = path
+
+            def read_pid(self):
+                try:
+                    return {"celeryd@foo.pid": 10,
+                            "celeryd@bar.pid": 11}[self.path]
+                except KeyError:
+                    raise ValueError()
+        PIDFile.side_effect = pids
+
+    @patch("celery.bin.celeryd_multi.PIDFile")
+    @patch("socket.gethostname")
+    def test_getpids(self, gethostname, PIDFile):
+        gethostname.return_value = "e.com"
+        self.prepare_pidfile_for_getpids(PIDFile)
+        callback = Mock()
+
+        p = NamespacedOptionParser(["foo", "bar", "baz"])
+        nodes = self.t.getpids(p, "celeryd", callback=callback)
+        self.assertEqual(nodes, [
+            ('foo.e.com',
+              ('celeryd', '--pidfile=celeryd@foo.pid', '-n foo.e.com', ''),
+             10),
+            ('bar.e.com',
+              ('celeryd', '--pidfile=celeryd@bar.pid', '-n bar.e.com', ''),
+             11),
+        ])
+        self.assertTrue(callback.called)
+        callback.assert_called_with(
+            "baz.e.com",
+            ['celeryd', '--pidfile=celeryd@baz.pid', '-n baz.e.com', ''],
+            None,
+        )
+        self.assertIn("DOWN", self.fh.getvalue())
+
+        # without callback, should work
+        nodes = self.t.getpids(p, "celeryd", callback=None)
+
+    @patch("celery.bin.celeryd_multi.PIDFile")
+    @patch("socket.gethostname")
+    @patch("celery.bin.celeryd_multi.sleep")
+    def test_shutdown_nodes(self, slepp, gethostname, PIDFile):
+        gethostname.return_value = "e.com"
+        self.prepare_pidfile_for_getpids(PIDFile)
+        self.assertIsNone(self.t.shutdown_nodes([]))
+        self.t.signal_node = Mock()
+        self.t.node_alive = Mock()
+        self.t.node_alive.return_value = False
+
+        callback = Mock()
+        self.t.stop(["foo", "bar", "baz"], "celeryd", callback=callback)
+        sigs = self.t.signal_node.call_args_list
+        self.assertEqual(len(sigs), 2)
+        self.assertEqual(sigs[0][0], ("foo.e.com", 10, signal.SIGTERM))
+        self.assertEqual(sigs[1][0], ("bar.e.com", 11, signal.SIGTERM))
+        self.t.signal_node.return_value = False
+        self.assertTrue(callback.called)
+        self.t.stop(["foo", "bar", "baz"], "celeryd", callback=None)
+        calls = [0]
+
+        def on_node_alive(pid):
+            calls[0] += 1
+            if calls[0] > 3:
+                return True
+            return False
+        self.t.signal_node.return_value = True
+        self.t.node_alive.side_effect = on_node_alive
+        self.t.stop(["foo", "bar", "baz"], "celeryd", retry=True)
+
+    @patch("os.kill")
+    def test_node_alive(self, kill):
+        kill.return_value = True
+        self.assertTrue(self.t.node_alive(13))
+        esrch = OSError()
+        esrch.errno = errno.ESRCH
+        kill.side_effect = esrch
+        self.assertFalse(self.t.node_alive(13))
+        kill.assert_called_with(13, 0)
+
+        enoent = OSError()
+        enoent.errno = errno.ENOENT
+        kill.side_effect = enoent
+        with self.assertRaises(OSError):
+            self.t.node_alive(13)
+
+    @patch("os.kill")
+    def test_signal_node(self, kill):
+        kill.return_value = True
+        self.assertTrue(self.t.signal_node("foo", 13, 9))
+        esrch = OSError()
+        esrch.errno = errno.ESRCH
+        kill.side_effect = esrch
+        self.assertFalse(self.t.signal_node("foo", 13, 9))
+        kill.assert_called_with(13, 9)
+        self.assertIn("Could not signal foo", self.fh.getvalue())
+
+        enoent = OSError()
+        enoent.errno = errno.ENOENT
+        kill.side_effect = enoent
+        with self.assertRaises(OSError):
+            self.t.signal_node("foo", 13, 9)
+
+    def test_start(self):
+        self.t.waitexec = Mock()
+        self.t.waitexec.return_value = 0
+        self.assertFalse(self.t.start(["foo", "bar", "baz"], "celeryd"))
+
+        self.t.waitexec.return_value = 1
+        self.assertFalse(self.t.start(["foo", "bar", "baz"], "celeryd"))
+
+    def test_show(self):
+        self.t.show(["foo", "bar", "baz"], "celeryd")
+        self.assertTrue(self.fh.getvalue())
+
+    @patch("socket.gethostname")
+    def test_get(self, gethostname):
+        gethostname.return_value = "e.com"
+        self.t.get(["xuzzy.e.com", "foo", "bar", "baz"], "celeryd")
+        self.assertFalse(self.fh.getvalue())
+        self.t.get(["foo.e.com", "foo", "bar", "baz"], "celeryd")
+        self.assertTrue(self.fh.getvalue())
+
+    @patch("socket.gethostname")
+    def test_names(self, gethostname):
+        gethostname.return_value = "e.com"
+        self.t.names(["foo", "bar", "baz"], "celeryd")
+        self.assertIn("foo.e.com\nbar.e.com\nbaz.e.com", self.fh.getvalue())
+
+    def test_execute_from_commandline(self):
+        start = self.t.commands["start"] = Mock()
+        self.t.error = Mock()
+        self.t.execute_from_commandline(["multi", "start", "foo", "bar"])
+        self.assertFalse(self.t.error.called)
+        start.assert_called_with(["foo", "bar"], "celeryd")
+
+        self.t.error = Mock()
+        self.t.execute_from_commandline(["multi", "frob", "foo", "bar"])
+        self.t.error.assert_called_with("Invalid command: frob")
+
+        self.t.error = Mock()
+        self.t.execute_from_commandline(["multi"])
+        self.t.error.assert_called_with()
+
+        self.t.error = Mock()
+        self.t.execute_from_commandline(["multi", "-foo"])
+        self.t.error.assert_called_with()
+
+        self.t.execute_from_commandline(["multi", "start", "foo",
+                "--nosplash", "--quiet", "-q", "--verbose", "--no-color"])
+        self.assertTrue(self.t.nosplash)
+        self.assertTrue(self.t.quiet)
+        self.assertTrue(self.t.verbose)
+        self.assertTrue(self.t.no_color)
+
+    def test_stop_verify(self):
+        self.t._stop_nodes = Mock()
+        self.t.stop_verify(["foo", "bar", "baz"], "celeryd")
+        self.assertEqual(self.t._stop_nodes.call_args[1]["retry"], 2)
+
+    @patch("celery.bin.celeryd_multi.MultiTool")
+    def test_main(self, MultiTool):
+        m = MultiTool.return_value = Mock()
+        with self.assertRaises(SystemExit):
+            main()
+        m.execute_from_commandline.assert_called_with(sys.argv)

+ 48 - 0
celery/tests/test_bin/test_celeryevdump.py

@@ -0,0 +1,48 @@
+from __future__ import absolute_import
+
+from mock import patch
+from time import time
+
+from celery.events.dumper import (
+    humanize_type,
+    Dumper,
+    evdump,
+)
+
+from celery.tests.utils import Case, WhateverIO
+
+
+class test_Dumper(Case):
+
+    def setUp(self):
+        self.out = WhateverIO()
+        self.dumper = Dumper(out=self.out)
+
+    def test_humanize_type(self):
+        self.assertEqual(humanize_type("worker-offline"), "shutdown")
+        self.assertEqual(humanize_type("task-started"), "task started")
+
+    def test_format_task_event(self):
+        self.dumper.format_task_event(
+            "worker.example.com", time(), "task-started", "tasks.add", {})
+        self.assertTrue(self.out.getvalue())
+
+    def test_on_event(self):
+        event = {
+            "hostname": "worker.example.com",
+            "timestamp": time(),
+            "uuid": "1ef",
+            "name": "tasks.add",
+            "args": "(2, 2)",
+            "kwargs": "{}",
+        }
+        self.dumper.on_event(dict(event, type="task-received"))
+        self.assertTrue(self.out.getvalue())
+        self.dumper.on_event(dict(event, type="task-revoked"))
+        self.dumper.on_event(dict(event, type="worker-online"))
+
+    @patch("celery.events.EventReceiver.capture")
+    def test_evdump(self, capture):
+        evdump()
+        capture.side_effect = KeyboardInterrupt()
+        evdump()

+ 66 - 0
celery/tests/test_utils/test_mail.py

@@ -0,0 +1,66 @@
+from __future__ import absolute_import
+
+from mock import Mock, patch
+
+from celery.utils.mail import Message, Mailer
+
+from celery.tests.utils import Case
+
+
+msg = Message(to="george@vandelay.com", sender="elaine@pendant.com",
+              subject="What's up with Jerry?", body="???!")
+
+
+class test_Message(Case):
+
+    def test_repr(self):
+        self.assertTrue(repr(msg))
+
+    def test_str(self):
+        self.assertTrue(str(msg))
+
+
+class test_Mailer(Case):
+
+    def test_send_supports_timeout(self):
+        mailer = Mailer()
+        mailer.supports_timeout = True
+        mailer._send = Mock()
+        mailer.send(msg)
+        mailer._send.assert_called_with(msg, timeout=2)
+
+    @patch("socket.setdefaulttimeout")
+    @patch("socket.getdefaulttimeout")
+    def test_send_no_timeout(self, get, set):
+        mailer = Mailer()
+        mailer.supports_timeout = False
+        mailer._send = Mock()
+        get.return_value = 10
+        mailer.send(msg)
+        get.assert_called_with()
+        sets = set.call_args_list
+        self.assertEqual(sets[0][0], (2, ))
+        self.assertEqual(sets[1][0], (10, ))
+        mailer._send.assert_called_with(msg)
+
+    @patch("smtplib.SMTP_SSL")
+    def test_send_ssl_tls(self, SMTP_SSL):
+        mailer = Mailer(use_ssl=True, use_tls=True)
+        client = SMTP_SSL.return_value = Mock()
+        mailer._send(msg)
+        self.assertTrue(client.starttls.called)
+        self.assertEqual(client.ehlo.call_count, 2)
+        client.quit.assert_called_with()
+        client.sendmail.assert_called_with(msg.sender, msg.to, str(msg))
+        mailer = Mailer(use_ssl=True, use_tls=True, user="foo",
+                        password="bar")
+        mailer._send(msg)
+        client.login.assert_called_with("foo", "bar")
+
+    @patch("smtplib.SMTP")
+    def test_send(self, SMTP):
+        client = SMTP.return_value = Mock()
+        mailer = Mailer(use_ssl=False, use_tls=False)
+        mailer._send(msg)
+
+        client.sendmail.assert_called_With(msg.sender, msg.to, str(msg))

+ 661 - 0
celery/tests/test_utils/test_platforms.py

@@ -0,0 +1,661 @@
+from __future__ import absolute_import
+from __future__ import with_statement
+
+import errno
+import os
+import resource
+import signal
+
+from mock import Mock, patch
+
+from celery import current_app
+from celery import platforms
+from celery.platforms import (
+    get_fdmax,
+    shellsplit,
+    ignore_EBADF,
+    set_process_title,
+    signals,
+    maybe_drop_privileges,
+    setuid,
+    setgid,
+    seteuid,
+    setegid,
+    initgroups,
+    parse_uid,
+    parse_gid,
+    detached,
+    DaemonContext,
+    create_pidlock,
+    PIDFile,
+    LockFailed,
+    setgroups,
+    _setgroups_hack
+)
+
+from celery.tests.utils import Case, WhateverIO, override_stdouts
+
+
+class test_ignore_EBADF(Case):
+
+    def test_raises_EBADF(self):
+        with ignore_EBADF():
+            exc = OSError()
+            exc.errno = errno.EBADF
+            raise exc
+
+    def test_otherwise(self):
+        with self.assertRaises(OSError):
+            with ignore_EBADF():
+                exc = OSError()
+                exc.errno = errno.ENOENT
+                raise exc
+
+
+class test_shellsplit(Case):
+
+    def test_split(self):
+        self.assertEqual(shellsplit("the 'quick' brown fox"),
+                ["the", "quick", "brown", "fox"])
+
+
+class test_set_process_title(Case):
+
+    def when_no_setps(self):
+        prev = platforms._setproctitle = platforms._setproctitle, None
+        try:
+            set_process_title("foo")
+        finally:
+            platforms._setproctitle = prev
+
+
+class test_Signals(Case):
+
+    @patch("signal.getsignal")
+    def test_getitem(self, getsignal):
+        signals["SIGINT"]
+        getsignal.assert_called_with(signal.SIGINT)
+
+    def test_supported(self):
+        self.assertTrue(signals.supported("INT"))
+        self.assertFalse(signals.supported("SIGIMAGINARY"))
+
+    def test_signum(self):
+        self.assertEqual(signals.signum(13), 13)
+        self.assertEqual(signals.signum("INT"), signal.SIGINT)
+        self.assertEqual(signals.signum("SIGINT"), signal.SIGINT)
+        with self.assertRaises(TypeError):
+            signals.signum("int")
+            signals.signum(object())
+
+    @patch("signal.signal")
+    def test_ignore(self, set):
+        signals.ignore("SIGINT")
+        set.assert_called_with(signals.signum("INT"), signals.ignored)
+        signals.ignore("SIGTERM")
+        set.assert_called_with(signals.signum("TERM"), signals.ignored)
+
+    @patch("signal.signal")
+    def test_setitem(self, set):
+        handle = lambda *a: a
+        signals["INT"] = handle
+        set.assert_called_with(signal.SIGINT, handle)
+
+    @patch("signal.signal")
+    def test_setitem_raises(self, set):
+        set.side_effect = ValueError()
+        signals["INT"] = lambda *a: a
+
+
+if not current_app.IS_WINDOWS:
+
+    class test_get_fdmax(Case):
+
+        @patch("resource.getrlimit")
+        def test_when_infinity(self, getrlimit):
+            getrlimit.return_value = [None, resource.RLIM_INFINITY]
+            default = object()
+            self.assertIs(get_fdmax(default), default)
+
+        @patch("resource.getrlimit")
+        def test_when_actual(self, getrlimit):
+            getrlimit.return_value = [None, 13]
+            self.assertEqual(get_fdmax(None), 13)
+
+    class test_maybe_drop_privileges(Case):
+
+        @patch("celery.platforms.parse_uid")
+        @patch("pwd.getpwuid")
+        @patch("celery.platforms.setgid")
+        @patch("celery.platforms.setuid")
+        @patch("celery.platforms.initgroups")
+        def test_with_uid(self, initgroups, setuid, setgid,
+                getpwuid, parse_uid):
+
+            class pw_struct(object):
+                pw_gid = 50001
+            getpwuid.return_value = pw_struct()
+            parse_uid.return_value = 5001
+            maybe_drop_privileges(uid="user")
+            parse_uid.assert_called_with("user")
+            getpwuid.assert_called_with(5001)
+            setgid.assert_called_with(50001)
+            initgroups.assert_called_with(5001, 50001)
+            setuid.assert_called_with(5001)
+
+        @patch("celery.platforms.parse_uid")
+        @patch("celery.platforms.parse_gid")
+        @patch("celery.platforms.setgid")
+        @patch("celery.platforms.setuid")
+        @patch("celery.platforms.initgroups")
+        def test_with_guid(self, initgroups, setuid, setgid,
+                parse_gid, parse_uid):
+            parse_uid.return_value = 5001
+            parse_gid.return_value = 50001
+            maybe_drop_privileges(uid="user", gid="group")
+            parse_uid.assert_called_with("user")
+            parse_gid.assert_called_with("group")
+            setgid.assert_called_with(50001)
+            initgroups.assert_called_with(5001, 50001)
+            setuid.assert_called_with(5001)
+
+        @patch("celery.platforms.setuid")
+        @patch("celery.platforms.setgid")
+        @patch("celery.platforms.parse_gid")
+        def test_only_gid(self, parse_gid, setgid, setuid):
+            parse_gid.return_value = 50001
+            maybe_drop_privileges(gid="group")
+            parse_gid.assert_called_with("group")
+            setgid.assert_called_with(50001)
+            self.assertFalse(setuid.called)
+
+    class test_setget_uid_gid(Case):
+
+        @patch("celery.platforms.parse_uid")
+        @patch("os.setuid")
+        def test_setuid(self, _setuid, parse_uid):
+            parse_uid.return_value = 5001
+            setuid("user")
+            parse_uid.assert_called_with("user")
+            _setuid.assert_called_with(5001)
+
+        @patch("celery.platforms.parse_uid")
+        @patch("os.geteuid")
+        @patch("os.seteuid")
+        def test_seteuid(self, _seteuid, _geteuid, parse_uid):
+            parse_uid.return_value = 5001
+            _geteuid.return_value = 5001
+            seteuid("user")
+            parse_uid.assert_called_with("user")
+            self.assertFalse(_seteuid.called)
+
+            _geteuid.return_value = 1
+            seteuid("user")
+            _seteuid.assert_called_with(5001)
+
+        @patch("celery.platforms.parse_gid")
+        @patch("os.setgid")
+        def test_setgid(self, _setgid, parse_gid):
+            parse_gid.return_value = 50001
+            setgid("group")
+            parse_gid.assert_called_with("group")
+            _setgid.assert_called_with(50001)
+
+        @patch("celery.platforms.parse_gid")
+        @patch("os.getegid")
+        @patch("os.setegid")
+        def test_setegid(self, _setegid, _getegid, parse_gid):
+            parse_gid.return_value = 50001
+            _getegid.return_value = 50001
+            setegid("group")
+            parse_gid.assert_called_with("group")
+            self.assertFalse(_setegid.called)
+
+            _getegid.return_value = 1
+            setegid("group")
+            _setegid.assert_called_with(50001)
+
+        def test_parse_uid_when_int(self):
+            self.assertEqual(parse_uid(5001), 5001)
+
+        @patch("pwd.getpwnam")
+        def test_parse_uid_when_existing_name(self, getpwnam):
+
+            class pwent(object):
+                pw_uid = 5001
+
+            getpwnam.return_value = pwent()
+            self.assertEqual(parse_uid("user"), 5001)
+
+        @patch("pwd.getpwnam")
+        def test_parse_uid_when_nonexisting_name(self, getpwnam):
+            getpwnam.side_effect = KeyError("user")
+
+            with self.assertRaises(KeyError):
+                parse_uid("user")
+
+        def test_parse_gid_when_int(self):
+            self.assertEqual(parse_gid(50001), 50001)
+
+        @patch("grp.getgrnam")
+        def test_parse_gid_when_existing_name(self, getgrnam):
+
+            class grent(object):
+                gr_gid = 50001
+
+            getgrnam.return_value = grent()
+            self.assertEqual(parse_gid("group"), 50001)
+
+        @patch("grp.getgrnam")
+        def test_parse_gid_when_nonexisting_name(self, getgrnam):
+            getgrnam.side_effect = KeyError("group")
+
+            with self.assertRaises(KeyError):
+                parse_gid("group")
+
+    class test_initgroups(Case):
+
+        @patch("pwd.getpwuid")
+        def test_with_initgroups(self, getpwuid):
+            prev, os.initgroups = os.initgroups, Mock()
+            try:
+                getpwuid.return_value = ["user"]
+                initgroups(5001, 50001)
+                os.initgroups.assert_called_with("user", 50001)
+            finally:
+                os.initgroups = prev
+
+        @patch("celery.platforms.setgroups")
+        @patch("grp.getgrall")
+        @patch("pwd.getpwuid")
+        def test_without_initgroups(self, getpwuid, getgrall, setgroups):
+            prev = getattr(os, "initgroups", None)
+            try:
+                delattr(os, "initgroups")
+            except AttributeError:
+                pass
+            try:
+                getpwuid.return_value = ["user"]
+
+                class grent(object):
+                    gr_mem = ["user"]
+
+                    def __init__(self, gid):
+                        self.gr_gid = gid
+
+                getgrall.return_value = [grent(1), grent(2), grent(3)]
+                initgroups(5001, 50001)
+                setgroups.assert_called_with([1, 2, 3])
+            finally:
+                if prev:
+                    os.initgroups = prev
+
+    class test_detached(Case):
+
+        def test_without_resource(self):
+            prev, platforms.resource = platforms.resource, None
+            try:
+                with self.assertRaises(RuntimeError):
+                    detached()
+            finally:
+                platforms.resource = prev
+
+        @patch("celery.platforms.create_pidlock")
+        @patch("celery.platforms.signals")
+        @patch("celery.platforms.maybe_drop_privileges")
+        @patch("os.geteuid")
+        @patch("__builtin__.open")
+        def test_default(self, open, geteuid, maybe_drop, signals, pidlock):
+            geteuid.return_value = 0
+            context = detached(uid="user", gid="group")
+            self.assertIsInstance(context, DaemonContext)
+            signals.reset.assert_called_with("SIGCLD")
+            maybe_drop.assert_called_with(uid="user", gid="group")
+            open.return_value = Mock()
+
+            geteuid.return_value = 5001
+            context = detached(uid="user", gid="group", logfile="/foo/bar")
+            self.assertIsInstance(context, DaemonContext)
+            open.assert_called_with("/foo/bar", "a")
+            open.return_value.close.assert_called_with()
+
+            context = detached(pidfile="/foo/bar/pid")
+            self.assertIsInstance(context, DaemonContext)
+            pidlock.assert_called_with("/foo/bar/pid")
+
+    class test_DaemonContext(Case):
+
+        @patch("os.fork")
+        @patch("os.setsid")
+        @patch("os._exit")
+        @patch("os.chdir")
+        @patch("os.umask")
+        @patch("os.close")
+        @patch("os.open")
+        @patch("os.dup2")
+        def test_open(self, dup2, open, close, umask, chdir, _exit, setsid,
+                fork):
+            x = DaemonContext(workdir="/opt/workdir")
+
+            fork.return_value = 0
+            with x:
+                self.assertTrue(x._is_open)
+                with x:
+                    pass
+            self.assertEqual(fork.call_count, 2)
+            setsid.assert_called_with()
+            self.assertFalse(_exit.called)
+
+            chdir.assert_called_with(x.workdir)
+            umask.assert_called_with(x.umask)
+            open.assert_called_with(platforms.DAEMON_REDIRECT_TO, os.O_RDWR)
+            self.assertEqual(dup2.call_args_list[0], [(0, 1), {}])
+            self.assertEqual(dup2.call_args_list[1], [(0, 2), {}])
+
+            fork.reset_mock()
+            fork.return_value = 1
+            x = DaemonContext(workdir="/opt/workdir")
+            with x:
+                pass
+            self.assertEqual(fork.call_count, 1)
+            _exit.assert_called_with(0)
+
+            x = DaemonContext(workdir="/opt/workdir", fake=True)
+            x._detach = Mock()
+            with x:
+                pass
+            self.assertFalse(x._detach.called)
+
+    class test_PIDFile(Case):
+
+        @patch("celery.platforms.PIDFile")
+        def test_create_pidlock(self, PIDFile):
+            p = PIDFile.return_value = Mock()
+            p.is_locked.return_value = True
+            p.remove_if_stale.return_value = False
+            with self.assertRaises(SystemExit):
+                create_pidlock("/var/pid")
+
+            p.remove_if_stale.return_value = True
+            ret = create_pidlock("/var/pid")
+            self.assertIs(ret, p)
+
+        def test_context(self):
+            p = PIDFile("/var/pid")
+            p.write_pid = Mock()
+            p.remove = Mock()
+
+            with p as _p:
+                self.assertIs(_p, p)
+            p.write_pid.assert_called_with()
+            p.remove.assert_called_with()
+
+        def test_acquire_raises_LockFailed(self):
+            p = PIDFile("/var/pid")
+            p.write_pid = Mock()
+            p.write_pid.side_effect = OSError()
+
+            with self.assertRaises(LockFailed):
+                with p:
+                    pass
+
+        @patch("os.path.exists")
+        def test_is_locked(self, exists):
+            p = PIDFile("/var/pid")
+            exists.return_value = True
+            self.assertTrue(p.is_locked())
+            exists.return_value = False
+            self.assertFalse(p.is_locked())
+
+        @patch("__builtin__.open")
+        def test_read_pid(self, open_):
+            s = open_.return_value = WhateverIO()
+            s.write("1816\n")
+            s.seek(0)
+            p = PIDFile("/var/pid")
+            self.assertEqual(p.read_pid(), 1816)
+
+        @patch("__builtin__.open")
+        def test_read_pid_partially_written(self, open_):
+            s = open_.return_value = WhateverIO()
+            s.write("1816")
+            s.seek(0)
+            p = PIDFile("/var/pid")
+            with self.assertRaises(ValueError):
+                p.read_pid()
+
+        @patch("__builtin__.open")
+        def test_read_pid_raises_ENOENT(self, open_):
+            exc = IOError()
+            exc.errno = errno.ENOENT
+            open_.side_effect = exc
+            p = PIDFile("/var/pid")
+            self.assertIsNone(p.read_pid())
+
+        @patch("__builtin__.open")
+        def test_read_pid_raises_IOError(self, open_):
+            exc = IOError()
+            exc.errno = errno.EAGAIN
+            open_.side_effect = exc
+            p = PIDFile("/var/pid")
+            with self.assertRaises(IOError):
+                p.read_pid()
+
+        @patch("__builtin__.open")
+        def test_read_pid_bogus_pidfile(self, open_):
+            s = open_.return_value = WhateverIO()
+            s.write("eighteensixteen\n")
+            s.seek(0)
+            p = PIDFile("/var/pid")
+            with self.assertRaises(ValueError):
+                p.read_pid()
+
+        @patch("os.unlink")
+        def test_remove(self, unlink):
+            unlink.return_value = True
+            p = PIDFile("/var/pid")
+            p.remove()
+            unlink.assert_called_with(p.path)
+
+        @patch("os.unlink")
+        def test_remove_ENOENT(self, unlink):
+            exc = OSError()
+            exc.errno = errno.ENOENT
+            unlink.side_effect = exc
+            p = PIDFile("/var/pid")
+            p.remove()
+            unlink.assert_called_with(p.path)
+
+        @patch("os.unlink")
+        def test_remove_EACCES(self, unlink):
+            exc = OSError()
+            exc.errno = errno.EACCES
+            unlink.side_effect = exc
+            p = PIDFile("/var/pid")
+            p.remove()
+            unlink.assert_called_with(p.path)
+
+        @patch("os.unlink")
+        def test_remove_OSError(self, unlink):
+            exc = OSError()
+            exc.errno = errno.EAGAIN
+            unlink.side_effect = exc
+            p = PIDFile("/var/pid")
+            with self.assertRaises(OSError):
+                p.remove()
+            unlink.assert_called_with(p.path)
+
+        @patch("os.kill")
+        def test_remove_if_stale_process_alive(self, kill):
+            p = PIDFile("/var/pid")
+            p.read_pid = Mock()
+            p.read_pid.return_value = 1816
+            kill.return_value = 0
+            self.assertFalse(p.remove_if_stale())
+            kill.assert_called_with(1816, 0)
+            p.read_pid.assert_called_with()
+
+            kill.side_effect = OSError()
+            kill.side_effect.errno = errno.ENOENT
+            self.assertFalse(p.remove_if_stale())
+
+        @patch("os.kill")
+        def test_remove_if_stale_process_dead(self, kill):
+            with override_stdouts():
+                p = PIDFile("/var/pid")
+                p.read_pid = Mock()
+                p.read_pid.return_value = 1816
+                p.remove = Mock()
+                exc = OSError()
+                exc.errno = errno.ESRCH
+                kill.side_effect = exc
+                self.assertTrue(p.remove_if_stale())
+                kill.assert_called_with(1816, 0)
+                p.remove.assert_called_with()
+
+        def test_remove_if_stale_broken_pid(self):
+            with override_stdouts():
+                p = PIDFile("/var/pid")
+                p.read_pid = Mock()
+                p.read_pid.side_effect = ValueError()
+                p.remove = Mock()
+
+                self.assertTrue(p.remove_if_stale())
+                p.remove.assert_called_with()
+
+        def test_remove_if_stale_no_pidfile(self):
+            p = PIDFile("/var/pid")
+            p.read_pid = Mock()
+            p.read_pid.return_value = None
+            p.remove = Mock()
+
+            self.assertTrue(p.remove_if_stale())
+            p.remove.assert_called_with()
+
+        @patch("os.fsync")
+        @patch("os.getpid")
+        @patch("os.open")
+        @patch("os.fdopen")
+        @patch("__builtin__.open")
+        def test_write_pid(self, open_, fdopen, osopen, getpid, fsync):
+            getpid.return_value = 1816
+            osopen.return_value = 13
+            w = fdopen.return_value = WhateverIO()
+            w.close = Mock()
+            r = open_.return_value = WhateverIO()
+            r.write("1816\n")
+            r.seek(0)
+
+            p = PIDFile("/var/pid")
+            p.write_pid()
+            w.seek(0)
+            self.assertEqual(w.readline(), "1816\n")
+            self.assertTrue(w.close.called)
+            getpid.assert_called_with()
+            osopen.assert_called_with(p.path, platforms.PIDFILE_FLAGS,
+                                      platforms.PIDFILE_MODE)
+            fdopen.assert_called_with(13, "w")
+            fsync.assert_called_with(13)
+            open_.assert_called_with(p.path)
+
+        @patch("os.fsync")
+        @patch("os.getpid")
+        @patch("os.open")
+        @patch("os.fdopen")
+        @patch("__builtin__.open")
+        def test_write_reread_fails(self, open_, fdopen,
+                osopen, getpid, fsync):
+            getpid.return_value = 1816
+            osopen.return_value = 13
+            w = fdopen.return_value = WhateverIO()
+            w.close = Mock()
+            r = open_.return_value = WhateverIO()
+            r.write("11816\n")
+            r.seek(0)
+
+            p = PIDFile("/var/pid")
+            with self.assertRaises(LockFailed):
+                p.write_pid()
+
+    class test_setgroups(Case):
+
+        @patch("os.setgroups")
+        def test_setgroups_hack_ValueError(self, setgroups):
+
+            def on_setgroups(groups):
+                if len(groups) <= 200:
+                    setgroups.return_value = True
+                    return
+                raise ValueError()
+            setgroups.side_effect = on_setgroups
+            _setgroups_hack(range(400))
+
+            setgroups.side_effect = ValueError()
+            with self.assertRaises(ValueError):
+                _setgroups_hack(range(400))
+
+        @patch("os.setgroups")
+        def test_setgroups_hack_OSError(self, setgroups):
+            exc = OSError()
+            exc.errno = errno.EINVAL
+
+            def on_setgroups(groups):
+                if len(groups) <= 200:
+                    setgroups.return_value = True
+                    return
+                raise exc
+            setgroups.side_effect = on_setgroups
+
+            _setgroups_hack(range(400))
+
+            setgroups.side_effect = exc
+            with self.assertRaises(OSError):
+                _setgroups_hack(range(400))
+
+            exc2 = OSError()
+            exc.errno = errno.ESRCH
+            setgroups.side_effect = exc2
+            with self.assertRaises(OSError):
+                _setgroups_hack(range(400))
+
+        @patch("os.sysconf")
+        @patch("celery.platforms._setgroups_hack")
+        def test_setgroups(self, hack, sysconf):
+            sysconf.return_value = 100
+            setgroups(range(400))
+            hack.assert_called_with(range(100))
+
+        @patch("os.sysconf")
+        @patch("celery.platforms._setgroups_hack")
+        def test_setgroups_sysconf_raises(self, hack, sysconf):
+            sysconf.side_effect = ValueError()
+            setgroups(range(400))
+            hack.assert_called_with(range(400))
+
+        @patch("os.getgroups")
+        @patch("os.sysconf")
+        @patch("celery.platforms._setgroups_hack")
+        def test_setgroups_raises_ESRCH(self, hack, sysconf, getgroups):
+            sysconf.side_effect = ValueError()
+            esrch = OSError()
+            esrch.errno = errno.ESRCH
+            hack.side_effect = esrch
+            with self.assertRaises(OSError):
+                setgroups(range(400))
+
+        @patch("os.getgroups")
+        @patch("os.sysconf")
+        @patch("celery.platforms._setgroups_hack")
+        def test_setgroups_raises_EPERM(self, hack, sysconf, getgroups):
+            sysconf.side_effect = ValueError()
+            eperm = OSError()
+            eperm.errno = errno.EPERM
+            hack.side_effect = eperm
+            getgroups.return_value = range(400)
+            setgroups(range(400))
+            getgroups.assert_called_with()
+
+            getgroups.return_value = [1000]
+            with self.assertRaises(OSError):
+                setgroups(range(400))
+            getgroups.assert_called_with()

+ 68 - 0
celery/tests/test_utils/test_term.py

@@ -0,0 +1,68 @@
+# -*- coding: utf-8 -*-
+from __future__ import absolute_import
+
+from celery.utils import term
+from celery.utils.term import colored, fg
+
+from celery.tests.utils import Case
+
+
+class test_colored(Case):
+
+    def test_colors(self):
+        colors = (
+            ("black", term.BLACK),
+            ("red", term.RED),
+            ("green", term.GREEN),
+            ("yellow", term.YELLOW),
+            ("blue", term.BLUE),
+            ("magenta", term.MAGENTA),
+            ("cyan", term.CYAN),
+            ("white", term.WHITE),
+        )
+
+        for name, key in colors:
+            self.assertIn(fg(30 + key), str(colored().names[name]("foo")))
+
+        self.assertTrue(str(colored().bold("f")))
+        self.assertTrue(str(colored().underline("f")))
+        self.assertTrue(str(colored().blink("f")))
+        self.assertTrue(str(colored().reverse("f")))
+        self.assertTrue(str(colored().bright("f")))
+        self.assertTrue(str(colored().ired("f")))
+        self.assertTrue(str(colored().igreen("f")))
+        self.assertTrue(str(colored().iyellow("f")))
+        self.assertTrue(str(colored().iblue("f")))
+        self.assertTrue(str(colored().imagenta("f")))
+        self.assertTrue(str(colored().icyan("f")))
+        self.assertTrue(str(colored().iwhite("f")))
+        self.assertTrue(str(colored().reset("f")))
+
+        self.assertTrue(str(colored().green(u"∂bar")))
+
+        self.assertTrue(
+            colored().red(u"éefoo") + colored().green(u"∂bar"))
+
+        self.assertEqual(
+            colored().red("foo").no_color(), "foo")
+
+        self.assertTrue(
+            repr(colored().blue(u"åfoo")))
+
+        self.assertEqual(repr(colored()), "''")
+
+        c = colored()
+        s = c.red("foo", c.blue("bar"), c.green("baz"))
+        self.assertTrue(s.no_color())
+
+        c._fold_no_color(s, u"øfoo")
+        c._fold_no_color(u"fooå", s)
+
+        c = colored().red(u"åfoo")
+        self.assertEqual(c._add(c, u"baræ"),
+            u'\x1b[1;31m\xe5foo\x1b[0mbar\xe6')
+
+        c2 = colored().blue(u"ƒƒz")
+        c3 = c._add(c, c2)
+        self.assertEqual(c3,
+            u'\x1b[1;31m\xe5foo\x1b[0m\x1b[1;34m\u0192\u0192z\x1b[0m')

+ 30 - 1
celery/tests/test_worker/test_worker_job.py

@@ -21,7 +21,7 @@ from celery.concurrency.base import BasePool
 from celery.datastructures import ExceptionInfo
 from celery.exceptions import (RetryTaskError,
                                WorkerLostError, InvalidTaskError)
-from celery.task.trace import eager_trace_task, TraceInfo
+from celery.task.trace import eager_trace_task, TraceInfo, mro_lookup
 from celery.result import AsyncResult
 from celery.task import task as task_dec
 from celery.task.base import Task
@@ -36,6 +36,35 @@ scratch = {"ACK": False}
 some_kwargs_scratchpad = {}
 
 
+class test_mro_lookup(Case):
+
+    def test_order(self):
+
+        class A(object):
+            pass
+
+        class B(A):
+            pass
+
+        class C(B):
+            pass
+
+        class D(C):
+
+            @classmethod
+            def mro(cls):
+                return ()
+
+        A.x = 10
+        self.assertEqual(mro_lookup(C, "x"), A)
+        self.assertIsNone(mro_lookup(C, "x", stop=(A, )))
+        B.x = 10
+        self.assertEqual(mro_lookup(C, "x"), B)
+        C.x = 10
+        self.assertEqual(mro_lookup(C, "x"), C)
+        self.assertIsNone(mro_lookup(D, "x"))
+
+
 def jail(task_id, name, args, kwargs):
     return eager_trace_task(current_app.tasks[name],
                             task_id, args, kwargs, eager=False)[0]

+ 1 - 4
celery/utils/compat.py

@@ -40,10 +40,7 @@ if is_py3k:
         def write(self, data):
             StringIO.write(self, bytes_to_str(data))
 else:
-    try:
-        from cStringIO import StringIO  # noqa
-    except ImportError:
-        from StringIO import StringIO   # noqa
+    from StringIO import StringIO       # noqa
     BytesIO = WhateverIO = StringIO     # noqa
 
 

+ 26 - 19
celery/utils/mail.py

@@ -13,11 +13,10 @@ from __future__ import absolute_import
 
 import sys
 import smtplib
+import traceback
+import warnings
 
-try:
-    from email.mime.text import MIMEText
-except ImportError:
-    from email.MIMEText import MIMEText  # noqa
+from email.mime.text import MIMEText
 
 from .imports import symbol_by_name
 
@@ -53,6 +52,7 @@ class Message(object):
 
 
 class Mailer(object):
+    supports_timeout = supports_timeout
 
     def __init__(self, host="localhost", port=0, user=None, password=None,
             timeout=2, use_ssl=False, use_tls=False):
@@ -64,23 +64,30 @@ class Mailer(object):
         self.use_ssl = use_ssl
         self.use_tls = use_tls
 
-    def send(self, message):
-        if supports_timeout:
-            self._send(message, timeout=self.timeout)
-        else:
-            import socket
-            old_timeout = socket.getdefaulttimeout()
-            socket.setdefaulttimeout(self.timeout)
-            try:
-                self._send(message)
-            finally:
-                socket.setdefaulttimeout(old_timeout)
+    def send(self, message, fail_silently=False):
+        try:
+            if self.supports_timeout:
+                self._send(message, timeout=self.timeout)
+            else:
+                import socket
+                old_timeout = socket.getdefaulttimeout()
+                socket.setdefaulttimeout(self.timeout)
+                try:
+                    self._send(message)
+                finally:
+                    socket.setdefaulttimeout(old_timeout)
+        except Exception, exc:
+            if not fail_silently:
+                raise
+            warnings.warn(SendmailWarning(
+                "Mail could not be sent: %r %r\n%r" % (
+                    exc, {"To": ", ".join(message.to),
+                          "Subject": message.subject},
+                    traceback.format_stack())))
 
     def _send(self, message, **kwargs):
-        if (self.use_ssl):
-            client = smtplib.SMTP_SSL(self.host, self.port, **kwargs)
-        else:
-            client = smtplib.SMTP(self.host, self.port, **kwargs)
+        Client = smtplib.SMTP_SSL if self.use_ssl else smtplib.SMTP
+        client = Client(self.host, self.port, **kwargs)
 
         if self.use_tls:
             client.ehlo()

+ 8 - 14
celery/utils/term.py

@@ -39,9 +39,7 @@ class colored(object):
 
     def __init__(self, *s, **kwargs):
         self.s = s
-        self.enabled = kwargs.get("enabled", True)
-        if IS_WINDOWS:
-            self.enabled = False
+        self.enabled = not IS_WINDOWS and kwargs.get("enabled", True)
         self.op = kwargs.get("op", "")
         self.names = {"black": self.black,
                       "red": self.red,
@@ -53,22 +51,18 @@ class colored(object):
                       "white": self.white}
 
     def _add(self, a, b):
-        if isinstance(a, unicode):
-            a = safe_str(a)
-        if isinstance(b, unicode):
-            b = safe_str(b)
-        return str(a) + str(b)
+        return unicode(a) + unicode(b)
 
     def _fold_no_color(self, a, b):
         try:
             A = a.no_color()
         except AttributeError:
-            A = safe_str(a)
+            A = unicode(a)
         try:
             B = b.no_color()
         except AttributeError:
-            B = safe_str(b)
-        return A + B
+            B = unicode(b)
+        return safe_str(A) + safe_str(B)
 
     def no_color(self):
         if self.s:
@@ -85,7 +79,7 @@ class colored(object):
         suffix = ""
         if self.enabled:
             suffix = RESET_SEQ
-        return self.embed() + suffix
+        return safe_str(self.embed() + suffix)
 
     def __str__(self):
         return safe_str(self.__unicode__())
@@ -145,7 +139,7 @@ class colored(object):
         return self.node(s, fg(40 + YELLOW))
 
     def iblue(self, *s):
-        return self.node(s, fg(40, BLUE))
+        return self.node(s, fg(40 + BLUE))
 
     def imagenta(self, *s):
         return self.node(s, fg(40 + MAGENTA))
@@ -160,4 +154,4 @@ class colored(object):
         return self.node(s or [""], RESET_SEQ)
 
     def __add__(self, other):
-        return str(self) + str(other)
+        return unicode(self) + unicode(other)

+ 0 - 10
contrib/release/py3k-run-tests

@@ -11,17 +11,8 @@ nosetests -vd celery.tests                                      \
             --cover3-exclude="                                  \
               celery                                            \
               celery.tests.*                                    \
-              celery.bin.celeryd_multi                          \
-              celery.bin.celeryd_detach                         \
-              celery.bin.celeryctl                              \
-              celery.bin.camqadm                                \
-              celery.local                                      \
-              celery.platforms                                  \
               celery.utils.compat                               \
-              celery.utils.mail                                 \
-              celery.utils.functional                           \
               celery.utils.dispatch*                            \
-              celery.utils.term                                 \
               celery.db.a805d4bd                                \
               celery.db.dfd042c7                                \
               celery.contrib*                                   \
@@ -29,7 +20,6 @@ nosetests -vd celery.tests                                      \
               celery.concurrency.gevent                         \
               celery.backends.mongodb                           \
               celery.backends.cassandra                         \
-              celery.events.dumper                              \
               celery.events.cursesmon"                          \
             --with-xunit                                        \
               --xunit-file="$base/nosetests.xml"

+ 3 - 10
setup.cfg

@@ -5,24 +5,17 @@ cover3-html = 1
 cover3-package = celery
 cover3-exclude = celery
                  celery.tests.*
-                 celery.bin.celeryd_multi
-                 celery.bin.celeryd_detach
-                 celery.bin.celeryctl
-                 celery.bin.camqadm
-                 celery.platforms
                  celery.utils.compat
-                 celery.utils.mail
                  celery.utils.dispatch*
-                 celery.utils.term
-                 celery.db.a805d4bd
-                 celery.db.dfd042c7
+                 celery.backends.database.a805d4bd
+                 celery.backends.database.dfd042c7
                  celery.contrib*
                  celery.concurrency.threads
                  celery.concurrency.gevent
                  celery.backends.mongodb
                  celery.backends.cassandra
-                 celery.events.dumper
                  celery.events.cursesmon
+                 celery.worker.autoreload
 
 [build_sphinx]
 source-dir = docs/