Browse Source

Merge branch '3.0'

Conflicts:
	celery/app/builtins.py
	celery/app/control.py
	celery/app/log.py
	celery/app/task.py
	celery/backends/amqp.py
	celery/backends/base.py
	celery/events/__init__.py
	celery/task/trace.py
	celery/utils/__init__.py
	celery/worker/__init__.py
	celery/worker/consumer.py
	celery/worker/control.py
Ask Solem 12 years ago
parent
commit
f564efe4ce

+ 1 - 0
CONTRIBUTORS.txt

@@ -126,3 +126,4 @@ Thomas Grainger, 2012/11/29
 Marius Gedminas, 2012/11/29
 Marius Gedminas, 2012/11/29
 Christoph Krybus, 2013/01/07
 Christoph Krybus, 2013/01/07
 Jun Sakai, 2013/01/16
 Jun Sakai, 2013/01/16
+Vlad Frolov, 2013/01/23

+ 28 - 12
celery/app/amqp.py

@@ -164,11 +164,15 @@ class TaskProducer(Producer):
     retry = False
     retry = False
     retry_policy = None
     retry_policy = None
     utc = True
     utc = True
+    event_dispatcher = None
+    send_sent_event = False
 
 
     def __init__(self, channel=None, exchange=None, *args, **kwargs):
     def __init__(self, channel=None, exchange=None, *args, **kwargs):
         self.retry = kwargs.pop('retry', self.retry)
         self.retry = kwargs.pop('retry', self.retry)
         self.retry_policy = kwargs.pop('retry_policy',
         self.retry_policy = kwargs.pop('retry_policy',
                                        self.retry_policy or {})
                                        self.retry_policy or {})
+        self.send_sent_event = kwargs.pop('send_sent_event',
+                                          self.send_sent_event)
         exchange = exchange or self.exchange
         exchange = exchange or self.exchange
         self.queues = self.app.amqp.queues  # shortcut
         self.queues = self.app.amqp.queues  # shortcut
         self.default_queue = self.app.amqp.default_queue
         self.default_queue = self.app.amqp.default_queue
@@ -247,25 +251,36 @@ class TaskProducer(Producer):
         )
         )
 
 
         signals.task_sent.send(sender=task_name, **body)
         signals.task_sent.send(sender=task_name, **body)
-        if event_dispatcher:
+        if self.send_sent_event:
+            evd = event_dispatcher or self.event_dispatcher
             exname = exchange or self.exchange
             exname = exchange or self.exchange
             if isinstance(exname, Exchange):
             if isinstance(exname, Exchange):
                 exname = exname.name
                 exname = exname.name
-            event_dispatcher.send(
-                'task-sent', uuid=task_id,
-                name=task_name,
-                args=safe_repr(task_args),
-                kwargs=safe_repr(task_kwargs),
-                retries=retries,
-                eta=eta,
-                expires=expires,
-                queue=qname,
-                exchange=exname,
-                routing_key=routing_key,
+            evd.publish(
+                'task-sent',
+                {
+                    'uuid': task_id,
+                    'name': task_name,
+                    'args': safe_repr(task_args),
+                    'kwargs': safe_repr(task_kwargs),
+                    'retries': retries,
+                    'eta': eta,
+                    'expires': expires,
+                    'queue': qname,
+                    'exchange': exname,
+                    'routing_key': routing_key,
+                },
+                self, retry=retry, retry_policy=retry_policy,
             )
             )
         return task_id
         return task_id
     delay_task = publish_task   # XXX Compat
     delay_task = publish_task   # XXX Compat
 
 
+    @cached_property
+    def event_dispatcher(self):
+        # We call Dispatcher.publish with a custom producer
+        # so don't need the dispatcher to be "enabled".
+        return self.app.events.Dispatcher(enabled=False)
+
 
 
 class TaskPublisher(TaskProducer):
 class TaskPublisher(TaskProducer):
     """Deprecated version of :class:`TaskProducer`."""
     """Deprecated version of :class:`TaskProducer`."""
@@ -359,6 +374,7 @@ class AMQP(object):
             compression=conf.CELERY_MESSAGE_COMPRESSION,
             compression=conf.CELERY_MESSAGE_COMPRESSION,
             retry=conf.CELERY_TASK_PUBLISH_RETRY,
             retry=conf.CELERY_TASK_PUBLISH_RETRY,
             retry_policy=conf.CELERY_TASK_PUBLISH_RETRY_POLICY,
             retry_policy=conf.CELERY_TASK_PUBLISH_RETRY_POLICY,
+            send_sent_event=conf.CELERY_SEND_TASK_SENT_EVENT,
             utc=conf.CELERY_ENABLE_UTC,
             utc=conf.CELERY_ENABLE_UTC,
         )
         )
     TaskPublisher = TaskProducer  # compat
     TaskPublisher = TaskProducer  # compat

+ 20 - 8
celery/app/builtins.py

@@ -66,18 +66,30 @@ def add_unlock_chord_task(app):
 
 
     """
     """
     from celery.canvas import subtask
     from celery.canvas import subtask
-    from celery import result as _res
+    from celery.exceptions import ChordError
 
 
     @app.task(name='celery.chord_unlock', max_retries=None,
     @app.task(name='celery.chord_unlock', max_retries=None,
               default_retry_delay=1, ignore_result=True, _force_evaluate=True)
               default_retry_delay=1, ignore_result=True, _force_evaluate=True)
-    def unlock_chord(group_id, callback, interval=None, propagate=False,
-                     max_retries=None, result=None, Result=_res.AsyncResult):
+    def unlock_chord(group_id, callback, interval=None, propagate=True,
+                     max_retries=None, result=None, Result=app.AsyncResult,
+                     GroupResult=app.GroupResult):
         if interval is None:
         if interval is None:
             interval = unlock_chord.default_retry_delay
             interval = unlock_chord.default_retry_delay
-        result = _res.GroupResult(group_id, [Result(r) for r in result])
-        j = result.join_native if result.supports_native_join else result.join
-        if result.ready():
-            subtask(callback).delay(j(propagate=propagate))
+        deps = GroupResult(group_id, [Result(r) for r in result])
+        j = deps.join_native if deps.supports_native_join else deps.join
+        if deps.ready():
+            callback = subtask(callback)
+            try:
+                ret = j(propagate=propagate)
+            except Exception, exc:
+                culprit = deps._failed_join_report().next()
+
+                app._tasks[callback.task].backend.fail_from_current_stack(
+                    callback.id, exc=ChordError('Dependency %s raised %r' % (
+                        culprit.id, exc)),
+                )
+            else:
+                callback.delay(ret)
         else:
         else:
             return unlock_chord.retry(countdown=interval,
             return unlock_chord.retry(countdown=interval,
                                       max_retries=max_retries)
                                       max_retries=max_retries)
@@ -281,7 +293,7 @@ def add_chord_task(app):
         ignore_result = False
         ignore_result = False
 
 
         def run(self, header, body, partial_args=(), interval=1, countdown=1,
         def run(self, header, body, partial_args=(), interval=1, countdown=1,
-                max_retries=None, propagate=False, eager=False, **kwargs):
+                max_retries=None, propagate=True, eager=False, **kwargs):
             group_id = uuid()
             group_id = uuid()
             AsyncResult = self.app.AsyncResult
             AsyncResult = self.app.AsyncResult
             prepare_member = self._prepare_member
             prepare_member = self._prepare_member

+ 2 - 2
celery/app/defaults.py

@@ -161,7 +161,7 @@ NAMESPACES = {
         'CONCURRENCY': Option(0, type='int'),
         'CONCURRENCY': Option(0, type='int'),
         'TIMER': Option(type='string'),
         'TIMER': Option(type='string'),
         'TIMER_PRECISION': Option(1.0, type='float'),
         'TIMER_PRECISION': Option(1.0, type='float'),
-        'FORCE_EXECV': Option(True, type='bool'),
+        'FORCE_EXECV': Option(False, type='bool'),
         'HIJACK_ROOT_LOGGER': Option(True, type='bool'),
         'HIJACK_ROOT_LOGGER': Option(True, type='bool'),
         'CONSUMER': Option('celery.worker.consumer:Consumer', type='string'),
         'CONSUMER': Option('celery.worker.consumer:Consumer', type='string'),
         'LOG_FORMAT': Option(DEFAULT_PROCESS_LOG_FMT),
         'LOG_FORMAT': Option(DEFAULT_PROCESS_LOG_FMT),
@@ -232,7 +232,7 @@ def find_deprecated_settings(source):
             warn_deprecated(description='The {0!r} setting'.format(name),
             warn_deprecated(description='The {0!r} setting'.format(name),
                             deprecation=opt.deprecate_by,
                             deprecation=opt.deprecate_by,
                             removal=opt.remove_by,
                             removal=opt.remove_by,
-                            alternative=opt.alt)
+                            alternative='Use %s instead' % (opt.alt, ))
     return source
     return source
 
 
 
 

+ 29 - 11
celery/app/log.py

@@ -34,6 +34,8 @@ from celery.utils.term import colored
 
 
 PY3 = sys.version_info[0] == 3
 PY3 = sys.version_info[0] == 3
 
 
+MP_LOG = os.environ.get('MP_LOG', False)
+
 
 
 class TaskFormatter(ColorFormatter):
 class TaskFormatter(ColorFormatter):
 
 
@@ -99,23 +101,31 @@ class Logging(object):
             sender=None, loglevel=loglevel, logfile=logfile,
             sender=None, loglevel=loglevel, logfile=logfile,
             format=format, colorize=colorize,
             format=format, colorize=colorize,
         )
         )
+
         if not receivers:
         if not receivers:
             root = logging.getLogger()
             root = logging.getLogger()
 
 
             if self.app.conf.CELERYD_HIJACK_ROOT_LOGGER:
             if self.app.conf.CELERYD_HIJACK_ROOT_LOGGER:
                 root.handlers = []
                 root.handlers = []
 
 
-            for logger in root, get_multiprocessing_logger():
-                if logger is not None:
-                    self.setup_handlers(logger, logfile, format,
-                                        colorize, **kwargs)
-                    if loglevel:
-                        logger.setLevel(loglevel)
-                    signals.after_setup_logger.send(
-                        sender=None, logger=logger,
-                        loglevel=loglevel, logfile=logfile,
-                        format=format, colorize=colorize,
-                    )
+            # Configure root logger
+            self._configure_logger(
+                root, logfile, loglevel, format, colorize, **kwargs
+            )
+
+            # Configure the multiprocessing logger
+            self._configure_logger(
+                get_multiprocessing_logger(),
+                logfile, loglevel if MP_LOG else logging.ERROR,
+                format, colorize, **kwargs
+            )
+
+            signals.after_setup_logger.send(
+                sender=None, logger=root,
+                loglevel=loglevel, logfile=logfile,
+                format=format, colorize=colorize,
+            )
+
             # then setup the root task logger.
             # then setup the root task logger.
             self.setup_task_loggers(loglevel, logfile, colorize=colorize)
             self.setup_task_loggers(loglevel, logfile, colorize=colorize)
 
 
@@ -127,6 +137,14 @@ class Logging(object):
                           _MP_FORK_LOGFORMAT_=format)
                           _MP_FORK_LOGFORMAT_=format)
         return receivers
         return receivers
 
 
+    def _configure_logger(self, logger, logfile, loglevel,
+                          format, colorize, **kwargs):
+        if logger is not None:
+            self.setup_handlers(logger, logfile, format,
+                                colorize, **kwargs)
+            if loglevel:
+                logger.setLevel(loglevel)
+
     def setup_task_loggers(self, loglevel=None, logfile=None, format=None,
     def setup_task_loggers(self, loglevel=None, logfile=None, format=None,
                            colorize=None, propagate=False, **kwargs):
                            colorize=None, propagate=False, **kwargs):
         """Setup the task logger.
         """Setup the task logger.

+ 0 - 6
celery/app/task.py

@@ -473,15 +473,9 @@ class Task(object):
         if connection:
         if connection:
             producer = app.amqp.TaskProducer(connection)
             producer = app.amqp.TaskProducer(connection)
         with app.producer_or_acquire(producer) as P:
         with app.producer_or_acquire(producer) as P:
-            evd = None
-            if conf.CELERY_SEND_TASK_SENT_EVENT:
-                evd = app.events.Dispatcher(channel=P.channel,
-                                            buffer_while_offline=False)
-
             extra_properties = self.backend.on_task_call(P, task_id)
             extra_properties = self.backend.on_task_call(P, task_id)
             task_id = P.publish_task(self.name, args, kwargs,
             task_id = P.publish_task(self.name, args, kwargs,
                                      task_id=task_id,
                                      task_id=task_id,
-                                     event_dispatcher=evd,
                                      callbacks=maybe_list(link),
                                      callbacks=maybe_list(link),
                                      errbacks=maybe_list(link_error),
                                      errbacks=maybe_list(link_error),
                                      **dict(options, **extra_properties))
                                      **dict(options, **extra_properties))

+ 31 - 5
celery/backends/base.py

@@ -18,13 +18,14 @@ import sys
 
 
 from datetime import timedelta
 from datetime import timedelta
 
 
+from billiard.einfo import ExceptionInfo
 from kombu import serialization
 from kombu import serialization
 from kombu.utils.encoding import bytes_to_str, ensure_bytes, from_utf8
 from kombu.utils.encoding import bytes_to_str, ensure_bytes, from_utf8
 
 
 from celery import states
 from celery import states
 from celery.app import current_task
 from celery.app import current_task
 from celery.datastructures import LRUCache
 from celery.datastructures import LRUCache
-from celery.exceptions import TimeoutError, TaskRevokedError
+from celery.exceptions import ChordError, TimeoutError, TaskRevokedError
 from celery.five import items
 from celery.five import items
 from celery.result import from_serializable, GroupResult
 from celery.result import from_serializable, GroupResult
 from celery.utils import timeutils
 from celery.utils import timeutils
@@ -84,6 +85,16 @@ class BaseBackend(object):
         return self.store_result(task_id, exc, status=states.FAILURE,
         return self.store_result(task_id, exc, status=states.FAILURE,
                                  traceback=traceback)
                                  traceback=traceback)
 
 
+    def fail_from_current_stack(self, task_id, exc=None):
+        type_, real_exc, tb = sys.exc_info()
+        try:
+            exc = real_exc if exc is None else exc
+            ei = ExceptionInfo((type_, exc, tb))
+            self.mark_as_failure(task_id, exc, ei.traceback)
+            return ei
+        finally:
+            del(tb)
+
     def mark_as_retry(self, task_id, exc, traceback=None):
     def mark_as_retry(self, task_id, exc, traceback=None):
         """Mark task as being retries. Stores the current
         """Mark task as being retries. Stores the current
         exception (if any)."""
         exception (if any)."""
@@ -167,6 +178,9 @@ class BaseBackend(object):
         else:
         else:
             return self.prepare_value(result)
             return self.prepare_value(result)
 
 
+    def is_cached(self, task_id):
+        return task_id in self._cache
+
     def store_result(self, task_id, result, status, traceback=None, **kwargs):
     def store_result(self, task_id, result, status, traceback=None, **kwargs):
         """Update task state and result."""
         """Update task state and result."""
         result = self.encode_result(result, status)
         result = self.encode_result(result, status)
@@ -409,7 +423,7 @@ class KeyValueStoreBackend(BaseBackend):
         else:
         else:
             self.fallback_chord_unlock(group_id, body, result, **kwargs)
             self.fallback_chord_unlock(group_id, body, result, **kwargs)
 
 
-    def on_chord_part_return(self, task, propagate=False):
+    def on_chord_part_return(self, task, propagate=True):
         if not self.implements_incr:
         if not self.implements_incr:
             return
             return
         from celery import subtask
         from celery import subtask
@@ -421,9 +435,21 @@ class KeyValueStoreBackend(BaseBackend):
         deps = GroupResult.restore(gid, backend=task.backend)
         deps = GroupResult.restore(gid, backend=task.backend)
         val = self.incr(key)
         val = self.incr(key)
         if val >= len(deps):
         if val >= len(deps):
-            subtask(task.request.chord).delay(deps.join(propagate=propagate))
-            deps.delete()
-            self.client.delete(key)
+            j = deps.join_native if deps.supports_native_join else deps.join
+            callback = subtask(task.request.chord)
+            try:
+                ret = j(propagate=propagate)
+            except Exception, exc:
+                culprit = deps._failed_join_report().next()
+                self.app._tasks[callback.task].backend.fail_from_current_stack(
+                    callback.id, exc=ChordError('Dependency %s raised %r' % (
+                        culprit.id, exc))
+                )
+            else:
+                callback.delay(ret)
+            finally:
+                deps.delete()
+                self.client.delete(key)
         else:
         else:
             self.expire(key, 86400)
             self.expire(key, 86400)
 
 

+ 7 - 0
celery/bin/base.py

@@ -6,6 +6,9 @@
 Preload Options
 Preload Options
 ---------------
 ---------------
 
 
+These options are supported by all commands,
+and usually parsed before command-specific arguments.
+
 .. cmdoption:: -A, --app
 .. cmdoption:: -A, --app
 
 
     app instance to use (e.g. module.attr_name)
     app instance to use (e.g. module.attr_name)
@@ -27,6 +30,10 @@ Preload Options
 Daemon Options
 Daemon Options
 --------------
 --------------
 
 
+These options are supported by commands that can detach
+into the background (daemon).  They will be present
+in any command that also has a `--detach` option.
+
 .. cmdoption:: -f, --logfile
 .. cmdoption:: -f, --logfile
 
 
     Path to log file. If no logfile is specified, `stderr` is used.
     Path to log file. If no logfile is specified, `stderr` is used.

+ 41 - 4
celery/canvas.py

@@ -31,17 +31,53 @@ Chord = Proxy(lambda: current_app.tasks['celery.chord'])
 
 
 
 
 class _getitem_property(object):
 class _getitem_property(object):
+    """Attribute -> dict key descriptor.
 
 
-    def __init__(self, key):
-        self.key = key
+    The target object must support ``__getitem__``,
+    and optionally ``__setitem__``.
+
+    Example:
+
+        class Me(dict):
+            deep = defaultdict(dict)
+
+            foo = _getitem_property('foo')
+            deep_thing = _getitem_property('deep.thing')
+
+
+        >>> me = Me()
+        >>> me.foo
+        None
+
+        >>> me.foo = 10
+        >>> me.foo
+        10
+        >>> me['foo']
+        10
+
+        >>> me.deep_thing = 42
+        >>> me.deep_thinge
+        42
+        >>> me.deep:
+        defaultdict(<type 'dict'>, {'thing': 42})
+
+    """
+
+    def __init__(self, keypath):
+        path, _, self.key = keypath.rpartition('.')
+        self.path = path.split('.') if path else None
+
+    def _path(self, obj):
+        return (reduce(lambda d, k: d[k], [obj] + self.path) if self.path
+                else obj)
 
 
     def __get__(self, obj, type=None):
     def __get__(self, obj, type=None):
         if obj is None:
         if obj is None:
             return type
             return type
-        return obj.get(self.key)
+        return self._path(obj).get(self.key)
 
 
     def __set__(self, obj, value):
     def __set__(self, obj, value):
-        obj[self.key] = value
+        self._path(obj)[self.key] = value
 
 
 
 
 class Signature(dict):
 class Signature(dict):
@@ -236,6 +272,7 @@ class Signature(dict):
             return self.type.apply_async
             return self.type.apply_async
         except KeyError:
         except KeyError:
             return _partial(current_app.send_task, self['task'])
             return _partial(current_app.send_task, self['task'])
+    id = _getitem_property('options.task_id')
     task = _getitem_property('task')
     task = _getitem_property('task')
     args = _getitem_property('args')
     args = _getitem_property('args')
     kwargs = _getitem_property('kwargs')
     kwargs = _getitem_property('kwargs')

+ 26 - 19
celery/events/__init__.py

@@ -139,8 +139,26 @@ class EventDispatcher(object):
             for callback in self.on_disabled:
             for callback in self.on_disabled:
                 callback()
                 callback()
 
 
-    def send(self, type, utcoffset=utcoffset, blind=False,
-             Event=Event, **fields):
+    def publish(self, type, fields, producer, retry=False,
+                retry_policy=None, blind=False, utcoffset=utcoffset,
+                Event=Event):
+        with self.mutex:
+            clock = None if blind else self.clock.forward()
+            event = Event(type, hostname=self.hostname, utcoffset=utcoffset(),
+                          pid=self.pid, clock=clock, **fields)
+            exchange = get_exchange(producer.connection)
+            producer.publish(
+                event,
+                routing_key=type.replace('-', '.'),
+                exchange=exchange.name,
+                retry=retry,
+                retry_policy=retry_policy,
+                declare=[exchange],
+                serializer=self.serializer,
+                headers=self.headers,
+            )
+
+    def send(self, type, blind=False, **fields):
         """Send event.
         """Send event.
 
 
         :param type: Kind of event.
         :param type: Kind of event.
@@ -153,23 +171,12 @@ class EventDispatcher(object):
             groups = self.groups
             groups = self.groups
             if groups and group_from(type) not in groups:
             if groups and group_from(type) not in groups:
                 return
                 return
-
-            clock = None if blind else self.clock.forward()
-
-            with self.mutex:
-                event = Event(type,
-                              hostname=self.hostname,
-                              clock=clock,
-                              utcoffset=utcoffset(),
-                              pid=self.pid, **fields)
-                try:
-                    self.publisher.publish(event,
-                                           routing_key=type.replace('-', '.'),
-                                           headers=self.headers)
-                except Exception as exc:
-                    if not self.buffer_while_offline:
-                        raise
-                    self._outbound_buffer.append((type, fields, exc))
+            try:
+                self._send(type, fields, self.producer, blind)
+            except Exception, exc:
+                if not self.buffer_while_offline:
+                    raise
+                self._outbound_buffer.append((type, fields, exc))
 
 
     def flush(self):
     def flush(self):
         while self._outbound_buffer:
         while self._outbound_buffer:

+ 22 - 5
celery/events/dumper.py

@@ -15,6 +15,7 @@ from datetime import datetime
 
 
 from celery.app import app_or_default
 from celery.app import app_or_default
 from celery.datastructures import LRUCache
 from celery.datastructures import LRUCache
+from celery.utils.timeutils import humanize_seconds
 
 
 
 
 TASK_NAMES = LRUCache(limit=0xFFF)
 TASK_NAMES = LRUCache(limit=0xFFF)
@@ -23,6 +24,11 @@ HUMAN_TYPES = {'worker-offline': 'shutdown',
                'worker-online': 'started',
                'worker-online': 'started',
                'worker-heartbeat': 'heartbeat'}
                'worker-heartbeat': 'heartbeat'}
 
 
+CONNECTION_ERROR = """\
+-> Cannot connect to %s: %s.
+Trying again %s
+"""
+
 
 
 def humanize_type(type):
 def humanize_type(type):
     try:
     try:
@@ -77,11 +83,22 @@ def evdump(app=None, out=sys.stdout):
     dumper = Dumper(out=out)
     dumper = Dumper(out=out)
     dumper.say('-> evdump: starting capture...')
     dumper.say('-> evdump: starting capture...')
     conn = app.connection()
     conn = app.connection()
-    recv = app.events.Receiver(conn, handlers={'*': dumper.on_event})
-    try:
-        recv.capture()
-    except (KeyboardInterrupt, SystemExit):
-        conn and conn.close()
+
+    def _error_handler(exc, interval):
+        dumper.say(CONNECTION_ERROR % (
+            conn.as_uri(), exc, humanize_seconds(interval, 'in', ' ')
+        ))
+
+    while 1:
+        try:
+            conn = conn.clone()
+            conn.ensure_connection(_error_handler)
+            recv = app.events.Receiver(conn, handlers={'*': dumper.on_event})
+            recv.capture()
+        except (KeyboardInterrupt, SystemExit):
+            return conn and conn.close()
+        except conn.connection_errors + conn.channel_errors:
+            dumper.say('-> Connection lost, attempting reconnect')
 
 
 if __name__ == '__main__':  # pragma: no cover
 if __name__ == '__main__':  # pragma: no cover
     evdump()
     evdump()

+ 4 - 0
celery/exceptions.py

@@ -126,3 +126,7 @@ class CDeprecationWarning(DeprecationWarning):
 
 
 class IncompleteStream(Exception):
 class IncompleteStream(Exception):
     """Found the end of a stream of data, but the data is not yet complete."""
     """Found the end of a stream of data, but the data is not yet complete."""
+
+
+class ChordError(Exception):
+    """A task part of the chord raised an exception."""

+ 8 - 0
celery/result.py

@@ -548,9 +548,17 @@ class ResultSet(ResultBase):
         acc = [None for _ in range(len(self))]
         acc = [None for _ in range(len(self))]
         for task_id, meta in self.iter_native(timeout=timeout,
         for task_id, meta in self.iter_native(timeout=timeout,
                                               interval=interval):
                                               interval=interval):
+            if propagate and meta['status'] in states.PROPAGATE_STATES:
+                raise meta['result']
             acc[results.index(task_id)] = meta['result']
             acc[results.index(task_id)] = meta['result']
         return acc
         return acc
 
 
+    def _failed_join_report(self):
+        for res in self.results:
+            if (res.backend.is_cached(res.id) and
+                    res.state in states.PROPAGATE_STATES):
+                yield res
+
     def __len__(self):
     def __len__(self):
         return len(self.results)
         return len(self.results)
 
 

+ 15 - 9
celery/task/trace.py

@@ -45,6 +45,7 @@ IGNORED = states.IGNORED
 RETRY = states.RETRY
 RETRY = states.RETRY
 FAILURE = states.FAILURE
 FAILURE = states.FAILURE
 EXCEPTION_STATES = states.EXCEPTION_STATES
 EXCEPTION_STATES = states.EXCEPTION_STATES
+IGNORE_STATES = frozenset([IGNORED, RETRY])
 
 
 #: set by :func:`setup_worker_optimizations`
 #: set by :func:`setup_worker_optimizations`
 _tasks = None
 _tasks = None
@@ -114,7 +115,8 @@ class TraceInfo(object):
 
 
 
 
 def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
 def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
-                 Info=TraceInfo, eager=False, propagate=False):
+                 Info=TraceInfo, eager=False, propagate=False,
+                 IGNORE_STATES=IGNORE_STATES):
     """Builts a function that tracing the tasks execution; catches all
     """Builts a function that tracing the tasks execution; catches all
     exceptions, and saves the state and result of the task execution
     exceptions, and saves the state and result of the task execution
     to the result backend.
     to the result backend.
@@ -203,6 +205,7 @@ def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
                     state = SUCCESS
                     state = SUCCESS
                 except Ignore as exc:
                 except Ignore as exc:
                     I, R = Info(IGNORED, exc), ExceptionInfo(internal=True)
                     I, R = Info(IGNORED, exc), ExceptionInfo(internal=True)
+                    state, retval = I.state, I.retval
                 except RetryTaskError as exc:
                 except RetryTaskError as exc:
                     I = Info(RETRY, exc)
                     I = Info(RETRY, exc)
                     state, retval = I.state, I.retval
                     state, retval = I.state, I.retval
@@ -230,14 +233,17 @@ def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
                         send_success(sender=task, result=retval)
                         send_success(sender=task, result=retval)
 
 
                 # -* POST *-
                 # -* POST *-
-                if task_request.chord:
-                    on_chord_part_return(task)
-                if task_after_return:
-                    task_after_return(state, retval, uuid, args, kwargs, None)
-                if postrun_receivers:
-                    send_postrun(sender=task, task_id=uuid, task=task,
-                                 args=args, kwargs=kwargs,
-                                 retval=retval, state=state)
+                if state not in IGNORE_STATES:
+                    if task_request.chord:
+                        on_chord_part_return(task)
+                    if task_after_return:
+                        task_after_return(
+                            state, retval, uuid, args, kwargs, None,
+                        )
+                    if postrun_receivers:
+                        send_postrun(sender=task, task_id=uuid, task=task,
+                                     args=args, kwargs=kwargs,
+                                     retval=retval, state=state)
             finally:
             finally:
                 pop_task()
                 pop_task()
                 pop_request()
                 pop_request()

+ 4 - 3
celery/utils/__init__.py

@@ -12,6 +12,7 @@ import os
 import sys
 import sys
 import traceback
 import traceback
 import warnings
 import warnings
+import types
 import datetime
 import datetime
 
 
 from functools import partial, wraps
 from functools import partial, wraps
@@ -202,14 +203,14 @@ def strtobool(term, table={'false': False, 'no': False, '0': False,
 
 
 def jsonify(obj):
 def jsonify(obj):
     """Transforms object making it suitable for json serialization"""
     """Transforms object making it suitable for json serialization"""
-    if isinstance(obj, (int, float, string_t, type(None))):
+    if isinstance(obj, (int, float, string_t, types.NoneType)):
         return obj
         return obj
     elif isinstance(obj, (tuple, list)):
     elif isinstance(obj, (tuple, list)):
         return [jsonify(o) for o in obj]
         return [jsonify(o) for o in obj]
     elif isinstance(obj, dict):
     elif isinstance(obj, dict):
         return dict((k, jsonify(v)) for k, v in items(obj))
         return dict((k, jsonify(v)) for k, v in items(obj))
-    # See "Date Time String Format" in the ECMA-262 specification.
     elif isinstance(obj, datetime.datetime):
     elif isinstance(obj, datetime.datetime):
+        # See "Date Time String Format" in the ECMA-262 specification.
         r = obj.isoformat()
         r = obj.isoformat()
         if obj.microsecond:
         if obj.microsecond:
             r = r[:23] + r[26:]
             r = r[:23] + r[26:]
@@ -226,7 +227,7 @@ def jsonify(obj):
     elif isinstance(obj, datetime.timedelta):
     elif isinstance(obj, datetime.timedelta):
         return str(obj)
         return str(obj)
     else:
     else:
-        raise ValueError('Unsupported type: {0}'.format(type(obj)))
+        raise ValueError('Unsupported type: {0!r}'.format(type(obj)))
 
 
 
 
 def gen_task_name(app, name, module_name):
 def gen_task_name(app, name, module_name):

+ 1 - 1
celery/utils/log.py

@@ -221,7 +221,7 @@ def ensure_process_aware_logger():
 
 
 
 
 def get_multiprocessing_logger():
 def get_multiprocessing_logger():
-    return mputil.get_logger() if mputil and MP_LOG else None
+    return mputil.get_logger() if mputil else None
 
 
 
 
 def reset_multiprocessing_logger():
 def reset_multiprocessing_logger():

+ 0 - 1
celery/worker/__init__.py

@@ -44,7 +44,6 @@ If you want to automatically declare unknown queues you can
 enable the CELERY_CREATE_MISSING_QUEUES setting.
 enable the CELERY_CREATE_MISSING_QUEUES setting.
 """
 """
 
 
-
 def default_nodename(hostname):
 def default_nodename(hostname):
     name, host = nodesplit(hostname or '')
     name, host = nodesplit(hostname or '')
     return nodename(name or 'celery', host or socket.gethostname())
     return nodename(name or 'celery', host or socket.gethostname())

+ 6 - 3
celery/worker/components.py

@@ -105,7 +105,7 @@ class Pool(bootsteps.StartStopStep):
         if w.pool:
         if w.pool:
             w.pool.terminate()
             w.pool.terminate()
 
 
-    def on_poll_init(self, pool, hub):
+    def on_poll_init(self, pool, w, hub):
         apply_after = hub.timer.apply_after
         apply_after = hub.timer.apply_after
         apply_at = hub.timer.apply_at
         apply_at = hub.timer.apply_at
         on_soft_timeout = pool.on_soft_timeout
         on_soft_timeout = pool.on_soft_timeout
@@ -115,7 +115,10 @@ class Pool(bootsteps.StartStopStep):
         remove = hub.remove
         remove = hub.remove
         now = time.time
         now = time.time
 
 
-        if not pool.did_start_ok():
+        # did_start_ok will verify that pool processes were able to start,
+        # but this will only work the first time we start, as
+        # maxtasksperchild will mess up metrics.
+        if not w.consumer.restart_count and not pool.did_start_ok():
             raise WorkerLostError('Could not start worker processes')
             raise WorkerLostError('Could not start worker processes')
 
 
         # need to handle pool results before every task
         # need to handle pool results before every task
@@ -178,7 +181,7 @@ class Pool(bootsteps.StartStopStep):
             semaphore=semaphore,
             semaphore=semaphore,
         )
         )
         if w.hub:
         if w.hub:
-            w.hub.on_init.append(partial(self.on_poll_init, pool))
+            w.hub.on_init.append(partial(self.on_poll_init, pool, w))
         return pool
         return pool
 
 
     def info(self, w):
     def info(self, w):

+ 3 - 0
celery/worker/consumer.py

@@ -120,6 +120,8 @@ class Consumer(object):
     #: as sending heartbeats.
     #: as sending heartbeats.
     timer = None
     timer = None
 
 
+    restart_count = -1  # first start is the same as a restart
+
     class Namespace(bootsteps.Namespace):
     class Namespace(bootsteps.Namespace):
         name = 'Consumer'
         name = 'Consumer'
         default_steps = [
         default_steps = [
@@ -187,6 +189,7 @@ class Consumer(object):
     def start(self):
     def start(self):
         ns, loop = self.namespace, self.loop
         ns, loop = self.namespace, self.loop
         while ns.state != CLOSE:
         while ns.state != CLOSE:
+            self.restart_count += 1
             maybe_shutdown()
             maybe_shutdown()
             try:
             try:
                 ns.start(self)
                 ns.start(self)

+ 2 - 0
celery/worker/control.py

@@ -8,6 +8,8 @@
 """
 """
 from __future__ import absolute_import
 from __future__ import absolute_import
 
 
+import os
+
 from kombu.utils.encoding import safe_repr
 from kombu.utils.encoding import safe_repr
 
 
 from celery.five import UserDict, items
 from celery.five import UserDict, items

+ 8 - 0
docs/contributing.rst

@@ -159,6 +159,14 @@ issue tracker.
 If you are unsure of the origin of the bug you can ask the
 If you are unsure of the origin of the bug you can ask the
 :ref:`mailing-list`, or just use the Celery issue tracker.
 :ref:`mailing-list`, or just use the Celery issue tracker.
 
 
+Contributors guide to the codebase
+==================================
+
+There's a seperate section for internal details,
+including details about the codebase and a style guide.
+
+Read :ref:`internals-guide` for more!
+
 .. _versions:
 .. _versions:
 
 
 Versions
 Versions

+ 41 - 3
docs/userguide/canvas.rst

@@ -694,8 +694,8 @@ get the sum of the resulting numbers::
     >>> from celery import chord
     >>> from celery import chord
     >>> from tasks import add, tsum
     >>> from tasks import add, tsum
 
 
-    >>> chord(add.subtask((i, i))
-    ...     for i in xrange(100))(tsum.subtask()).get()
+    >>> chord(add.s(i, i)
+    ...       for i in xrange(100))(tsum.s()).get()
     9900
     9900
 
 
 
 
@@ -708,7 +708,9 @@ The synchronization step is costly, so you should avoid using chords as much
 as possible. Still, the chord is a powerful primitive to have in your toolbox
 as possible. Still, the chord is a powerful primitive to have in your toolbox
 as synchronization is a required step for many parallel algorithms.
 as synchronization is a required step for many parallel algorithms.
 
 
-Let's break the chord expression down::
+Let's break the chord expression down:
+
+.. code-block:: python
 
 
     >>> callback = tsum.subtask()
     >>> callback = tsum.subtask()
     >>> header = [add.subtask((i, i)) for i in xrange(100)]
     >>> header = [add.subtask((i, i)) for i in xrange(100)]
@@ -724,6 +726,42 @@ the return value of each task in the header.  The task id returned by
 and get the final return value (but remember to :ref:`never have a task wait
 and get the final return value (but remember to :ref:`never have a task wait
 for other tasks <task-synchronous-subtasks>`)
 for other tasks <task-synchronous-subtasks>`)
 
 
+Error handling
+~~~~~~~~~~~~~~
+
+.. versionadded:: 3.0.14
+
+So what happens if one of the tasks raises an exception?
+
+Errors will propagate to the callback, so the callback will not be executed
+instead the callback changes to failure state, and the error is set
+to the :exc:`~celery.exceptions.ChordError` exception:
+
+.. code-block:: python
+
+    >>> c = chord([add.s(4, 4), raising_task.s(), add.s(8, 8)])
+    >>> result = c()
+    >>> result.get()
+    Traceback (most recent call last):
+      File "<stdin>", line 1, in <module>
+      File "*/celery/result.py", line 120, in get
+        interval=interval)
+      File "*/celery/backends/amqp.py", line 150, in wait_for
+        raise self.exception_to_python(meta['result'])
+    celery.exceptions.ChordError: Dependency 97de6f3f-ea67-4517-a21c-d867c61fcb47
+        raised ValueError('something something',)
+
+
+While the traceback may be different depending on which result backend is
+being used, you can see the error description includes the id of the task that failed
+and a string representation of the original exception.  You can also
+find the original traceback in ``result.traceback``.
+
+Note that the rest of the tasks will still execute, so the third task
+(``add.s(8, 8)``) is still executed even though the middle task failed.
+Also the :exc:`~celery.exceptions.ChordError` only shows the task that failed
+first (in time): it does not respect the ordering of the header group.
+
 .. _chord-important-notes:
 .. _chord-important-notes:
 
 
 Important Notes
 Important Notes

+ 1 - 1
extra/generic-init.d/celeryd

@@ -209,7 +209,7 @@ case "$1" in
         check_paths
         check_paths
     ;;
     ;;
     *)
     *)
-        echo "Usage: /etc/init.d/celeryd {start|stop|restart|kill|create_paths}"
+        echo "Usage: /etc/init.d/celeryd {start|stop|restart|kill|create-paths}"
         exit 64  # EX_USAGE
         exit 64  # EX_USAGE
     ;;
     ;;
 esac
 esac