Browse Source

[3.1] Support for task protocol 2

Ask Solem 8 years ago
parent
commit
2af050a054
1 changed files with 52 additions and 6 deletions
  1. 52 6
      celery/worker/consumer.py

+ 52 - 6
celery/worker/consumer.py

@@ -31,6 +31,7 @@ from kombu.utils.compat import get_errno
 from kombu.utils.encoding import safe_repr, bytes_t
 from kombu.utils.limits import TokenBucket
 
+from celery import chain
 from celery import bootsteps
 from celery.app.trace import build_tracer
 from celery.canvas import signature
@@ -447,16 +448,24 @@ class Consumer(object):
         callbacks = self.on_task_message
 
         def on_task_received(body, message):
+            headers = message.headers
             try:
-                name = body['task']
+                type_, is_proto2 = headers['task'], 1
             except (KeyError, TypeError):
-                return on_unknown_message(body, message)
+                try:
+                    type_, is_proto2 = body['task'], 0
+                except (KeyError, TypeError):
+                    return on_unknown_message(body, message)
+
+            if is_proto2:
+                body = proto2_to_proto1(
+                    self.app, type_, body, message, headers)
 
             try:
-                strategies[name](message, body,
-                                 message.ack_log_error,
-                                 message.reject_log_error,
-                                 callbacks)
+                strategies[type_](message, body,
+                                  message.ack_log_error,
+                                  message.reject_log_error,
+                                  callbacks)
             except KeyError as exc:
                 on_unknown_task(body, message, exc)
             except InvalidTaskError as exc:
@@ -470,6 +479,43 @@ class Consumer(object):
         )
 
 
+def proto2_to_proto1(app, type_, body, message, headers):
+    args, kwargs, embed = body
+    embedded = _extract_proto2_embed(**embed)
+    chained = embedded.pop('chain')
+    new_body = dict(
+        _extract_proto2_headers(type_, **headers),
+        args=args,
+        kwargs=kwargs,
+        **embedded)
+    if chained:
+        new_body['callbacks'].append(chain(chained, app=app))
+    return new_body
+
+
+def _extract_proto2_headers(type_, id, retries, eta, expires,
+                            group, timelimit, **_):
+    return {
+        'id': id,
+        'task': type_,
+        'retries': retries,
+        'eta': eta,
+        'expires': expires,
+        'utc': True,
+        'taskset': group,
+        'timelimit': timelimit,
+    }
+
+
+def _extract_proto2_embed(callbacks, errbacks, chain, chord, **_):
+    return {
+        'callbacks': callbacks or [],
+        'errbacks': errbacks,
+        'chain': chain,
+        'chord': chord,
+    }
+
+
 class Connection(bootsteps.StartStopStep):
 
     def __init__(self, c, **kwargs):