Sfoglia il codice sorgente

Log errors occuring in process_cleanup. Closes #365. Thanks to jdunck

Ask Solem 14 anni fa
parent
commit
5aa3375ae5

+ 34 - 1
celery/tests/test_worker/test_worker_job.py

@@ -8,6 +8,7 @@ import time
 from datetime import datetime, timedelta
 
 from kombu.transport.base import Message
+from mock import Mock
 
 from celery import states
 from celery.app import app_or_default
@@ -26,7 +27,7 @@ from celery.worker.state import revoked
 
 from celery.tests.compat import catch_warnings
 from celery.tests.utils import unittest
-from celery.tests.utils import execute_context, StringIO
+from celery.tests.utils import execute_context, StringIO, wrap_logger
 
 
 scratch = {"ACK": False}
@@ -82,6 +83,38 @@ class test_RetryTaskError(unittest.TestCase):
 
 class test_WorkerTaskTrace(unittest.TestCase):
 
+    def test_process_cleanup_fails(self):
+        backend = mytask.backend
+        mytask.backend = Mock()
+        mytask.backend.process_cleanup = Mock(side_effect=KeyError())
+        try:
+
+            def with_wrap_logger(sio):
+                uuid = gen_unique_id()
+                ret = jail(uuid, mytask.name, [2], {})
+                self.assertEqual(ret, 4)
+                mytask.backend.mark_as_done.assert_called_with(uuid, 4)
+                logs = sio.getvalue().strip()
+                self.assertIn("Process cleanup failed", logs)
+                return 1234
+
+            logger = mytask.app.log.get_default_logger()
+            self.assertEqual(execute_context(
+                    wrap_logger(logger), with_wrap_logger), 1234)
+
+        finally:
+            mytask.backend = backend
+
+    def test_process_cleanup_BaseException(self):
+        backend = mytask.backend
+        mytask.backend = Mock()
+        mytask.backend.process_cleanup = Mock(side_effect=SystemExit())
+        try:
+            self.assertRaises(SystemExit,
+                    jail, gen_unique_id(), mytask.name, [2], {})
+        finally:
+            mytask.backend = backend
+
     def test_execute_jail_success(self):
         ret = jail(gen_unique_id(), mytask.name, [2], {})
         self.assertEqual(ret, 4)

+ 31 - 1
celery/tests/utils.py

@@ -7,6 +7,7 @@ except AttributeError:
     import unittest2 as unittest
 
 import importlib
+import logging
 import os
 import sys
 import time
@@ -15,7 +16,12 @@ try:
 except ImportError:    # py3k
     import builtins
 
-from celery.utils.compat import StringIO
+from celery.utils.compat import StringIO, LoggerAdapter
+try:
+    from contextlib import contextmanager
+except ImportError:
+    from celery.tests.utils import fallback_contextmanager as contextmanager
+
 
 from nose import SkipTest
 
@@ -72,6 +78,30 @@ class GeneratorContextManager(object):
                     raise
 
 
+def get_handlers(logger):
+    if isinstance(logger, LoggerAdapter):
+        return logger.logger.handlers
+    return logger.handlers
+
+
+def set_handlers(logger, new_handlers):
+    if isinstance(logger, LoggerAdapter):
+        logger.logger.handlers = new_handlers
+    logger.handlers = new_handlers
+
+
+@contextmanager
+def wrap_logger(logger, loglevel=logging.ERROR):
+    old_handlers = get_handlers(logger)
+    sio = StringIO()
+    siohandler = logging.StreamHandler(sio)
+    set_handlers(logger, [siohandler])
+
+    yield sio
+
+    set_handlers(logger, old_handlers)
+
+
 def fallback_contextmanager(fun):
     def helper(*args, **kwds):
         return GeneratorContextManager(fun(*args, **kwds))

+ 13 - 4
celery/worker/job.py

@@ -7,8 +7,9 @@ import warnings
 
 from datetime import datetime
 
+from celery import current_app
 from celery import platforms
-from celery.app import app_or_default, current_app
+from celery.app import app_or_default
 from celery.datastructures import ExceptionInfo
 from celery.exceptions import SoftTimeLimitExceeded, TimeLimitExceeded
 from celery.exceptions import WorkerLostError, RetryTaskError
@@ -93,7 +94,7 @@ class WorkerTaskTrace(TaskTrace):
     hostname = None
 
     def __init__(self, *args, **kwargs):
-        self.loader = kwargs.get("loader") or current_app().loader
+        self.loader = kwargs.get("loader") or current_app.loader
         self.hostname = kwargs.get("hostname") or socket.gethostname()
         super(WorkerTaskTrace, self).__init__(*args, **kwargs)
 
@@ -125,8 +126,16 @@ class WorkerTaskTrace(TaskTrace):
         try:
             return super(WorkerTaskTrace, self).execute()
         finally:
-            self.task.backend.process_cleanup()
-            self.loader.on_process_cleanup()
+            try:
+                self.task.backend.process_cleanup()
+                self.loader.on_process_cleanup()
+            except (KeyboardInterrupt, SystemExit, MemoryError):
+                raise
+            except Exception, exc:
+                logger = current_app.log.get_default_logger()
+                logger.error("Process cleanup failed: %r" % (exc, ),
+                             exc_info=sys.exc_info())
+
 
     def handle_success(self, retval, *args):
         """Handle successful execution."""

+ 1 - 1
contrib/requirements/test.txt

@@ -3,7 +3,7 @@ simplejson
 nose
 nose-cover3
 coverage>=3.0
-mock>=0.6.0
+mock>=0.7.0
 pytyrant
 redis
 pymongo