فهرست منبع

Client related events do not have a worker.id, and adds a Task.client field

Ask Solem 11 سال پیش
والد
کامیت
f635384458
1فایلهای تغییر یافته به همراه12 افزوده شده و 5 حذف شده
  1. 12 5
      celery/events/state.py

+ 12 - 5
celery/events/state.py

@@ -203,7 +203,7 @@ class Task(object):
     name = received = sent = started = succeeded = failed = retried = \
         revoked = args = kwargs = eta = expires = retries = worker = result = \
         exception = timestamp = runtime = traceback = exchange = \
-        routing_key = None
+        routing_key = client = None
     state = states.PENDING
     clock = 0
 
@@ -211,7 +211,7 @@ class Task(object):
                'succeeded', 'failed', 'retried', 'revoked', 'args', 'kwargs',
                'eta', 'expires', 'retries', 'worker', 'result', 'exception',
                'timestamp', 'runtime', 'traceback', 'exchange', 'routing_key',
-               'clock')
+               'clock', 'client')
     if not PYPY:
         __slots__ = _fields + ('__dict__', '__weakref__')
 
@@ -302,6 +302,10 @@ class Task(object):
     def __reduce__(self):
         return _depickle_task, (self.__class__, self.as_dict())
 
+    @property
+    def origin(self):
+        return self.client if self.worker is None else self.worker.id
+
     @property
     def ready(self):
         return self.state in states.READY_STATES
@@ -500,7 +504,9 @@ class State(object):
                     task, created = get_task(uuid), False
                 except KeyError:
                     task = tasks[uuid] = Task(uuid)
-                if not is_client_event:
+                if is_client_event:
+                    task.client = hostname
+                else:
                     try:
                         worker, created = get_worker(hostname), False
                     except KeyError:
@@ -509,8 +515,9 @@ class State(object):
                     if worker is not None and local_received:
                         worker.event(None, local_received, timestamp)
                 clock = 0 if is_client_event else event.get('clock')
+                origin = hostname if is_client_event else worker.id
                 heappush(taskheap,
-                         timetuple(clock, timestamp, worker.id, ref(task)))
+                         timetuple(clock, timestamp, origin, ref(task)))
                 if len(taskheap) > maxtasks:
                     heappop(taskheap)
                 if subject == 'received':
@@ -524,7 +531,7 @@ class State(object):
 
     def rebuild_taskheap(self, timetuple=timetuple, heapify=heapify):
         heap = self._taskheap[:] = [
-            timetuple(t.clock, t.timestamp, t.worker.id, ref(t))
+            timetuple(t.clock, t.timestamp, t.origin, ref(t))
             for t in values(self.tasks)
         ]
         heapify(heap)