Browse Source

Refs #4356: Handle "hybrid" messages that have moved between Celery versions (#4358)

* handle "hybrid" messages which have passed through a protocol 1 and protocol 2 consumer in its life.

we detected an edgecase which is proofed out in https://gist.github.com/ewdurbin/ddf4b0f0c0a4b190251a4a23859dd13c#file-readme-md which mishandles messages which have been retried by a 3.1.25, then a 4.1.0, then again by a 3.1.25 consumer. as an extension, this patch handles the "next" iteration of these mutant payloads.

* explicitly construct proto2 from "hybrid" messages

* remove unused kwarg

* fix pydocstyle check

* flake8 fixes

* correct fix for misread pydocstyle error
Russell Keith-Magee 7 years ago
parent
commit
a4abe149aa
1 changed files with 46 additions and 2 deletions
  1. 46 2
      celery/worker/strategy.py

+ 46 - 2
celery/worker/strategy.py

@@ -24,6 +24,46 @@ logger = get_logger(__name__)
 # We cache globals and attribute lookups, so disable this warning.
 
 
+def hybrid_to_proto2(message, body):
+    """Create a fresh protocol 2 message from a hybrid protocol 1/2 message."""
+    try:
+        args, kwargs = body.get('args', ()), body.get('kwargs', {})
+        kwargs.items  # pylint: disable=pointless-statement
+    except KeyError:
+        raise InvalidTaskError('Message does not have args/kwargs')
+    except AttributeError:
+        raise InvalidTaskError(
+            'Task keyword arguments must be a mapping',
+        )
+
+    headers = {
+        'lang': body.get('lang'),
+        'task': body.get('task'),
+        'id': body.get('id'),
+        'root_id': body.get('root_id'),
+        'parent_id': body.get('parent_id'),
+        'group': body.get('group'),
+        'meth': body.get('meth'),
+        'shadow': body.get('shadow'),
+        'eta': body.get('eta'),
+        'expires': body.get('expires'),
+        'retries': body.get('retries'),
+        'timelimit': body.get('timelimit'),
+        'argsrepr': body.get('argsrepr'),
+        'kwargsrepr': body.get('kwargsrepr'),
+        'origin': body.get('origin'),
+    }
+
+    embed = {
+        'callbacks': body.get('callbacks'),
+        'errbacks': body.get('errbacks'),
+        'chord': body.get('chord'),
+        'chain': None,
+    }
+
+    return (args, kwargs, embed), headers, True, body.get('utc', True)
+
+
 def proto1_to_proto2(message, body):
     """Convert Task message protocol 1 arguments to protocol 2.
 
@@ -93,14 +133,18 @@ def default(task, app, consumer,
 
     def task_message_handler(message, body, ack, reject, callbacks,
                              to_timestamp=to_timestamp):
-        if body is None:
+        if body is None and 'args' not in message.payload:
             body, headers, decoded, utc = (
                 message.body, message.headers, False, app.uses_utc_timezone(),
             )
             if not body_can_be_buffer:
                 body = bytes(body) if isinstance(body, buffer_t) else body
         else:
-            body, headers, decoded, utc = proto1_to_proto2(message, body)
+            if 'args' in message.payload:
+                body, headers, decoded, utc = hybrid_to_proto2(message,
+                                                               message.payload)
+            else:
+                body, headers, decoded, utc = proto1_to_proto2(message, body)
 
         req = Req(
             message,