Переглянути джерело

Handle errors occuring while acking task messages. Thanks to Chimrod.

Ask Solem 14 роки тому
батько
коміт
935ab248c2
2 змінених файлів з 10 додано та 3 видалено
  1. 8 1
      celery/worker/consumer.py
  2. 2 2
      celery/worker/job.py

+ 8 - 1
celery/worker/consumer.py

@@ -306,8 +306,15 @@ class Consumer(object):
 
         # Handle task
         if message_data.get("task"):
+            def ack():
+                try:
+                    message.ack()
+                except self.connection_errors, exc:
+                    self.logger.critical(
+                            "Couldn't ack %r: message:%r reason:%r" % (
+                                message.delivery_tag, message_data, exc))
             try:
-                task = TaskRequest.from_message(message, message_data,
+                task = TaskRequest.from_message(message, message_data, ack,
                                                 app=self.app,
                                                 logger=self.logger,
                                                 hostname=self.hostname,

+ 2 - 2
celery/worker/job.py

@@ -258,7 +258,7 @@ class TaskRequest(object):
             self._store_errors = self.task.store_errors_even_if_ignored
 
     @classmethod
-    def from_message(cls, message, message_data, **kw):
+    def from_message(cls, message, message_data, on_ack=noop, **kw):
         """Create request from a task message.
 
         :raises UnknownTaskError: if the message does not describe a task,
@@ -280,7 +280,7 @@ class TaskRequest(object):
                    retries=message_data.get("retries", 0),
                    eta=maybe_iso8601(message_data.get("eta")),
                    expires=maybe_iso8601(message_data.get("expires")),
-                   on_ack=message.ack,
+                   on_ack=on_ack,
                    delivery_info=delivery_info,
                    **kw)