Browse Source

Merge branch 'gregoirecachet/master'

Ask Solem 15 years ago
parent
commit
a2124e2c1e
3 changed files with 29 additions and 3 deletions
  1. 10 0
      celery/conf.py
  2. 7 2
      celery/datastructures.py
  3. 12 1
      celery/worker.py

+ 10 - 0
celery/conf.py

@@ -158,3 +158,13 @@ AMQP_CONSUMER_QUEUE = getattr(settings, "CELERY_AMQP_CONSUMER_QUEUE",
                               DEFAULT_AMQP_CONSUMER_QUEUE)
 
 REAP_TIMEOUT = DEFAULT_REAP_TIMEOUT
+
+"""
+.. data:: SEND_CELERY_TASK_ERROR_EMAILS
+
+    If set to True, errors in tasks will be sent to admins by e-mail.
+    If unset, it will send the emails if DEBUG is False.
+
+"""
+SEND_CELERY_TASK_ERROR_EMAILS = getattr(settings, "SEND_CELERY_TASK_ERROR_EMAILS",
+                                        settings.DEBUG is False)

+ 7 - 2
celery/datastructures.py

@@ -11,7 +11,9 @@ import time
 import os
 from UserList import UserList
 from celery.timer import TimeoutTimer, TimeoutError
-from celery.conf import REAP_TIMEOUT
+from celery.conf import REAP_TIMEOUT, SEND_CELERY_TASK_ERROR_EMAILS
+
+from django.core.mail import mail_admins
 
 
 class PositionQueue(UserList):
@@ -244,11 +246,14 @@ class TaskProcessQueue(object):
 
         """
         if self.done_msg:
+            from celery.worker import ExcInfo
             msg = self.done_msg % {
                     "name": task_name,
                     "id": task_id,
                     "return_value": ret_value}
-            if isinstance(ret_value, Exception):
+            if isinstance(ret_value, ExcInfo):
                 self.logger.error(msg)
+                if SEND_CELERY_TASK_ERROR_EMAILS is True:
+                    mail_admins(msg, ret_value.traceback, fail_silently=True)
             else:
                 self.logger.info(msg)

+ 12 - 1
celery/worker.py

@@ -14,6 +14,8 @@ import simplejson
 import traceback
 import logging
 import time
+import sys
+import traceback
 
 
 class EmptyQueue(Exception):
@@ -24,6 +26,15 @@ class UnknownTask(Exception):
     """Got an unknown task in the queue. The message is requeued and
     ignored."""
 
+class ExcInfo(object):
+    
+    def __init__(self, exc_info):
+        type_, exception, tb = exc_info
+        self.exception = exception
+        self.traceback = '\n'.join(traceback.format_exception(*exc_info))
+        
+    def __str__(self):
+        return str(self.exception)
 
 def jail(task_id, func, args, kwargs):
     """Wraps the task in a jail, which catches all exceptions, and
@@ -52,7 +63,7 @@ def jail(task_id, func, args, kwargs):
         result = func(*args, **kwargs)
     except Exception, exc:
         default_backend.mark_as_failure(task_id, exc)
-        return exc
+        return ExcInfo(sys.exc_info())
     else:
         default_backend.mark_as_done(task_id, result)
         return result