Browse Source

Trace: eager_trace_tasks and build_tracer now supports app argument

Ask Solem 11 years ago
parent
commit
3c207e9854
4 changed files with 10 additions and 7 deletions
  1. 1 0
      celery/app/task.py
  2. 5 5
      celery/app/trace.py
  3. 2 1
      celery/concurrency/processes.py
  4. 2 1
      celery/worker/consumer.py

+ 1 - 0
celery/app/task.py

@@ -652,6 +652,7 @@ class Task(object):
 
         tb = None
         retval, info = eager_trace_task(task, task_id, args, kwargs,
+                                        app=self._get_app(),
                                         request=request, propagate=throw)
         if isinstance(retval, ExceptionInfo):
             retval, tb = retval.exception, retval.traceback

+ 5 - 5
celery/app/trace.py

@@ -123,7 +123,7 @@ class TraceInfo(object):
 
 
 def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
-                 Info=TraceInfo, eager=False, propagate=False,
+                 Info=TraceInfo, eager=False, propagate=False, app=None,
                  IGNORE_STATES=IGNORE_STATES):
     """Returns a function that traces task execution; catches all
     exceptions and updates result backend with the state and result
@@ -151,7 +151,7 @@ def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
     # saving the extra method call and a line less in the stack trace.
     fun = task if task_has_custom(task, '__call__') else task.run
 
-    loader = loader or current_app.loader
+    loader = loader or app.loader
     backend = task.backend
     ignore_result = task.ignore_result
     track_started = task.track_started
@@ -283,9 +283,9 @@ def trace_task(task, uuid, args, kwargs, request={}, **opts):
         return report_internal_error(task, exc)
 
 
-def _trace_task_ret(name, uuid, args, kwargs, request={}, **opts):
-    return trace_task(current_app.tasks[name],
-                      uuid, args, kwargs, request, **opts)
+def _trace_task_ret(name, uuid, args, kwargs, request={}, app=None, **opts):
+    return trace_task((app or current_app).tasks[name],
+                      uuid, args, kwargs, request, app=app, **opts)
 trace_task_ret = _trace_task_ret
 
 

+ 2 - 1
celery/concurrency/processes.py

@@ -115,7 +115,8 @@ def process_initializer(app, hostname):
     # rebuild execution handler for all tasks.
     from celery.app.trace import build_tracer
     for name, task in items(app.tasks):
-        task.__trace__ = build_tracer(name, task, app.loader, hostname)
+        task.__trace__ = build_tracer(name, task, app.loader, hostname,
+                                      app=app)
     signals.worker_process_init.send(sender=None)
 
 

+ 2 - 1
celery/worker/consumer.py

@@ -391,7 +391,8 @@ class Consumer(object):
         loader = self.app.loader
         for name, task in items(self.app.tasks):
             self.strategies[name] = task.start_strategy(self.app, self)
-            task.__trace__ = build_tracer(name, task, loader, self.hostname)
+            task.__trace__ = build_tracer(name, task, loader, self.hostname,
+                                          app=self.app)
 
     def create_task_handler(self, callbacks):
         strategies = self.strategies