Browse Source

Task protocol 2: Adds new "origin" message header for hostname of task sender

Ask Solem 9 years ago
parent
commit
1bcdfde9fc
3 changed files with 11 additions and 1 deletions
  1. 4 1
      celery/app/amqp.py
  2. 1 0
      celery/app/task.py
  3. 6 0
      docs/internals/protocol.rst

+ 4 - 1
celery/app/amqp.py

@@ -24,6 +24,7 @@ from kombu.utils.functional import maybe_list
 from celery import signals
 from celery.five import items, string_t
 from celery.local import try_import
+from celery.utils import anon_nodename
 from celery.utils.saferepr import saferepr
 from celery.utils.text import indent as textindent
 from celery.utils.timeutils import maybe_make_aware, to_utc
@@ -303,7 +304,8 @@ class AMQP(object):
                    callbacks=None, errbacks=None, reply_to=None,
                    time_limit=None, soft_time_limit=None,
                    create_sent_event=False, root_id=None, parent_id=None,
-                   shadow=None, chain=None, now=None, timezone=None):
+                   shadow=None, chain=None, now=None, timezone=None,
+                   origin=None):
         args = args or ()
         kwargs = kwargs or {}
         if not isinstance(args, (list, tuple)):
@@ -350,6 +352,7 @@ class AMQP(object):
                 'parent_id': parent_id,
                 'argsrepr': argsrepr,
                 'kwargsrepr': kwargsrepr,
+                'origin': origin or anon_nodename()
             },
             properties={
                 'correlation_id': task_id,

+ 1 - 0
celery/app/task.py

@@ -92,6 +92,7 @@ class Context(object):
     callbacks = None
     errbacks = None
     timelimit = None
+    origin = None
     _children = None   # see property
     _protected = 0
 

+ 6 - 0
docs/internals/protocol.rst

@@ -48,6 +48,7 @@ Definition
         'timelimit': (soft, hard),
         'argsrepr': str repr(args),
         'kwargsrepr': str repr(kwargs),
+        'origin': str nodename,
     }
 
     body = (
@@ -70,6 +71,10 @@ This example sends a task message using version 2 of the protocol:
 
     # chain: add(add(add(2, 2), 4), 8) == 2 + 2 + 4 + 8
 
+    import json
+    import os
+    import socket
+
     task_id = uuid()
     args = (2, 2)
     kwargs = {}
@@ -80,6 +85,7 @@ This example sends a task message using version 2 of the protocol:
             'task': 'proj.tasks.add',
             'argsrepr': repr(args),
             'kwargsrepr': repr(kwargs),
+            'origin': '@'.join([os.getpid(), socket.gethostname()])
         }
         properties={
             'correlation_id': task_id,