Browse Source

Gracefully handles consumer decode error (#5044)

Steven Sklar 6 years ago
parent
commit
ed17e12b14
2 changed files with 14 additions and 0 deletions
  1. 3 0
      celery/worker/consumer/consumer.py
  2. 11 0
      t/unit/worker/test_loops.py

+ 3 - 0
celery/worker/consumer/consumer.py

@@ -16,6 +16,7 @@ from time import sleep
 from billiard.common import restart_state
 from billiard.exceptions import RestartFreqExceeded
 from kombu.asynchronous.semaphore import DummyLock
+from kombu.exceptions import DecodeError
 from kombu.utils.compat import _detect_environment
 from kombu.utils.encoding import bytes_t, safe_repr
 from kombu.utils.limits import TokenBucket
@@ -568,6 +569,8 @@ class Consumer(object):
                     )
                 except InvalidTaskError as exc:
                     return on_invalid_task(payload, message, exc)
+                except DecodeError as exc:
+                    return self.on_decode_error(message, exc)
 
         return on_task_received
 

+ 11 - 0
t/unit/worker/test_loops.py

@@ -6,6 +6,7 @@ import socket
 import pytest
 from case import Mock
 from kombu.asynchronous import ERR, READ, WRITE, Hub
+from kombu.exceptions import DecodeError
 
 from celery.bootsteps import CLOSE, RUN
 from celery.exceptions import (InvalidTaskError, WorkerLostError,
@@ -91,6 +92,10 @@ class X(object):
             name='on_invalid_task',
         )
         _consumer.on_invalid_task = self.on_invalid_task
+        self.on_decode_error = self.obj.on_decode_error = Mock(
+            name='on_decode_error',
+        )
+        _consumer.on_decode_error = self.on_decode_error
         _consumer.strategies = self.obj.strategies
 
     def timeout_then_error(self, mock):
@@ -206,6 +211,12 @@ class test_asynloop:
         on_task(msg)
         x.on_invalid_task.assert_called_with(None, msg, exc)
 
+    def test_on_task_DecodeError(self):
+        x, on_task, msg, strategy = self.task_context(self.add.s(2, 2))
+        exc = strategy.side_effect = DecodeError()
+        on_task(msg)
+        x.on_decode_error.assert_called_with(msg, exc)
+
     def test_should_terminate(self):
         x = X(self.app)
         # XXX why aren't the errors propagated?!?