Bryan Berg 14 years ago
parent
commit
76c499d369

+ 2 - 0
AUTHORS

@@ -45,3 +45,5 @@ Ordered by date of first contribution:
   Gert Van Gool <gertvangool@gmail.com>
   Gert Van Gool <gertvangool@gmail.com>
   sdcooke
   sdcooke
   David Cramer <dcramer@gmail.com>
   David Cramer <dcramer@gmail.com>
+  Bryan Berg <bryan@mixedmedialabs.com>
+  Piotr Sikora <piotr.sikora@frickle.com>

+ 160 - 1
Changelog

@@ -11,7 +11,166 @@
 =====
 =====
 :release-date: TBA
 :release-date: TBA
 :status: in-progress
 :status: in-progress
-:branch: app
+:branch: master
+
+
+.. _version-2.1.2:
+
+2.1.2
+=====
+:release-data: TBA
+
+.. _v212-fixes:
+
+Fixes
+-----
+
+* celeryd: Now sends the ``task-retried`` event for retried tasks.
+
+* celeryd: Now honors ignore result for
+  :exc:`~celery.exceptions.WorkerLostError` and timeout errors.
+
+* celerybeat: Fixed :exc:`UnboundLocalError` in celerybeat logging
+  when using logging setup signals.
+
+* celeryd: All log messages now includes ``exc_info``.
+
+.. _version-2.1.1:
+
+2.1.1
+=====
+:release-date: 2010-10-14 14:00 PM CEST
+
+.. _v211-fixes:
+
+Fixes
+-----
+
+* Now working on Windows again.
+
+   Removed dependency on the pwd/grp modules.
+
+* snapshots: Fixed race condition leading to loss of events.
+
+* celeryd: Reject tasks with an eta that cannot be converted to timestamp.
+
+    See issue #209
+
+* concurrency.processes.pool: The semaphore was released twice for each task
+  (both at ACK and result ready).
+
+    This has been fixed, and it is now released only once per task.
+
+* docs/configuration: Fixed typo ``CELERYD_SOFT_TASK_TIME_LIMIT`` ->
+  :setting:`CELERYD_TASK_SOFT_TIME_LIMIT`.
+
+    See issue #214
+
+* control command ``dump_scheduled``: was using old .info attribute
+
+* :program:`celeryd-multi`: Fixed ``set changed size during iteration`` bug
+    occuring in the restart command.
+
+* celeryd: Accidentally tried to use additional command line arguments.
+
+   This would lead to an error like:
+
+    ``got multiple values for keyword argument 'concurrency'``.
+
+    Additional command line arguments are now ignored, and does not
+    produce this error.  However -- we do reserve the right to use
+    positional arguments in the future, so please do not depend on this
+    behavior.
+
+* celerybeat: Now respects routers and task execution options again.
+
+* celerybeat: Now reuses the publisher instead of the connection.
+
+* Cache result backend: Using :class:`float` as the expires argument
+  to ``cache.set`` is deprecated by the memcached libraries,
+  so we now automatically cast to :class:`int`.
+
+* unittests: No longer emits logging and warnings in test output.
+
+.. _v211-news:
+
+News
+----
+
+* Now depends on carrot version 0.10.7.
+
+* Added :setting:`CELERY_REDIRECT_STDOUTS`, and
+  :setting:`CELERYD_REDIRECT_STDOUTS_LEVEL` settings.
+
+    :setting:`CELERY_REDIRECT_STDOUTS` is used by :program:`celeryd` and
+    :program:`celerybeat`.  All output to ``stdout`` and ``stderr`` will be
+    redirected to the current logger if enabled.
+
+    :setting:`CELERY_REDIRECT_STDOUTS_LEVEL` decides the loglevel used and is
+    :const:`WARNING` by default.
+
+* Added :setting:`CELERYBEAT_SCHEDULER` setting.
+
+    This setting is used to define the default for the -S option to
+    :program:`celerybeat`.
+
+    Example:
+
+    .. code-block:: python
+
+        CELERYBEAT_SCHEDULER = "djcelery.schedulers.DatabaseScheduler"
+
+* Added Task.expires: Used to set default expiry time for tasks.
+
+* New remote control commands: ``add_consumer`` and ``cancel_consumer``.
+
+    .. method:: add_consumer(queue, exchange, exchange_type, routing_key,
+                             **options)
+        :module:
+
+        Tells the worker to declare and consume from the specified
+        declaration.
+
+    .. method:: cancel_consumer(queue_name)
+        :module:
+
+        Tells the worker to stop consuming from queue (by queue name).
+
+
+    Commands also added to :program:`celeryctl` and
+    :class:`~celery.task.control.inspect`.
+
+
+    Example using celeryctl to start consuming from queue "queue", in 
+    exchange "exchange", of type "direct" using binding key "key"::
+
+        $ celeryctl inspect add_consumer queue exchange direct key
+        $ celeryctl inspect cancel_consumer queue
+
+    See :ref:`monitoring-celeryctl` for more information about the
+    :program:`celeryctl` program.
+
+
+    Another example using :class:`~celery.task.control.inspect`:
+
+    .. code-block:: python
+
+        >>> from celery.task.control import inspect
+        >>> inspect.add_consumer(queue="queue", exchange="exchange",
+        ...                      exchange_type="direct",
+        ...                      routing_key="key",
+        ...                      durable=False,
+        ...                      auto_delete=True)
+
+        >>> inspect.cancel_consumer("queue")
+
+* celerybeat: Now logs the traceback if a message can't be sent.
+
+* celerybeat: Now enables a default socket timeout of 30 seconds.
+
+* README/introduction/homepage: Added link to `Flask-Celery`_.
+
+.. _`Flask-Celery`: http://github.com/ask/flask-celery
 
 
 .. _version-2.1.0:
 .. _version-2.1.0:
 
 

+ 1 - 4
celery/app/amqp.py

@@ -68,14 +68,11 @@ class Queues(UserDict):
     @classmethod
     @classmethod
     def with_defaults(cls, queues, default_exchange, default_exchange_type):
     def with_defaults(cls, queues, default_exchange, default_exchange_type):
 
 
-        def _defaults(opts):
+        for opts in queues.values():
             opts.setdefault("exchange", default_exchange),
             opts.setdefault("exchange", default_exchange),
             opts.setdefault("exchange_type", default_exchange_type)
             opts.setdefault("exchange_type", default_exchange_type)
             opts.setdefault("binding_key", default_exchange)
             opts.setdefault("binding_key", default_exchange)
             opts.setdefault("routing_key", opts.get("binding_key"))
             opts.setdefault("routing_key", opts.get("binding_key"))
-            return opts
-
-        map(_defaults, queues.values())
         return cls(queues)
         return cls(queues)
 
 
 
 

+ 1 - 0
celery/app/defaults.py

@@ -96,6 +96,7 @@ NAMESPACES = {
         "REDIRECT_STDOUTS_LEVEL": Option("WARNING"),
         "REDIRECT_STDOUTS_LEVEL": Option("WARNING"),
     },
     },
     "CELERYD": {
     "CELERYD": {
+        "AUTOSCALER": Option("celery.worker.controllers.Autoscaler"),
         "CONCURRENCY": Option(0, type="int"),
         "CONCURRENCY": Option(0, type="int"),
         "ETA_SCHEDULER": Option("celery.utils.timer2.Timer"),
         "ETA_SCHEDULER": Option("celery.utils.timer2.Timer"),
         "ETA_SCHEDULER_PRECISION": Option(1.0, type="float"),
         "ETA_SCHEDULER_PRECISION": Option(1.0, type="float"),

+ 3 - 4
celery/apps/beat.py

@@ -58,10 +58,9 @@ class Beat(object):
         handled = self.app.log.setup_logging_subsystem(loglevel=self.loglevel,
         handled = self.app.log.setup_logging_subsystem(loglevel=self.loglevel,
                                                        logfile=self.logfile)
                                                        logfile=self.logfile)
         logger = self.app.log.get_default_logger(name="celery.beat")
         logger = self.app.log.get_default_logger(name="celery.beat")
-        if not handled:
-            if self.redirect_stdouts:
-                self.app.log.redirect_stdouts_to_logger(logger,
-                        loglevel=self.redirect_stdouts_level)
+        if self.redirect_stdouts and not handled:
+            self.app.log.redirect_stdouts_to_logger(logger,
+                    loglevel=self.redirect_stdouts_level)
         return logger
         return logger
 
 
     def start_scheduler(self, logger=None):
     def start_scheduler(self, logger=None):

+ 12 - 3
celery/apps/worker.py

@@ -6,6 +6,8 @@ import socket
 import sys
 import sys
 import warnings
 import warnings
 
 
+from carrot.utils import partition
+
 from celery import __version__
 from celery import __version__
 from celery import platforms
 from celery import platforms
 from celery import signals
 from celery import signals
@@ -40,7 +42,8 @@ class Worker(object):
             schedule=None, task_time_limit=None, task_soft_time_limit=None,
             schedule=None, task_time_limit=None, task_soft_time_limit=None,
             max_tasks_per_child=None, queues=None, events=False, db=None,
             max_tasks_per_child=None, queues=None, events=False, db=None,
             include=None, app=None, pidfile=None,
             include=None, app=None, pidfile=None,
-            redirect_stdouts=None, redirect_stdouts_level=None, **kwargs):
+            redirect_stdouts=None, redirect_stdouts_level=None,
+            autoscale=None, **kwargs):
         self.app = app = app_or_default(app)
         self.app = app = app_or_default(app)
         self.concurrency = (concurrency or
         self.concurrency = (concurrency or
                             app.conf.CELERYD_CONCURRENCY or
                             app.conf.CELERYD_CONCURRENCY or
@@ -67,6 +70,10 @@ class Worker(object):
         self.queues = None
         self.queues = None
         self.include = include or []
         self.include = include or []
         self.pidfile = pidfile
         self.pidfile = pidfile
+        self.autoscale = None
+        if autoscale:
+            max_c, _, min_c = partition(autoscale, ",")
+            self.autoscale = [int(max_c), min_c and int(min_c) or 0]
         self._isatty = sys.stdout.isatty()
         self._isatty = sys.stdout.isatty()
         self.colored = term.colored(enabled=app.conf.CELERYD_LOG_COLOR)
         self.colored = term.colored(enabled=app.conf.CELERYD_LOG_COLOR)
 
 
@@ -128,7 +135,8 @@ class Worker(object):
     def init_loader(self):
     def init_loader(self):
         self.loader = self.app.loader
         self.loader = self.app.loader
         self.settings = self.app.conf
         self.settings = self.app.conf
-        map(self.loader.import_module, self.include)
+        for module in self.include:
+            self.loader.import_module(module)
 
 
     def redirect_stdouts_to_logger(self):
     def redirect_stdouts_to_logger(self):
         handled = self.app.log.setup_logging_subsystem(loglevel=self.loglevel,
         handled = self.app.log.setup_logging_subsystem(loglevel=self.loglevel,
@@ -193,7 +201,8 @@ class Worker(object):
                                 queues=self.queues,
                                 queues=self.queues,
                                 max_tasks_per_child=self.max_tasks_per_child,
                                 max_tasks_per_child=self.max_tasks_per_child,
                                 task_time_limit=self.task_time_limit,
                                 task_time_limit=self.task_time_limit,
-                                task_soft_time_limit=self.task_soft_time_limit)
+                                task_soft_time_limit=self.task_soft_time_limit,
+                                autoscale=self.autoscale)
         self.install_platform_tweaks(worker)
         self.install_platform_tweaks(worker)
         worker.start()
         worker.start()
 
 

+ 6 - 0
celery/backends/amqp.py

@@ -123,6 +123,9 @@ class AMQPBackend(BaseDictBackend):
         return result
         return result
 
 
     def get_task_meta(self, task_id, cache=True):
     def get_task_meta(self, task_id, cache=True):
+        if cache and task_id in self._cache:
+            return self._cache[task_id]
+
         return self.poll(task_id)
         return self.poll(task_id)
 
 
     def wait_for(self, task_id, timeout=None, cache=True):
     def wait_for(self, task_id, timeout=None, cache=True):
@@ -148,6 +151,9 @@ class AMQPBackend(BaseDictBackend):
         result = consumer.fetch()
         result = consumer.fetch()
         try:
         try:
             if result:
             if result:
+                consumer.backend.queue_delete(queue=consumer.queue,
+                                              if_unused=True,
+                                              if_empty=True)
                 payload = self._cache[task_id] = result.payload
                 payload = self._cache[task_id] = result.payload
                 return payload
                 return payload
             else:
             else:

+ 2 - 1
celery/beat.py

@@ -5,6 +5,7 @@ Periodic Task Scheduler
 """
 """
 import time
 import time
 import shelve
 import shelve
+import sys
 import threading
 import threading
 import traceback
 import traceback
 import multiprocessing
 import multiprocessing
@@ -155,7 +156,7 @@ class Scheduler(UserDict):
                 result = self.apply_async(entry, publisher=publisher)
                 result = self.apply_async(entry, publisher=publisher)
             except Exception, exc:
             except Exception, exc:
                 self.logger.error("Message Error: %s\n%s" % (exc,
                 self.logger.error("Message Error: %s\n%s" % (exc,
-                    traceback.format_stack()))
+                    traceback.format_stack()), exc_info=sys.exc_info())
             else:
             else:
                 self.logger.debug("%s sent. id->%s" % (entry.task,
                 self.logger.debug("%s sent. id->%s" % (entry.task,
                                                        result.task_id))
                                                        result.task_id))

+ 5 - 0
celery/bin/celeryd.py

@@ -150,6 +150,11 @@ class WorkerCommand(Command):
                 help="Optional file used to store the workers pid. "
                 help="Optional file used to store the workers pid. "
                      "The worker will not start if this file already exists "
                      "The worker will not start if this file already exists "
                      "and the pid is still alive."),
                      "and the pid is still alive."),
+            Option('--autoscale', default=None,
+                help="Enable autoscaling by providing "
+                     "max_concurrency,min_concurrency. Example: "
+                     "--autoscale=10,3 (always keep 3 processes, "
+                     "but grow to 10 if necessary)."),
         )
         )
 
 
 
 

+ 1 - 1
celery/bin/celeryev.py

@@ -37,7 +37,7 @@ class EvCommand(Command):
     def set_process_status(self, prog, info=""):
     def set_process_status(self, prog, info=""):
         prog = "%s:%s" % (self.prog_name, prog)
         prog = "%s:%s" % (self.prog_name, prog)
         info = "%s %s" % (info, platforms.strargv(sys.argv))
         info = "%s %s" % (info, platforms.strargv(sys.argv))
-        return platform.set_process_title(prog, info=info)
+        return platforms.set_process_title(prog, info=info)
 
 
     def get_options(self):
     def get_options(self):
         return (
         return (

+ 6 - 0
celery/concurrency/processes/__init__.py

@@ -102,6 +102,12 @@ class TaskPool(object):
                                       error_callback=on_worker_error,
                                       error_callback=on_worker_error,
                                       waitforslot=self.putlocks)
                                       waitforslot=self.putlocks)
 
 
+    def grow(self, n=1):
+        return self._pool.grow(n)
+
+    def shrink(self, n=1):
+        return self._pool.shrink(n)
+
     def on_worker_error(self, errbacks, exc):
     def on_worker_error(self, errbacks, exc):
         einfo = ExceptionInfo((exc.__class__, exc, None))
         einfo = ExceptionInfo((exc.__class__, exc, None))
         [errback(einfo) for errback in errbacks]
         [errback(einfo) for errback in errbacks]

+ 42 - 4
celery/concurrency/processes/pool.py

@@ -81,8 +81,8 @@ def soft_timeout_sighandler(signum, frame):
 
 
 
 
 def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None):
 def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None):
-    assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0)
     pid = os.getpid()
     pid = os.getpid()
+    assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0)
     put = outqueue.put
     put = outqueue.put
     get = inqueue.get
     get = inqueue.get
 
 
@@ -108,6 +108,7 @@ def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None):
     if SIG_SOFT_TIMEOUT is not None:
     if SIG_SOFT_TIMEOUT is not None:
         signal.signal(SIG_SOFT_TIMEOUT, soft_timeout_sighandler)
         signal.signal(SIG_SOFT_TIMEOUT, soft_timeout_sighandler)
 
 
+
     completed = 0
     completed = 0
     while maxtasks is None or (maxtasks and completed < maxtasks):
     while maxtasks is None or (maxtasks and completed < maxtasks):
         try:
         try:
@@ -137,7 +138,6 @@ def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None):
         completed += 1
         completed += 1
     debug('worker exiting after %d tasks' % completed)
     debug('worker exiting after %d tasks' % completed)
 
 
-
 #
 #
 # Class representing a process pool
 # Class representing a process pool
 #
 #
@@ -527,6 +527,44 @@ class Pool(object):
             return True
             return True
         return False
         return False
 
 
+    def shrink(self, n=1):
+        for i, worker in enumerate(self._iterinactive()):
+            self._processes -= 1
+            if self._putlock:
+                self._putlock._initial_value -= 1
+                self._putlock.acquire()
+            worker.terminate()
+            if i == n - 1:
+                return
+        raise ValueError("Can't shrink pool. All processes busy!")
+
+    def grow(self, n=1):
+        for i in xrange(n):
+            #assert len(self._pool) == self._processes
+            self._processes += 1
+            if self._putlock:
+                cond = self._putlock._Semaphore__cond
+                cond.acquire()
+                try:
+                    self._putlock._initial_value += 1
+                    self._putlock._Semaphore__value += 1
+                    cond.notify()
+                finally:
+                    cond.release()
+
+    def _iterinactive(self):
+        for worker in self._pool:
+            if not self._worker_active(worker):
+                yield worker
+        raise
+
+    def _worker_active(self, worker):
+        jobs = []
+        for job in self._cache.values():
+            if worker.pid in job.worker_pids():
+                return True
+        return False
+
     def _repopulate_pool(self):
     def _repopulate_pool(self):
         """Bring the number of pool processes up to the specified number,
         """Bring the number of pool processes up to the specified number,
         for use after reaping workers which have exited.
         for use after reaping workers which have exited.
@@ -541,8 +579,8 @@ class Pool(object):
     def _maintain_pool(self):
     def _maintain_pool(self):
         """"Clean up any exited workers and start replacements for them.
         """"Clean up any exited workers and start replacements for them.
         """
         """
-        if self._join_exited_workers():
-            self._repopulate_pool()
+        self._join_exited_workers()
+        self._repopulate_pool()
 
 
     def _setup_queues(self):
     def _setup_queues(self):
         from multiprocessing.queues import SimpleQueue
         from multiprocessing.queues import SimpleQueue

+ 2 - 2
celery/datastructures.py

@@ -98,8 +98,8 @@ class PositionQueue(UserList):
     @property
     @property
     def filled(self):
     def filled(self):
         """Returns the filled slots as a list."""
         """Returns the filled slots as a list."""
-        return filter(lambda v: not isinstance(v, self.UnfilledPosition),
-                      self.data)
+        return [slot for slot in self.data
+                    if not isinstance(slot, self.UnfilledPosition)]
 
 
 
 
 class ExceptionInfo(object):
 class ExceptionInfo(object):

+ 5 - 4
celery/events/state.py

@@ -59,8 +59,8 @@ class Task(Element):
                     "result", "eta", "runtime", "expires",
                     "result", "eta", "runtime", "expires",
                     "exception")
                     "exception")
 
 
-    _merge_rules = {states.RECEIVED: ("name", "args", "kwargs",
-                                      "retries", "eta", "expires")}
+    merge_rules = {states.RECEIVED: ("name", "args", "kwargs",
+                                     "retries", "eta", "expires")}
 
 
     _defaults = dict(uuid=None,
     _defaults = dict(uuid=None,
                      name=None,
                      name=None,
@@ -97,7 +97,8 @@ class Task(Element):
     def update(self, state, timestamp, fields):
     def update(self, state, timestamp, fields):
         if self.worker:
         if self.worker:
             self.worker.on_heartbeat(timestamp=timestamp)
             self.worker.on_heartbeat(timestamp=timestamp)
-        if states.state(state) < states.state(self.state):
+        if state != states.RETRY and self.state != states.RETRY and \
+                states.state(state) < states.state(self.state):
             self.merge(state, timestamp, fields)
             self.merge(state, timestamp, fields)
         else:
         else:
             self.state = state
             self.state = state
@@ -105,7 +106,7 @@ class Task(Element):
             super(Task, self).update(fields)
             super(Task, self).update(fields)
 
 
     def merge(self, state, timestamp, fields):
     def merge(self, state, timestamp, fields):
-        keep = self._merge_rules.get(state)
+        keep = self.merge_rules.get(state)
         if keep is not None:
         if keep is not None:
             fields = dict((key, fields[key]) for key in keep)
             fields = dict((key, fields[key]) for key in keep)
             super(Task, self).update(fields)
             super(Task, self).update(fields)

+ 16 - 4
celery/log.py

@@ -3,6 +3,7 @@ import logging
 import threading
 import threading
 import sys
 import sys
 import traceback
 import traceback
+import types
 
 
 from multiprocessing import current_process
 from multiprocessing import current_process
 from multiprocessing import util as mputil
 from multiprocessing import util as mputil
@@ -31,18 +32,27 @@ class ColorFormatter(logging.Formatter):
         logging.Formatter.__init__(self, msg)
         logging.Formatter.__init__(self, msg)
         self.use_color = use_color
         self.use_color = use_color
 
 
+    def formatException(self, ei):
+        r = logging.Formatter.formatException(self, ei)
+        if type(r) in [types.StringType]:
+            r = r.decode('utf-8', 'replace') # Convert to unicode
+        return r
+
     def format(self, record):
     def format(self, record):
         levelname = record.levelname
         levelname = record.levelname
 
 
         if self.use_color and levelname in COLORS:
         if self.use_color and levelname in COLORS:
-            record.msg = str(colored().names[COLORS[levelname]](record.msg))
+            record.msg = unicode(colored().names[COLORS[levelname]](record.msg))
 
 
         # Very ugly, but have to make sure processName is supported
         # Very ugly, but have to make sure processName is supported
         # by foreign logger instances.
         # by foreign logger instances.
         # (processName is always supported by Python 2.7)
         # (processName is always supported by Python 2.7)
         if "processName" not in record.__dict__:
         if "processName" not in record.__dict__:
             record.__dict__["processName"] = current_process()._name
             record.__dict__["processName"] = current_process()._name
-        return logging.Formatter.format(self, record)
+        t = logging.Formatter.format(self, record)
+        if type(t) in [types.UnicodeType]:
+            t = t.encode('utf-8', 'replace')
+        return t
 
 
 
 
 class Logging(object):
 class Logging(object):
@@ -251,7 +261,8 @@ class LoggingProxy(object):
         This is equivalent to calling :meth:`write` for each string.
         This is equivalent to calling :meth:`write` for each string.
 
 
         """
         """
-        map(self.write, sequence)
+        for part in sequence:
+            self.write(part)
 
 
     def flush(self):
     def flush(self):
         """This object is not buffered so any :meth:`flush` requests
         """This object is not buffered so any :meth:`flush` requests
@@ -281,7 +292,8 @@ class SilenceRepeated(object):
 
 
     def __call__(self, *msgs):
     def __call__(self, *msgs):
         if self._iterations >= self.max_iterations:
         if self._iterations >= self.max_iterations:
-            map(self.action, msgs)
+            for msg in msgs:
+                self.action(msg)
             self._iterations = 0
             self._iterations = 0
         else:
         else:
             self._iterations += 1
             self._iterations += 1

+ 8 - 2
celery/routes.py

@@ -4,6 +4,12 @@ from celery.utils import instantiate, firstmethod, mpromise
 _first_route = firstmethod("route_for_task")
 _first_route = firstmethod("route_for_task")
 
 
 
 
+def merge(a, b):
+    """Like ``dict(a, **b)`` except it will keep values from ``a``,
+    if the value in ``b`` is :const:`None`."""
+    return dict(a, **dict((k, v) for k, v in b.iteritems() if v is not None))
+
+
 class MapRoute(object):
 class MapRoute(object):
     """Makes a router out of a :class:`dict`."""
     """Makes a router out of a :class:`dict`."""
 
 
@@ -37,7 +43,7 @@ class Router(object):
             route = self.lookup_route(task, args, kwargs)
             route = self.lookup_route(task, args, kwargs)
             if route:
             if route:
                 # Also expand "queue" keys in route.
                 # Also expand "queue" keys in route.
-                return dict(options, **self.expand_destination(route))
+                return merge(options, self.expand_destination(route))
         return options
         return options
 
 
     def expand_destination(self, route):
     def expand_destination(self, route):
@@ -60,7 +66,7 @@ class Router(object):
                     raise QueueNotFound(
                     raise QueueNotFound(
                         "Queue '%s' is not defined in CELERY_QUEUES" % queue)
                         "Queue '%s' is not defined in CELERY_QUEUES" % queue)
             dest.setdefault("routing_key", dest.get("binding_key"))
             dest.setdefault("routing_key", dest.get("binding_key"))
-            return dict(dest, **route)
+            return merge(dest, route)
 
 
         return route
         return route
 
 

+ 1 - 0
celery/states.py

@@ -57,6 +57,7 @@ PRECEDENCE = ["SUCCESS",
               "REVOKED",
               "REVOKED",
               "STARTED",
               "STARTED",
               "RECEIVED",
               "RECEIVED",
+              "RETRY",
               "PENDING"]
               "PENDING"]
 
 
 
 

+ 4 - 5
celery/task/base.py

@@ -46,8 +46,6 @@ def _unpickle_task(name):
     return tasks[name]
     return tasks[name]
 
 
 
 
-
-
 class Context(threading.local):
 class Context(threading.local):
 
 
     def update(self, d, **kwargs):
     def update(self, d, **kwargs):
@@ -542,11 +540,12 @@ class BaseTask(object):
             kwargs = request.kwargs
             kwargs = request.kwargs
 
 
         delivery_info = request.delivery_info
         delivery_info = request.delivery_info
-        options.setdefault("exchange", delivery_info.get("exchange"))
-        options.setdefault("routing_key", delivery_info.get("routing_key"))
+        if delivery_info:
+            options.setdefault("exchange", delivery_info.get("exchange"))
+            options.setdefault("routing_key", delivery_info.get("routing_key"))
 
 
         options["retries"] = request.retries + 1
         options["retries"] = request.retries + 1
-        options["task_id"] = kwargs.pop("task_id", None)
+        options["task_id"] = request.id
         options["countdown"] = options.get("countdown",
         options["countdown"] = options.get("countdown",
                                         self.default_retry_delay)
                                         self.default_retry_delay)
         max_exc = exc or self.MaxRetriesExceededError(
         max_exc = exc or self.MaxRetriesExceededError(

+ 2 - 1
celery/tests/test_buckets.py

@@ -232,7 +232,8 @@ class test_TaskBucket(unittest.TestCase):
         ajobs = [cjob(i, TaskA) for i in xrange(10)]
         ajobs = [cjob(i, TaskA) for i in xrange(10)]
         bjobs = [cjob(i, TaskB) for i in xrange(20)]
         bjobs = [cjob(i, TaskB) for i in xrange(20)]
         jobs = list(chain(*izip(bjobs, ajobs)))
         jobs = list(chain(*izip(bjobs, ajobs)))
-        map(b.put, jobs)
+        for job in jobs:
+            b.put(job)
 
 
         got_ajobs = 0
         got_ajobs = 0
         for job in (b.get() for i in xrange(20)):
         for job in (b.get() for i in xrange(20)):

+ 4 - 2
celery/tests/test_datastructures.py

@@ -118,7 +118,8 @@ class test_LimitedSet(unittest.TestCase):
     def test_iter(self):
     def test_iter(self):
         s = LimitedSet(maxlen=2)
         s = LimitedSet(maxlen=2)
         items = "foo", "bar"
         items = "foo", "bar"
-        map(s.add, items)
+        for item in items:
+            s.add(item)
         l = list(iter(s))
         l = list(iter(s))
         for item in items:
         for item in items:
             self.assertIn(item, l)
             self.assertIn(item, l)
@@ -126,7 +127,8 @@ class test_LimitedSet(unittest.TestCase):
     def test_repr(self):
     def test_repr(self):
         s = LimitedSet(maxlen=2)
         s = LimitedSet(maxlen=2)
         items = "foo", "bar"
         items = "foo", "bar"
-        map(s.add, items)
+        for item in items:
+            s.add(item)
         self.assertIn("LimitedSet(", repr(s))
         self.assertIn("LimitedSet(", repr(s))
 
 
 
 

+ 20 - 0
celery/tests/test_routes.py

@@ -76,6 +76,26 @@ class test_lookup_route(unittest.TestCase):
                 router.route({}, "celery.ping",
                 router.route({}, "celery.ping",
                     args=[1, 2], kwargs={}))
                     args=[1, 2], kwargs={}))
 
 
+    @with_queues()
+    def test_expands_queue_in_options(self):
+        R = routes.prepare(())
+        router = routes.Router(R, app_or_default().conf.CELERY_QUEUES,
+                               create_missing=True)
+        # apply_async forwards all arguments, even exchange=None etc,
+        # so need to make sure it's merged correctly.
+        route = router.route({"queue": "testq",
+                              "exchange": None,
+                              "routing_key": None,
+                              "immediate": False},
+                             "celery.ping",
+                             args=[1, 2], kwargs={})
+        self.assertDictContainsSubset({"exchange": "testq",
+                                       "routing_key": "testq",
+                                       "immediate": False},
+                                       route)
+        self.assertNotIn("queue", route)
+
+
     @with_queues(foo=a_queue, bar=b_queue)
     @with_queues(foo=a_queue, bar=b_queue)
     def test_lookup_paths_traversed(self):
     def test_lookup_paths_traversed(self):
         R = routes.prepare(({"celery.xaza": {"queue": "bar"}},
         R = routes.prepare(({"celery.xaza": {"queue": "bar"}},

+ 8 - 9
celery/tests/test_task.py

@@ -4,7 +4,6 @@ from datetime import datetime, timedelta
 
 
 from pyparsing import ParseException
 from pyparsing import ParseException
 
 
-
 from celery import task
 from celery import task
 from celery.app import app_or_default
 from celery.app import app_or_default
 from celery.decorators import task as task_dec
 from celery.decorators import task as task_dec
@@ -147,9 +146,9 @@ class TestTaskRetries(unittest.TestCase):
         self.assertEqual(result.get(), 42)
         self.assertEqual(result.get(), 42)
         self.assertEqual(RetryTaskNoArgs.iterations, 4)
         self.assertEqual(RetryTaskNoArgs.iterations, 4)
 
 
-    def test_retry_kwargs_can_not_be_empty(self):
-        self.assertRaises(TypeError, RetryTaskMockApply.retry,
-                            args=[4, 4], kwargs={})
+    def test_retry_kwargs_can_be_empty(self):
+        self.assertRaises(RetryTaskError, RetryTaskMockApply.retry,
+                            args=[4, 4], kwargs=None)
 
 
     def test_retry_not_eager(self):
     def test_retry_not_eager(self):
         exc = Exception("baz")
         exc = Exception("baz")
@@ -322,20 +321,20 @@ class TestCeleryTasks(unittest.TestCase):
                 name="Elaine M. Benes")
                 name="Elaine M. Benes")
 
 
         # With eta.
         # With eta.
-        presult2 = task.apply_async(t1, kwargs=dict(name="George Costanza"),
-                                    eta=datetime.now() + timedelta(days=1))
+        presult2 = t1.apply_async(kwargs=dict(name="George Costanza"),
+                                  eta=datetime.now() + timedelta(days=1))
         self.assertNextTaskDataEqual(consumer, presult2, t1.name,
         self.assertNextTaskDataEqual(consumer, presult2, t1.name,
                 name="George Costanza", test_eta=True)
                 name="George Costanza", test_eta=True)
 
 
         # With countdown.
         # With countdown.
-        presult2 = task.apply_async(t1, kwargs=dict(name="George Costanza"),
-                                    countdown=10)
+        presult2 = t1.apply_async(kwargs=dict(name="George Costanza"),
+                                  countdown=10)
         self.assertNextTaskDataEqual(consumer, presult2, t1.name,
         self.assertNextTaskDataEqual(consumer, presult2, t1.name,
                 name="George Costanza", test_eta=True)
                 name="George Costanza", test_eta=True)
 
 
         # Discarding all tasks.
         # Discarding all tasks.
         consumer.discard_all()
         consumer.discard_all()
-        task.apply_async(t1)
+        t1.apply_async()
         self.assertEqual(consumer.discard_all(), 1)
         self.assertEqual(consumer.discard_all(), 1)
         self.assertIsNone(consumer.fetch())
         self.assertIsNone(consumer.fetch())
 
 

+ 10 - 5
celery/tests/test_worker_state.py

@@ -62,7 +62,8 @@ class test_Persistent(StateResetCase):
         self.assertTrue(self.p.db.closed)
         self.assertTrue(self.p.db.closed)
 
 
     def add_revoked(self, *ids):
     def add_revoked(self, *ids):
-        map(self.p.db.setdefault("revoked", LimitedSet()).add, ids)
+        for id in ids:
+            self.p.db.setdefault("revoked", LimitedSet()).add(id)
 
 
     def test_merge(self, data=["foo", "bar", "baz"]):
     def test_merge(self, data=["foo", "bar", "baz"]):
         self.add_revoked(*data)
         self.add_revoked(*data)
@@ -73,7 +74,8 @@ class test_Persistent(StateResetCase):
     def test_sync(self, data1=["foo", "bar", "baz"],
     def test_sync(self, data1=["foo", "bar", "baz"],
                         data2=["baz", "ini", "koz"]):
                         data2=["baz", "ini", "koz"]):
         self.add_revoked(*data1)
         self.add_revoked(*data1)
-        map(state.revoked.add, data2)
+        for item in data2:
+            state.revoked.add(item)
         self.p.sync(self.p.db)
         self.p.sync(self.p.db)
 
 
         for item in data2:
         for item in data2:
@@ -92,7 +94,8 @@ class test_state(StateResetCase):
                                       SimpleReq("bar"),
                                       SimpleReq("bar"),
                                       SimpleReq("baz"),
                                       SimpleReq("baz"),
                                       SimpleReq("baz")]):
                                       SimpleReq("baz")]):
-        map(state.task_accepted, requests)
+        for request in requests:
+            state.task_accepted(request)
         for req in requests:
         for req in requests:
             self.assertIn(req, state.active_requests)
             self.assertIn(req, state.active_requests)
         self.assertEqual(state.total_count["foo"], 1)
         self.assertEqual(state.total_count["foo"], 1)
@@ -101,7 +104,9 @@ class test_state(StateResetCase):
 
 
     def test_ready(self, requests=[SimpleReq("foo"),
     def test_ready(self, requests=[SimpleReq("foo"),
                                    SimpleReq("bar")]):
                                    SimpleReq("bar")]):
-        map(state.task_accepted, requests)
+        for request in requests:
+            state.task_accepted(request)
         self.assertEqual(len(state.active_requests), 2)
         self.assertEqual(len(state.active_requests), 2)
-        map(state.task_ready, requests)
+        for request in requests:
+            state.task_ready(request)
         self.assertEqual(len(state.active_requests), 0)
         self.assertEqual(len(state.active_requests), 0)

+ 9 - 0
celery/utils/compat.py

@@ -298,6 +298,8 @@ import sys
 
 
 from logging import LogRecord
 from logging import LogRecord
 
 
+log_takes_extra = "extra" in inspect.getargspec(logging.Logger._log)[0]
+
 # The func argument to LogRecord was added in 2.5
 # The func argument to LogRecord was added in 2.5
 if "func" not in inspect.getargspec(LogRecord.__init__)[0]:
 if "func" not in inspect.getargspec(LogRecord.__init__)[0]:
     def LogRecord(name, level, fn, lno, msg, args, exc_info, func):
     def LogRecord(name, level, fn, lno, msg, args, exc_info, func):
@@ -405,6 +407,12 @@ try:
 except ImportError:
 except ImportError:
     LoggerAdapter = _CompatLoggerAdapter
     LoggerAdapter = _CompatLoggerAdapter
 
 
+
+def log_with_extra(logger, level, msg, *args, **kwargs):
+    if not log_takes_extra:
+        kwargs.pop("extra", None)
+    return logger.log(level, msg, *args, **kwargs)
+
 ############## itertools.izip_longest #######################################
 ############## itertools.izip_longest #######################################
 
 
 try:
 try:
@@ -427,6 +435,7 @@ except ImportError:
         except IndexError:
         except IndexError:
             pass
             pass
 
 
+
 ############## itertools.chain.from_iterable ################################
 ############## itertools.chain.from_iterable ################################
 from itertools import chain
 from itertools import chain
 
 

+ 20 - 4
celery/worker/__init__.py

@@ -37,8 +37,8 @@ def process_initializer(app, hostname):
 
 
     """
     """
     app = app_or_default(app)
     app = app_or_default(app)
-    map(platforms.reset_signal, WORKER_SIGRESET)
-    map(platforms.ignore_signal, WORKER_SIGIGNORE)
+    [platforms.reset_signal(signal) for signal in WORKER_SIGRESET]
+    [platforms.ignore_signal(signal) for signal in WORKER_SIGIGNORE]
     platforms.set_mp_process_title("celeryd", hostname=hostname)
     platforms.set_mp_process_title("celeryd", hostname=hostname)
 
 
     # This is for windows and other platforms not supporting
     # This is for windows and other platforms not supporting
@@ -120,7 +120,8 @@ class WorkController(object):
             task_soft_time_limit=None, max_tasks_per_child=None,
             task_soft_time_limit=None, max_tasks_per_child=None,
             pool_putlocks=None, db=None, prefetch_multiplier=None,
             pool_putlocks=None, db=None, prefetch_multiplier=None,
             eta_scheduler_precision=None, queues=None,
             eta_scheduler_precision=None, queues=None,
-            disable_rate_limits=None, app=None):
+            disable_rate_limits=None, autoscale=None,
+            autoscaler_cls=None, app=None):
 
 
         self.app = app_or_default(app)
         self.app = app_or_default(app)
         conf = self.app.conf
         conf = self.app.conf
@@ -138,6 +139,8 @@ class WorkController(object):
         self.mediator_cls = mediator_cls or conf.CELERYD_MEDIATOR
         self.mediator_cls = mediator_cls or conf.CELERYD_MEDIATOR
         self.eta_scheduler_cls = eta_scheduler_cls or \
         self.eta_scheduler_cls = eta_scheduler_cls or \
                                     conf.CELERYD_ETA_SCHEDULER
                                     conf.CELERYD_ETA_SCHEDULER
+        self.autoscaler_cls = autoscaler_cls or \
+                                    conf.CELERYD_AUTOSCALER
         self.schedule_filename = schedule_filename or \
         self.schedule_filename = schedule_filename or \
                                     conf.CELERYBEAT_SCHEDULE_FILENAME
                                     conf.CELERYBEAT_SCHEDULE_FILENAME
         self.hostname = hostname or socket.gethostname()
         self.hostname = hostname or socket.gethostname()
@@ -178,7 +181,13 @@ class WorkController(object):
         self.logger.debug("Instantiating thread components...")
         self.logger.debug("Instantiating thread components...")
 
 
         # Threads + Pool + Consumer
         # Threads + Pool + Consumer
-        self.pool = instantiate(self.pool_cls, self.concurrency,
+        self.autoscaler = None
+        max_concurrency = None
+        min_concurrency = concurrency
+        if autoscale:
+            max_concurrency, min_concurrency = autoscale
+
+        self.pool = instantiate(self.pool_cls, min_concurrency,
                                 logger=self.logger,
                                 logger=self.logger,
                                 initializer=process_initializer,
                                 initializer=process_initializer,
                                 initargs=(self.app, self.hostname),
                                 initargs=(self.app, self.hostname),
@@ -187,6 +196,12 @@ class WorkController(object):
                                 soft_timeout=self.task_soft_time_limit,
                                 soft_timeout=self.task_soft_time_limit,
                                 putlocks=self.pool_putlocks)
                                 putlocks=self.pool_putlocks)
 
 
+        if autoscale:
+            self.autoscaler = instantiate(self.autoscaler_cls, self.pool,
+                                          max_concurrency=max_concurrency,
+                                          min_concurrency=min_concurrency,
+                                          logger=self.logger)
+
         self.mediator = None
         self.mediator = None
         if not disable_rate_limits:
         if not disable_rate_limits:
             self.mediator = instantiate(self.mediator_cls, self.ready_queue,
             self.mediator = instantiate(self.mediator_cls, self.ready_queue,
@@ -224,6 +239,7 @@ class WorkController(object):
                                         self.mediator,
                                         self.mediator,
                                         self.scheduler,
                                         self.scheduler,
                                         self.beat,
                                         self.beat,
+                                        self.autoscaler,
                                         self.listener))
                                         self.listener))
 
 
     def start(self):
     def start(self):

+ 4 - 2
celery/worker/buckets.py

@@ -138,11 +138,13 @@ class TaskBucket(object):
 
 
     def init_with_registry(self):
     def init_with_registry(self):
         """Initialize with buckets for all the task types in the registry."""
         """Initialize with buckets for all the task types in the registry."""
-        map(self.add_bucket_for_type, self.task_registry.keys())
+        for task in self.task_registry.keys():
+            self.add_bucket_for_type(task)
 
 
     def refresh(self):
     def refresh(self):
         """Refresh rate limits for all task types in the registry."""
         """Refresh rate limits for all task types in the registry."""
-        map(self.update_bucket_for_type, self.task_registry.keys())
+        for task in self.task_registry.keys():
+            self.update_bucket_for_type(task)
 
 
     def get_bucket_for_type(self, task_name):
     def get_bucket_for_type(self, task_name):
         """Get the bucket for a particular task type."""
         """Get the bucket for a particular task type."""

+ 3 - 1
celery/worker/control/__init__.py

@@ -1,3 +1,5 @@
+import sys
+
 from celery.app import app_or_default
 from celery.app import app_or_default
 from celery.pidbox import ControlReplyPublisher
 from celery.pidbox import ControlReplyPublisher
 from celery.utils import kwdict
 from celery.utils import kwdict
@@ -69,7 +71,7 @@ class ControlDispatch(object):
             except Exception, exc:
             except Exception, exc:
                 self.logger.error(
                 self.logger.error(
                         "Error running control command %s kwargs=%s: %s" % (
                         "Error running control command %s kwargs=%s: %s" % (
-                            command, kwargs, exc))
+                            command, kwargs, exc), exc_info=sys.exc_info())
                 reply = {"error": str(exc)}
                 reply = {"error": str(exc)}
             if reply_to:
             if reply_to:
                 self.reply({self.hostname: reply},
                 self.reply({self.hostname: reply},

+ 13 - 1
celery/worker/control/builtins.py

@@ -1,3 +1,4 @@
+import sys
 from datetime import datetime
 from datetime import datetime
 
 
 from celery.registry import tasks
 from celery.registry import tasks
@@ -68,7 +69,7 @@ def rate_limit(panel, task_name, rate_limit, **kwargs):
         tasks[task_name].rate_limit = rate_limit
         tasks[task_name].rate_limit = rate_limit
     except KeyError:
     except KeyError:
         panel.logger.error("Rate limit attempt for unknown task %s" % (
         panel.logger.error("Rate limit attempt for unknown task %s" % (
-            task_name, ))
+            task_name, ), exc_info=sys.exc_info())
         return {"error": "unknown task"}
         return {"error": "unknown task"}
 
 
     if not hasattr(panel.listener.ready_queue, "refresh"):
     if not hasattr(panel.listener.ready_queue, "refresh"):
@@ -166,6 +167,17 @@ def ping(panel, **kwargs):
     return "pong"
     return "pong"
 
 
 
 
+@Panel.register
+def pool_grow(panel, n=1, **kwargs):
+    panel.listener.pool.grow(n)
+    return {"ok": "spawned worker processes"}
+
+@Panel.register
+def pool_shrink(panel, n=1, **kwargs):
+    panel.listener.pool.shrink(n)
+    return {"ok": "terminated worker processes"}
+
+
 @Panel.register
 @Panel.register
 def shutdown(panel, **kwargs):
 def shutdown(panel, **kwargs):
     panel.logger.critical("Got shutdown from remote.")
     panel.logger.critical("Got shutdown from remote.")

+ 75 - 2
celery/worker/controllers.py

@@ -3,12 +3,80 @@
 Worker Controller Threads
 Worker Controller Threads
 
 
 """
 """
+import logging
 import sys
 import sys
 import threading
 import threading
 import traceback
 import traceback
+
+from time import sleep, time
 from Queue import Empty as QueueEmpty
 from Queue import Empty as QueueEmpty
 
 
 from celery.app import app_or_default
 from celery.app import app_or_default
+from celery.utils.compat import log_with_extra
+from celery.worker import state
+
+
+class Autoscaler(threading.Thread):
+
+    def __init__(self, pool, max_concurrency, min_concurrency=0,
+            keepalive=30, logger=None):
+        threading.Thread.__init__(self)
+        self.pool = pool
+        self.max_concurrency = max_concurrency
+        self.min_concurrency = min_concurrency
+        self.keepalive = keepalive
+        self.logger = logger or log.get_default_logger()
+        self._last_action = None
+        self._shutdown = threading.Event()
+        self._stopped = threading.Event()
+        self.setDaemon(True)
+        self.setName(self.__class__.__name__)
+
+        assert self.keepalive, "can't scale down too fast."
+
+    def scale(self):
+        current = min(self.qty, self.max_concurrency)
+        if current > self.processes:
+            self.scale_up(current - self.processes)
+        elif current < self.processes:
+            self.scale_down((self.processes - current) - self.min_concurrency)
+        sleep(1.0)
+
+    def scale_up(self, n):
+        self.logger.info("Scaling up %s processes." % (n, ))
+        self._last_action = time()
+        return self.pool.grow(n)
+
+    def scale_down(self, n):
+        if not self._last_action or not n:
+            return
+        if time() - self._last_action > self.keepalive:
+            self.logger.info("Scaling down %s processes." % (n, ))
+            self._last_action = time()
+            try:
+                self.pool.shrink(n)
+            except Exception, exc:
+                import traceback
+                traceback.print_stack()
+                self.logger.error("Autoscaler: scale_down: %r" % (exc, ))
+
+    def run(self):
+        while not self._shutdown.isSet():
+            self.scale()
+        self._stopped.set()                 # indicate that we are stopped
+
+    def stop(self):
+        self._shutdown.set()
+        self._stopped.wait()                # block until this thread is done
+        self.join(1e100)
+
+    @property
+    def qty(self):
+        return len(state.reserved_requests)
+
+    @property
+    def processes(self):
+        return self.pool._pool._processes
 
 
 
 
 class Mediator(threading.Thread):
 class Mediator(threading.Thread):
@@ -54,8 +122,13 @@ class Mediator(threading.Thread):
         try:
         try:
             self.callback(task)
             self.callback(task)
         except Exception, exc:
         except Exception, exc:
-            self.logger.error("Mediator callback raised exception %r\n%s" % (
-                exc, traceback.format_exc()), exc_info=sys.exc_info())
+            log_with_extra(self.logger, logging.ERROR,
+                           "Mediator callback raised exception %r\n%s" % (
+                               exc, traceback.format_exc()),
+                           exc_info=sys.exc_info(),
+                           extra={"data": {"hostname": task.hostname,
+                                           "id": task.task_id,
+                                           "name": task.task_name}})
 
 
     def run(self):
     def run(self):
         while not self._shutdown.isSet():
         while not self._shutdown.isSet():

+ 29 - 6
celery/worker/job.py

@@ -1,3 +1,4 @@
+import logging
 import os
 import os
 import sys
 import sys
 import time
 import time
@@ -10,11 +11,12 @@ from celery import platforms
 from celery.app import app_or_default
 from celery.app import app_or_default
 from celery.datastructures import ExceptionInfo
 from celery.datastructures import ExceptionInfo
 from celery.exceptions import SoftTimeLimitExceeded, TimeLimitExceeded
 from celery.exceptions import SoftTimeLimitExceeded, TimeLimitExceeded
-from celery.exceptions import WorkerLostError
+from celery.exceptions import WorkerLostError, RetryTaskError
 from celery.execute.trace import TaskTrace
 from celery.execute.trace import TaskTrace
 from celery.registry import tasks
 from celery.registry import tasks
 from celery.utils import noop, kwdict, fun_takes_kwargs
 from celery.utils import noop, kwdict, fun_takes_kwargs
 from celery.utils import truncate_text
 from celery.utils import truncate_text
+from celery.utils.compat import log_with_extra
 from celery.utils.timeutils import maybe_iso8601
 from celery.utils.timeutils import maybe_iso8601
 from celery.worker import state
 from celery.worker import state
 
 
@@ -121,7 +123,7 @@ class WorkerTaskTrace(TaskTrace):
         message, orig_exc = exc.args
         message, orig_exc = exc.args
         if self._store_errors:
         if self._store_errors:
             self.task.backend.mark_as_retry(self.task_id, orig_exc, strtb)
             self.task.backend.mark_as_retry(self.task_id, orig_exc, strtb)
-        self.super.handle_retry(exc, type_, tb, strtb)
+        return self.super.handle_retry(exc, type_, tb, strtb)
 
 
     def handle_failure(self, exc, type_, tb, strtb):
     def handle_failure(self, exc, type_, tb, strtb):
         """Handle exception."""
         """Handle exception."""
@@ -201,6 +203,9 @@ class TaskRequest(object):
     error_msg = """
     error_msg = """
         Task %(name)s[%(id)s] raised exception: %(exc)s\n%(traceback)s
         Task %(name)s[%(id)s] raised exception: %(exc)s\n%(traceback)s
     """
     """
+    retry_msg = """
+        Task %(name)s[%(id)s] retry: %(exc)s
+    """
 
 
     # E-mails
     # E-mails
     email_subject = """
     email_subject = """
@@ -447,6 +452,16 @@ class TaskRequest(object):
         #     "the quick brown fox jumps over the lazy dog" :)
         #     "the quick brown fox jumps over the lazy dog" :)
         return truncate_text(repr(result), maxlen)
         return truncate_text(repr(result), maxlen)
 
 
+    def on_retry(self, exc_info):
+        self.send_event("task-retried", uuid=self.task_id,
+                                        exception=repr(exc_info.exception.exc),
+                                        traceback=repr(exc_info.traceback))
+        msg = self.retry_msg.strip() % {
+                "id": self.task_id,
+                "name": self.task_name,
+                "exc": repr(exc_info.exception.exc)}
+        self.logger.info(msg)
+
     def on_failure(self, exc_info):
     def on_failure(self, exc_info):
         """The handler used if the task raised an exception."""
         """The handler used if the task raised an exception."""
         state.task_ready(self)
         state.task_ready(self)
@@ -454,9 +469,8 @@ class TaskRequest(object):
         if self.task.acks_late:
         if self.task.acks_late:
             self.acknowledge()
             self.acknowledge()
 
 
-        self.send_event("task-failed", uuid=self.task_id,
-                                       exception=repr(exc_info.exception),
-                                       traceback=exc_info.traceback)
+        if isinstance(exc_info.exception, RetryTaskError):
+            return self.on_retry(exc_info)
 
 
         # This is a special case as the process would not have had
         # This is a special case as the process would not have had
         # time to write the result.
         # time to write the result.
@@ -465,6 +479,10 @@ class TaskRequest(object):
                 self.task.backend.mark_as_failure(self.task_id,
                 self.task.backend.mark_as_failure(self.task_id,
                                                   exc_info.exception)
                                                   exc_info.exception)
 
 
+        self.send_event("task-failed", uuid=self.task_id,
+                                       exception=repr(exc_info.exception),
+                                       traceback=exc_info.traceback)
+
         context = {"hostname": self.hostname,
         context = {"hostname": self.hostname,
                    "id": self.task_id,
                    "id": self.task_id,
                    "name": self.task_name,
                    "name": self.task_name,
@@ -472,7 +490,12 @@ class TaskRequest(object):
                    "traceback": unicode(exc_info.traceback, 'utf-8'),
                    "traceback": unicode(exc_info.traceback, 'utf-8'),
                    "args": self.args,
                    "args": self.args,
                    "kwargs": self.kwargs}
                    "kwargs": self.kwargs}
-        self.logger.error(self.error_msg.strip() % context, exc_info=exc_info)
+
+        log_with_extra(self.logger, logging.ERROR,
+                       self.error_msg.strip() % context,
+                       extra={"data": {"hostname": self.hostname,
+                                       "id": self.task_id,
+                                       "name": self.task_name}})
 
 
         task_obj = tasks.get(self.task_name, object)
         task_obj = tasks.get(self.task_name, object)
         self.send_error_email(task_obj, context, exc_info.exception,
         self.send_error_email(task_obj, context, exc_info.exception,

+ 7 - 3
celery/worker/listener.py

@@ -76,6 +76,7 @@ up and running.
 from __future__ import generators
 from __future__ import generators
 
 
 import socket
 import socket
+import sys
 import warnings
 import warnings
 
 
 from carrot.connection import AMQPConnectionException
 from carrot.connection import AMQPConnectionException
@@ -87,6 +88,7 @@ from celery.exceptions import NotRegistered
 from celery.pidbox import BroadcastConsumer
 from celery.pidbox import BroadcastConsumer
 from celery.utils import noop, retry_over_time
 from celery.utils import noop, retry_over_time
 from celery.utils.timer2 import to_timestamp
 from celery.utils.timer2 import to_timestamp
+from celery.worker import state
 from celery.worker.job import TaskRequest, InvalidTaskError
 from celery.worker.job import TaskRequest, InvalidTaskError
 from celery.worker.control import ControlDispatch
 from celery.worker.control import ControlDispatch
 from celery.worker.heartbeat import Heart
 from celery.worker.heartbeat import Heart
@@ -277,16 +279,18 @@ class CarrotListener(object):
             except OverflowError, exc:
             except OverflowError, exc:
                 self.logger.error(
                 self.logger.error(
                     "Couldn't convert eta %s to timestamp: %r. Task: %r" % (
                     "Couldn't convert eta %s to timestamp: %r. Task: %r" % (
-                        task.eta, exc, task.info(safe=True)))
+                        task.eta, exc, task.info(safe=True)), exc_info=sys.exc_info())
                 task.acknowledge()
                 task.acknowledge()
             else:
             else:
                 self.qos.increment()
                 self.qos.increment()
                 self.eta_schedule.apply_at(eta,
                 self.eta_schedule.apply_at(eta,
                                            self.apply_eta_task, (task, ))
                                            self.apply_eta_task, (task, ))
         else:
         else:
+            state.task_reserved(task)
             self.ready_queue.put(task)
             self.ready_queue.put(task)
 
 
     def apply_eta_task(self, task):
     def apply_eta_task(self, task):
+        state.task_reserved(task)
         self.ready_queue.put(task)
         self.ready_queue.put(task)
         self.qos.decrement_eventually()
         self.qos.decrement_eventually()
 
 
@@ -307,11 +311,11 @@ class CarrotListener(object):
                                                 eventer=self.event_dispatcher)
                                                 eventer=self.event_dispatcher)
             except NotRegistered, exc:
             except NotRegistered, exc:
                 self.logger.error("Unknown task ignored: %s: %s" % (
                 self.logger.error("Unknown task ignored: %s: %s" % (
-                        str(exc), message_data))
+                        str(exc), message_data), exc_info=sys.exc_info())
                 message.ack()
                 message.ack()
             except InvalidTaskError, exc:
             except InvalidTaskError, exc:
                 self.logger.error("Invalid task ignored: %s: %s" % (
                 self.logger.error("Invalid task ignored: %s: %s" % (
-                        str(exc), message_data))
+                        str(exc), message_data), exc_info=sys.exc_info())
                 message.ack()
                 message.ack()
             else:
             else:
                 self.on_task(task)
                 self.on_task(task)

+ 6 - 0
celery/worker/state.py

@@ -24,11 +24,16 @@ Count of tasks executed by the worker, sorted by type.
 The list of currently revoked tasks. (PERSISTENT if statedb set).
 The list of currently revoked tasks. (PERSISTENT if statedb set).
 
 
 """
 """
+reserved_requests = set()
 active_requests = set()
 active_requests = set()
 total_count = defaultdict(lambda: 0)
 total_count = defaultdict(lambda: 0)
 revoked = LimitedSet(maxlen=REVOKES_MAX, expires=REVOKE_EXPIRES)
 revoked = LimitedSet(maxlen=REVOKES_MAX, expires=REVOKE_EXPIRES)
 
 
 
 
+def task_reserved(request):
+    reserved_requests.add(request)
+
+
 def task_accepted(request):
 def task_accepted(request):
     """Updates global state when a task has been accepted."""
     """Updates global state when a task has been accepted."""
     active_requests.add(request)
     active_requests.add(request)
@@ -38,6 +43,7 @@ def task_accepted(request):
 def task_ready(request):
 def task_ready(request):
     """Updates global state when a task is ready."""
     """Updates global state when a task is ready."""
     active_requests.discard(request)
     active_requests.discard(request)
+    reserved_requests.discard(request)
 
 
 
 
 class Persistent(object):
 class Persistent(object):

+ 1 - 1
contrib/release/sphinx-to-rst.py

@@ -1,4 +1,4 @@
-#!/usr/bin/even/python
+#!/usr/bin/env python
 import os
 import os
 import re
 import re
 import sys
 import sys

+ 16 - 1
docs/community.rst

@@ -56,6 +56,22 @@ http://botland.oebfare.com/logger/celery/
 News
 News
 ====
 ====
 
 
+Celery, RabbitMQ and sending messages directly.
+-----------------------------------------------
+http://blog.timc3.com/2010/10/17/celery-rabbitmq-and-sending-messages-directly/
+
+Cron dentro do Django com Celery (Portugese)
+--------------------------------------------
+http://blog.avelino.us/2010/10/cron-dentro-do-django-com-celery.html
+
+RabbitMQとCeleryを使ってDjangoでジョブキューしてみる (Japanese)
+---------------------------------------------------------------
+http://d.hatena.ne.jp/yuku_t/
+
+Celery - Eine asynchrone Task Queue (nicht nur) für Django (German)
+-------------------------------------------------------------------
+http://www.scribd.com/doc/39203296/Celery-Eine-asynchrone-Task-Queue-nicht-nur-fur-Django
+
 Asynchronous Processing Using Celery (historio.us)
 Asynchronous Processing Using Celery (historio.us)
 --------------------------------------------------
 --------------------------------------------------
 http://blog.historio.us/asynchronous-processing-using-celery
 http://blog.historio.us/asynchronous-processing-using-celery
@@ -68,7 +84,6 @@ http://www.slideshare.net/shawnrider/massaging-the-pony-message-queues-and-you
 ------------------------------------------------
 ------------------------------------------------
 http://www.slideshare.net/ericholscher/large-problems
 http://www.slideshare.net/ericholscher/large-problems
 
 
-
 Django and asynchronous jobs
 Django and asynchronous jobs
 ----------------------------
 ----------------------------
 http://www.davidfischer.name/2010/09/django-and-asynchronous-jobs/
 http://www.davidfischer.name/2010/09/django-and-asynchronous-jobs/

+ 2 - 3
docs/internals/events.rst

@@ -36,10 +36,9 @@ Task Events
     Sent if the task has been revoked (Note that this is likely
     Sent if the task has been revoked (Note that this is likely
     to be sent by more than one worker)
     to be sent by more than one worker)
 
 
-* task-retried(uuid, exception, traceback, hostname, delay, timestamp)
+* task-retried(uuid, exception, traceback, hostname, timestamp)
 
 
-    Sent if the task failed, but will be retried in the future.
-    (**NOT IMPLEMENTED**)
+    Sent if the task failed, but will be retried.
 
 
 Worker Events
 Worker Events
 =============
 =============

+ 1 - 2
docs/userguide/monitoring.rst

@@ -507,10 +507,9 @@ Task Events
     Sent if the task has been revoked (Note that this is likely
     Sent if the task has been revoked (Note that this is likely
     to be sent by more than one worker).
     to be sent by more than one worker).
 
 
-* ``task-retried(uuid, exception, traceback, hostname, delay, timestamp)``
+* ``task-retried(uuid, exception, traceback, hostname, timestamp)``
 
 
     Sent if the task failed, but will be retried in the future.
     Sent if the task failed, but will be retried in the future.
-    (**NOT IMPLEMENTED**)
 
 
 .. _event-reference-worker:
 .. _event-reference-worker: