Переглянути джерело

Task messages now include Lamport clock value

Ask Solem 12 роки тому
батько
коміт
1df7164ff9

+ 2 - 1
celery/app/amqp.py

@@ -200,7 +200,8 @@ class TaskProducer(Producer):
                 'callbacks': callbacks,
                 'errbacks': errbacks,
                 'reply_to': reply_to,
-                'timeouts': timeouts}
+                'timeouts': timeouts,
+                'clock': self.app.clock.forward()}
         group_id = group_id or taskset_id
         if group_id:
             body['taskset'] = group_id

+ 10 - 11
celery/events/state.py

@@ -362,10 +362,9 @@ class State(object):
         task, created = self.get_or_create_task(uuid)
         task.worker = worker
 
-        clock = 0 if type == 'sent' else fields.get('clock')
-
         taskheap = self._taskheap
         timestamp = fields['timestamp']
+        clock = fields.get('clock')
         heappush(taskheap, _lamportinfo(clock, timestamp, worker.id, task))
         curcount = len(self.tasks)
         if len(taskheap) > self.max_tasks_in_memory * 2:
@@ -399,6 +398,8 @@ class State(object):
                 break
 
     def tasks_by_time(self, limit=None):
+        """Generator giving tasks ordered by time,
+        in ``(uuid, Task)`` tuples."""
         seen = set()
         for evtup in islice(reversed(self._taskheap), 0, limit):
             uuid = evtup[3].uuid
@@ -410,22 +411,20 @@ class State(object):
     def tasks_by_type(self, name, limit=None):
         """Get all tasks by type.
 
-        Returns a list of `(uuid, task)` tuples.
+        Returns a list of ``(uuid, Task)`` tuples.
 
         """
-        return islice(((tup[3].uuid, tup[3])
-                            for tup in self._taskheap
-                                if tup[3].name == name), 0, limit)
+        return islice(((uuid, task)
+                            for uuid, task in self.tasks_by_time()
+                                if task.name == name), 0, limit)
 
     def tasks_by_worker(self, hostname, limit=None):
         """Get all tasks by worker.
 
-        Returns a list of `(uuid, task)` tuples.
-
         """
-        return islice(((tup[3].uuid, tup[3])
-                        for tup in self._taskheap
-                            if tup[3].worker.hostname == hostname), 0, limit)
+        return islice(((uuid, task)
+                        for uuid, task in self.tasks_by_time()
+                            if task.name == name), 0, limit)
 
     def task_types(self):
         """Returns a list of all seen task types."""

+ 2 - 2
celery/utils/mail.py

@@ -130,7 +130,7 @@ class ErrorMail(object):
 
     * hostname
 
-        Worker hostname.
+        Worker nodename.
 
     """
 
@@ -140,7 +140,7 @@ class ErrorMail(object):
 
     #: Format string used to generate error email subjects.
     subject = """\
-        [celery@{hostname}] Error: Task {name} ({id}): {exc!r}
+        [{hostname}] Error: Task {name} ({id}): {exc!r}
     """
 
     #: Format string used to generate error email content.

+ 2 - 1
celery/worker/consumer.py

@@ -204,7 +204,8 @@ class Consumer(object):
         return (self, self.connection, self.task_consumer,
                 self.strategies, self.namespace, self.hub, self.qos,
                 self.amqheartbeat, self.handle_unknown_message,
-                self.handle_unknown_task, self.handle_invalid_task)
+                self.handle_unknown_task, self.handle_invalid_task,
+                self.app.clock)
 
     def on_poll_init(self, hub):
         hub.update_readers(self.connection.eventmap)

+ 9 - 2
celery/worker/loops.py

@@ -25,7 +25,7 @@ AMQHEARTBEAT_RATE = 2.0
 
 def asynloop(obj, connection, consumer, strategies, ns, hub, qos,
         heartbeat, handle_unknown_message, handle_unknown_task,
-        handle_invalid_task, sleep=sleep, min=min, Empty=Empty,
+        handle_invalid_task, clock, sleep=sleep, min=min, Empty=Empty,
         hbrate=AMQHEARTBEAT_RATE):
     """Non-blocking eventloop consuming messages until connection is lost,
     or shutdown is requested."""
@@ -43,6 +43,7 @@ def asynloop(obj, connection, consumer, strategies, ns, hub, qos,
         drain_nowait = connection.drain_nowait
         on_task_callbacks = hub.on_task
         keep_draining = connection.transport.nb_keep_draining
+        adjust_clock = clock.adjust
 
         if heartbeat and connection.supports_heartbeats:
             hub.timer.apply_interval(
@@ -55,6 +56,9 @@ def asynloop(obj, connection, consumer, strategies, ns, hub, qos,
                 name = body['task']
             except (KeyError, TypeError):
                 return handle_unknown_message(body, message)
+
+            adjust_clock(body.get('clock') or 0)
+
             try:
                 strategies[name](message, body, message.ack_log_error)
             except KeyError as exc:
@@ -122,8 +126,9 @@ def asynloop(obj, connection, consumer, strategies, ns, hub, qos,
 
 def synloop(obj, connection, consumer, strategies, ns, hub, qos,
         heartbeat, handle_unknown_message, handle_unknown_task,
-        handle_invalid_task, **kwargs):
+        handle_invalid_task, clock, **kwargs):
     """Fallback blocking eventloop for transports that doesn't support AIO."""
+    adjust_clock = clock.adjust
 
     def on_task_received(body, message):
         try:
@@ -131,6 +136,8 @@ def synloop(obj, connection, consumer, strategies, ns, hub, qos,
         except (KeyError, TypeError):
             return handle_unknown_message(body, message)
 
+        adjust_clock(body.get('clock') or 0)
+
         try:
             strategies[name](message, body, message.ack_log_error)
         except KeyError as exc: