Browse Source

Message protocol v2 now includes repr of args/kwargs. Closes #2847

Ask Solem 9 years ago
parent
commit
e71652d384

+ 10 - 5
celery/app/amqp.py

@@ -18,11 +18,11 @@ from kombu import Connection, Consumer, Exchange, Producer, Queue
 from kombu.common import Broadcast
 from kombu.pools import ProducerPool
 from kombu.utils import cached_property
-from kombu.utils.encoding import safe_repr
 from kombu.utils.functional import maybe_list
 
 from celery import signals
 from celery.five import items, string_t
+from celery.utils.saferepr import saferepr
 from celery.utils.text import indent as textindent
 from celery.utils.timeutils import to_utc
 
@@ -293,6 +293,9 @@ class AMQP(object):
         eta = eta and eta.isoformat()
         expires = expires and expires.isoformat()
 
+        argsrepr = saferepr(args)
+        kwargsrepr = saferepr(kwargs)
+
         return task_message(
             headers={
                 'lang': 'py',
@@ -305,6 +308,8 @@ class AMQP(object):
                 'timelimit': [time_limit, soft_time_limit],
                 'root_id': root_id,
                 'parent_id': parent_id,
+                'argsrepr': argsrepr,
+                'kwargsrepr': kwargsrepr,
             },
             properties={
                 'correlation_id': task_id,
@@ -323,8 +328,8 @@ class AMQP(object):
                 'root': root_id,
                 'parent': parent_id,
                 'name': name,
-                'args': safe_repr(args),
-                'kwargs': safe_repr(kwargs),
+                'args': argsrepr,
+                'kwargs': kwargsrepr,
                 'retries': retries,
                 'eta': eta,
                 'expires': expires,
@@ -385,8 +390,8 @@ class AMQP(object):
             sent_event={
                 'uuid': task_id,
                 'name': name,
-                'args': safe_repr(args),
-                'kwargs': safe_repr(kwargs),
+                'args': saferepr(args),
+                'kwargs': saferepr(kwargs),
                 'retries': retries,
                 'eta': eta,
                 'expires': expires,

+ 167 - 0
celery/tests/utils/test_saferepr.py

@@ -0,0 +1,167 @@
+from __future__ import absolute_import, unicode_literals
+
+import re
+
+from decimal import Decimal
+from pprint import pprint
+
+from celery.five import items, long_t, values
+
+from celery.utils.saferepr import saferepr
+
+from celery.tests.case import Case
+
+EXPECTED_1 = """\
+{'rest': {'baz': 'The quick brown fox jumps over the lazy dog.', \
+'foo': 'The quick brown fox jumps...', ...}}\
+"""
+
+D_NUMBERS = {
+    b'integer': 1,
+    b'float': 1.3,
+    b'decimal': Decimal("1.3"),
+    b'long': long_t(1.3),
+    b'complex': complex(13.3),
+}
+D_INT_KEYS = {v: k for k, v in items(D_NUMBERS)}
+
+QUICK_BROWN_FOX = 'The quick brown fox jumps over the lazy dog.'
+B_QUICK_BROWN_FOX = b'The quick brown fox jumps over the lazy dog.'
+
+D_TEXT = {
+    b'foo': QUICK_BROWN_FOX,
+    b'bar': B_QUICK_BROWN_FOX,
+    b'baz': B_QUICK_BROWN_FOX,
+    b'xuzzy': B_QUICK_BROWN_FOX,
+}
+
+L_NUMBERS = list(values(D_NUMBERS))
+
+D_TEXT_LARGE = {
+    b'bazxuzzyfoobarlongverylonglong': QUICK_BROWN_FOX * 30,
+}
+
+D_ALL = {
+    b'numbers': D_NUMBERS,
+    b'intkeys': D_INT_KEYS,
+    b'text': D_TEXT,
+    b'largetext': D_TEXT_LARGE,
+}
+
+D_D_TEXT = {b'rest': D_TEXT}
+
+RE_OLD_SET_REPR = re.compile(r'(?:frozen)?set\d?\(\[(.+?)\]\)')
+RE_OLD_SET_REPR_REPLACE = r'{\1}'
+
+
+def from_old_repr(s):
+    return RE_OLD_SET_REPR.sub(
+        RE_OLD_SET_REPR_REPLACE, s).replace("u'", "'")
+
+
+class list2(list):
+    pass
+
+
+class list3(list):
+
+    def __repr__(self):
+        return list.__repr__(self)
+
+
+class tuple2(tuple):
+    pass
+
+
+class tuple3(tuple):
+
+    def __repr__(self):
+        return tuple.__repr__(self)
+
+
+class set2(set):
+    pass
+
+
+class set3(set):
+
+    def __repr__(self):
+        return set.__repr__(self)
+
+
+class frozenset2(frozenset):
+    pass
+
+
+class frozenset3(frozenset):
+
+    def __repr__(self):
+        return frozenset.__repr__(self)
+
+
+class dict2(dict):
+    pass
+
+
+class dict3(dict):
+
+    def __repr__(self):
+        return dict.__repr__(self)
+
+
+class Unorderable:
+
+    def __repr__(self):
+        return str(id(self))
+
+
+class test_saferepr(Case):
+
+    def test_safe_types(self):
+        for value in values(D_NUMBERS):
+            self.assertEqual(saferepr(value), repr(value))
+
+    def test_numbers_dict(self):
+        self.assertEqual(saferepr(D_NUMBERS), repr(D_NUMBERS))
+
+    def test_numbers_list(self):
+        self.assertEqual(saferepr(L_NUMBERS), repr(L_NUMBERS))
+
+    def test_numbers_keys(self):
+        self.assertEqual(saferepr(D_INT_KEYS), repr(D_INT_KEYS))
+
+    def test_text(self):
+        self.assertEqual(saferepr(D_TEXT), repr(D_TEXT).replace("u'", "'"))
+
+    def test_text_maxlen(self):
+        self.assertEqual(saferepr(D_D_TEXT, 100), EXPECTED_1)
+
+    def test_same_as_repr(self):
+        # Simple objects, small containers and classes that overwrite __repr__
+        # For those the result should be the same as repr().
+        # Ahem.  The docs don't say anything about that -- this appears to
+        # be testing an implementation quirk.  Starting in Python 2.5, it's
+        # not true for dicts:  pprint always sorts dicts by key now; before,
+        # it sorted a dict display if and only if the display required
+        # multiple lines.  For that reason, dicts with more than one element
+        # aren't tested here.
+        types = (
+            0, 0, 0+0j, 0.0, "", b"",
+            (), tuple2(), tuple3(),
+            [], list2(), list3(),
+            set(), set2(), set3(),
+            frozenset(), frozenset2(), frozenset3(),
+            {}, dict2(), dict3(),
+            self.assertTrue, pprint,
+            -6, -6, -6-6j, -1.5, "x", b"x", (3,), [3], {3: 6},
+            (1, 2), [3, 4], {5: 6},
+            tuple2((1, 2)), tuple3((1, 2)), tuple3(range(100)),
+            [3, 4], list2([3, 4]), list3([3, 4]), list3(range(100)),
+            set({7}), set2({7}), set3({7}),
+            frozenset({8}), frozenset2({8}), frozenset3({8}),
+            dict2({5: 6}), dict3({5: 6}),
+            range(10, -11, -1)
+        )
+        for simple in types:
+            native = from_old_repr(repr(simple))
+            self.assertEqual(saferepr(simple), native)

+ 5 - 4
celery/tests/utils/test_text.py

@@ -1,13 +1,14 @@
-from __future__ import absolute_import
+from __future__ import absolute_import, unicode_literals
 
 from celery.utils.text import (
-    indent,
-    ensure_2lines,
     abbr,
-    truncate,
     abbrtask,
+    ensure_2lines,
+    indent,
     pretty,
+    truncate,
 )
+
 from celery.tests.case import AppCase, Case
 
 RANDTEXT = """\

+ 170 - 0
celery/utils/saferepr.py

@@ -0,0 +1,170 @@
+# -*- coding: utf-8 -*-
+"""
+    celery.utils.saferepr
+    ~~~~~~~~~~~~~~~~~~~~~
+
+    Streaming, truncating, non-recursive version of :func:`repr`.
+
+    Differences from regular :func:`repr`:
+
+    - Sets are represented the Python 3 way: ``{1, 2}`` vs ``set([1, 2])``.
+    - Unicode strings does not have the ``u'`` prefix, even on Python 2.
+
+    Very slow with no limits, super quick with limits.
+
+"""
+from collections import Iterable, Mapping, deque, namedtuple
+
+from itertools import chain
+from numbers import Number
+from pprint import _recursion
+
+from celery.five import items, text_t
+
+from .text import truncate
+
+__all__ = ['saferepr']
+
+_literal = namedtuple('_literal', ('value', 'truncate', 'direction'))
+_key = namedtuple('_key', ('value',))
+_quoted = namedtuple('_quoted', ('value',))
+_dirty = namedtuple('_dirty', ('objid',))
+
+chars_t = (bytes, text_t)
+literal_t = (_literal, _key)
+safe_t = (Number,)
+set_t = (frozenset, set)
+
+LIT_DICT_START = _literal('{', False, +1)
+LIT_DICT_KVSEP = _literal(': ', True, 0)
+LIT_DICT_END = _literal('}', False, -1)
+LIT_LIST_START = _literal('[', False, +1)
+LIT_LIST_END = _literal(']', False, -1)
+LIT_LIST_SEP = _literal(', ', True, 0)
+LIT_SET_START = _literal('{', False, +1)
+LIT_SET_END = _literal('}', False, -1)
+LIT_TUPLE_START = _literal('(', False, +1)
+LIT_TUPLE_END = _literal(')', False, -1)
+LIT_TUPLE_END_SV = _literal(',)', False, -1)
+
+
+def saferepr(o, maxlen=None, maxlevels=3, seen=None):
+    return ''.join(_saferepr(
+        o, maxlen=maxlen, maxlevels=maxlevels, seen=seen
+    ))
+
+
+def _chaindict(mapping,
+               LIT_DICT_KVSEP=LIT_DICT_KVSEP,
+               LIT_LIST_SEP=LIT_LIST_SEP):
+    size = len(mapping)
+    for i, (k, v) in enumerate(items(mapping)):
+        yield _key(k)
+        yield LIT_DICT_KVSEP
+        yield v
+        if i < (size - 1):
+            yield LIT_LIST_SEP
+
+
+def _chainlist(it, LIT_LIST_SEP=LIT_LIST_SEP):
+    size = len(it)
+    for i, v in enumerate(it):
+        yield v
+        if i < (size - 1):
+            yield LIT_LIST_SEP
+
+
+def _repr_empty_set(s):
+    return '%s([])' % (type(s).__name__,)
+
+
+def _saferepr(o, maxlen=None, maxlevels=3, seen=None):
+    stack = deque([iter([o])])
+    for token, it in reprstream(stack, seen=seen, maxlevels=maxlevels):
+        if maxlen is not None and maxlen <= 0:
+            yield ', ...'
+            # move rest back to stack, so that we can include
+            # dangling parens.
+            stack.append(it)
+            break
+        if isinstance(token, _literal):
+            val = str(token.value)
+        elif isinstance(token, _key):
+            val = repr(token.value).replace("u'", "'")
+        elif isinstance(token, _quoted):
+            val = "'%s'" % (truncate(token.value, maxlen),)
+        else:
+            val = truncate(token, maxlen)
+        yield val
+        if maxlen is not None:
+            maxlen -= len(val)
+    for rest1 in stack:
+        # maxlen exceeded, process any dangling parens.
+        for rest2 in rest1:
+            if isinstance(rest2, _literal) and not rest2.truncate:
+                yield rest2.value
+
+
+def reprstream(stack, seen=None, maxlevels=3, level=0, isinstance=isinstance):
+    seen = seen or set()
+    append = stack.append
+    popleft = stack.popleft
+    is_in_seen = seen.__contains__
+    discard_from_seen = seen.discard
+    add_to_seen = seen.add
+
+    while stack:
+        lit_start = lit_end = None
+        it = popleft()
+        for val in it:
+            orig = val
+            if isinstance(val, _dirty):
+                discard_from_seen(val.objid)
+                continue
+            elif isinstance(val, _literal):
+                level += val.direction
+                yield val, it
+            elif isinstance(val, _key):
+                yield val, it
+            elif isinstance(val, safe_t):
+                yield repr(val), it
+            elif isinstance(val, chars_t):
+                yield _quoted(val), it
+            else:
+                if isinstance(val, set_t):
+                    if not val:
+                        yield _repr_empty_set(val), it
+                        continue
+                    lit_start, lit_end, val = (
+                        LIT_SET_START, LIT_SET_END, _chainlist(val))
+                elif isinstance(val, tuple):
+                    lit_start, lit_end, val = (
+                        LIT_TUPLE_START,
+                        LIT_TUPLE_END_SV if len(val) == 1 else LIT_TUPLE_END,
+                        _chainlist(val))
+                elif isinstance(val, Mapping):
+                    lit_start, lit_end, val = (
+                        LIT_DICT_START, LIT_DICT_END, _chaindict(val))
+                elif isinstance(val, Iterable):
+                    lit_start, lit_end, val = (
+                        LIT_LIST_START, LIT_LIST_END, _chainlist(val))
+                else:
+                    # other type of object
+                    yield repr(val), it
+                    continue
+
+                if maxlevels and level >= maxlevels:
+                    yield "%s...%s" % (lit_start.value, lit_end.value), it
+                    continue
+
+                objid = id(orig)
+                if is_in_seen(objid):
+                    yield _recursion(orig), it
+                    continue
+                add_to_seen(objid)
+
+                # Recurse into the new list/tuple/dict/etc by tacking
+                # the rest of our iterable onto the new it: this way
+                # it works similar to a linked list.
+                append(chain([lit_start], val, [_dirty(objid), lit_end], it))
+                break

+ 1 - 1
celery/utils/text.py

@@ -64,7 +64,7 @@ def indent(t, indent=0, sep='\n'):
 
 def truncate(text, maxlen=128, suffix='...'):
     """Truncates text to a maximum number of characters."""
-    if len(text) >= maxlen:
+    if maxlen and len(text) >= maxlen:
         return text[:maxlen].rsplit(' ', 1)[0] + suffix
     return text
 

+ 9 - 2
celery/worker/request.py

@@ -79,7 +79,7 @@ class Request(object):
             'app', 'type', 'name', 'id', 'on_ack', 'body',
             'hostname', 'eventer', 'connection_errors', 'task', 'eta',
             'expires', 'request_dict', 'on_reject', 'utc',
-            'content_type', 'content_encoding',
+            'content_type', 'content_encoding', 'argsrepr', 'kwargsrepr',
             '__weakref__', '__dict__',
         )
 
@@ -111,6 +111,8 @@ class Request(object):
             self.name = headers['shadow']
         if 'timelimit' in headers:
             self.time_limits = headers['timelimit']
+        self.argsrepr = headers.get('argsrepr', '')
+        self.kwargsrepr = headers.get('kwargsrepr', '')
         self.on_ack = on_ack
         self.on_reject = on_reject
         self.hostname = hostname or socket.gethostname()
@@ -384,6 +386,8 @@ class Request(object):
     def info(self, safe=False):
         return {'id': self.id,
                 'name': self.name,
+                'args': self.argsrepr,
+                'kwargs': self.kwargsrepr,
                 'type': self.type,
                 'body': self.body,
                 'hostname': self.hostname,
@@ -404,7 +408,10 @@ class Request(object):
         return '{0.name}[{0.id}]'.format(self)
 
     def __repr__(self):
-        return '<{0}: {1}>'.format(type(self).__name__, self.humaninfo())
+        return '<{0}: {1} {2} {3}>'.format(
+            type(self).__name__, self.humaninfo(),
+            self.argsrepr, self.kwargsrepr,
+        )
 
     @property
     def tzlocal(self):

+ 7 - 2
celery/worker/strategy.py

@@ -15,6 +15,7 @@ from kombu.five import buffer_t
 
 from celery.exceptions import InvalidTaskError
 from celery.utils.log import get_logger
+from celery.utils.saferepr import saferepr
 from celery.utils.timeutils import timezone
 
 from .request import Request, create_request_cls
@@ -40,7 +41,11 @@ def proto1_to_proto2(message, body):
         raise InvalidTaskError(
             'Task keyword arguments must be a mapping',
         )
-    body['headers'] = message.headers
+    body.update(
+        argsrepr=saferepr(args),
+        kwargsrepr=saferepr(kwargs),
+        headers=message.headers,
+    )
     try:
         body['group'] = body['taskset']
     except KeyError:
@@ -95,7 +100,7 @@ def default(task, app, consumer,
             send_event(
                 'task-received',
                 uuid=req.id, name=req.name,
-                args='', kwargs='',
+                args=req.argsrepr, kwargs=req.kwargsrepr,
                 retries=req.request_dict.get('retries', 0),
                 eta=req.eta and req.eta.isoformat(),
                 expires=req.expires and req.expires.isoformat(),

+ 7 - 1
docs/internals/protocol.rst

@@ -46,6 +46,8 @@ Definition
         'expires'; iso8601 expires,
         'retries': int retries,
         'timelimit': (soft, hard),
+        'argsrepr': str repr(args),
+        'kwargsrepr': str repr(kwargs),
     }
 
     body = (
@@ -69,11 +71,15 @@ This example sends a task message using version 2 of the protocol:
     # chain: add(add(add(2, 2), 4), 8) == 2 + 2 + 4 + 8
 
     task_id = uuid()
+    args = (2, 2)
+    kwargs = {}
     basic_publish(
-        message=json.dumps(([2, 2], {}, None),
+        message=json.dumps((args, kwargs, None),
         application_headers={
             'lang': 'py',
             'task': 'proj.tasks.add',
+            'argsrepr': repr(args),
+            'kwargsrepr': repr(kwargs),
         }
         properties={
             'correlation_id': task_id,