Jelajahi Sumber

Handle errors occuring while acking task messages. Thanks to Chimrod.

Ask Solem 15 tahun lalu
induk
melakukan
5967f209bd
2 mengubah file dengan 13 tambahan dan 5 penghapusan
  1. 3 3
      celery/worker/job.py
  2. 10 2
      celery/worker/listener.py

+ 3 - 3
celery/worker/job.py

@@ -266,8 +266,8 @@ class TaskRequest(object):
         return False
 
     @classmethod
-    def from_message(cls, message, message_data, logger=None, eventer=None,
-            hostname=None):
+    def from_message(cls, message, message_data, on_ack=noop, logger=None,
+            eventer=None, hostname=None):
         """Create a :class:`TaskRequest` from a task message sent by
         :class:`celery.messaging.TaskPublisher`.
 
@@ -293,7 +293,7 @@ class TaskRequest(object):
             raise InvalidTaskError("Task kwargs must be a dictionary.")
 
         return cls(task_name, task_id, args, kwdict(kwargs),
-                   retries=retries, on_ack=message.ack,
+                   retries=retries, on_ack=on_ack,
                    delivery_info=delivery_info, logger=logger,
                    eventer=eventer, hostname=hostname,
                    eta=eta, expires=expires)

+ 10 - 2
celery/worker/listener.py

@@ -237,7 +237,7 @@ class CarrotListener(object):
             self.reset_connection()
             try:
                 self.consume_messages()
-            except (socket.error, AMQPConnectionException, IOError):
+            except (socket.error, AMQPConnectionException, IOError, OSError):
                 self.logger.error("CarrotListener: Connection to broker lost."
                                 + " Trying to re-establish connection...")
 
@@ -300,8 +300,16 @@ class CarrotListener(object):
 
         # Handle task
         if message_data.get("task"):
+            def ack():
+                try:
+                    message.ack()
+                except (socket.error, AMQPConnectionException,
+                        IOError, OSError), exc:
+                    self.logger.critical(
+                            "Couldn't ack %r: message:%r reason:%r" % (
+                                message.delivery_tag, message_data, exc))
             try:
-                task = TaskRequest.from_message(message, message_data,
+                task = TaskRequest.from_message(message, message_data, ack,
                                                 logger=self.logger,
                                                 hostname=self.hostname,
                                                 eventer=self.event_dispatcher)