Ver Fonte

Super refactor, merging everything for 3.2

Ask Solem há 11 anos atrás
pai
commit
1e9dd26592

+ 1 - 0
celery/app/amqp.py

@@ -297,6 +297,7 @@ class AMQP(object):
             headers={
                 'lang': 'py',
                 'c_type': name,
+                'task_id': task_id,
                 'eta': eta,
                 'expires': expires,
                 'callbacks': callbacks,

+ 0 - 1
celery/app/builtins.py

@@ -64,7 +64,6 @@ def add_unlock_chord_task(app):
 
         # check if the task group is ready, and if so apply the callback.
         callback = maybe_signature(callback, app)
-        root_id = callback.options.get('root_id')
         deps = GroupResult(
             group_id,
             [result_from_tuple(r, app=app) for r in result],

+ 8 - 5
celery/app/task.py

@@ -695,7 +695,7 @@ class Task(object):
 
         """
         # trace imports Task, so need to import inline.
-        from celery.app.trace import eager_trace_task
+        from celery.app.trace import build_tracer
 
         app = self._get_app()
         args = args or ()
@@ -736,12 +736,15 @@ class Task(object):
             kwargs.update(extend_with)
 
         tb = None
-        retval, info = eager_trace_task(task, task_id, args, kwargs,
-                                        app=self._get_app(),
-                                        request=request, propagate=throw)
+        tracer = build_tracer(
+            task.name, task, eager=True,
+            propagate=throw, app=self._get_app(),
+        )
+        ret = tracer(task_id, args, kwargs, request)
+        retval = ret.retval
         if isinstance(retval, ExceptionInfo):
             retval, tb = retval.exception, retval.traceback
-        state = states.SUCCESS if info is None else info.state
+        state = states.SUCCESS if ret.info is None else ret.info.state
         return EagerResult(task_id, retval, state, traceback=tb)
 
     def AsyncResult(self, task_id, **kwargs):

+ 140 - 20
celery/app/trace.py

@@ -15,33 +15,68 @@ from __future__ import absolute_import
 # but in the end it only resulted in bad performance and horrible tracebacks,
 # so instead we now use one closure per task class.
 
+import logging
 import os
 import socket
 import sys
 
+from collections import namedtuple
 from warnings import warn
 
 from billiard.einfo import ExceptionInfo
 from kombu.exceptions import EncodeError
-from kombu.utils import kwdict
+from kombu.serialization import decode as decode_message
+from kombu.utils.encoding import safe_repr, safe_str
 
 from celery import current_app, group
 from celery import states, signals
 from celery._state import _task_stack
 from celery.app import set_default_app
 from celery.app.task import Task as BaseTask, Context
-from celery.exceptions import Ignore, Reject, Retry
+from celery.exceptions import Ignore, Reject, Retry, InvalidTaskError
+from celery.five import monotonic
 from celery.utils.log import get_logger
 from celery.utils.objects import mro_lookup
 from celery.utils.serialization import (
-    get_pickleable_exception,
-    get_pickleable_etype,
+    get_pickleable_exception, get_pickled_exception, get_pickleable_etype,
 )
+from celery.utils.text import truncate
 
-__all__ = ['TraceInfo', 'build_tracer', 'trace_task', 'eager_trace_task',
+__all__ = ['TraceInfo', 'build_tracer', 'trace_task',
            'setup_worker_optimizations', 'reset_worker_optimizations']
 
-_logger = get_logger(__name__)
+logger = get_logger(__name__)
+info = logger.info
+
+#: Format string used to log task success.
+LOG_SUCCESS = """\
+Task %(name)s[%(id)s] succeeded in %(runtime)ss: %(return_value)s\
+"""
+
+#: Format string used to log task failure.
+LOG_FAILURE = """\
+Task %(name)s[%(id)s] %(description)s: %(exc)s\
+"""
+
+#: Format string used to log task internal error.
+LOG_INTERNAL_ERROR = """\
+Task %(name)s[%(id)s] %(description)s: %(exc)s\
+"""
+
+#: Format string used to log task ignored.
+LOG_IGNORED = """\
+Task %(name)s[%(id)s] %(description)s\
+"""
+
+#: Format string used to log task rejected.
+LOG_REJECTED = """\
+Task %(name)s[%(id)s] %(exc)s\
+"""
+
+#: Format string used to log task retry.
+LOG_RETRY = """\
+Task %(name)s[%(id)s] retry: %(exc)s\
+"""
 
 send_prerun = signals.task_prerun.send
 send_postrun = signals.task_postrun.send
@@ -59,6 +94,8 @@ IGNORE_STATES = frozenset([IGNORED, RETRY, REJECTED])
 _tasks = None
 _patched = {}
 
+trace_ok_t = namedtuple('trace_ok_t', ('retval', 'info', 'runtime', 'retstr'))
+
 
 def task_has_custom(task, attr):
     """Return true if the task or one of its bases
@@ -100,6 +137,10 @@ class TraceInfo(object):
             task.on_retry(reason.exc, req.id, req.args, req.kwargs, einfo)
             signals.task_retry.send(sender=task, request=req,
                                     reason=reason, einfo=einfo)
+            info(LOG_RETRY, {
+                'id': req.id, 'name': task.name,
+                'exc': safe_repr(reason.exc),
+            })
             return einfo
         finally:
             del(tb)
@@ -123,14 +164,71 @@ class TraceInfo(object):
                                       kwargs=req.kwargs,
                                       traceback=tb,
                                       einfo=einfo)
+            self._log_error(task, einfo)
             return einfo
         finally:
             del(tb)
 
+    def _log_error(self, task, einfo):
+        req = task.request
+        eobj = einfo.exception = get_pickled_exception(einfo.exception)
+        exception, traceback, exc_info, internal, sargs, skwargs = (
+            safe_repr(eobj),
+            safe_str(einfo.traceback),
+            einfo.exc_info,
+            einfo.internal,
+            safe_repr(req.args),
+            safe_repr(req.kwargs),
+        )
+        if task.throws and isinstance(eobj, task.throws):
+            do_send_mail, severity, exc_info, description = (
+                False, logging.INFO, None, 'raised expected',
+            )
+        else:
+            do_send_mail, severity, description = (
+                True, logging.ERROR, 'raised unexpected',
+            )
+        format = LOG_FAILURE
+
+        if internal:
+            if isinstance(einfo.exception, Reject):
+                format = LOG_REJECTED
+                description = 'rejected'
+                severity = logging.WARN
+                exc_info = einfo
+            elif isinstance(einfo.exception, Ignore):
+                format = LOG_IGNORED
+                description = 'ignored'
+                severity = logging.INFO
+                exc_info = None
+            else:
+                format = LOG_INTERNAL_ERROR
+                description = 'INTERNAL ERROR'
+                severity = logging.CRITICAL
+
+        context = {
+            'hostname': req.hostname,
+            'id': req.id,
+            'name': task.name,
+            'exc': exception,
+            'traceback': traceback,
+            'args': sargs,
+            'kwargs': skwargs,
+            'description': description,
+            'internal': internal,
+        }
+
+        logger.log(severity, format.strip(), context,
+                   exc_info=exc_info,
+                   extra={'data': context})
+
+        task.send_error_email(context, einfo.exception)
+
 
 def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
                  Info=TraceInfo, eager=False, propagate=False, app=None,
-                 IGNORE_STATES=IGNORE_STATES):
+                 monotonic=monotonic, truncate=truncate,
+                 trace_ok_t=trace_ok_t, IGNORE_STATES=IGNORE_STATES):
     """Return a function that traces task execution; catches all
     exceptions and updates result backend with the state and result
 
@@ -186,6 +284,7 @@ def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
     push_task = _task_stack.push
     pop_task = _task_stack.pop
     on_chord_part_return = backend.on_chord_part_return
+    _does_info = logger.isEnabledFor(logging.INFO)
 
     prerun_receivers = signals.task_prerun.receivers
     postrun_receivers = signals.task_postrun.receivers
@@ -209,6 +308,8 @@ def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
     def trace_task(uuid, args, kwargs, request=None):
         # R      - is the possibly prepared return value.
         # I      - is the Info object.
+        # T      - runtime
+        # Rstr   - textual representation of return value
         # retval - is the always unmodified return value.
         # state  - is the resulting task state.
 
@@ -216,9 +317,14 @@ def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
         # for performance reasons, and because the function is so long
         # we want the main variables (I, and R) to stand out visually from the
         # the rest of the variables, so breaking PEP8 is worth it ;)
-        R = I = retval = state = None
-        kwargs = kwdict(kwargs)
+        R = I = T = Rstr = retval = state = None
+        time_start = monotonic()
         try:
+            try:
+                kwargs.items
+            except AttributeError:
+                raise InvalidTaskError(
+                    'Task keyword arguments is not a mapping')
             push_task(task)
             task_request = Context(request or {}, args=args,
                                    called_directly=False, kwargs=kwargs)
@@ -289,6 +395,13 @@ def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
                             task_on_success(retval, uuid, args, kwargs)
                         if success_receivers:
                             send_success(sender=task, result=retval)
+                        if _does_info:
+                            T = monotonic() - time_start
+                            Rstr = truncate(safe_repr(R), 256)
+                            info(LOG_SUCCESS, {
+                                'id': uuid, 'name': name,
+                                'return_value': Rstr, 'runtime': T,
+                            })
 
                 # -* POST *-
                 if state not in IGNORE_STATES:
@@ -314,15 +427,15 @@ def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
                         except (KeyboardInterrupt, SystemExit, MemoryError):
                             raise
                         except Exception as exc:
-                            _logger.error('Process cleanup failed: %r', exc,
-                                          exc_info=True)
+                            logger.error('Process cleanup failed: %r', exc,
+                                         exc_info=True)
         except MemoryError:
             raise
         except Exception as exc:
             if eager:
                 raise
             R = report_internal_error(task, exc)
-        return R, I
+        return trace_ok_t(R, I, T, Rstr)
 
     return trace_task
 
@@ -342,16 +455,23 @@ def _trace_task_ret(name, uuid, args, kwargs, request={}, app=None, **opts):
 trace_task_ret = _trace_task_ret
 
 
-def _fast_trace_task(task, uuid, args, kwargs, request={}):
+def _fast_trace_task_v1(task, uuid, args, kwargs, request={}):
     # setup_worker_optimizations will point trace_task_ret to here,
     # so this is the function used in the worker.
-    return _tasks[task].__trace__(uuid, args, kwargs, request)[0]
-
-
-def eager_trace_task(task, uuid, args, kwargs, request=None, **opts):
-    opts.setdefault('eager', True)
-    return build_tracer(task.name, task, **opts)(
-        uuid, args, kwargs, request)
+    R, I, T, Rstr = _tasks[task].__trace__(uuid, args, kwargs, request)[0]
+    # exception instance if error, else result text
+    return (R if I else Rstr), T
+
+
+def _fast_trace_task(task, uuid, request, body, content_type,
+                     content_encoding, decode_message=decode_message,
+                     **extra_request):
+    args, kwargs = decode_message(body, content_type, content_encoding)
+    request.update(args=args, kwargs=kwargs, **extra_request)
+    R, I, T, Rstr = _tasks[task].__trace__(
+        uuid, args, kwargs, request,
+    )
+    return (R if I else Rstr), T
 
 
 def report_internal_error(task, exc):

+ 0 - 9
celery/concurrency/asynpool.py

@@ -37,7 +37,6 @@ from amqp.utils import promise
 from billiard.pool import RUN, TERMINATE, ACK, NACK, WorkersJoined
 from billiard import pool as _pool
 from billiard.compat import buf_t, setblocking, isblocking
-from billiard.einfo import ExceptionInfo
 from billiard.queues import _SimpleQueue
 from kombu.async import READ, WRITE, ERR
 from kombu.serialization import pickle as _pickle
@@ -46,7 +45,6 @@ from kombu.utils.compat import get_errno
 from kombu.utils.eventio import SELECT_BAD_FD
 from celery.five import Counter, items, values
 from celery.utils.log import get_logger
-from celery.utils.text import truncate
 from celery.worker import state as worker_state
 
 try:
@@ -96,8 +94,6 @@ SCHED_STRATEGIES = {
     'fair': SCHED_STRATEGY_FAIR,
 }
 
-RESULT_MAXLEN = 128
-
 Ack = namedtuple('Ack', ('id', 'fd', 'payload'))
 
 
@@ -170,11 +166,6 @@ class Worker(_pool.Worker):
         # is writable.
         self.outq.put((WORKER_UP, (pid, )))
 
-    def prepare_result(self, result, RESULT_MAXLEN=RESULT_MAXLEN):
-        if not isinstance(result, ExceptionInfo):
-            return truncate(repr(result), RESULT_MAXLEN)
-        return result
-
 
 class ResultHandler(_pool.ResultHandler):
     """Handles messages from the pool processes."""

+ 2 - 1
celery/concurrency/base.py

@@ -66,6 +66,7 @@ class BasePool(object):
 
     _state = None
     _pool = None
+    _does_debug = True
 
     #: only used by multiprocessing pool
     uses_semaphore = False
@@ -79,7 +80,6 @@ class BasePool(object):
         self.options = options
         self.forking_enable = forking_enable
         self.callbacks_propagate = callbacks_propagate
-        self._does_debug = logger.isEnabledFor(logging.DEBUG)
 
     def on_start(self):
         pass
@@ -128,6 +128,7 @@ class BasePool(object):
         self.on_terminate()
 
     def start(self):
+        self._does_debug = logger.isEnabledFor(logging.DEBUG)
         self.on_start()
         self._state = self.RUN
 

+ 5 - 3
celery/tests/tasks/test_trace.py

@@ -6,7 +6,7 @@ from celery import states
 from celery.exceptions import Ignore, Retry
 from celery.app.trace import (
     TraceInfo,
-    eager_trace_task,
+    build_tracer,
     trace_task,
     setup_worker_optimizations,
     reset_worker_optimizations,
@@ -15,8 +15,10 @@ from celery.tests.case import AppCase, Mock, patch
 
 
 def trace(app, task, args=(), kwargs={}, propagate=False, **opts):
-    return eager_trace_task(task, 'id-1', args, kwargs,
-                            propagate=propagate, app=app, **opts)
+    t = build_tracer(task.name, task,
+                     eager=True, propagate=propagate, app=app, **opts)
+    ret = t('id-1', args, kwargs, None)
+    return ret.retval, ret.info
 
 
 class TraceCase(AppCase):

+ 22 - 3
celery/worker/consumer.py

@@ -127,6 +127,8 @@ MINGLE_GET_FIELDS = itemgetter('clock', 'revoked')
 
 
 def dump_body(m, body):
+    # v2 protocol does not deserialize body
+    body = m.body if body is None else body
     if isinstance(body, buffer_t):
         body = bytes_t(body)
     return '{0} ({1}b)'.format(truncate(safe_repr(body), 1024),
@@ -445,7 +447,7 @@ class Consumer(object):
         on_invalid_task = self.on_invalid_task
         callbacks = self.on_task_message
 
-        def on_task_received(body, message):
+        def on_v1_task_received(body, message):
             try:
                 name = body['task']
             except (KeyError, TypeError):
@@ -461,6 +463,22 @@ class Consumer(object):
             except InvalidTaskError as exc:
                 on_invalid_task(body, message, exc)
 
+        def on_task_received(message):
+            headers = message.headers
+            try:
+                type_ = headers['c_type']
+            except KeyError:
+                return on_v1_task_received(message.payload, message)
+            try:
+                strategies[type_](
+                    message, None,
+                    message.ack_log_error, message.reject_log_error, callbacks,
+                )
+            except KeyError as exc:
+                on_unknown_task(None, message, exc)
+            except InvalidTaskError as exc:
+                on_invalid_task(None, message, exc)
+
         return on_task_received
 
     def __repr__(self):
@@ -541,8 +559,9 @@ class Heart(bootsteps.StartStopStep):
         c.heart = None
 
     def start(self, c):
-        c.heart = heartbeat.Heart(c.timer, c.event_dispatcher,
-            self.heartbeat_interval)
+        c.heart = heartbeat.Heart(
+            c.timer, c.event_dispatcher, self.heartbeat_interval,
+        )
         c.heart.start()
 
     def stop(self, c):

+ 3 - 1
celery/worker/control.py

@@ -364,7 +364,9 @@ def active_queues(state):
 
 
 def _wanted_config_key(key):
-    return isinstance(key, string_t) and key.isupper() and not key.startswith('__')
+    return (isinstance(key, string_t) and
+            key.isupper() and
+            not key.startswith('__'))
 
 
 @Panel.register

+ 78 - 218
celery/worker/job.py

@@ -17,7 +17,6 @@ from billiard.einfo import ExceptionInfo
 from datetime import datetime
 from weakref import ref
 
-from kombu.utils import kwdict, reprcall
 from kombu.utils.encoding import safe_repr, safe_str
 
 from celery import signals
@@ -27,14 +26,12 @@ from celery.exceptions import (
     SoftTimeLimitExceeded, TimeLimitExceeded,
     WorkerLostError, Terminated, Retry, Reject,
 )
-from celery.five import items, monotonic, string, string_t
+from celery.five import string
 from celery.platforms import signals as _signals
-from celery.utils import fun_takes_kwargs
 from celery.utils.functional import noop
 from celery.utils.log import get_logger
-from celery.utils.serialization import get_pickled_exception
-from celery.utils.text import truncate
 from celery.utils.timeutils import maybe_iso8601, timezone, maybe_make_aware
+from celery.utils.serialization import get_pickled_exception
 
 from . import state
 
@@ -69,8 +66,6 @@ task_accepted = state.task_accepted
 task_ready = state.task_ready
 revoked_tasks = state.revoked
 
-NEEDS_KWDICT = sys.version_info <= (2, 6)
-
 #: Use when no message object passed to :class:`Request`.
 DEFAULT_FIELDS = {
     'headers': None,
@@ -85,63 +80,46 @@ DEFAULT_FIELDS = {
 }
 
 
+class RequestV1(object):
+    if not IS_PYPY:
+        __slots__ = (
+            'app', 'name', 'id', 'root_id', 'parent_id',
+            'on_ack', 'hostname', 'eventer', 'connection_errors', 'task',
+            'eta', 'expires', 'request_dict', 'acknowledged', 'on_reject',
+            'utc', 'time_start', 'worker_pid', '_already_revoked',
+            '_terminate_on_ack', '_apply_result',
+            '_tzlocal', '__weakref__', '__dict__',
+        )
+
+
 class Request(object):
     """A request for task execution."""
     if not IS_PYPY:  # pragma: no cover
         __slots__ = (
-            'app', 'name', 'id', 'args', 'kwargs', 'on_ack',
+            'app', 'name', 'id', 'on_ack', 'payload',
             'hostname', 'eventer', 'connection_errors', 'task', 'eta',
             'expires', 'request_dict', 'acknowledged', 'on_reject',
-            'utc', 'time_start', 'worker_pid', '_already_revoked',
-            '_terminate_on_ack', '_apply_result',
+            'utc', 'time_start', 'worker_pid', 'timeouts',
+            'content_type', 'content_encoding',
+            '_already_revoked', '_terminate_on_ack', '_apply_result',
             '_tzlocal', '__weakref__', '__dict__',
         )
 
-    #: Format string used to log task success.
-    success_msg = """\
-        Task %(name)s[%(id)s] succeeded in %(runtime)ss: %(return_value)s
-    """
-
-    #: Format string used to log task failure.
-    error_msg = """\
-        Task %(name)s[%(id)s] %(description)s: %(exc)s
-    """
-
-    #: Format string used to log internal error.
-    internal_error_msg = """\
-        Task %(name)s[%(id)s] %(description)s: %(exc)s
-    """
-
-    ignored_msg = """\
-        Task %(name)s[%(id)s] %(description)s
-    """
-
-    rejected_msg = """\
-        Task %(name)s[%(id)s] %(exc)s
-    """
-
-    #: Format string used to log task retry.
-    retry_msg = """Task %(name)s[%(id)s] retry: %(exc)s"""
-
-    def __init__(self, body, on_ack=noop,
+    def __init__(self, message, on_ack=noop,
                  hostname=None, eventer=None, app=None,
                  connection_errors=None, request_dict=None,
-                 message=None, task=None, on_reject=noop, **opts):
+                 task=None, on_reject=noop, **opts):
+        headers = message.headers
         self.app = app
-        name = self.name = body['task']
-        self.id = body['id']
-        self.args = body.get('args', [])
-        self.kwargs = body.get('kwargs', {})
-        try:
-            self.kwargs.items
-        except AttributeError:
-            raise InvalidTaskError(
-                'Task keyword arguments is not a mapping')
-        if NEEDS_KWDICT:
-            self.kwargs = kwdict(self.kwargs)
-        eta = body.get('eta')
-        expires = body.get('expires')
-        utc = self.utc = body.get('utc', False)
+        name = self.name = headers['c_type']
+        self.id = headers['task_id']
+        self.payload = message.body
+        self.content_type = message.content_type
+        self.content_encoding = message.content_encoding
+        eta = headers.get('eta')
+        expires = headers.get('expires')
+        self.timeouts = (headers['timeouts'] if 'timeouts' in headers
+                         else (None, None))
         self.on_ack = on_ack
         self.on_reject = on_reject
         self.hostname = hostname or socket.gethostname()
@@ -157,75 +135,42 @@ class Request(object):
         # supported at this point is UTC.
         if eta is not None:
             try:
-                self.eta = maybe_iso8601(eta)
+                eta = maybe_iso8601(eta)
             except (AttributeError, ValueError, TypeError) as exc:
                 raise InvalidTaskError(
                     'invalid eta value {0!r}: {1}'.format(eta, exc))
-            if utc:
-                self.eta = maybe_make_aware(self.eta, self.tzlocal)
+            self.eta = maybe_make_aware(eta, self.tzlocal)
         else:
             self.eta = None
         if expires is not None:
             try:
-                self.expires = maybe_iso8601(expires)
+                expires = maybe_iso8601(expires)
             except (AttributeError, ValueError, TypeError) as exc:
                 raise InvalidTaskError(
                     'invalid expires value {0!r}: {1}'.format(expires, exc))
-            if utc:
-                self.expires = maybe_make_aware(self.expires, self.tzlocal)
+            self.expires = maybe_make_aware(expires, self.tzlocal)
         else:
             self.expires = None
 
-        if message:
-            delivery_info = message.delivery_info or {}
-            properties = message.properties or {}
-            body.update({
-                'headers': message.headers,
-                'reply_to': properties.get('reply_to'),
-                'correlation_id': properties.get('correlation_id'),
-                'delivery_info': {
-                    'exchange': delivery_info.get('exchange'),
-                    'routing_key': delivery_info.get('routing_key'),
-                    'priority': delivery_info.get('priority'),
-                    'redelivered': delivery_info.get('redelivered'),
-                }
-
-            })
-        else:
-            body.update(DEFAULT_FIELDS)
-        self.request_dict = body
+        delivery_info = message.delivery_info or {}
+        properties = message.properties or {}
+        headers.update({
+            'reply_to': properties.get('reply_to'),
+            'correlation_id': properties.get('correlation_id'),
+            'delivery_info': {
+                'exchange': delivery_info.get('exchange'),
+                'routing_key': delivery_info.get('routing_key'),
+                'priority': delivery_info.get('priority'),
+                'redelivered': delivery_info.get('redelivered'),
+            }
+
+        })
+        self.request_dict = headers
 
     @property
     def delivery_info(self):
         return self.request_dict['delivery_info']
 
-    def extend_with_default_kwargs(self):
-        """Extend the tasks keyword arguments with standard task arguments.
-
-        Currently these are `logfile`, `loglevel`, `task_id`,
-        `task_name`, `task_retries`, and `delivery_info`.
-
-        See :meth:`celery.task.base.Task.run` for more information.
-
-        Magic keyword arguments are deprecated and will be removed
-        in version 4.0.
-
-        """
-        kwargs = dict(self.kwargs)
-        default_kwargs = {'logfile': None,   # deprecated
-                          'loglevel': None,  # deprecated
-                          'task_id': self.id,
-                          'task_name': self.name,
-                          'task_retries': self.request_dict.get('retries', 0),
-                          'task_is_eager': False,
-                          'delivery_info': self.delivery_info}
-        fun = self.task.run
-        supported_keys = fun_takes_kwargs(fun, default_kwargs)
-        extend_with = {key: val for key, val in items(default_kwargs)
-                       if key in supported_keys}
-        kwargs.update(extend_with)
-        return kwargs
-
     def execute_using_pool(self, pool, **kwargs):
         """Used by the worker to send this task to the pool.
 
@@ -235,32 +180,28 @@ class Request(object):
             and ignored.
 
         """
-        uuid = self.id
+        task_id = self.id
         task = self.task
         if self.revoked():
-            raise TaskRevokedError(uuid)
+            raise TaskRevokedError(task_id)
 
-        hostname = self.hostname
-        kwargs = self.kwargs
-        if task.accept_magic_kwargs:
-            kwargs = self.extend_with_default_kwargs()
-        request = self.request_dict
-        request.update({'hostname': hostname, 'is_eager': False,
-                        'delivery_info': self.delivery_info,
-                        'group': self.request_dict.get('taskset')})
-        timeout, soft_timeout = request.get('timelimit', (None, None))
+        payload = self.payload
+        timeout, soft_timeout = self.timeouts
         timeout = timeout or task.time_limit
         soft_timeout = soft_timeout or task.soft_time_limit
         result = pool.apply_async(
             trace_task_ret,
-            args=(self.name, uuid, self.args, kwargs, request),
+            args=(self.name, task_id, self.request_dict,
+                  bytes(payload) if isinstance(payload, buffer) else payload,
+                  self.content_type, self.content_encoding),
+            kwargs={'hostname': self.hostname, 'is_eager': False},
             accept_callback=self.on_accepted,
             timeout_callback=self.on_timeout,
             callback=self.on_success,
             error_callback=self.on_failure,
-            soft_timeout=soft_timeout,
-            timeout=timeout,
-            correlation_id=uuid,
+            soft_timeout=soft_timeout or task.soft_time_limit,
+            timeout=timeout or task.time_limit,
+            correlation_id=task_id,
         )
         # cannot create weakref to None
         self._apply_result = ref(result) if result is not None else result
@@ -281,8 +222,6 @@ class Request(object):
             self.acknowledge()
 
         kwargs = self.kwargs
-        if self.task.accept_magic_kwargs:
-            kwargs = self.extend_with_default_kwargs()
         request = self.request_dict
         request.update({'loglevel': loglevel, 'logfile': logfile,
                         'hostname': self.hostname, 'is_eager': False,
@@ -374,7 +313,7 @@ class Request(object):
         if self.task.acks_late:
             self.acknowledge()
 
-    def on_success(self, ret_value, now=None, nowfun=monotonic):
+    def on_success(self, ret_value, **kwargs):
         """Handler called if the task was successfully processed."""
         if isinstance(ret_value, ExceptionInfo):
             if isinstance(ret_value.exception, (
@@ -387,18 +326,10 @@ class Request(object):
             self.acknowledge()
 
         if self.eventer and self.eventer.enabled:
-            now = nowfun()
-            runtime = self.time_start and (now - self.time_start) or 0
-            self.send_event('task-succeeded',
-                            result=safe_repr(ret_value), runtime=runtime)
-
-        if _does_info:
-            now = now or nowfun()
-            runtime = self.time_start and (now - self.time_start) or 0
-            info(self.success_msg.strip(), {
-                'id': self.id, 'name': self.name,
-                'return_value': self.repr_result(ret_value),
-                'runtime': runtime})
+            result, runtime = ret_value
+            self.send_event(
+                'task-succeeded', result=ret_value, runtime=runtime,
+            )
 
     def on_retry(self, exc_info):
         """Handler called if the task should be retried."""
@@ -409,17 +340,19 @@ class Request(object):
                         exception=safe_repr(exc_info.exception.exc),
                         traceback=safe_str(exc_info.traceback))
 
-        if _does_info:
-            info(self.retry_msg.strip(),
-                 {'id': self.id, 'name': self.name,
-                  'exc': exc_info.exception})
-
     def on_failure(self, exc_info):
         """Handler called if the task raised an exception."""
         task_ready(self)
         send_failed_event = True
 
-        if not exc_info.internal:
+        if exc_info.internal:
+            if isinstance(exc_info.exception, MemoryError):
+                raise MemoryError('Process got: %s' % (exc_info.exception, ))
+            elif isinstance(exc_info.exception, Reject):
+                self.reject(requeue=exc_info.exception.requeue)
+            elif isinstance(exc_info.exception, Ignore):
+                self.acknowledge()
+        else:
             exc = exc_info.exception
 
             if isinstance(exc, Retry):
@@ -439,77 +372,14 @@ class Request(object):
             # (acks_late) acknowledge after result stored.
             if self.task.acks_late:
                 self.acknowledge()
-        self._log_error(exc_info, send_failed_event=send_failed_event)
-
-    def _log_error(self, einfo, send_failed_event=True):
-        einfo.exception = get_pickled_exception(einfo.exception)
-        eobj = einfo.exception
-        exception, traceback, exc_info, internal, sargs, skwargs = (
-            safe_repr(eobj),
-            safe_str(einfo.traceback),
-            einfo.exc_info,
-            einfo.internal,
-            safe_repr(self.args),
-            safe_repr(self.kwargs),
-        )
-        task = self.task
-        if task.throws and isinstance(eobj, task.throws):
-            do_send_mail, severity, exc_info, description = (
-                False, logging.INFO, None, 'raised expected',
-            )
-        else:
-            do_send_mail, severity, description = (
-                True, logging.ERROR, 'raised unexpected',
-            )
-        format = self.error_msg
+
         if send_failed_event:
             self.send_event(
-                'task-failed', exception=exception, traceback=traceback,
+                'task-failed',
+                exception=safe_repr(get_pickled_exception(exc_info.exception)),
+                traceback=exc_info.traceback,
             )
 
-        if internal:
-            if isinstance(einfo.exception, MemoryError):
-                raise MemoryError('Process got: %s' % (einfo.exception, ))
-            elif isinstance(einfo.exception, Reject):
-                format = self.rejected_msg
-                description = 'rejected'
-                severity = logging.WARN
-                exc_info = einfo
-                self.reject(requeue=einfo.exception.requeue)
-            elif isinstance(einfo.exception, Ignore):
-                format = self.ignored_msg
-                description = 'ignored'
-                severity = logging.INFO
-                exc_info = None
-                self.acknowledge()
-            else:
-                format = self.internal_error_msg
-                description = 'INTERNAL ERROR'
-                severity = logging.CRITICAL
-
-        context = {
-            'hostname': self.hostname,
-            'id': self.id,
-            'name': self.name,
-            'exc': exception,
-            'traceback': traceback,
-            'args': sargs,
-            'kwargs': skwargs,
-            'description': description,
-        }
-
-        logger.log(severity, format.strip(), context,
-                   exc_info=exc_info,
-                   extra={'data': {'id': self.id,
-                                   'name': self.name,
-                                   'args': sargs,
-                                   'kwargs': skwargs,
-                                   'hostname': self.hostname,
-                                   'internal': internal}})
-
-        if do_send_mail:
-            task.send_error_email(context, einfo.exception)
-
     def acknowledge(self):
         """Acknowledge task."""
         if not self.acknowledged:
@@ -521,18 +391,10 @@ class Request(object):
             self.on_reject(logger, self.connection_errors, requeue)
             self.acknowledged = True
 
-    def repr_result(self, result, maxlen=RESULT_MAXLEN):
-        # 46 is the length needed to fit
-        #     'the quick brown fox jumps over the lazy dog' :)
-        if not isinstance(result, string_t):
-            result = safe_repr(result)
-        return truncate(result) if len(result) > maxlen else result
-
     def info(self, safe=False):
         return {'id': self.id,
                 'name': self.name,
-                'args': self.args if safe else safe_repr(self.args),
-                'kwargs': self.kwargs if safe else safe_repr(self.kwargs),
+                'body': self.body,
                 'hostname': self.hostname,
                 'time_start': self.time_start,
                 'acknowledged': self.acknowledged,
@@ -546,9 +408,7 @@ class Request(object):
     shortinfo = __str__
 
     def __repr__(self):
-        return '<{0} {1}: {2}>'.format(
-            type(self).__name__, self.id,
-            reprcall(self.name, self.args, self.kwargs))
+        return '<{0} {1}: {2}>'.format(type(self).__name__, self.id, self.name)
 
     @property
     def tzlocal(self):

+ 2 - 2
celery/worker/loops.py

@@ -37,7 +37,7 @@ def asynloop(obj, connection, consumer, blueprint, hub, qos,
     if heartbeat and connection.supports_heartbeats:
         hub.call_repeatedly(heartbeat / hbrate, hbtick, hbrate)
 
-    consumer.callbacks = [on_task_received]
+    consumer.on_message = on_task_received
     consumer.consume()
     obj.on_ready()
     obj.controller.register_with_event_loop(hub)
@@ -86,7 +86,7 @@ def synloop(obj, connection, consumer, blueprint, hub, qos,
     """Fallback blocking event loop for transports that doesn't support AIO."""
 
     on_task_received = obj.create_task_handler()
-    consumer.register_callback(on_task_received)
+    consumer.on_message = on_task_received
     consumer.consume()
 
     obj.on_ready()

+ 15 - 9
celery/worker/strategy.py

@@ -11,12 +11,11 @@ from __future__ import absolute_import
 import logging
 
 from kombu.async.timer import to_timestamp
-from kombu.utils.encoding import safe_repr
 
 from celery.utils.log import get_logger
 from celery.utils.timeutils import timezone
 
-from .job import Request
+from .job import Request, RequestV1
 from .state import task_reserved
 
 __all__ = ['default']
@@ -29,7 +28,8 @@ def default(task, app, consumer,
             to_system_tz=timezone.to_system):
     hostname = consumer.hostname
     eventer = consumer.event_dispatcher
-    Req = Request
+    ReqV2 = Request
+    ReqV1 = RequestV1
     connection_errors = consumer.connection_errors
     _does_info = logger.isEnabledFor(logging.INFO)
     events = eventer and eventer.enabled
@@ -43,11 +43,17 @@ def default(task, app, consumer,
 
     def task_message_handler(message, body, ack, reject, callbacks,
                              to_timestamp=to_timestamp):
-        req = Req(body, on_ack=ack, on_reject=reject,
-                  app=app, hostname=hostname,
-                  eventer=eventer, task=task,
-                  connection_errors=connection_errors,
-                  message=message)
+        if body is None:
+            req = ReqV2(message,
+                        on_ack=ack, on_reject=reject, app=app,
+                        hostname=hostname, eventer=eventer, task=task,
+                        connection_errors=connection_errors)
+        else:
+            req = ReqV1(body,
+                        on_ack=ack, on_reject=reject, app=app,
+                        hostname=hostname, eventer=eventer, task=task,
+                        connection_errors=connection_errors,
+                        message=message)
         if req.revoked():
             return
 
@@ -58,7 +64,7 @@ def default(task, app, consumer,
             send_event(
                 'task-received',
                 uuid=req.id, name=req.name,
-                args=safe_repr(req.args), kwargs=safe_repr(req.kwargs),
+                args='', kwargs='',
                 retries=req.request_dict.get('retries', 0),
                 eta=req.eta and req.eta.isoformat(),
                 expires=req.expires and req.expires.isoformat(),

+ 3 - 2
docs/internals/protov2.rst

@@ -103,6 +103,9 @@ Definition
     headers = {
         'lang': (string)'py'
         'c_type': (string)task,
+        'task_id': (uuid)task_id,
+        'root_id': (uuid)root_id,
+        'parent_id': (uuid)parent_id,
 
         # optional
         'c_meth': (string)unused,
@@ -116,8 +119,6 @@ Definition
         'chord': (uuid)chord_id,
         'retries': (int)retries,
         'timelimit': (tuple)(soft, hard),
-        'root_id': (uuid)root_id,
-        'parent_id': (uuid)parent_id,
     }
 
     body = (args, kwargs)

+ 1 - 0
funtests/stress/stress/templates.py

@@ -70,6 +70,7 @@ class default(object):
         'interval_max': 2,
         'interval_step': 0.1,
     }
+    CELERY_TASK_PROTOCOL = 2
 
 
 @template()