瀏覽代碼

Use carrot.Consumer.iterqueue() instead of carrot.Consumer.fetch(), + fix
syntax errors.

Ask Solem 16 年之前
父節點
當前提交
41387ae9af
共有 1 個文件被更改,包括 23 次插入30 次删除
  1. 23 30
      celery/worker.py

+ 23 - 30
celery/worker.py

@@ -125,8 +125,11 @@ class TaskWrapper(object):
         self.task_func = task_func
         self.args = args
         self.kwargs = kwargs
-        self.done_msg = opts.get("done_msg", self.done_msg)
-        self.logger = opts.get("logger", multiprocessing.get_logger())
+        for opt in ("success_msg", "fail_msg", "fail_email_subject",
+                "fail_email_body", "logger"):
+            setattr(self, opt, opts.get(opt, getattr(self, opt, None)))
+        if not self.logger:
+            self.logger = multiprocessing.get_logger()
 
     def __repr__(self):
         return '<%s: {name:"%s", id:"%s", args:"%s", kwargs:"%s"}>' % (
@@ -311,6 +314,7 @@ class WorkController(object):
             self.task_consumer.connection.close()
         amqp_connection = DjangoAMQPConnection()
         self.task_consumer = TaskConsumer(connection=amqp_connection)
+        self.task_consumer_it = self.task_consumer.iterqueue(infinite=True)
 
     def connection_diagnostics(self):
         """Diagnose the AMQP connection, and reset connection if
@@ -336,30 +340,23 @@ class WorkController(object):
         """
         #self.connection_diagnostics()
         self.logger.debug("Trying to fetch message from broker...")
-        message = self.task_consumer.fetch()
-        if message is not None:
-            self.logger.debug("Acknowledging message with delivery tag %s" % (
-                message.delivery_tag))
-        return message
-
-    def fetch_next_task(self):
-        """Fetch the next task from the AMQP broker.
-
-        Raises :exc:`EmptyQueue` exception if there is no message
-        waiting on the queue.
-
-        :returns: :class:`TaskWrapper` instance.
-
-        """
-        message = self.receive_message()
-        if message is None: # No messages waiting.
+        message = next(self.task_consumer_it)
+        if not message:
             raise EmptyQueue()
+        return message
 
+    def process_task(self, message):
         task = TaskWrapper.from_message(message, logger=self.logger)
         self.logger.info("Got task from broker: %s[%s]" % (
-                            task.task_name, task.task_id))
+            task.task_name, task.task_id))
+        self.logger.debug("Got a task: %s. Trying to execute it..." % task)
+
+        result = task.execute_using_pool(self.pool, self.loglevel,
+                                         self.logfile)
 
-        return task
+        self.logger.debug("Task %s has been executed asynchronously." % task)
+
+        return result
 
     def execute_next_task(self):
         """Execute the next task on the queue using the multiprocessing pool.
@@ -367,17 +364,13 @@ class WorkController(object):
         Catches all exceptions and logs them with level
         :const:`logging.CRITICAL`.
 
-        """
-        self.logger.debug("Trying to fetch a task.")
-        task = self.fetch_next_task()
-        self.logger.debug("Got a task: %s. Trying to execute it..." % task)
-
-        result = task.execute_using_pool(self.pool, self.loglevel,
-                                         self.logfile)
+        Raises :exc:`EmptyQueue` exception if there is no message
+        waiting on the queue.
 
-        self.logger.debug("Task %s has been executed asynchronously." % task)
+        :returns: :class:`TaskWrapper` instance.
 
-        return result
+        """
+        return self.process_task(self.receive_message())
 
     def run_periodic_tasks(self):
         """Schedule all waiting periodic tasks for execution.