Forráskód Böngészése

Chord errors must also be logged

Ask Solem 11 éve
szülő
commit
2e78672a7d
3 módosított fájl, 18 hozzáadás és 0 törlés
  1. 6 0
      celery/app/builtins.py
  2. 8 0
      celery/backends/base.py
  3. 4 0
      celery/backends/redis.py

+ 6 - 0
celery/app/builtins.py

@@ -13,9 +13,12 @@ from collections import deque
 
 from celery._state import get_current_worker_task
 from celery.utils import uuid
+from celery.utils.log import get_logger
 
 __all__ = ['shared_task', 'load_shared_tasks']
 
+logger = get_logger(__name__)
+
 #: global list of functions defining tasks that should be
 #: added to all apps.
 _shared_tasks = set()
@@ -105,12 +108,15 @@ def add_unlock_chord_task(app):
                     )
                 except StopIteration:
                     reason = repr(exc)
+                logger.error('Chord %r raised: %r', group_id, exc, exc_info=1)
                 app.backend.chord_error_from_stack(callback,
                                                    ChordError(reason))
             else:
                 try:
                     callback.delay(ret)
                 except Exception as exc:
+                    logger.error('Chord %r raised: %r', group_id, exc,
+                                 exc_info=1)
                     app.backend.chord_error_from_stack(
                         callback,
                         exc=ChordError('Callback error: {0!r}'.format(exc)),

+ 8 - 0
celery/backends/base.py

@@ -35,6 +35,7 @@ from celery.result import (
 )
 from celery.utils import timeutils
 from celery.utils.functional import LRUCache
+from celery.utils.log import get_logger
 from celery.utils.serialization import (
     get_pickled_exception,
     get_pickleable_exception,
@@ -46,6 +47,8 @@ __all__ = ['BaseBackend', 'KeyValueStoreBackend', 'DisabledBackend']
 EXCEPTION_ABLE_CODECS = frozenset(['pickle', 'yaml'])
 PY3 = sys.version_info >= (3, 0)
 
+logger = get_logger(__name__)
+
 
 def unpickle_backend(cls, args, kwargs):
     """Return an unpickled backend."""
@@ -527,6 +530,7 @@ class KeyValueStoreBackend(BaseBackend):
             deps = GroupResult.restore(gid, backend=task.backend)
         except Exception as exc:
             callback = maybe_signature(task.request.chord, app=app)
+            logger.error('Chord %r raised: %r', gid, exc, exc_info=1)
             return self.chord_error_from_stack(
                 callback,
                 ChordError('Cannot restore group: {0!r}'.format(exc)),
@@ -536,6 +540,8 @@ class KeyValueStoreBackend(BaseBackend):
                 raise ValueError(gid)
             except ValueError as exc:
                 callback = maybe_signature(task.request.chord, app=app)
+                logger.error('Chord callback %r raised: %r', gid, exc,
+                             exc_info=1)
                 return self.chord_error_from_stack(
                     callback,
                     ChordError('GroupResult {0} no longer exists'.format(gid)),
@@ -556,11 +562,13 @@ class KeyValueStoreBackend(BaseBackend):
                 except StopIteration:
                     reason = repr(exc)
 
+                logger.error('Chord %r raised: %r', gid, reason, exc_info=1)
                 self.chord_error_from_stack(callback, ChordError(reason))
             else:
                 try:
                     callback.delay(ret)
                 except Exception as exc:
+                    logger.error('Chord %r raised: %r', gid, exc, exc_info=1)
                     self.chord_error_from_stack(
                         callback,
                         ChordError('Callback error: {0!r}'.format(exc)),

+ 4 - 0
celery/backends/redis.py

@@ -220,15 +220,19 @@ class RedisBackend(KeyValueStoreBackend):
                 try:
                     callback.delay([unpack(tup, decode) for tup in resl])
                 except Exception as exc:
+                    error('Chord callback for %r raised: %r',
+                          request.group, exc, exc_info=1)
                     app._tasks[callback.task].backend.fail_from_current_stack(
                         callback.id,
                         exc=ChordError('Callback error: {0!r}'.format(exc)),
                     )
         except ChordError as exc:
+            error('Chord %r raised: %r', request.group, exc, exc_info=1)
             app._tasks[callback.task].backend.fail_from_current_stack(
                 callback.id, exc=exc,
             )
         except Exception as exc:
+            error('Chord %r raised: %r', request.group, exc, exc_info=1)
             app._tasks[callback.task].backend.fail_from_current_stack(
                 callback.id, exc=ChordError('Join error: {0!r}'.format(exc)),
             )