소스 검색

Introducing Boot Steps

Ask Solem 13 년 전
부모
커밋
a134881c19

+ 209 - 0
celery/abstract.py

@@ -0,0 +1,209 @@
+# -*- coding: utf-8 -*-
+"""
+    celery.abstract
+    ~~~~~~~~~~~~~~~
+
+    Implements components and boot-steps.
+
+    :copyright: (c) 2009 - 2011 by Ask Solem.
+    :license: BSD, see LICENSE for more details.
+
+"""
+from __future__ import absolute_import
+
+from collections import defaultdict
+from importlib import import_module
+
+from .datastructures import DependencyGraph
+from .utils import instantiate
+
+
+class Namespace(object):
+    """A namespace containing components.
+
+    Every component must belong to a namespace.
+
+    When component classes are created they are added to the
+    mapping of unclaimed components.  The components will be
+    claimed when the namespace they belong to is created.
+
+    :keyword name:  Set the name of this namespace.
+    :keyword app:  Set the Celery app for this namespace.
+
+    """
+    name = None
+    _unclaimed = defaultdict(dict)
+    _started_count = 0
+
+    def __init__(self, name=None, app=None, logger=None):
+        self.app = app
+        self.name = name or self.name
+        self.logger = logger or self.app.log.get_default_logger()
+        self.services = []
+
+    def modules(self):
+        """Subclasses can override this to return a
+        list of modules to import before components are claimed."""
+        pass
+
+    def load_modules(self):
+        """Will load the component modules this namespace depends on."""
+        for m in self.modules():
+            self.import_module(m)
+
+    def apply(self, parent, **kwargs):
+        """Apply the components in this namespace to an object.
+
+        This will apply the ``__init__`` and ``include`` methods
+        of each components with the object as argument.
+
+        For ``StartStopComponents`` the services created
+        will also be added the the objects ``components`` attribute.
+
+        """
+        self._debug("Loading modules.")
+        self.load_modules()
+        self._debug("Claiming components.")
+        self.components = self._claim()
+        self._debug("Building boot step graph.")
+        self.boot_steps = [self.bind_component(name, parent, **kwargs)
+                            for name in self._finalize_boot_steps()]
+        self._debug("New boot order: %r" % (
+            [c.name for c in self.boot_steps], ))
+
+        for component in self.boot_steps:
+            component.include(parent)
+        return self
+
+    def import_module(self, module):
+        return import_module(module)
+
+    def bind_component(self, name, parent, **kwargs):
+        """Bind component to parent object and this namespace."""
+        comp = self[name](parent, **kwargs)
+        comp.namespace = self
+        return comp
+
+    def __getitem__(self, name):
+        return self.components[name]
+
+    def _find_last(self):
+        for C in self.components.itervalues():
+            if C.last:
+                return C
+
+    def _finalize_boot_steps(self):
+        G = self.graph = DependencyGraph((C.name, C.requires)
+                            for C in self.components.itervalues())
+        last = self._find_last()
+        if last:
+            for obj in G:
+                if obj != last.name:
+                    G.add_edge(last.name, obj)
+        return G.topsort()
+
+    def _claim(self):
+        return self._unclaimed[self.name]
+
+    def _debug(self, msg, *args):
+        return self.logger.debug("[%s] " + msg,
+                                *(self.name.capitalize(), ) + args)
+
+
+class ComponentType(type):
+    """Metaclass for components."""
+
+    def __new__(cls, name, bases, attrs):
+        abstract = attrs.pop("abstract", False)
+        if not abstract:
+            try:
+                cname = attrs["name"]
+            except KeyError:
+                raise NotImplementedError("Components must be named")
+            namespace = attrs.get("namespace", None)
+            if not namespace:
+                attrs["namespace"], _, attrs["name"] = cname.partition('.')
+        cls = super(ComponentType, cls).__new__(cls, name, bases, attrs)
+        if not abstract:
+            Namespace._unclaimed[cls.namespace][cls.name] = cls
+        return cls
+
+
+class Component(object):
+    """A component.
+
+    The :meth:`__init__` method called when the component
+    is bound to a parent object, and can as such be used
+    to initialize attributes in the parent object at
+    parent-instantiaton time.
+
+    """
+    __metaclass__ = ComponentType
+
+    #: The name of the component, or the namespace
+    #: and the name of the component separated by dot.
+    name = None
+
+    #: List of component names this component depends on.
+    #: Note that the dependencies must be in the same namespace.
+    requires = ()
+
+    #: can be used to specify the namespace,
+    #: if the name does not include it.
+    namespace = None
+
+    #: if set the component will not be registered,
+    #: but can be used as a component base class.
+    abstract = True
+
+    #: Optional obj created by the :meth:`create` method.
+    #: This is used by StartStopComponents to keep the
+    #: original service object.
+    obj = None
+
+    #: This flag is reserved for the workers Consumer,
+    #: since it is required to always be started last.
+    #: There can only be one object marked with lsat
+    #: in every namespace.
+    last = False
+
+    #: This provides the default for :meth:`include_if`.
+    enabled = True
+
+    def __init__(self, parent, **kwargs):
+        pass
+
+    def create(self, parent):
+        """Create the component."""
+        pass
+
+    def include_if(self, parent):
+        """An optional predicate that decided whether this
+        component should be created."""
+        return self.enabled
+
+    def instantiate(self, qualname, *args, **kwargs):
+        return instantiate(qualname, *args, **kwargs)
+
+    def include(self, parent):
+        if self.include_if(parent):
+            self.obj = self.create(parent)
+            return True
+
+
+class StartStopComponent(Component):
+    abstract = True
+    terminable = False
+
+    def start(self):
+        return self.obj.start()
+
+    def stop(self):
+        return self.obj.stop()
+
+    def terminate(self):
+        return self.obj.terminate()
+
+    def include(self, parent):
+        if super(StartStopComponent, self).include(parent):
+            parent.components.append(self.obj)

+ 1 - 3
celery/app/abstract.py

@@ -10,11 +10,10 @@ class from_config(object):
         return attr if self.key is None else self.key
 
 
-
 class _configurated(type):
 
     def __new__(cls, name, bases, attrs):
-        C = attrs["__confopts__"] = dict((attr, spec.get_key(attr))
+        attrs["__confopts__"] = dict((attr, spec.get_key(attr))
                                           for attr, spec in attrs.iteritems()
                                               if isinstance(spec, from_config))
         inherit_from = attrs.get("inherit_confopts", ())
@@ -28,7 +27,6 @@ class _configurated(type):
 class configurated(object):
     __metaclass__ = _configurated
 
-
     def setup_defaults(self, kwargs, namespace="celery"):
         confopts = self.__confopts__
         app, find = self.app, self.app.conf.find_value_for_key

+ 1 - 0
celery/app/defaults.py

@@ -151,6 +151,7 @@ NAMESPACES = {
     },
     "CELERYD": {
         "AUTOSCALER": Option("celery.worker.autoscale.Autoscaler"),
+        "BOOT_STEPS": Option((), type="tuple"),
         "CONCURRENCY": Option(0, type="int"),
         "ETA_SCHEDULER": Option(None, type="string"),
         "ETA_SCHEDULER_PRECISION": Option(1.0, type="float"),

+ 0 - 3
celery/concurrency/base.py

@@ -3,12 +3,9 @@ from __future__ import absolute_import
 
 import logging
 import os
-import sys
 import time
-import traceback
 
 from .. import log
-from ..datastructures import ExceptionInfo
 from ..utils import timer2
 from ..utils.encoding import safe_repr
 

+ 2 - 0
celery/concurrency/processes/__init__.py

@@ -29,6 +29,7 @@ WORKER_SIGRESET = frozenset(["SIGTERM",
 #: List of signals to ignore when a child process starts.
 WORKER_SIGIGNORE = frozenset(["SIGINT"])
 
+
 def process_initializer(app, hostname):
     """Initializes the process so it can be used to process tasks."""
     app = app_or_default(app)
@@ -43,6 +44,7 @@ def process_initializer(app, hostname):
     app.loader.init_worker_process()
     signals.worker_process_init.send(sender=None)
 
+
 class TaskPool(BasePool):
     """Process Pool for processing tasks in parallel.
 

+ 0 - 1
celery/concurrency/processes/pool.py

@@ -21,7 +21,6 @@ import signal
 import sys
 import threading
 import time
-import traceback
 import Queue
 import warnings
 

+ 174 - 0
celery/datastructures.py

@@ -16,6 +16,7 @@ import sys
 import time
 import traceback
 
+from collections import defaultdict
 from itertools import chain
 from threading import RLock
 
@@ -24,6 +25,179 @@ from kombu.utils.limits import TokenBucket  # noqa
 from .utils.compat import UserDict, OrderedDict
 
 
+class CycleError(Exception):
+    """A cycle was detected in an acyclic graph."""
+
+
+class DependencyGraph(object):
+    """A directed acyclic graph of objects and their dependencies.
+
+    Supports a robust topological sort
+    to detect the order in which they must be handled.
+
+    Takes an optional iterator of ``(obj, dependencies)``
+    tuples to build the graph from.
+
+    .. warning::
+
+        Does not support cycle detection.
+
+    """
+
+    def __init__(self, it=None):
+        self.adjacent = {}
+        if it is not None:
+            self.update(it)
+
+    def add_arc(self, obj):
+        """Add an object to the graph."""
+        self.adjacent[obj] = []
+
+    def add_edge(self, A, B):
+        """Add an edge from object ``A`` to object ``B``
+        (``A`` depends on ``B``)."""
+        self[A].append(B)
+
+    def topsort(self):
+        """Sort the graph topologically.
+
+        :returns: a list of objects in the order
+            in which they must be handled.
+
+        """
+        graph = DependencyGraph()
+        components = self._tarjan72()
+
+        NC = dict((node, component)
+                    for component in components
+                        for node in component)
+        for component in components:
+            graph.add_arc(component)
+        for node in self:
+            node_c = NC[node]
+            for successor in self[node]:
+                successor_c = NC[successor]
+                if node_c != successor_c:
+                    graph.add_edge(node_c, successor_c)
+        return [t[0] for t in graph._khan62()]
+
+    def valency_of(self, obj):
+        """Returns the velency (degree) of a vertex in the graph."""
+        l = [len(self[obj])]
+        for node in self[obj]:
+            l.append(self.valency_of(node))
+        return sum(l)
+
+    def update(self, it):
+        """Update the graph with data from a list
+        of ``(obj, dependencies)`` tuples."""
+        tups = list(it)
+        for obj, _ in tups:
+            self.add_arc(obj)
+        for obj, deps in tups:
+            for dep in deps:
+                self.add_edge(obj, dep)
+
+    def edges(self):
+        """Returns generator that yields for all edges in the graph."""
+        return (obj for obj, adj in self.iteritems() if adj)
+
+    def _khan62(self):
+        """Khans simple topological sort algorithm from '62
+
+        See http://en.wikipedia.org/wiki/Topological_sorting
+
+        """
+        count = defaultdict(lambda: 0)
+        result = []
+
+        for node in self:
+            for successor in self[node]:
+                count[successor] += 1
+        ready = [node for node in self if not count[node]]
+
+        while ready:
+            node = ready.pop()
+            result.append(node)
+
+            for successor in self[node]:
+                count[successor] -= 1
+                if count[successor] == 0:
+                    ready.append(successor)
+        result.reverse()
+        return result
+
+    def _tarjan72(self):
+        """Tarjan's algorithm to find strongly connected components.
+
+        See http://bit.ly/vIMv3h.
+
+        """
+        result, stack, low = [], [], {}
+
+        def visit(node):
+            if node in low:
+                return
+            num = len(low)
+            low[node] = num
+            stack_pos = len(stack)
+            stack.append(node)
+
+            for successor in self[node]:
+                visit(successor)
+                low[node] = min(low[node], low[successor])
+
+            if num == low[node]:
+                component = tuple(stack[stack_pos:])
+                stack[stack_pos:] = []
+                result.append(component)
+                for item in component:
+                    low[item] = len(self)
+
+        for node in self:
+            visit(node)
+
+        return result
+
+    def to_dot(self, fh, ws=" " * 4):
+        """Convert the graph to DOT format.
+
+        :param fh: A file, or a file-like object to write the graph to.
+
+        """
+        fh.write("digraph dependencies {\n")
+        for obj, adjacent in self.iteritems():
+            if not adjacent:
+                fh.write(ws + '"%s"\n' % (obj, ))
+            for req in adjacent:
+                fh.write(ws + '"%s" -> "%s"\n' % (obj, req))
+        fh.write("}\n")
+
+    def __iter__(self):
+        return self.adjacent.iterkeys()
+
+    def __getitem__(self, node):
+        return self.adjacent[node]
+
+    def __len__(self):
+        return len(self.adjacent)
+
+    def _iterate_items(self):
+        return self.adjacent.iteritems()
+    items = iteritems = _iterate_items
+
+    def __repr__(self):
+        return '\n'.join(self.repr_node(N) for N in self)
+
+    def repr_node(self, obj, level=1):
+        output = ["%s(%s)" % (obj, self.valency_of(obj))]
+        for other in self[obj]:
+            d = "%s(%s)" % (other, self.valency_of(other))
+            output.append('     ' * level + d)
+            output.extend(self.repr_node(other, level + 1).split('\n')[1:])
+        return '\n'.join(output)
+
+
 class AttributeDictMixin(object):
     """Adds attribute access to mappings.
 

+ 2 - 1
celery/tests/test_concurrency/test_concurrency_processes.py

@@ -33,7 +33,7 @@ except ImportError:
             def apply_async(self, *args, **kwargs):
                 pass
     mp = _mp()  # noqa
-    safe_apply_callback = None
+    safe_apply_callback = None  # noqa
 
 from celery.datastructures import ExceptionInfo
 from celery.utils import noop
@@ -199,6 +199,7 @@ class test_TaskPool(unittest.TestCase):
 
     def test_restart(self):
         raise SkipTest("functional test")
+
         def get_pids(pool):
             return set([p.pid for p in pool._pool._pool])
 

+ 102 - 98
celery/worker/__init__.py

@@ -20,14 +20,14 @@ import traceback
 
 from kombu.utils.finalize import Finalize
 
-from .. import beat
+from .. import abstract
 from .. import concurrency as _concurrency
-from .. import registry, signals
+from .. import registry
 from ..app import app_or_default
 from ..app.abstract import configurated, from_config
 from ..exceptions import SystemTerminate
 from ..log import SilenceRepeated
-from ..utils import noop, instantiate
+from ..utils import noop, qualname
 
 from . import state
 from .buckets import TaskBucket, FastQueue
@@ -37,6 +37,97 @@ CLOSE = 0x2
 TERMINATE = 0x3
 
 
+class Namespace(abstract.Namespace):
+    name = "worker"
+    builtin_boot_steps = ("celery.worker.autoscale",
+                          "celery.worker.consumer",
+                          "celery.worker.mediator")
+
+    def modules(self):
+        return (self.builtin_boot_steps
+              + self.app.conf.CELERYD_BOOT_STEPS)
+
+
+class Pool(abstract.StartStopComponent):
+    name = "worker.pool"
+    requires = ("queues", )
+
+    def __init__(self, w, autoscale=None, **kwargs):
+        w.autoscale = autoscale
+        w.pool = None
+        w.max_concurrency = None
+        w.min_concurrency = w.concurrency
+        if w.autoscale:
+            w.max_concurrency, w.min_concurrency = w.autoscale
+
+    def create(self, w):
+        pool = w.pool = self.instantiate(w.pool_cls, w.min_concurrency,
+                                logger=w.logger,
+                                initargs=(w.app, w.hostname),
+                                maxtasksperchild=w.max_tasks_per_child,
+                                timeout=w.task_time_limit,
+                                soft_timeout=w.task_soft_time_limit,
+                                putlocks=w.pool_putlocks)
+        return pool
+
+
+class Beat(abstract.StartStopComponent):
+    name = "worker.beat"
+
+    def __init__(self, w, embed_clockservice=False, **kwargs):
+        self.enabled = w.embed_clockservice = embed_clockservice
+        w.beat = None
+
+    def create(self, w):
+        from celery.beat import EmbeddedService
+        b = w.beat = EmbeddedService(app=w.app,
+                                     logger=w.logger,
+                                     schedule_filename=w.schedule_filename,
+                                     scheduler_cls=w.scheduler_cls)
+        return b
+
+
+class Queues(abstract.Component):
+    name = "worker.queues"
+
+    def create(self, w):
+        if not w.pool_cls.rlimit_safe:
+            w.disable_rate_limits = True
+        if w.disable_rate_limits:
+            w.ready_queue = FastQueue()
+            w.ready_queue.put = w.process_task
+        else:
+            w.ready_queue = TaskBucket(task_registry=registry.tasks)
+
+
+class Timers(abstract.Component):
+    name = "worker.timers"
+    requires = ("pool", )
+
+    def create(self, w):
+        w.priority_timer = self.instantiate(w.pool.Timer)
+        if not w.eta_scheduler_cls:
+            # Default Timer is set by the pool, as e.g. eventlet
+            # needs a custom implementation.
+            w.eta_scheduler_cls = w.pool.Timer
+            w.scheduler = self.instantiate(w.eta_scheduler_cls,
+                                    precision=w.eta_scheduler_precision,
+                                    on_error=w.on_timer_error,
+                                    on_tick=w.on_timer_tick)
+
+
+class StateDB(abstract.Component):
+    name = "worker.state-db"
+
+    def __init__(self, w, **kwargs):
+        self.enabled = w.state_db
+        w._persistence = None
+
+    def create(self, w):
+        w._persistence = state.Persistent(w.state_db)
+        atexit.register(w._persistence.save)
+
+
 class WorkController(configurated):
     """Unmanaged worker instance."""
     RUN = RUN
@@ -65,114 +156,30 @@ class WorkController(configurated):
 
     _state = None
     _running = 0
-    _persistence = None
 
     def __init__(self, loglevel=None, hostname=None, logger=None,
-            ready_callback=noop, embed_clockservice=False, autoscale=None,
+            ready_callback=noop,
             queues=None, app=None, **kwargs):
         self.app = app_or_default(app)
-        conf = self.app.conf
         self._shutdown_complete = threading.Event()
         self.setup_defaults(kwargs, namespace="celeryd")
         self.app.select_queues(queues)  # select queues subset.
 
         # Options
-        self.pool_cls = _concurrency.get_implementation(self.pool_cls)
-        self.autoscale = autoscale
         self.loglevel = loglevel or self.loglevel
         self.logger = self.app.log.get_default_logger()
         self.hostname = hostname or socket.gethostname()
-        self.embed_clockservice = embed_clockservice
         self.ready_callback = ready_callback
         self.timer_debug = SilenceRepeated(self.logger.debug,
                                            max_iterations=10)
         self._finalize = Finalize(self, self.stop, exitpriority=1)
         self._finalize_db = None
 
-        if self.state_db:
-            self._persistence = state.Persistent(self.state_db)
-            atexit.register(self._persistence.save)
-
-        # Queues
-        if not self.pool_cls.rlimit_safe:
-            self.disable_rate_limits = True
-        if self.disable_rate_limits:
-            self.ready_queue = FastQueue()
-            self.ready_queue.put = self.process_task
-        else:
-            self.ready_queue = TaskBucket(task_registry=registry.tasks)
-
-        self.logger.debug("Instantiating thread components...")
-
-        # Threads + Pool + Consumer
-        self.autoscaler = None
-        max_concurrency = None
-        min_concurrency = self.concurrency
-        if autoscale:
-            max_concurrency, min_concurrency = autoscale
-
-        self.pool = instantiate(self.pool_cls, min_concurrency,
-                                logger=self.logger,
-                                initargs=(self.app, self.hostname),
-                                maxtasksperchild=self.max_tasks_per_child,
-                                timeout=self.task_time_limit,
-                                soft_timeout=self.task_soft_time_limit,
-                                putlocks=self.pool_putlocks)
-        self.priority_timer = instantiate(self.pool.Timer)
-
-        if not self.eta_scheduler_cls:
-            # Default Timer is set by the pool, as e.g. eventlet
-            # needs a custom implementation.
-            self.eta_scheduler_cls = self.pool.Timer
-
-        self.autoscaler = None
-        if autoscale:
-            self.autoscaler = instantiate(self.autoscaler_cls, self.pool,
-                                          max_concurrency=max_concurrency,
-                                          min_concurrency=min_concurrency,
-                                          logger=self.logger)
-
-        self.mediator = None
-        if not self.disable_rate_limits:
-            self.mediator = instantiate(self.mediator_cls, self.ready_queue,
-                                        app=self.app,
-                                        callback=self.process_task,
-                                        logger=self.logger)
-
-        self.scheduler = instantiate(self.eta_scheduler_cls,
-                                precision=self.eta_scheduler_precision,
-                                on_error=self.on_timer_error,
-                                on_tick=self.on_timer_tick)
-
-        self.beat = None
-        if self.embed_clockservice:
-            self.beat = beat.EmbeddedService(app=self.app,
-                                logger=self.logger,
-                                schedule_filename=self.schedule_filename,
-                                scheduler_cls=self.scheduler_cls)
-
-        prefetch_count = self.concurrency * self.prefetch_multiplier
-        self.consumer = instantiate(self.consumer_cls,
-                                    self.ready_queue,
-                                    self.scheduler,
-                                    logger=self.logger,
-                                    hostname=self.hostname,
-                                    send_events=self.send_events,
-                                    init_callback=self.ready_callback,
-                                    initial_prefetch_count=prefetch_count,
-                                    pool=self.pool,
-                                    priority_timer=self.priority_timer,
-                                    app=self.app,
-                                    controller=self)
-
-        # The order is important here;
-        #   the first in the list is the first to start,
-        # and they must be stopped in reverse order.
-        self.components = filter(None, (self.pool,
-                                        self.mediator,
-                                        self.beat,
-                                        self.autoscaler,
-                                        self.consumer))
+        # Initialize boot steps
+        self.pool_cls = _concurrency.get_implementation(self.pool_cls)
+        self.components = []
+        self.namespace = Namespace(app=self.app,
+                                   logger=self.logger).apply(self, **kwargs)
 
     def start(self):
         """Starts the workers main loop."""
@@ -180,8 +187,7 @@ class WorkController(configurated):
 
         try:
             for i, component in enumerate(self.components):
-                self.logger.debug("Starting thread %s...",
-                                  component.__class__.__name__)
+                self.logger.debug("Starting %s...", qualname(component))
                 self._running = i + 1
                 component.start()
         except SystemTerminate:
@@ -236,11 +242,9 @@ class WorkController(configurated):
             return
 
         self._state = self.CLOSE
-        signals.worker_shutdown.send(sender=self)
 
         for component in reversed(self.components):
-            self.logger.debug("%s thread %s...", what,
-                              component.__class__.__name__)
+            self.logger.debug("%s %s...", what, qualname(component))
             stop = component.stop
             if not warm:
                 stop = getattr(component, "terminate", None) or stop

+ 17 - 0
celery/worker/autoscale.py

@@ -24,9 +24,26 @@ import traceback
 from time import sleep, time
 
 from . import state
+from ..abstract import StartStopComponent
 from ..utils.threads import bgThread
 
 
+class WorkerComponent(StartStopComponent):
+    name = "worker.autoscaler"
+    requires = ("pool", )
+
+    def __init__(self, w, **kwargs):
+        self.enabled = w.autoscale
+        w.autoscaler = None
+
+    def create(self, w):
+        scaler = w.autoscaler = self.instantiate(w.autoscaler_cls, w.pool,
+                                    max_concurrency=w.max_concurrency,
+                                    min_concurrency=w.min_concurrency,
+                                    logger=w.logger)
+        return scaler
+
+
 class Autoscaler(bgThread):
 
     def __init__(self, pool, max_concurrency, min_concurrency=0,

+ 20 - 0
celery/worker/consumer.py

@@ -83,6 +83,7 @@ import threading
 import traceback
 import warnings
 
+from ..abstract import StartStopComponent
 from ..app import app_or_default
 from ..datastructures import AttributeDict
 from ..exceptions import InvalidTaskError
@@ -130,6 +131,25 @@ body: %s {content_type:%s content_encoding:%s delivery_info:%s}\
 """
 
 
+class Component(StartStopComponent):
+    name = "worker.consumer"
+    last = True
+
+    def create(self, w):
+        prefetch_count = w.concurrency * w.prefetch_multiplier
+        c = w.consumer = self.instantiate(
+                w.consumer_cls, w.ready_queue, w.scheduler,
+                logger=w.logger, hostname=w.hostname,
+                send_events=w.send_events,
+                init_callback=w.ready_callback,
+                initial_prefetch_count=prefetch_count,
+                pool=w.pool,
+                priority_timer=w.priority_timer,
+                app=w.app,
+                controller=w)
+        return c
+
+
 class QoS(object):
     """Quality of Service for Channel.
 

+ 2 - 2
celery/worker/job.py

@@ -85,7 +85,6 @@ class Request(object):
     #: Format string used to log task retry.
     retry_msg = """Task %(name)s[%(id)s] retry: %(exc)s"""
 
-
     def __init__(self, body, on_ack=noop,
             hostname=None, logger=None, eventer=None, app=None,
             connection_errors=None, request_dict=None,
@@ -95,6 +94,8 @@ class Request(object):
         self.id = body["id"]
         self.args = body.get("args", [])
         self.kwargs = body.get("kwargs", {})
+        if NEEDS_KWDICT:
+            self.kwargs = kwdict(self.kwargs)
         eta = body.get("eta")
         expires = body.get("expires")
         utc = body.get("utc", False)
@@ -447,4 +448,3 @@ class TaskRequest(Request):
             "task": name, "id": id, "args": args,
             "kwargs": kwargs, "eta": eta,
             "expires": expires}, **options)
-

+ 16 - 0
celery/worker/mediator.py

@@ -24,10 +24,26 @@ import traceback
 
 from Queue import Empty
 
+from ..abstract import StartStopComponent
 from ..app import app_or_default
 from ..utils.threads import bgThread
 
 
+class WorkerComponent(StartStopComponent):
+    name = "worker.mediator"
+    requires = ("pool", "queues", )
+
+    def __init__(self, w, **kwargs):
+        w.mediator = None
+        self.enabled = not w.disable_rate_limits
+
+    def create(self, w):
+        m = w.mediator = self.instantiate(w.mediator_cls, w.ready_queue,
+                                          app=w.app, callback=w.process_task,
+                                          logger=w.logger)
+        return m
+
+
 class Mediator(bgThread):
 
     #: The task queue, a :class:`~Queue.Queue` instance.

+ 0 - 2
celery/worker/strategy.py

@@ -2,8 +2,6 @@ from __future__ import absolute_import
 
 from .job import Request
 
-from celery.execute.trace import trace_task
-
 
 def default(task, app, consumer):
     logger = consumer.logger

+ 1 - 1
docs/.templates/page.html

@@ -7,7 +7,7 @@
         This document is for Celery's development version, which can be
         significantly different from previous releases. Get old docs here:
 
-        <a href="http://docs.celeryproject.org/en/latest/{{ pagename }}{{ file_suffix }}">2.1</a>.
+        <a href="http://docs.celeryproject.org/en/latest/{{ pagename }}{{ file_suffix }}">2.4</a>.
         </p>
         {% else %}
         <p>