Explorar el Código

Adds correlation_id to multiprocessing ApplyResult

Ask Solem hace 11 años
padre
commit
ea87144771
Se han modificado 1 ficheros con 13 adiciones y 10 borrados
  1. 13 10
      celery/worker/job.py

+ 13 - 10
celery/worker/job.py

@@ -230,9 +230,10 @@ class Request(object):
             and ignored.
 
         """
+        uuid = self.id
         task = self.task
         if self.revoked():
-            raise TaskRevokedError(self.id)
+            raise TaskRevokedError(uuid)
 
         hostname = self.hostname
         kwargs = self.kwargs
@@ -245,15 +246,17 @@ class Request(object):
         timeout, soft_timeout = request.get('timelimit', (None, None))
         timeout = timeout or task.time_limit
         soft_timeout = soft_timeout or task.soft_time_limit
-        result = pool.apply_async(trace_task_ret,
-                                  args=(self.name, self.id,
-                                        self.args, kwargs, request),
-                                  accept_callback=self.on_accepted,
-                                  timeout_callback=self.on_timeout,
-                                  callback=self.on_success,
-                                  error_callback=self.on_failure,
-                                  soft_timeout=soft_timeout,
-                                  timeout=timeout)
+        result = pool.apply_async(
+            trace_task_ret,
+            args=(self.name, uuid, self.args, kwargs, request),
+            accept_callback=self.on_accepted,
+            timeout_callback=self.on_timeout,
+            callback=self.on_success,
+            error_callback=self.on_failure,
+            soft_timeout=soft_timeout,
+            timeout=timeout,
+            correlation_id=uuid,
+        )
         return result
 
     def execute(self, loglevel=None, logfile=None):