|
@@ -362,14 +362,17 @@ class State(object):
|
|
|
"""Process task event."""
|
|
|
uuid = fields['uuid']
|
|
|
hostname = fields['hostname']
|
|
|
- worker, _ = self.get_or_create_worker(hostname)
|
|
|
+ # task-sent event is sent by client, not worker
|
|
|
+ is_client_event = type == 'sent'
|
|
|
task, created = self.get_or_create_task(uuid)
|
|
|
- task.worker = worker
|
|
|
+ if not is_client_event:
|
|
|
+ worker, _ = self.get_or_create_worker(hostname)
|
|
|
+ task.worker = worker
|
|
|
maxtasks = self.max_tasks_in_memory * 2
|
|
|
|
|
|
taskheap = self._taskheap
|
|
|
timestamp = fields.get('timestamp') or 0
|
|
|
- clock = 0 if type == 'sent' else fields.get('clock')
|
|
|
+ clock = 0 if is_client_event else fields.get('clock')
|
|
|
heappush(taskheap, timetuple(clock, timestamp, worker.id, task))
|
|
|
if len(taskheap) > maxtasks:
|
|
|
heappop(taskheap)
|