Browse Source

More work on celerymon

Ask Solem 15 years ago
parent
commit
396b5ba335
4 changed files with 48 additions and 27 deletions
  1. 21 11
      celery/events.py
  2. 5 1
      celery/messaging.py
  3. 18 8
      celery/worker/__init__.py
  4. 4 7
      celery/worker/heartbeat.py

+ 21 - 11
celery/events.py

@@ -1,6 +1,9 @@
-from celery.messaging import EventPublisher, EventConsumer
-from UserDict import UserDict
 import time
+import socket
+import threading
+from UserDict import UserDict
+
+from celery.messaging import EventPublisher, EventConsumer
 
 """
 Events
@@ -8,11 +11,11 @@ Events
 
 WORKER-ONLINE    hostname timestamp
 WORKER-OFFLINE   hostname timestamp
-TASK-RECEIVED    uuid name args kwargs retries eta timestamp
-TASK-ACCEPTED    uuid timestamp
-TASK-SUCCEEDED   uuid result timestamp
-TASK-FAILED      uuid exception timestamp
-TASK-RETRIED     uuid exception timestamp
+TASK-RECEIVED    uuid name args kwargs retries eta hostname timestamp
+TASK-ACCEPTED    uuid hostname timestamp
+TASK-SUCCEEDED   uuid result hostname timestamp
+TASK-FAILED      uuid exception hostname timestamp
+TASK-RETRIED     uuid exception hostname timestamp
 WORKER-HEARTBEAT hostname timestamp
 
 """
@@ -24,16 +27,23 @@ def Event(type, **fields):
 class EventDispatcher(object):
     """
 
-    dispatcher.send("worker-heartbeat", hostname="h8.opera.com")
+    dispatcher.send("event-name", arg1=1, arg2=2, arg3=3)
 
     """
-    def __init__(self, connection):
+    def __init__(self, connection, hostname=None):
         self.connection = connection
         self.publisher = EventPublisher(self.connection)
+        self.hostname = hostname or socket.gethostname()
+        self._lock = threading.Lock()
 
     def send(self, type, **fields):
-        fields["timestamp"] = time.time()
-        self.publisher.send(Event(type, **fields))
+        self._lock.acquire()
+        try:
+            fields["timestamp"] = time.time()
+            fields["hostname"] = self.hostname
+            self.publisher.send(Event(type, **fields))
+        finally:
+            self._lock.release()
 
 
 class EventReceiver(object):

+ 5 - 1
celery/messaging.py

@@ -105,7 +105,11 @@ class BroadcastPublisher(Publisher):
     routing_key = ""
 
     def revoke(self, task_id):
-        self.send(dict(revoke=task_id))
+        self.send("revoke", dict(task_id=task_id))
+
+    def send(self, type, data):
+        data["command"] = type
+        super(BroadcastPublisher, self).send({"control": data})
 
 
 class BroadcastConsumer(Consumer):

+ 18 - 8
celery/worker/__init__.py

@@ -61,7 +61,7 @@ class CarrotListener(object):
         self.logger = logger
         self.prefetch_count = SharedCounter(initial_prefetch_count)
         self.event_dispatcher = None
-        self.hostname = socket.gethostname()
+        self.event_connection = None
         self.heart = None
 
     def start(self):
@@ -101,6 +101,13 @@ class CarrotListener(object):
         to the broker."""
         self.close_connection()
 
+    def handle_control_command(self, command):
+        if command["command"] == "revoke":
+            revoke_uuid = command["task_id"]
+            revoked.add(revoke_uuid)
+            self.logger.warn("Task %s marked as revoked." % revoke_uuid)
+            return
+
     def receive_message(self, message_data, message):
         """The callback called when a new message is received.
 
@@ -109,10 +116,10 @@ class CarrotListener(object):
 
         """
 
-        revoke_uuid = message_data.get("revoke", None)
-        if revoke_uuid:
-            revoked.add(revoke_uuid)
-            self.logger.warn("Task %s marked as revoked." % revoke_uuid)
+        control = message_data.get("control")
+        if control:
+            print("RECV CONTROL: %s" % control)
+            self.handle_control_command(control)
             return
 
         try:
@@ -162,6 +169,9 @@ class CarrotListener(object):
                     "CarrotListener: Closing connection to the broker...")
             self.amqp_connection.close()
             self.amqp_connection = None
+        if self.event_connection:
+            self.event_connection.close()
+            self.event_connection = None
         self.event_dispatcher = None
 
     def reset_connection(self):
@@ -175,13 +185,14 @@ class CarrotListener(object):
                 "CarrotListener: Re-establishing connection to the broker...")
         self.close_connection()
         self.amqp_connection = self._open_connection()
+        self.logger.debug("CarrotListener: Connection Established.")
         self.task_consumer = get_consumer_set(connection=self.amqp_connection)
         self.broadcast_consumer = BroadcastConsumer(self.amqp_connection)
         self.task_consumer.add_consumer(self.broadcast_consumer)
         self.task_consumer.register_callback(self.receive_message)
         self.event_dispatcher = EventDispatcher(self.amqp_connection)
-        self.heart = Heart(self.event_dispatcher, hostname=self.hostname)
-        self.heart.start()
+        self.heart = Heart(self.event_dispatcher)
+        #self.heart.start()
 
     def _open_connection(self):
         """Retries connecting to the AMQP broker over time.
@@ -207,7 +218,6 @@ class CarrotListener(object):
         conn = retry_over_time(_establish_connection, (socket.error, IOError),
                                errback=_connection_error_handler,
                                max_retries=conf.AMQP_CONNECTION_MAX_RETRIES)
-        self.logger.debug("CarrotListener: Connection Established.")
         return conn
 
 

+ 4 - 7
celery/worker/heartbeat.py

@@ -1,4 +1,3 @@
-import socket
 import threading
 from time import sleep
 
@@ -6,30 +5,28 @@ from time import sleep
 class Heart(threading.Thread):
     interval = 60
 
-    def __init__(self, eventer, hostname=None, interval=None):
+    def __init__(self, eventer, interval=None):
         super(Heart, self).__init__()
         self.eventer = eventer
         self.interval = interval or self.interval
-        self.hostname = hostname or socket.gethostname()
         self._shutdown = threading.Event()
         self._stopped = threading.Event()
         self.setDaemon(True)
 
     def run(self):
-        hostname = self.hostname
         interval = self.interval
         send = self.eventer.send
 
-        send("worker-online", hostname=hostname)
+        send("worker-online")
 
         while 1:
             if self._shutdown.isSet():
                 break
-            send("worker-heartbeat", hostname=hostname)
+            send("worker-heartbeat")
             sleep(interval)
         self._stopped.set()
 
-        send("worker-offline", hostname=hostname)
+        send("worker-offline")
 
     def stop(self):
         """Gracefully shutdown the thread."""