Browse Source

Adds Task.request.headers

Ask Solem 11 years ago
parent
commit
a8fa261aad
3 changed files with 7 additions and 3 deletions
  1. 1 0
      celery/app/task.py
  2. 4 2
      celery/worker/job.py
  3. 2 1
      celery/worker/strategy.py

+ 1 - 0
celery/app/task.py

@@ -73,6 +73,7 @@ class Context(object):
     eta = None
     expires = None
     is_eager = False
+    headers = None
     delivery_info = None
     taskset = None   # compat alias to group
     group = None

+ 4 - 2
celery/worker/job.py

@@ -76,7 +76,7 @@ class Request(object):
             'hostname', 'eventer', 'connection_errors', 'task', 'eta',
             'expires', 'request_dict', 'acknowledged', 'on_reject',
             'utc', 'time_start', 'worker_pid', '_already_revoked',
-            '_terminate_on_ack',
+            '_terminate_on_ack', 'headers',
             '_tzlocal', '__weakref__',
         )
 
@@ -109,7 +109,8 @@ class Request(object):
     def __init__(self, body, on_ack=noop,
                  hostname=None, eventer=None, app=None,
                  connection_errors=None, request_dict=None,
-                 delivery_info=None, task=None, on_reject=noop, **opts):
+                 delivery_info=None, headers=None, task=None,
+                 on_reject=noop, **opts):
         self.app = app
         name = self.name = body['task']
         self.id = body['id']
@@ -165,6 +166,7 @@ class Request(object):
             'priority': delivery_info.get('priority'),
             'redelivered': delivery_info.get('redelivered'),
         }
+        body['headers'] = headers  # pass application/headers
         self.request_dict = body
 
     @classmethod

+ 2 - 1
celery/worker/strategy.py

@@ -47,7 +47,8 @@ def default(task, app, consumer,
                   app=app, hostname=hostname,
                   eventer=eventer, task=task,
                   connection_errors=connection_errors,
-                  delivery_info=message.delivery_info)
+                  delivery_info=message.delivery_info,
+                  headers=message.headers)
         if req.revoked():
             return