Browse Source

Fixed a lot of stuff, shutdowns works properly, refactored + now sends the time it took to execute the task with the task-succeeded event.

Ask Solem 15 years ago
parent
commit
83b824cc80
5 changed files with 102 additions and 88 deletions
  1. 4 14
      celery/events.py
  2. 0 1
      celery/worker/__init__.py
  3. 26 7
      celery/worker/heartbeat.py
  4. 6 1
      celery/worker/job.py
  5. 66 65
      celery/worker/listener.py

+ 4 - 14
celery/events.py

@@ -5,20 +5,6 @@ from UserDict import UserDict
 
 from celery.messaging import EventPublisher, EventConsumer
 
-"""
-Events
-======
-
-WORKER-ONLINE    hostname timestamp
-WORKER-OFFLINE   hostname timestamp
-TASK-RECEIVED    uuid name args kwargs retries eta hostname timestamp
-TASK-ACCEPTED    uuid hostname timestamp
-TASK-SUCCEEDED   uuid result hostname timestamp
-TASK-FAILED      uuid exception hostname timestamp
-TASK-RETRIED     uuid exception hostname timestamp
-WORKER-HEARTBEAT hostname timestamp
-
-"""
 
 def Event(type, **fields):
     return dict(fields, type=type, timestamp=time.time())
@@ -45,6 +31,10 @@ class EventDispatcher(object):
         finally:
             self._lock.release()
 
+    def close(self):
+        self._lock.locked() and self._lock.release()
+        self.publisher and self.publisher.close()
+
 
 class EventReceiver(object):
     handlers = {}

+ 0 - 1
celery/worker/__init__.py

@@ -155,7 +155,6 @@ class WorkController(object):
 
     def stop(self):
         """Gracefully shutdown the worker server."""
-        # shut down the periodic work controller thread
         if self._state != "RUN":
             return
 

+ 26 - 7
celery/worker/heartbeat.py

@@ -1,5 +1,5 @@
 import threading
-from time import sleep
+from time import time, sleep
 
 
 class Heart(threading.Thread):
@@ -12,23 +12,42 @@ class Heart(threading.Thread):
         self._shutdown = threading.Event()
         self._stopped = threading.Event()
         self.setDaemon(True)
+        self._state = None
 
     def run(self):
+        self._state = "RUN"
         interval = self.interval
-        send = self.eventer.send
+        dispatch = self.eventer.send
 
-        send("worker-online")
+        dispatch("worker-online")
 
+
+        # We can't sleep all of the interval, because then
+        # it takes 60 seconds (or value of interval) to shutdown
+        # the thread.
+
+        last_beat = None
         while 1:
             if self._shutdown.isSet():
                 break
-            send("worker-heartbeat")
-            sleep(interval)
-        self._stopped.set()
+            now = time()
+            if not last_beat or now > last_beat + interval:
+                last_beat = now
+                dispatch("worker-heartbeat")
+            sleep(1)
 
-        send("worker-offline")
+        try:
+            dispatch("worker-offline")
+        finally:
+            self._stopped.set()
 
     def stop(self):
         """Gracefully shutdown the thread."""
+        if not self._state == "RUN":
+            return
+        self._state = "CLOSE"
+        print("SET SHUTDOWN!")
         self._shutdown.set()
+        print("WAIT FOR STOPPED")
         self._stopped.wait() # block until this thread is done
+        print("STOPPED")

+ 6 - 1
celery/worker/job.py

@@ -4,6 +4,7 @@ Jobs Executable by the Worker Server.
 
 """
 import sys
+import time
 import socket
 import warnings
 
@@ -185,6 +186,7 @@ class TaskWrapper(object):
         self.eventer = opts.get("eventer")
         self.on_ack = on_ack
         self.executed = False
+        self.time_start = None
         for opt in ("success_msg", "fail_msg", "fail_email_subject",
                 "fail_email_body"):
             setattr(self, opt, opts.get(opt, getattr(self, opt, None)))
@@ -299,6 +301,7 @@ class TaskWrapper(object):
         self.send_event("task-accepted", uuid=self.task_id)
 
         args = self._get_tracer_args(loglevel, logfile)
+        self.time_start = time.time()
         return pool.apply_async(execute_and_trace, args=args,
                 callbacks=[self.on_success], errbacks=[self.on_failure],
                 on_ack=self.on_ack)
@@ -307,7 +310,9 @@ class TaskWrapper(object):
         """The handler used if the task was successfully processed (
         without raising an exception)."""
 
-        self.send_event("task-succeeded", uuid=self.task_id, result=ret_value)
+        runtime = time.time() - self.time_start
+        self.send_event("task-succeeded", uuid=self.task_id,
+                        result=ret_value, runtime=runtime)
 
         msg = self.success_msg.strip() % {
                 "id": self.task_id,

+ 66 - 65
celery/worker/listener.py

@@ -14,10 +14,13 @@ from celery.messaging import get_consumer_set, BroadcastConsumer
 from celery.exceptions import NotRegistered
 from celery.datastructures import SharedCounter
 
+RUN = 0x0
+CLOSE = 0x1
+
 
 class CarrotListener(object):
-    """Listen for messages received from the AMQP broker and
-    move them the the bucket queue for task processing.
+    """Listen for messages received from the broker and
+    move them the the ready queue for task processing.
 
     :param ready_queue: See :attr:`ready_queue`.
     :param eta_scheduler: See :attr:`eta_scheduler`.
@@ -46,8 +49,8 @@ class CarrotListener(object):
         self.logger = logger
         self.prefetch_count = SharedCounter(initial_prefetch_count)
         self.event_dispatcher = None
-        self.event_connection = None
         self.heart = None
+        self._state = None
 
     def start(self):
         """Start the consumer.
@@ -57,7 +60,7 @@ class CarrotListener(object):
 
         """
 
-        while True:
+        while 1:
             self.reset_connection()
             try:
                 self.consume_messages()
@@ -70,102 +73,96 @@ class CarrotListener(object):
         task_consumer = self.task_consumer
 
         self.logger.debug("CarrotListener: Starting message consumer...")
-        it = task_consumer.iterconsume(limit=None)
-
+        wait_for_message = task_consumer.iterconsume(limit=None).next
         self.logger.debug("CarrotListener: Ready to accept tasks!")
 
         prev_pcount = None
-        while True:
-            if not prev_pcount or int(self.prefetch_count) != prev_pcount:
-                self.task_consumer.qos(prefetch_count=int(self.prefetch_count))
-                prev_pcount = int(self.prefetch_count)
-            it.next()
+        while 1:
+            pcount = int(self.prefetch_count) # Convert SharedCounter to int
+            if not prev_pcount or pcount != prev_pcount:
+                task_consumer.qos(prefetch_count=pcount)
+                prev_pcount = pcount
 
-    def stop(self):
-        """Stop processing AMQP messages and close the connection
-        to the broker."""
-        self.close_connection()
+            wait_for_message()
 
-    def handle_control_command(self, command):
+    def on_control_command(self, command):
         if command["command"] == "revoke":
             revoke_uuid = command["task_id"]
             revoked.add(revoke_uuid)
             self.logger.warn("Task %s marked as revoked." % revoke_uuid)
             return
 
-    def receive_message(self, message_data, message):
-        """The callback called when a new message is received.
+    def on_task(self, task, eta=None):
+        """Handle received task.
 
-        If the message has an ``eta`` we move it to the hold queue,
-        otherwise we move it the bucket queue for immediate processing.
+        If the task has an ``eta`` we enter it into the ETA schedule,
+        otherwise we move it the ready queue for immediate processing.
 
         """
 
-        control = message_data.get("control")
-        if control:
-            print("RECV CONTROL: %s" % control)
-            self.handle_control_command(control)
-            return
-
-        try:
-            task = TaskWrapper.from_message(message, message_data,
-                                            logger=self.logger,
-                                            eventer=self.event_dispatcher)
-        except NotRegistered, exc:
-            self.logger.error("Unknown task ignored: %s" % (exc))
-            return
-
         if task.task_id in revoked:
             self.logger.warn("Got revoked task from broker: %s[%s]" % (
                 task.task_name, task.task_id))
-            return
-
-        eta = message_data.get("eta")
+            return task.on_ack()
 
         self.event_dispatcher.send("task-received", uuid=task.task_id,
-                                                    name=task.task_name,
-                                                    args=task.args,
-                                                    kwargs=task.kwargs,
-                                                    retries=task.retries,
-                                                    eta=eta)
+                name=task.task_name, args=task.args, kwargs=task.kwargs,
+                retries=task.retries, eta=eta)
+
         if eta:
             if not isinstance(eta, datetime):
                 eta = parse_iso8601(eta)
             self.prefetch_count.increment()
             self.logger.info("Got task from broker: %s[%s] eta:[%s]" % (
                     task.task_name, task.task_id, eta))
-            self.eta_scheduler.enter(task,
-                                     eta=eta,
+            self.eta_scheduler.enter(task, eta=eta,
                                      callback=self.prefetch_count.decrement)
         else:
             self.logger.info("Got task from broker: %s[%s]" % (
                     task.task_name, task.task_id))
             self.ready_queue.put(task)
 
+    def receive_message(self, message_data, message):
+        """The callback called when a new message is received. """
+
+        # Handle task
+        if message_data.get("task"):
+            try:
+                task = TaskWrapper.from_message(message, message_data,
+                                                logger=self.logger,
+                                                eventer=self.event_dispatcher)
+            except NotRegistered, exc:
+                self.logger.error("Unknown task ignored: %s" % (exc))
+            else:
+                self.on_task(task, eta=message_data.get("eta"))
+            return
+
+        # Handle control command
+        control = message_data.get("control")
+        if control:
+            self.on_control_command(control)
+        return
+
     def close_connection(self):
-        """Close the AMQP connection."""
-        if self.heart:
-            self.heart.stop()
-        if self.task_consumer:
-            self.task_consumer.close()
-            self.task_consumer = None
-        if self.amqp_connection:
-            self.logger.debug(
-                    "CarrotListener: Closing connection to the broker...")
-            self.amqp_connection.close()
-            self.amqp_connection = None
-        if self.event_connection:
-            self.event_connection.close()
-            self.event_connection = None
-        self.event_dispatcher = None
+        if not self._state == RUN:
+            return
+        self._state = CLOSE
 
-    def reset_connection(self):
-        """Reset the AMQP connection, and reinitialize the
-        :class:`carrot.messaging.ConsumerSet` instance.
+        self.logger.debug("Heart: Going into cardiac arrest...")
+        self.heart = self.heart and self.heart.stop()
 
-        Resets the task consumer in :attr:`task_consumer`.
+        self.logger.debug("TaskConsumer: Shutting down...")
+        self.task_consumer = self.task_consumer and self.task_consumer.close()
 
-        """
+        self.logger.debug("EventDispatcher: Shutting down...")
+        self.event_dispatcher = self.event_dispatcher and \
+                                    self.event_dispatcher.close()
+        self.logger.debug(
+                "CarrotListener: Closing connection to broker...")
+        self.amqp_connection = self.amqp_connection and \
+                                    self.amqp_connection.close()
+
+    def reset_connection(self):
         self.logger.debug(
                 "CarrotListener: Re-establishing connection to the broker...")
         self.close_connection()
@@ -177,7 +174,9 @@ class CarrotListener(object):
         self.task_consumer.register_callback(self.receive_message)
         self.event_dispatcher = EventDispatcher(self.amqp_connection)
         self.heart = Heart(self.event_dispatcher)
-        #self.heart.start()
+        self.heart.start()
+
+        self._state = RUN
 
     def _open_connection(self):
         """Retries connecting to the AMQP broker over time.
@@ -205,4 +204,6 @@ class CarrotListener(object):
                                max_retries=conf.AMQP_CONNECTION_MAX_RETRIES)
         return conn
 
+    def stop(self):
+        self.close_connection()