Ver Fonte

Merge branch 'master' into kombu2

Conflicts:
	celery/backends/amqp.py
	celery/task/base.py
	celery/tests/test_task.py
	celery/worker/control/__init__.py
Ask Solem há 14 anos atrás
pai
commit
c4d723dfcb

+ 1 - 0
AUTHORS

@@ -46,3 +46,4 @@ Ordered by date of first contribution:
   sdcooke
   sdcooke
   David Cramer <dcramer@gmail.com>
   David Cramer <dcramer@gmail.com>
   Bryan Berg <bryan@mixedmedialabs.com>
   Bryan Berg <bryan@mixedmedialabs.com>
+  Piotr Sikora <piotr.sikora@frickle.com>

+ 160 - 1
Changelog

@@ -11,7 +11,166 @@
 =====
 =====
 :release-date: TBA
 :release-date: TBA
 :status: in-progress
 :status: in-progress
-:branch: app
+:branch: master
+
+
+.. _version-2.1.2:
+
+2.1.2
+=====
+:release-data: TBA
+
+.. _v212-fixes:
+
+Fixes
+-----
+
+* celeryd: Now sends the ``task-retried`` event for retried tasks.
+
+* celeryd: Now honors ignore result for
+  :exc:`~celery.exceptions.WorkerLostError` and timeout errors.
+
+* celerybeat: Fixed :exc:`UnboundLocalError` in celerybeat logging
+  when using logging setup signals.
+
+* celeryd: All log messages now includes ``exc_info``.
+
+.. _version-2.1.1:
+
+2.1.1
+=====
+:release-date: 2010-10-14 14:00 PM CEST
+
+.. _v211-fixes:
+
+Fixes
+-----
+
+* Now working on Windows again.
+
+   Removed dependency on the pwd/grp modules.
+
+* snapshots: Fixed race condition leading to loss of events.
+
+* celeryd: Reject tasks with an eta that cannot be converted to timestamp.
+
+    See issue #209
+
+* concurrency.processes.pool: The semaphore was released twice for each task
+  (both at ACK and result ready).
+
+    This has been fixed, and it is now released only once per task.
+
+* docs/configuration: Fixed typo ``CELERYD_SOFT_TASK_TIME_LIMIT`` ->
+  :setting:`CELERYD_TASK_SOFT_TIME_LIMIT`.
+
+    See issue #214
+
+* control command ``dump_scheduled``: was using old .info attribute
+
+* :program:`celeryd-multi`: Fixed ``set changed size during iteration`` bug
+    occuring in the restart command.
+
+* celeryd: Accidentally tried to use additional command line arguments.
+
+   This would lead to an error like:
+
+    ``got multiple values for keyword argument 'concurrency'``.
+
+    Additional command line arguments are now ignored, and does not
+    produce this error.  However -- we do reserve the right to use
+    positional arguments in the future, so please do not depend on this
+    behavior.
+
+* celerybeat: Now respects routers and task execution options again.
+
+* celerybeat: Now reuses the publisher instead of the connection.
+
+* Cache result backend: Using :class:`float` as the expires argument
+  to ``cache.set`` is deprecated by the memcached libraries,
+  so we now automatically cast to :class:`int`.
+
+* unittests: No longer emits logging and warnings in test output.
+
+.. _v211-news:
+
+News
+----
+
+* Now depends on carrot version 0.10.7.
+
+* Added :setting:`CELERY_REDIRECT_STDOUTS`, and
+  :setting:`CELERYD_REDIRECT_STDOUTS_LEVEL` settings.
+
+    :setting:`CELERY_REDIRECT_STDOUTS` is used by :program:`celeryd` and
+    :program:`celerybeat`.  All output to ``stdout`` and ``stderr`` will be
+    redirected to the current logger if enabled.
+
+    :setting:`CELERY_REDIRECT_STDOUTS_LEVEL` decides the loglevel used and is
+    :const:`WARNING` by default.
+
+* Added :setting:`CELERYBEAT_SCHEDULER` setting.
+
+    This setting is used to define the default for the -S option to
+    :program:`celerybeat`.
+
+    Example:
+
+    .. code-block:: python
+
+        CELERYBEAT_SCHEDULER = "djcelery.schedulers.DatabaseScheduler"
+
+* Added Task.expires: Used to set default expiry time for tasks.
+
+* New remote control commands: ``add_consumer`` and ``cancel_consumer``.
+
+    .. method:: add_consumer(queue, exchange, exchange_type, routing_key,
+                             **options)
+        :module:
+
+        Tells the worker to declare and consume from the specified
+        declaration.
+
+    .. method:: cancel_consumer(queue_name)
+        :module:
+
+        Tells the worker to stop consuming from queue (by queue name).
+
+
+    Commands also added to :program:`celeryctl` and
+    :class:`~celery.task.control.inspect`.
+
+
+    Example using celeryctl to start consuming from queue "queue", in 
+    exchange "exchange", of type "direct" using binding key "key"::
+
+        $ celeryctl inspect add_consumer queue exchange direct key
+        $ celeryctl inspect cancel_consumer queue
+
+    See :ref:`monitoring-celeryctl` for more information about the
+    :program:`celeryctl` program.
+
+
+    Another example using :class:`~celery.task.control.inspect`:
+
+    .. code-block:: python
+
+        >>> from celery.task.control import inspect
+        >>> inspect.add_consumer(queue="queue", exchange="exchange",
+        ...                      exchange_type="direct",
+        ...                      routing_key="key",
+        ...                      durable=False,
+        ...                      auto_delete=True)
+
+        >>> inspect.cancel_consumer("queue")
+
+* celerybeat: Now logs the traceback if a message can't be sent.
+
+* celerybeat: Now enables a default socket timeout of 30 seconds.
+
+* README/introduction/homepage: Added link to `Flask-Celery`_.
+
+.. _`Flask-Celery`: http://github.com/ask/flask-celery
 
 
 .. _version-2.1.0:
 .. _version-2.1.0:
 
 

+ 1 - 4
celery/app/amqp.py

@@ -73,14 +73,11 @@ class Queues(UserDict):
     @classmethod
     @classmethod
     def with_defaults(cls, queues, default_exchange, default_exchange_type):
     def with_defaults(cls, queues, default_exchange, default_exchange_type):
 
 
-        def _defaults(opts):
+        for opts in queues.values():
             opts.setdefault("exchange", default_exchange),
             opts.setdefault("exchange", default_exchange),
             opts.setdefault("exchange_type", default_exchange_type)
             opts.setdefault("exchange_type", default_exchange_type)
             opts.setdefault("binding_key", default_exchange)
             opts.setdefault("binding_key", default_exchange)
             opts.setdefault("routing_key", opts.get("binding_key"))
             opts.setdefault("routing_key", opts.get("binding_key"))
-            return opts
-
-        map(_defaults, queues.values())
         return cls(queues)
         return cls(queues)
 
 
 
 

+ 2 - 1
celery/apps/worker.py

@@ -131,7 +131,8 @@ class Worker(object):
     def init_loader(self):
     def init_loader(self):
         self.loader = self.app.loader
         self.loader = self.app.loader
         self.settings = self.app.conf
         self.settings = self.app.conf
-        map(self.loader.import_module, self.include)
+        for module in self.include:
+            self.loader.import_module(module)
 
 
     def redirect_stdouts_to_logger(self):
     def redirect_stdouts_to_logger(self):
         handled = self.app.log.setup_logging_subsystem(loglevel=self.loglevel,
         handled = self.app.log.setup_logging_subsystem(loglevel=self.loglevel,

+ 4 - 0
celery/backends/amqp.py

@@ -109,6 +109,9 @@ class AMQPBackend(BaseDictBackend):
         return result
         return result
 
 
     def get_task_meta(self, task_id, cache=True):
     def get_task_meta(self, task_id, cache=True):
+        if cache and task_id in self._cache:
+            return self._cache[task_id]
+
         return self.poll(task_id)
         return self.poll(task_id)
 
 
     def wait_for(self, task_id, timeout=None, cache=True):
     def wait_for(self, task_id, timeout=None, cache=True):
@@ -133,6 +136,7 @@ class AMQPBackend(BaseDictBackend):
         binding = self._create_binding(task_id)(self.channel)
         binding = self._create_binding(task_id)(self.channel)
         result = binding.get()
         result = binding.get()
         if result:
         if result:
+            binding.delete(if_unused=True, if_empty=True, nowait=True)
             payload = self._cache[task_id] = result.payload
             payload = self._cache[task_id] = result.payload
             return payload
             return payload
         elif task_id in self._cache:
         elif task_id in self._cache:

+ 2 - 1
celery/beat.py

@@ -5,6 +5,7 @@ Periodic Task Scheduler
 """
 """
 import time
 import time
 import shelve
 import shelve
+import sys
 import threading
 import threading
 import traceback
 import traceback
 import multiprocessing
 import multiprocessing
@@ -155,7 +156,7 @@ class Scheduler(UserDict):
                 result = self.apply_async(entry, publisher=publisher)
                 result = self.apply_async(entry, publisher=publisher)
             except Exception, exc:
             except Exception, exc:
                 self.logger.error("Message Error: %s\n%s" % (exc,
                 self.logger.error("Message Error: %s\n%s" % (exc,
-                    traceback.format_stack()))
+                    traceback.format_stack()), exc_info=sys.exc_info())
             else:
             else:
                 self.logger.debug("%s sent. id->%s" % (entry.task,
                 self.logger.debug("%s sent. id->%s" % (entry.task,
                                                        result.task_id))
                                                        result.task_id))

+ 2 - 2
celery/datastructures.py

@@ -98,8 +98,8 @@ class PositionQueue(UserList):
     @property
     @property
     def filled(self):
     def filled(self):
         """Returns the filled slots as a list."""
         """Returns the filled slots as a list."""
-        return filter(lambda v: not isinstance(v, self.UnfilledPosition),
-                      self.data)
+        return [slot for slot in self.data
+                    if not isinstance(slot, self.UnfilledPosition)]
 
 
 
 
 class ExceptionInfo(object):
 class ExceptionInfo(object):

+ 5 - 4
celery/events/state.py

@@ -59,8 +59,8 @@ class Task(Element):
                     "result", "eta", "runtime", "expires",
                     "result", "eta", "runtime", "expires",
                     "exception")
                     "exception")
 
 
-    _merge_rules = {states.RECEIVED: ("name", "args", "kwargs",
-                                      "retries", "eta", "expires")}
+    merge_rules = {states.RECEIVED: ("name", "args", "kwargs",
+                                     "retries", "eta", "expires")}
 
 
     _defaults = dict(uuid=None,
     _defaults = dict(uuid=None,
                      name=None,
                      name=None,
@@ -97,7 +97,8 @@ class Task(Element):
     def update(self, state, timestamp, fields):
     def update(self, state, timestamp, fields):
         if self.worker:
         if self.worker:
             self.worker.on_heartbeat(timestamp=timestamp)
             self.worker.on_heartbeat(timestamp=timestamp)
-        if states.state(state) < states.state(self.state):
+        if state != states.RETRY and self.state != states.RETRY and \
+                states.state(state) < states.state(self.state):
             self.merge(state, timestamp, fields)
             self.merge(state, timestamp, fields)
         else:
         else:
             self.state = state
             self.state = state
@@ -105,7 +106,7 @@ class Task(Element):
             super(Task, self).update(fields)
             super(Task, self).update(fields)
 
 
     def merge(self, state, timestamp, fields):
     def merge(self, state, timestamp, fields):
-        keep = self._merge_rules.get(state)
+        keep = self.merge_rules.get(state)
         if keep is not None:
         if keep is not None:
             fields = dict((key, fields[key]) for key in keep)
             fields = dict((key, fields[key]) for key in keep)
             super(Task, self).update(fields)
             super(Task, self).update(fields)

+ 16 - 4
celery/log.py

@@ -3,6 +3,7 @@ import logging
 import threading
 import threading
 import sys
 import sys
 import traceback
 import traceback
+import types
 
 
 from multiprocessing import current_process
 from multiprocessing import current_process
 from multiprocessing import util as mputil
 from multiprocessing import util as mputil
@@ -31,18 +32,27 @@ class ColorFormatter(logging.Formatter):
         logging.Formatter.__init__(self, msg)
         logging.Formatter.__init__(self, msg)
         self.use_color = use_color
         self.use_color = use_color
 
 
+    def formatException(self, ei):
+        r = logging.Formatter.formatException(self, ei)
+        if type(r) in [types.StringType]:
+            r = r.decode('utf-8', 'replace') # Convert to unicode
+        return r
+
     def format(self, record):
     def format(self, record):
         levelname = record.levelname
         levelname = record.levelname
 
 
         if self.use_color and levelname in COLORS:
         if self.use_color and levelname in COLORS:
-            record.msg = str(colored().names[COLORS[levelname]](record.msg))
+            record.msg = unicode(colored().names[COLORS[levelname]](record.msg))
 
 
         # Very ugly, but have to make sure processName is supported
         # Very ugly, but have to make sure processName is supported
         # by foreign logger instances.
         # by foreign logger instances.
         # (processName is always supported by Python 2.7)
         # (processName is always supported by Python 2.7)
         if "processName" not in record.__dict__:
         if "processName" not in record.__dict__:
             record.__dict__["processName"] = current_process()._name
             record.__dict__["processName"] = current_process()._name
-        return logging.Formatter.format(self, record)
+        t = logging.Formatter.format(self, record)
+        if type(t) in [types.UnicodeType]:
+            t = t.encode('utf-8', 'replace')
+        return t
 
 
 
 
 class Logging(object):
 class Logging(object):
@@ -251,7 +261,8 @@ class LoggingProxy(object):
         This is equivalent to calling :meth:`write` for each string.
         This is equivalent to calling :meth:`write` for each string.
 
 
         """
         """
-        map(self.write, sequence)
+        for part in sequence:
+            self.write(part)
 
 
     def flush(self):
     def flush(self):
         """This object is not buffered so any :meth:`flush` requests
         """This object is not buffered so any :meth:`flush` requests
@@ -281,7 +292,8 @@ class SilenceRepeated(object):
 
 
     def __call__(self, *msgs):
     def __call__(self, *msgs):
         if self._iterations >= self.max_iterations:
         if self._iterations >= self.max_iterations:
-            map(self.action, msgs)
+            for msg in msgs:
+                self.action(msg)
             self._iterations = 0
             self._iterations = 0
         else:
         else:
             self._iterations += 1
             self._iterations += 1

+ 8 - 2
celery/routes.py

@@ -4,6 +4,12 @@ from celery.utils import instantiate, firstmethod, mpromise
 _first_route = firstmethod("route_for_task")
 _first_route = firstmethod("route_for_task")
 
 
 
 
+def merge(a, b):
+    """Like ``dict(a, **b)`` except it will keep values from ``a``,
+    if the value in ``b`` is :const:`None`."""
+    return dict(a, **dict((k, v) for k, v in b.iteritems() if v is not None))
+
+
 class MapRoute(object):
 class MapRoute(object):
     """Makes a router out of a :class:`dict`."""
     """Makes a router out of a :class:`dict`."""
 
 
@@ -37,7 +43,7 @@ class Router(object):
             route = self.lookup_route(task, args, kwargs)
             route = self.lookup_route(task, args, kwargs)
             if route:
             if route:
                 # Also expand "queue" keys in route.
                 # Also expand "queue" keys in route.
-                return dict(options, **self.expand_destination(route))
+                return merge(options, self.expand_destination(route))
         return options
         return options
 
 
     def expand_destination(self, route):
     def expand_destination(self, route):
@@ -60,7 +66,7 @@ class Router(object):
                     raise QueueNotFound(
                     raise QueueNotFound(
                         "Queue '%s' is not defined in CELERY_QUEUES" % queue)
                         "Queue '%s' is not defined in CELERY_QUEUES" % queue)
             dest.setdefault("routing_key", dest.get("binding_key"))
             dest.setdefault("routing_key", dest.get("binding_key"))
-            return dict(dest, **route)
+            return merge(dest, route)
 
 
         return route
         return route
 
 

+ 1 - 0
celery/states.py

@@ -57,6 +57,7 @@ PRECEDENCE = ["SUCCESS",
               "REVOKED",
               "REVOKED",
               "STARTED",
               "STARTED",
               "RECEIVED",
               "RECEIVED",
+              "RETRY",
               "PENDING"]
               "PENDING"]
 
 
 
 

+ 5 - 6
celery/task/base.py

@@ -47,8 +47,6 @@ def _unpickle_task(name):
     return tasks[name]
     return tasks[name]
 
 
 
 
-
-
 class Context(threading.local):
 class Context(threading.local):
 
 
     def update(self, d, **kwargs):
     def update(self, d, **kwargs):
@@ -549,12 +547,13 @@ class BaseTask(object):
         if kwargs is None:
         if kwargs is None:
             kwargs = request.kwargs
             kwargs = request.kwargs
 
 
-        delivery_info = request.delivery_info or {}
-        options.setdefault("exchange", delivery_info.get("exchange"))
-        options.setdefault("routing_key", delivery_info.get("routing_key"))
+        delivery_info = request.delivery_info
+        if delivery_info:
+            options.setdefault("exchange", delivery_info.get("exchange"))
+            options.setdefault("routing_key", delivery_info.get("routing_key"))
 
 
         options["retries"] = request.retries + 1
         options["retries"] = request.retries + 1
-        options["task_id"] = kwargs.pop("task_id", None)
+        options["task_id"] = request.id
         options["countdown"] = options.get("countdown",
         options["countdown"] = options.get("countdown",
                                         self.default_retry_delay)
                                         self.default_retry_delay)
         max_exc = exc or self.MaxRetriesExceededError(
         max_exc = exc or self.MaxRetriesExceededError(

+ 2 - 1
celery/tests/test_buckets.py

@@ -232,7 +232,8 @@ class test_TaskBucket(unittest.TestCase):
         ajobs = [cjob(i, TaskA) for i in xrange(10)]
         ajobs = [cjob(i, TaskA) for i in xrange(10)]
         bjobs = [cjob(i, TaskB) for i in xrange(20)]
         bjobs = [cjob(i, TaskB) for i in xrange(20)]
         jobs = list(chain(*izip(bjobs, ajobs)))
         jobs = list(chain(*izip(bjobs, ajobs)))
-        map(b.put, jobs)
+        for job in jobs:
+            b.put(job)
 
 
         got_ajobs = 0
         got_ajobs = 0
         for job in (b.get() for i in xrange(20)):
         for job in (b.get() for i in xrange(20)):

+ 4 - 2
celery/tests/test_datastructures.py

@@ -118,7 +118,8 @@ class test_LimitedSet(unittest.TestCase):
     def test_iter(self):
     def test_iter(self):
         s = LimitedSet(maxlen=2)
         s = LimitedSet(maxlen=2)
         items = "foo", "bar"
         items = "foo", "bar"
-        map(s.add, items)
+        for item in items:
+            s.add(item)
         l = list(iter(s))
         l = list(iter(s))
         for item in items:
         for item in items:
             self.assertIn(item, l)
             self.assertIn(item, l)
@@ -126,7 +127,8 @@ class test_LimitedSet(unittest.TestCase):
     def test_repr(self):
     def test_repr(self):
         s = LimitedSet(maxlen=2)
         s = LimitedSet(maxlen=2)
         items = "foo", "bar"
         items = "foo", "bar"
-        map(s.add, items)
+        for item in items:
+            s.add(item)
         self.assertIn("LimitedSet(", repr(s))
         self.assertIn("LimitedSet(", repr(s))
 
 
 
 

+ 20 - 0
celery/tests/test_routes.py

@@ -76,6 +76,26 @@ class test_lookup_route(unittest.TestCase):
                 router.route({}, "celery.ping",
                 router.route({}, "celery.ping",
                     args=[1, 2], kwargs={}))
                     args=[1, 2], kwargs={}))
 
 
+    @with_queues()
+    def test_expands_queue_in_options(self):
+        R = routes.prepare(())
+        router = routes.Router(R, app_or_default().conf.CELERY_QUEUES,
+                               create_missing=True)
+        # apply_async forwards all arguments, even exchange=None etc,
+        # so need to make sure it's merged correctly.
+        route = router.route({"queue": "testq",
+                              "exchange": None,
+                              "routing_key": None,
+                              "immediate": False},
+                             "celery.ping",
+                             args=[1, 2], kwargs={})
+        self.assertDictContainsSubset({"exchange": "testq",
+                                       "routing_key": "testq",
+                                       "immediate": False},
+                                       route)
+        self.assertNotIn("queue", route)
+
+
     @with_queues(foo=a_queue, bar=b_queue)
     @with_queues(foo=a_queue, bar=b_queue)
     def test_lookup_paths_traversed(self):
     def test_lookup_paths_traversed(self):
         R = routes.prepare(({"celery.xaza": {"queue": "bar"}},
         R = routes.prepare(({"celery.xaza": {"queue": "bar"}},

+ 4 - 1
celery/tests/test_task.py

@@ -4,7 +4,6 @@ from datetime import datetime, timedelta
 
 
 from pyparsing import ParseException
 from pyparsing import ParseException
 
 
-
 from celery import task
 from celery import task
 from celery.app import app_or_default
 from celery.app import app_or_default
 from celery.decorators import task as task_dec
 from celery.decorators import task as task_dec
@@ -147,6 +146,10 @@ class TestTaskRetries(unittest.TestCase):
         self.assertEqual(result.get(), 42)
         self.assertEqual(result.get(), 42)
         self.assertEqual(RetryTaskNoArgs.iterations, 4)
         self.assertEqual(RetryTaskNoArgs.iterations, 4)
 
 
+    def test_retry_kwargs_can_be_empty(self):
+        self.assertRaises(RetryTaskError, RetryTaskMockApply.retry,
+                            args=[4, 4], kwargs=None)
+
     def test_retry_not_eager(self):
     def test_retry_not_eager(self):
         exc = Exception("baz")
         exc = Exception("baz")
         try:
         try:

+ 10 - 5
celery/tests/test_worker_state.py

@@ -62,7 +62,8 @@ class test_Persistent(StateResetCase):
         self.assertTrue(self.p.db.closed)
         self.assertTrue(self.p.db.closed)
 
 
     def add_revoked(self, *ids):
     def add_revoked(self, *ids):
-        map(self.p.db.setdefault("revoked", LimitedSet()).add, ids)
+        for id in ids:
+            self.p.db.setdefault("revoked", LimitedSet()).add(id)
 
 
     def test_merge(self, data=["foo", "bar", "baz"]):
     def test_merge(self, data=["foo", "bar", "baz"]):
         self.add_revoked(*data)
         self.add_revoked(*data)
@@ -73,7 +74,8 @@ class test_Persistent(StateResetCase):
     def test_sync(self, data1=["foo", "bar", "baz"],
     def test_sync(self, data1=["foo", "bar", "baz"],
                         data2=["baz", "ini", "koz"]):
                         data2=["baz", "ini", "koz"]):
         self.add_revoked(*data1)
         self.add_revoked(*data1)
-        map(state.revoked.add, data2)
+        for item in data2:
+            state.revoked.add(item)
         self.p.sync(self.p.db)
         self.p.sync(self.p.db)
 
 
         for item in data2:
         for item in data2:
@@ -92,7 +94,8 @@ class test_state(StateResetCase):
                                       SimpleReq("bar"),
                                       SimpleReq("bar"),
                                       SimpleReq("baz"),
                                       SimpleReq("baz"),
                                       SimpleReq("baz")]):
                                       SimpleReq("baz")]):
-        map(state.task_accepted, requests)
+        for request in requests:
+            state.task_accepted(request)
         for req in requests:
         for req in requests:
             self.assertIn(req, state.active_requests)
             self.assertIn(req, state.active_requests)
         self.assertEqual(state.total_count["foo"], 1)
         self.assertEqual(state.total_count["foo"], 1)
@@ -101,7 +104,9 @@ class test_state(StateResetCase):
 
 
     def test_ready(self, requests=[SimpleReq("foo"),
     def test_ready(self, requests=[SimpleReq("foo"),
                                    SimpleReq("bar")]):
                                    SimpleReq("bar")]):
-        map(state.task_accepted, requests)
+        for request in requests:
+            state.task_accepted(request)
         self.assertEqual(len(state.active_requests), 2)
         self.assertEqual(len(state.active_requests), 2)
-        map(state.task_ready, requests)
+        for request in requests:
+            state.task_ready(request)
         self.assertEqual(len(state.active_requests), 0)
         self.assertEqual(len(state.active_requests), 0)

+ 9 - 0
celery/utils/compat.py

@@ -298,6 +298,8 @@ import sys
 
 
 from logging import LogRecord
 from logging import LogRecord
 
 
+log_takes_extra = "extra" in inspect.getargspec(logging.Logger._log)[0]
+
 # The func argument to LogRecord was added in 2.5
 # The func argument to LogRecord was added in 2.5
 if "func" not in inspect.getargspec(LogRecord.__init__)[0]:
 if "func" not in inspect.getargspec(LogRecord.__init__)[0]:
     def LogRecord(name, level, fn, lno, msg, args, exc_info, func):
     def LogRecord(name, level, fn, lno, msg, args, exc_info, func):
@@ -405,6 +407,12 @@ try:
 except ImportError:
 except ImportError:
     LoggerAdapter = _CompatLoggerAdapter
     LoggerAdapter = _CompatLoggerAdapter
 
 
+
+def log_with_extra(logger, level, msg, *args, **kwargs):
+    if not log_takes_extra:
+        kwargs.pop("extra", None)
+    return logger.log(level, msg, *args, **kwargs)
+
 ############## itertools.izip_longest #######################################
 ############## itertools.izip_longest #######################################
 
 
 try:
 try:
@@ -427,6 +435,7 @@ except ImportError:
         except IndexError:
         except IndexError:
             pass
             pass
 
 
+
 ############## itertools.chain.from_iterable ################################
 ############## itertools.chain.from_iterable ################################
 from itertools import chain
 from itertools import chain
 
 

+ 2 - 2
celery/worker/__init__.py

@@ -37,8 +37,8 @@ def process_initializer(app, hostname):
 
 
     """
     """
     app = app_or_default(app)
     app = app_or_default(app)
-    map(platforms.reset_signal, WORKER_SIGRESET)
-    map(platforms.ignore_signal, WORKER_SIGIGNORE)
+    [platforms.reset_signal(signal) for signal in WORKER_SIGRESET]
+    [platforms.ignore_signal(signal) for signal in WORKER_SIGIGNORE]
     platforms.set_mp_process_title("celeryd", hostname=hostname)
     platforms.set_mp_process_title("celeryd", hostname=hostname)
 
 
     # This is for windows and other platforms not supporting
     # This is for windows and other platforms not supporting

+ 4 - 2
celery/worker/buckets.py

@@ -138,11 +138,13 @@ class TaskBucket(object):
 
 
     def init_with_registry(self):
     def init_with_registry(self):
         """Initialize with buckets for all the task types in the registry."""
         """Initialize with buckets for all the task types in the registry."""
-        map(self.add_bucket_for_type, self.task_registry.keys())
+        for task in self.task_registry.keys():
+            self.add_bucket_for_type(task)
 
 
     def refresh(self):
     def refresh(self):
         """Refresh rate limits for all task types in the registry."""
         """Refresh rate limits for all task types in the registry."""
-        map(self.update_bucket_for_type, self.task_registry.keys())
+        for task in self.task_registry.keys():
+            self.update_bucket_for_type(task)
 
 
     def get_bucket_for_type(self, task_name):
     def get_bucket_for_type(self, task_name):
         """Get the bucket for a particular task type."""
         """Get the bucket for a particular task type."""

+ 4 - 3
celery/worker/consumer.py

@@ -71,6 +71,7 @@ up and running.
 from __future__ import generators
 from __future__ import generators
 
 
 import socket
 import socket
+import sys
 import warnings
 import warnings
 
 
 from celery.app import app_or_default
 from celery.app import app_or_default
@@ -277,7 +278,7 @@ class Consumer(object):
             except OverflowError, exc:
             except OverflowError, exc:
                 self.logger.error(
                 self.logger.error(
                     "Couldn't convert eta %s to timestamp: %r. Task: %r" % (
                     "Couldn't convert eta %s to timestamp: %r. Task: %r" % (
-                        task.eta, exc, task.info(safe=True)))
+                        task.eta, exc, task.info(safe=True)), exc_info=sys.exc_info())
                 task.acknowledge()
                 task.acknowledge()
             else:
             else:
                 self.qos.increment()
                 self.qos.increment()
@@ -309,11 +310,11 @@ class Consumer(object):
                                                 eventer=self.event_dispatcher)
                                                 eventer=self.event_dispatcher)
             except NotRegistered, exc:
             except NotRegistered, exc:
                 self.logger.error("Unknown task ignored: %s: %s" % (
                 self.logger.error("Unknown task ignored: %s: %s" % (
-                        str(exc), message_data))
+                        str(exc), message_data), exc_info=sys.exc_info())
                 message.ack()
                 message.ack()
             except InvalidTaskError, exc:
             except InvalidTaskError, exc:
                 self.logger.error("Invalid task ignored: %s: %s" % (
                 self.logger.error("Invalid task ignored: %s: %s" % (
-                        str(exc), message_data))
+                        str(exc), message_data), exc_info=sys.exc_info())
                 message.ack()
                 message.ack()
             else:
             else:
                 self.on_task(task)
                 self.on_task(task)

+ 0 - 3
celery/worker/control/__init__.py

@@ -1,6 +1,3 @@
-import socket
-
-from celery.app import app_or_default
 from celery.worker.control.registry import Panel
 from celery.worker.control.registry import Panel
 
 
 # Loads the built-in remote control commands
 # Loads the built-in remote control commands

+ 2 - 1
celery/worker/control/builtins.py

@@ -1,3 +1,4 @@
+import sys
 from datetime import datetime
 from datetime import datetime
 
 
 from celery.registry import tasks
 from celery.registry import tasks
@@ -68,7 +69,7 @@ def rate_limit(panel, task_name, rate_limit, **kwargs):
         tasks[task_name].rate_limit = rate_limit
         tasks[task_name].rate_limit = rate_limit
     except KeyError:
     except KeyError:
         panel.logger.error("Rate limit attempt for unknown task %s" % (
         panel.logger.error("Rate limit attempt for unknown task %s" % (
-            task_name, ))
+            task_name, ), exc_info=sys.exc_info())
         return {"error": "unknown task"}
         return {"error": "unknown task"}
 
 
     if not hasattr(panel.consumer.ready_queue, "refresh"):
     if not hasattr(panel.consumer.ready_queue, "refresh"):

+ 9 - 2
celery/worker/controllers.py

@@ -3,12 +3,14 @@
 Worker Controller Threads
 Worker Controller Threads
 
 
 """
 """
+import logging
 import sys
 import sys
 import threading
 import threading
 import traceback
 import traceback
 from Queue import Empty as QueueEmpty
 from Queue import Empty as QueueEmpty
 
 
 from celery.app import app_or_default
 from celery.app import app_or_default
+from celery.utils.compat import log_with_extra
 
 
 
 
 class Mediator(threading.Thread):
 class Mediator(threading.Thread):
@@ -54,8 +56,13 @@ class Mediator(threading.Thread):
         try:
         try:
             self.callback(task)
             self.callback(task)
         except Exception, exc:
         except Exception, exc:
-            self.logger.error("Mediator callback raised exception %r\n%s" % (
-                exc, traceback.format_exc()), exc_info=sys.exc_info())
+            log_with_extra(self.logger, logging.ERROR,
+                           "Mediator callback raised exception %r\n%s" % (
+                               exc, traceback.format_exc()),
+                           exc_info=sys.exc_info(),
+                           extra={"data": {"hostname": task.hostname,
+                                           "id": task.task_id,
+                                           "name": task.task_name}})
 
 
     def run(self):
     def run(self):
         while not self._shutdown.isSet():
         while not self._shutdown.isSet():

+ 29 - 6
celery/worker/job.py

@@ -1,3 +1,4 @@
+import logging
 import os
 import os
 import sys
 import sys
 import time
 import time
@@ -10,11 +11,12 @@ from celery import platforms
 from celery.app import app_or_default
 from celery.app import app_or_default
 from celery.datastructures import ExceptionInfo
 from celery.datastructures import ExceptionInfo
 from celery.exceptions import SoftTimeLimitExceeded, TimeLimitExceeded
 from celery.exceptions import SoftTimeLimitExceeded, TimeLimitExceeded
-from celery.exceptions import WorkerLostError
+from celery.exceptions import WorkerLostError, RetryTaskError
 from celery.execute.trace import TaskTrace
 from celery.execute.trace import TaskTrace
 from celery.registry import tasks
 from celery.registry import tasks
 from celery.utils import noop, kwdict, fun_takes_kwargs
 from celery.utils import noop, kwdict, fun_takes_kwargs
 from celery.utils import truncate_text
 from celery.utils import truncate_text
+from celery.utils.compat import log_with_extra
 from celery.utils.timeutils import maybe_iso8601
 from celery.utils.timeutils import maybe_iso8601
 from celery.worker import state
 from celery.worker import state
 
 
@@ -121,7 +123,7 @@ class WorkerTaskTrace(TaskTrace):
         message, orig_exc = exc.args
         message, orig_exc = exc.args
         if self._store_errors:
         if self._store_errors:
             self.task.backend.mark_as_retry(self.task_id, orig_exc, strtb)
             self.task.backend.mark_as_retry(self.task_id, orig_exc, strtb)
-        self.super.handle_retry(exc, type_, tb, strtb)
+        return self.super.handle_retry(exc, type_, tb, strtb)
 
 
     def handle_failure(self, exc, type_, tb, strtb):
     def handle_failure(self, exc, type_, tb, strtb):
         """Handle exception."""
         """Handle exception."""
@@ -201,6 +203,9 @@ class TaskRequest(object):
     error_msg = """
     error_msg = """
         Task %(name)s[%(id)s] raised exception: %(exc)s\n%(traceback)s
         Task %(name)s[%(id)s] raised exception: %(exc)s\n%(traceback)s
     """
     """
+    retry_msg = """
+        Task %(name)s[%(id)s] retry: %(exc)s
+    """
 
 
     # E-mails
     # E-mails
     email_subject = """
     email_subject = """
@@ -447,6 +452,16 @@ class TaskRequest(object):
         #     "the quick brown fox jumps over the lazy dog" :)
         #     "the quick brown fox jumps over the lazy dog" :)
         return truncate_text(repr(result), maxlen)
         return truncate_text(repr(result), maxlen)
 
 
+    def on_retry(self, exc_info):
+        self.send_event("task-retried", uuid=self.task_id,
+                                        exception=repr(exc_info.exception.exc),
+                                        traceback=repr(exc_info.traceback))
+        msg = self.retry_msg.strip() % {
+                "id": self.task_id,
+                "name": self.task_name,
+                "exc": repr(exc_info.exception.exc)}
+        self.logger.info(msg)
+
     def on_failure(self, exc_info):
     def on_failure(self, exc_info):
         """The handler used if the task raised an exception."""
         """The handler used if the task raised an exception."""
         state.task_ready(self)
         state.task_ready(self)
@@ -454,9 +469,8 @@ class TaskRequest(object):
         if self.task.acks_late:
         if self.task.acks_late:
             self.acknowledge()
             self.acknowledge()
 
 
-        self.send_event("task-failed", uuid=self.task_id,
-                                       exception=repr(exc_info.exception),
-                                       traceback=exc_info.traceback)
+        if isinstance(exc_info.exception, RetryTaskError):
+            return self.on_retry(exc_info)
 
 
         # This is a special case as the process would not have had
         # This is a special case as the process would not have had
         # time to write the result.
         # time to write the result.
@@ -465,6 +479,10 @@ class TaskRequest(object):
                 self.task.backend.mark_as_failure(self.task_id,
                 self.task.backend.mark_as_failure(self.task_id,
                                                   exc_info.exception)
                                                   exc_info.exception)
 
 
+        self.send_event("task-failed", uuid=self.task_id,
+                                       exception=repr(exc_info.exception),
+                                       traceback=exc_info.traceback)
+
         context = {"hostname": self.hostname,
         context = {"hostname": self.hostname,
                    "id": self.task_id,
                    "id": self.task_id,
                    "name": self.task_name,
                    "name": self.task_name,
@@ -472,7 +490,12 @@ class TaskRequest(object):
                    "traceback": unicode(exc_info.traceback, 'utf-8'),
                    "traceback": unicode(exc_info.traceback, 'utf-8'),
                    "args": self.args,
                    "args": self.args,
                    "kwargs": self.kwargs}
                    "kwargs": self.kwargs}
-        self.logger.error(self.error_msg.strip() % context, exc_info=exc_info)
+
+        log_with_extra(self.logger, logging.ERROR,
+                       self.error_msg.strip() % context,
+                       extra={"data": {"hostname": self.hostname,
+                                       "id": self.task_id,
+                                       "name": self.task_name}})
 
 
         task_obj = tasks.get(self.task_name, object)
         task_obj = tasks.get(self.task_name, object)
         self.send_error_email(task_obj, context, exc_info.exception,
         self.send_error_email(task_obj, context, exc_info.exception,

+ 1 - 1
contrib/release/sphinx-to-rst.py

@@ -1,4 +1,4 @@
-#!/usr/bin/even/python
+#!/usr/bin/env python
 import os
 import os
 import re
 import re
 import sys
 import sys

+ 12 - 0
docs/community.rst

@@ -60,6 +60,18 @@ Celery, RabbitMQ and sending messages directly.
 -----------------------------------------------
 -----------------------------------------------
 http://blog.timc3.com/2010/10/17/celery-rabbitmq-and-sending-messages-directly/
 http://blog.timc3.com/2010/10/17/celery-rabbitmq-and-sending-messages-directly/
 
 
+Cron dentro do Django com Celery (Portugese)
+--------------------------------------------
+http://blog.avelino.us/2010/10/cron-dentro-do-django-com-celery.html
+
+RabbitMQとCeleryを使ってDjangoでジョブキューしてみる (Japanese)
+---------------------------------------------------------------
+http://d.hatena.ne.jp/yuku_t/
+
+Celery - Eine asynchrone Task Queue (nicht nur) für Django (German)
+-------------------------------------------------------------------
+http://www.scribd.com/doc/39203296/Celery-Eine-asynchrone-Task-Queue-nicht-nur-fur-Django
+
 Asynchronous Processing Using Celery (historio.us)
 Asynchronous Processing Using Celery (historio.us)
 --------------------------------------------------
 --------------------------------------------------
 http://blog.historio.us/asynchronous-processing-using-celery
 http://blog.historio.us/asynchronous-processing-using-celery

+ 2 - 3
docs/internals/events.rst

@@ -36,10 +36,9 @@ Task Events
     Sent if the task has been revoked (Note that this is likely
     Sent if the task has been revoked (Note that this is likely
     to be sent by more than one worker)
     to be sent by more than one worker)
 
 
-* task-retried(uuid, exception, traceback, hostname, delay, timestamp)
+* task-retried(uuid, exception, traceback, hostname, timestamp)
 
 
-    Sent if the task failed, but will be retried in the future.
-    (**NOT IMPLEMENTED**)
+    Sent if the task failed, but will be retried.
 
 
 Worker Events
 Worker Events
 =============
 =============

+ 1 - 2
docs/userguide/monitoring.rst

@@ -507,10 +507,9 @@ Task Events
     Sent if the task has been revoked (Note that this is likely
     Sent if the task has been revoked (Note that this is likely
     to be sent by more than one worker).
     to be sent by more than one worker).
 
 
-* ``task-retried(uuid, exception, traceback, hostname, delay, timestamp)``
+* ``task-retried(uuid, exception, traceback, hostname, timestamp)``
 
 
     Sent if the task failed, but will be retried in the future.
     Sent if the task failed, but will be retried in the future.
-    (**NOT IMPLEMENTED**)
 
 
 .. _event-reference-worker:
 .. _event-reference-worker: