Bladeren bron

The bucket is working. Couldn't have the AMQP consumer in its own thread, so
created a Mediator thread instead, pushing items form the bucket queue to the
pool.

Ask Solem 16 jaren geleden
bovenliggende
commit
5036026654
1 gewijzigde bestanden met toevoegingen van 58 en 112 verwijderingen
  1. 58 112
      celery/worker.py

+ 58 - 112
celery/worker.py

@@ -265,37 +265,24 @@ class TaskWrapper(object):
                 meta={"task_id": self.task_id, "task_name": self.task_name})
 
 
-class AMQPMediator(threading.Thread):
-    """Thread continously taking care of new messages pushed by the
-    AMQP broker."""
-    
-    def __init__(self, bucket_queue, hold_queue):
-        super(AMQPMediator, self).__init__()
+class Mediator(threading.Thread):
+    """Thread continuously passing tasks in the queue
+    to the pool."""
+
+    def __init__(self, bucket_queue, callback):
+        super(Mediator, self).__init__()
         self._shutdown = threading.Event()
         self._stopped = threading.Event()
         self.bucket_queue = bucket_queue
-        self.hold_queue = hold_queue
-        self.amqp_connection = None
-        self.task_consumer = None
+        self.callback = callback
 
-    def add_to_queue(self, message_data, message):
-        eta = message_data.get("eta")
-        if eta:
-            print("ADD TO HOLD QUEUE: %s" % eta)
-            self.hold_queue.put((message_data, message, eta))
-        else:
-            self.bucket_queue.put((message_data, message))
-    
     def run(self):
-        print("THREAD RUNNING")
-        task_consumer = self.reset_connection()
-        it = task_consumer.iterconsume(limit=None)
         while True:
             if self._shutdown.isSet():
                 break
-            print("TRYING TO GET NEXT MESSAGE")
-            it.next()
-        self.close_connection()
+            # This blocks until there's a message in the queue.
+            task = self.bucket_queue.get()
+            self.callback(task)
         self._stopped.set() # indicate that we are stopped
 
     def stop(self):
@@ -303,47 +290,11 @@ class AMQPMediator(threading.Thread):
         self._shutdown.set()
         self._stopped.wait() # block until this thread is done
 
-    def close_connection(self):
-        """Close the AMQP connection."""
-        if self.task_consumer:
-            self.task_consumer.close()
-        if self.amqp_connection:
-            self.amqp_connection.close()
-
-    def reset_connection(self):
-        """Reset the AMQP connection, and reinitialize the
-        :class:`celery.messaging.TaskConsumer` instance.
-
-        Resets the task consumer in :attr:`task_consumer`.
-
-        """
-        self.close_connection()
-        self.amqp_connection = DjangoAMQPConnection()
-        self.task_consumer = TaskConsumer(connection=self.amqp_connection)
-        self.task_consumer.register_callback(self.add_to_queue)
-        return self.task_consumer
-
-    def connection_diagnostics(self):
-        """Diagnose the AMQP connection, and reset connection if
-        necessary."""
-        connection = self.task_consumer.backend.channel.connection
-
-        if not connection:
-            self.logger.info(
-                    "AMQP Connection has died, restoring connection.")
-            self.reset_connection()
-
 
 class PeriodicWorkController(threading.Thread):
     """A thread that continuously checks if there are
     :class:`celery.task.PeriodicTask` tasks waiting for execution,
-    and executes them.
-
-    Example:
-
-        >>> PeriodicWorkController().start()
-
-    """
+    and executes them."""
 
     def __init__(self):
         super(PeriodicWorkController, self).__init__()
@@ -416,15 +367,36 @@ class WorkController(object):
         self.pool = TaskPool(self.concurrency, logger=self.logger)
         self.periodicworkcontroller = PeriodicWorkController()
         self.is_detached = is_detached
-        self.bucket_queue = Queue(maxsize=self.concurrency)
-        self.hold_queue = Queue()
-        self.amqp_mediator = AMQPMediator(self.bucket_queue, self.hold_queue)
+        self.bucket_queue = Queue()
+        self.mediator = Mediator(self.bucket_queue, self.process_task)
+        self.amqp_connection = None
+        self.task_consumer = None
+    
+    def run(self):
+        """Starts the workers main loop."""
+        self._state = "RUN"
+        task_consumer = self.reset_connection()
+        it = task_consumer.iterconsume(limit=None)
+
+        self.pool.run()
+        self.mediator.start()
+        self.periodicworkcontroller.start()
+
+        try:
+            while True:
+                it.next()
+        except (SystemExit, KeyboardInterrupt):
+            self.shutdown()
 
-    def safe_process_task(self, message_data, message):
-        """The method called when we receive a message."""
+    def process_task(self, task):
+        """Process task by passing it to the pool of workers."""
         try:
             try:
-                self.process_task(message_data, message)
+                self.logger.info("Got task from broker: %s[%s]" % (
+                    task.task_name, task.task_id))
+                task.execute_using_pool(self.pool, self.loglevel,
+                                        self.logfile)
+                self.logger.debug("Task %s has been executed." % task)
             except ValueError:
                 # execute_next_task didn't return a r/name/id tuple,
                 # probably because it got an exception.
@@ -437,21 +409,30 @@ class WorkController(object):
         except (SystemExit, KeyboardInterrupt):
             self.shutdown()
 
-    def process_task(self, message_data, message):
-        """Process task message by passing it to the pool of workers."""
-        
+    def add_to_bucket(self, message_data, message):
         task = TaskWrapper.from_message(message, message_data,
                                         logger=self.logger)
-        self.logger.info("Got task from broker: %s[%s]" % (
-            task.task_name, task.task_id))
-        self.logger.debug("Got a task: %s. Trying to execute it..." % task)
+        self.bucket_queue.put(task)
+
+    def close_connection(self):
+        """Close the AMQP connection."""
+        if self.task_consumer:
+            self.task_consumer.close()
+        if self.amqp_connection:
+            self.amqp_connection.close()
 
-        result = task.execute_using_pool(self.pool, self.loglevel,
-                                         self.logfile)
+    def reset_connection(self):
+        """Reset the AMQP connection, and reinitialize the
+        :class:`celery.messaging.TaskConsumer` instance.
 
-        self.logger.debug("Task %s has been executed asynchronously." % task)
+        Resets the task consumer in :attr:`task_consumer`.
 
-        return
+        """
+        self.close_connection()
+        self.amqp_connection = DjangoAMQPConnection()
+        self.task_consumer = TaskConsumer(connection=self.amqp_connection)
+        self.task_consumer.register_callback(self.add_to_bucket)
+        return self.task_consumer
 
     def shutdown(self):
         """Make sure ``celeryd`` exits cleanly."""
@@ -459,42 +440,7 @@ class WorkController(object):
         if self._state != "RUN":
             return
         self._state = "TERMINATE"
-        self.amqp_mediator.stop()
+        self.mediator.stop()
         self.periodicworkcontroller.stop()
         self.pool.terminate()
         self.close_connection()
-
-    def run(self):
-        """Starts the workers main loop."""
-        self._state = "RUN"
-
-        self.pool.run()
-        self.amqp_mediator.start()
-        self.periodicworkcontroller.start()
-
-        try:
-            while True:
-                self.process_hold()
-                self.process_bucket()
-        except (SystemExit, KeyboardInterrupt):
-            self.shutdown()
-
-    def process_hold(self):
-        try:
-            message_data, message, eta = self.hold_queue.get(timeout=0.2)
-        except QueueEmpty:
-            pass
-        else:
-            print("GOT ITEM FROM HOLD QUEUE: %s" % eta)
-            if datetime.now() >= eta:
-                self.safe_process_task(message_data, message)
-            else:
-                self.hold_queue.put((message_data, message, eta))
-
-    def process_bucket(self):
-        try:
-            message_data, message = self.bucket_queue.get(timeout=0.2)
-        except QueueEmpty:
-            pass
-        else:
-            self.safe_process_task(message_data, message)