Browse Source

WorkerTaskTrace now includes more metadata for the STARTED state: pid and hostname of the worker that started the task.

Closes #181. Thanks to justdave.
Ask Solem 14 years ago
parent
commit
5c962c8101
3 changed files with 18 additions and 10 deletions
  1. 2 2
      celery/backends/base.py
  2. 1 1
      celery/execute/trace.py
  3. 15 7
      celery/worker/job.py

+ 2 - 2
celery/backends/base.py

@@ -32,9 +32,9 @@ class BaseBackend(object):
         raise NotImplementedError(
                 "store_result is not supported by this backend.")
 
-    def mark_as_started(self, task_id):
+    def mark_as_started(self, task_id, **meta):
         """Mark a task as started"""
-        return self.store_result(task_id, None, status=states.STARTED)
+        return self.store_result(task_id, meta, status=states.STARTED)
 
     def mark_as_done(self, task_id, result):
         """Mark task as successfully executed."""

+ 1 - 1
celery/execute/trace.py

@@ -40,7 +40,7 @@ class TraceInfo(object):
 
 class TaskTrace(object):
 
-    def __init__(self, task_name, task_id, args, kwargs, task=None):
+    def __init__(self, task_name, task_id, args, kwargs, task=None, **_):
         self.task_id = task_id
         self.task_name = task_name
         self.args = args

+ 15 - 7
celery/worker/job.py

@@ -1,3 +1,4 @@
+import os
 import sys
 import time
 import socket
@@ -74,7 +75,8 @@ class WorkerTaskTrace(TaskTrace):
     """
 
     def __init__(self, *args, **kwargs):
-        self.loader = kwargs.pop("loader", current_loader())
+        self.loader = kwargs.get("loader") or current_loader()
+        self.hostname = kwargs.get("hostname") or socket.gethostname()
         super(WorkerTaskTrace, self).__init__(*args, **kwargs)
 
         self._store_errors = True
@@ -98,7 +100,9 @@ class WorkerTaskTrace(TaskTrace):
         """Execute, trace and store the result of the task."""
         self.loader.on_task_init(self.task_id, self.task)
         if self.task.track_started:
-            self.task.backend.mark_as_started(self.task_id)
+            self.task.backend.mark_as_started(self.task_id,
+                                              pid=os.getpid(),
+                                              hostname=self.hostname)
         try:
             return super(WorkerTaskTrace, self).execute()
         finally:
@@ -335,7 +339,8 @@ class TaskRequest(object):
         if not self.task.acks_late:
             self.acknowledge()
 
-        tracer = WorkerTaskTrace(*self._get_tracer_args(loglevel, logfile))
+        tracer = WorkerTaskTrace(*self._get_tracer_args(loglevel, logfile),
+                                 **{"hostname": self.hostname})
         retval = tracer.execute()
         self.acknowledge()
         return retval
@@ -361,10 +366,13 @@ class TaskRequest(object):
 
         args = self._get_tracer_args(loglevel, logfile)
         self.time_start = time.time()
-        result = pool.apply_async(execute_and_trace, args=args,
-                    accept_callback=self.on_accepted,
-                    timeout_callback=self.on_timeout,
-                    callbacks=[self.on_success], errbacks=[self.on_failure])
+        result = pool.apply_async(execute_and_trace,
+                                  args=args,
+                                  kwargs={"hostname": self.hostname},
+                                  accept_callback=self.on_accepted,
+                                  timeout_callback=self.on_timeout,
+                                  callbacks=[self.on_success],
+                                  errbacks=[self.on_failure])
         return result
 
     def on_accepted(self):