Ask Solem 13 سال پیش
والد
کامیت
7aa55c5f62

+ 1 - 1
README.rst

@@ -4,7 +4,7 @@
 
 .. image:: http://cloud.github.com/downloads/ask/celery/celery_128.png
 
-:Version: 2.5.0
+:Version: 2.5.1
 :Web: http://celeryproject.org/
 :Download: http://pypi.python.org/pypi/celery/
 :Source: http://github.com/ask/celery/

+ 1 - 2
celery/abstract.py

@@ -68,8 +68,7 @@ class Namespace(object):
         self._debug("Building boot step graph.")
         self.boot_steps = [self.bind_component(name, parent, **kwargs)
                                 for name in self._finalize_boot_steps()]
-        self._debug("New boot order: %r" % (
-            [c.name for c in self.boot_steps], ))
+        self._debug("New boot order: %r", [c.name for c in self.boot_steps])
 
         for component in self.boot_steps:
             component.include(parent)

+ 1 - 1
celery/app/defaults.py

@@ -44,7 +44,7 @@ def str_to_bool(term, table={"false": False, "no": False, "0": False,
     try:
         return table[term.lower()]
     except KeyError:
-        raise TypeError("%r can not be converted to type bool" % (term, ))
+        raise TypeError("Can't coerce %r to type bool" % (term, ))
 
 
 class Option(object):

+ 3 - 3
celery/apps/worker.py

@@ -16,7 +16,7 @@ from .. import __version__, platforms, signals
 from ..app import app_or_default
 from ..app.abstract import configurated, from_config
 from ..exceptions import ImproperlyConfigured, SystemTerminate
-from ..utils import isatty, LOG_LEVELS, cry, qualname
+from ..utils import cry, isatty, LOG_LEVELS, pluralize, qualname
 from ..worker import WorkController
 
 try:
@@ -165,8 +165,8 @@ class Worker(configurated):
 
     def purge_messages(self):
         count = self.app.control.discard_all()
-        what = (not count or count > 1) and "messages" or "message"
-        print("discard: Erased %d %s from the queue.\n" % (count, what))
+        print("discard: Erased %d %s from the queue.\n" % (
+                count, pluralize(count, "message")))
 
     def worker_init(self):
         # Run the worker init handler.

+ 7 - 4
celery/beat.py

@@ -23,6 +23,8 @@ try:
 except ImportError:
     multiprocessing = None  # noqa
 
+from kombu.utils import reprcall
+
 from . import __version__
 from . import platforms
 from . import registry
@@ -116,8 +118,9 @@ class ScheduleEntry(object):
         return vars(self).iteritems()
 
     def __repr__(self):
-        return ("<Entry: %(name)s %(task)s(*%(args)s, **%(kwargs)s) "
-                "{%(schedule)s}>" % vars(self))
+        return ("<Entry: %s %s {%s}" % (self.name,
+                    reprcall(self.task, self.args, self.kwargs),
+                    self.schedule))
 
 
 class Scheduler(object):
@@ -401,8 +404,8 @@ class Service(object):
         try:
             while not self._is_shutdown.isSet():
                 interval = self.scheduler.tick()
-                self.debug("Celerybeat: Waking up %s." % (
-                        humanize_seconds(interval, prefix="in ")))
+                self.debug("Celerybeat: Waking up %s.",
+                           humanize_seconds(interval, prefix="in "))
                 time.sleep(interval)
         except (KeyboardInterrupt, SystemExit):
             self._is_shutdown.set()

+ 2 - 8
celery/bin/celeryctl.py

@@ -17,7 +17,7 @@ from anyjson import deserialize
 from .. import __version__
 from ..app import app_or_default, current_app
 from ..platforms import EX_OK, EX_FAILURE, EX_UNAVAILABLE, EX_USAGE
-from ..utils import term
+from ..utils import pluralize, term
 from ..utils.timeutils import maybe_iso8601
 
 from ..bin.base import Command as CeleryCommand
@@ -206,12 +206,6 @@ class apply(Command):
 apply = command(apply)
 
 
-def pluralize(n, text, suffix='s'):
-    if n > 1:
-        return text + suffix
-    return text
-
-
 class purge(Command):
 
     def run(self, *args, **kwargs):
@@ -334,7 +328,7 @@ class status(Command):
         nodecount = len(replies)
         if not kwargs.get("quiet", False):
             self.out("\n%s %s online." % (nodecount,
-                                          nodecount > 1 and "nodes" or "node"))
+                                          pluralize(nodecount, "node")))
 status = command(status)
 
 

+ 8 - 11
celery/bin/celeryd_multi.py

@@ -104,6 +104,7 @@ from time import sleep
 from .. import __version__
 from ..platforms import shellsplit
 from ..utils import term
+from ..utils import pluralize
 from ..utils.encoding import from_utf8
 
 SIGNAMES = set(sig for sig in dir(signal)
@@ -188,8 +189,8 @@ class MultiTool(object):
 
         return self.retcode
 
-    def say(self, msg):
-        self.fh.write("%s\n" % (msg, ))
+    def say(self, m, newline=True):
+        self.fh.write("%s\n" % m if m else m)
 
     def names(self, argv, cmd):
         p = NamespacedOptionParser(argv)
@@ -273,7 +274,7 @@ class MultiTool(object):
             left = len(P)
             if left:
                 self.note(self.colored.blue("> Waiting for %s %s..." % (
-                    left, left > 1 and "nodes" or "node")), newline=False)
+                    left, pluralize(left, "node"))), newline=False)
 
         if retry:
             note_waiting()
@@ -360,11 +361,11 @@ class MultiTool(object):
             self.say(expander(template))
 
     def help(self, argv, cmd=None):
-        say(__doc__)
+        self.say(__doc__)
 
     def usage(self):
         self.splash()
-        say(USAGE % {"prog_name": self.prog_name})
+        self.say(USAGE % {"prog_name": self.prog_name})
 
     def splash(self):
         if not self.nosplash:
@@ -386,7 +387,7 @@ class MultiTool(object):
 
     def error(self, msg=None):
         if msg:
-            say(msg)
+            self.say(msg)
         self.usage()
         self.retcode = 1
         return 1
@@ -397,7 +398,7 @@ class MultiTool(object):
 
     def note(self, msg, newline=True):
         if not self.quiet:
-            say(str(msg), newline=newline)
+            self.say(str(msg), newline=newline)
 
 
 def multi_args(p, cmd="celeryd", append="", prefix="", suffix=""):
@@ -531,10 +532,6 @@ def abbreviations(map):
     return expand
 
 
-def say(m, newline=True):
-    sys.stderr.write(newline and "%s\n" % (m, ) or m)
-
-
 def findsig(args, default=signal.SIGTERM):
     for arg in reversed(args):
         if len(arg) == 2 and arg[0] == "-" and arg[1].isdigit():

+ 1 - 1
celery/contrib/batches.py

@@ -166,7 +166,7 @@ class Batches(Task):
         if self._buffer.qsize():
             requests = list(consume_queue(self._buffer))
             if requests:
-                self.debug("Buffer complete: %s" % (len(requests), ))
+                self.debug("Buffer complete: %s", len(requests))
                 self.flush(requests)
         if not requests:
             self.debug("Cancelling timer: Nothing in buffer.")

+ 3 - 2
celery/events/__init__.py

@@ -158,14 +158,15 @@ class EventReceiver(object):
     handlers = {}
 
     def __init__(self, connection, handlers=None, routing_key="#",
-            node_id=None, app=None):
+            node_id=None, app=None, queue_prefix="celeryev"):
         self.app = app_or_default(app)
         self.connection = connection
         if handlers is not None:
             self.handlers = handlers
         self.routing_key = routing_key
         self.node_id = node_id or uuid()
-        self.queue = Queue("%s.%s" % ("celeryev", self.node_id),
+        self.queue_prefix = queue_prefix
+        self.queue = Queue('.'.join([self.queue_prefix, self.node_id]),
                            exchange=event_exchange,
                            routing_key=self.routing_key,
                            auto_delete=True,

+ 2 - 3
celery/log.py

@@ -358,10 +358,9 @@ class SilenceRepeated(object):
         self.max_iterations = max_iterations
         self._iterations = 0
 
-    def __call__(self, *msgs):
+    def __call__(self, *args, **kwargs):
         if not self._iterations or self._iterations >= self.max_iterations:
-            for msg in msgs:
-                self.action(msg)
+            self.action(*args, **kwargs)
             self._iterations = 0
         else:
             self._iterations += 1

+ 2 - 2
celery/platforms.py

@@ -53,12 +53,12 @@ def pyimplementation():
     if hasattr(_platform, "python_implementation"):
         return _platform.python_implementation()
     elif sys.platform.startswith("java"):
-        return "Jython %s" % (sys.platform, )
+        return "Jython " + sys.platform
     elif hasattr(sys, "pypy_version_info"):
         v = ".".join(map(str, sys.pypy_version_info[:3]))
         if sys.pypy_version_info[3:]:
             v += "-" + "".join(map(str, sys.pypy_version_info[3:]))
-        return "PyPy %s" % (v, )
+        return "PyPy " + v
     else:
         return "CPython"
 

+ 4 - 6
celery/task/http.py

@@ -109,12 +109,10 @@ class MutableURL(object):
     def __str__(self):
         scheme, netloc, path, params, query, fragment = self.parts
         query = urlencode(utf8dict(self.query.items()))
-        components = ["%s://" % scheme,
-                      "%s" % netloc,
-                      path and "%s" % path or "/",
-                      params and ";%s" % params or None,
-                      query and "?%s" % query or None,
-                      fragment and "#%s" % fragment or None]
+        components = [scheme + "://", netloc, path or "/",
+                      ";%s" % params   if params   else "",
+                      "?%s" % query    if query    else "",
+                      "#%s" % fragment if fragment else ""]
         return "".join(filter(None, components))
 
     def __repr__(self):

+ 4 - 2
celery/tests/test_worker/__init__.py

@@ -827,8 +827,10 @@ class test_WorkController(AppCase):
         worker.timer_debug = worker.logger.debug
 
         worker.on_timer_tick(30.0)
-        logged = worker.logger.debug.call_args[0][0]
-        self.assertIn("30.0", logged)
+        xargs = worker.logger.debug.call_args[0]
+        fmt, arg = xargs[0], xargs[1]
+        self.assertEqual(30.0, arg)
+        self.assertIn("Next eta %s secs", fmt)
 
     def test_process_task(self):
         worker = self.worker

+ 6 - 0
celery/utils/__init__.py

@@ -311,6 +311,12 @@ def truncate_text(text, maxlen=128, suffix="..."):
     return text
 
 
+def pluralize(n, text, suffix='s'):
+    if n > 1:
+        return text + suffix
+    return text
+
+
 def abbr(S, max, ellipsis="..."):
     if S is None:
         return "???"

+ 4 - 2
celery/utils/timeutils.py

@@ -17,6 +17,8 @@ from datetime import datetime, timedelta
 from dateutil import tz
 from dateutil.parser import parse as parse_iso8601
 
+from . import pluralize
+
 try:
     import pytz
 except ImportError:
@@ -189,8 +191,8 @@ def humanize_seconds(secs, prefix=""):
     for unit, divider, formatter in TIME_UNITS:
         if secs >= divider:
             w = secs / divider
-            punit = w > 1 and (unit + "s") or unit
-            return "%s%s %s" % (prefix, formatter(w), punit)
+            return "%s%s %s" % (prefix, formatter(w),
+                                pluralize(w, unit))
     return "now"
 
 

+ 4 - 4
celery/worker/__init__.py

@@ -232,7 +232,7 @@ class WorkController(configurated):
         except SystemTerminate:
             self.terminate()
         except Exception, exc:
-            self.logger.error("Unrecoverable error: %r" % (exc, ),
+            self.logger.error("Unrecoverable error: %r", exc,
                               exc_info=sys.exc_info())
             self.stop()
         except (KeyboardInterrupt, SystemExit):
@@ -301,10 +301,10 @@ class WorkController(configurated):
 
         for module in set(modules or ()):
             if module not in sys.modules:
-                self.logger.debug("importing module %s" % (module, ))
+                self.logger.debug("importing module %s", module)
                 imp(module)
             elif reload:
-                self.logger.debug("reloading module %s" % (module, ))
+                self.logger.debug("reloading module %s", module)
                 reload_from_cwd(sys.modules[module], reloader)
         self.pool.restart()
 
@@ -312,7 +312,7 @@ class WorkController(configurated):
         self.logger.error("Timer error: %r", einfo[1], exc_info=einfo)
 
     def on_timer_tick(self, delay):
-        self.timer_debug("Scheduler wake-up! Next eta %s secs." % delay)
+        self.timer_debug("Scheduler wake-up! Next eta %s secs.", delay)
 
     @property
     def state(self):

+ 3 - 3
celery/worker/autoreload.py

@@ -224,9 +224,9 @@ class Autoreloader(bgThread):
     def on_change(self, files):
         modified = [f for f in files if self._maybe_modified(f)]
         if modified:
-            self.logger.info("Detected modified modules: %s" % (
-                    map(self._module_name, modified), ))
-            self._reload(map(self._module_name, modified))
+            names = [self._module_name(module) for module in modified]
+            self.logger.info("Detected modified modules: %r", names)
+            self._reload(names)
 
     def _reload(self, modules):
         self.controller.reload(modules, reload=True)