|  | @@ -8,17 +8,28 @@ from celery.log import setup_logger
 | 
	
		
			
				|  |  |  from celery.registry import tasks
 | 
	
		
			
				|  |  |  from celery.pool import TaskPool
 | 
	
		
			
				|  |  |  from celery.datastructures import ExceptionInfo
 | 
	
		
			
				|  |  | -from celery.models import PeriodicTaskMeta
 | 
	
		
			
				|  |  |  from celery.backends import default_backend, default_periodic_status_backend
 | 
	
		
			
				|  |  |  from celery.timer import EventTimer
 | 
	
		
			
				|  |  |  from django.core.mail import mail_admins
 | 
	
		
			
				|  |  |  import multiprocessing
 | 
	
		
			
				|  |  | -import simplejson
 | 
	
		
			
				|  |  |  import traceback
 | 
	
		
			
				|  |  |  import logging
 | 
	
		
			
				|  |  | +import socket
 | 
	
		
			
				|  |  |  import time
 | 
	
		
			
				|  |  |  import sys
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +TASK_FAIL_EMAIL_BODY = """
 | 
	
		
			
				|  |  | +Task %(name)s with id %(id)s raised exception: %(exc)s
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +The contents of the full traceback was:
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +%(traceback)s
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +-- 
 | 
	
		
			
				|  |  | +Just thought I'd let you know!
 | 
	
		
			
				|  |  | +celeryd at %(hostname)s.
 | 
	
		
			
				|  |  | +"""
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  class EmptyQueue(Exception):
 | 
	
		
			
				|  |  |      """The message queue is currently empty."""
 | 
	
	
		
			
				|  | @@ -98,7 +109,15 @@ class TaskWrapper(object):
 | 
	
		
			
				|  |  |          Mapping of keyword arguments to apply to the task.
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      """
 | 
	
		
			
				|  |  | -    done_msg = "Task %(name)s[%(id)s] processed: %(return_value)s"
 | 
	
		
			
				|  |  | +    success_msg = "Task %(name)s[%(id)s] processed: %(return_value)s"
 | 
	
		
			
				|  |  | +    fail_msg = """
 | 
	
		
			
				|  |  | +        Task %(name)s[%(id)s] raised exception: %(exc)s\n%(traceback)s
 | 
	
		
			
				|  |  | +    """
 | 
	
		
			
				|  |  | +    fail_email_subject = """
 | 
	
		
			
				|  |  | +        [celery@%(hostname)s] Error: Task %(name)s (%(id)s): %(exc)s
 | 
	
		
			
				|  |  | +    """
 | 
	
		
			
				|  |  | +    fail_email_body = TASK_FAIL_EMAIL_BODY
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def __init__(self, task_name, task_id, task_func, args, kwargs, **opts):
 | 
	
		
			
				|  |  |          self.task_name = task_name
 | 
	
	
		
			
				|  | @@ -163,24 +182,33 @@ class TaskWrapper(object):
 | 
	
		
			
				|  |  |                          self.task_func, self.args, task_func_kwargs])
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def on_success(self, ret_value, meta):
 | 
	
		
			
				|  |  | +        """The handler used if the task was successfully processed (
 | 
	
		
			
				|  |  | +        without raising an exception)."""
 | 
	
		
			
				|  |  |          task_id = meta.get("task_id")
 | 
	
		
			
				|  |  |          task_name = meta.get("task_name")
 | 
	
		
			
				|  |  | -        msg = self.done_msg % {
 | 
	
		
			
				|  |  | +        msg = self.success_msg.strip() % {
 | 
	
		
			
				|  |  |                  "id": task_id,
 | 
	
		
			
				|  |  |                  "name": task_name,
 | 
	
		
			
				|  |  |                  "return_value": ret_value}
 | 
	
		
			
				|  |  |          self.logger.info(msg)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    def on_failure(self, ret_value, meta):
 | 
	
		
			
				|  |  | +    def on_failure(self, exc_info, meta):
 | 
	
		
			
				|  |  | +        """The handler used if the task raised an exception."""
 | 
	
		
			
				|  |  |          task_id = meta.get("task_id")
 | 
	
		
			
				|  |  |          task_name = meta.get("task_name")
 | 
	
		
			
				|  |  | -        msg = self.done_msg % {
 | 
	
		
			
				|  |  | -                "id": task_id,
 | 
	
		
			
				|  |  | -                "name": task_name,
 | 
	
		
			
				|  |  | -                "return_value": ret_value}
 | 
	
		
			
				|  |  | -        self.logger.error(msg)
 | 
	
		
			
				|  |  | +        context = {
 | 
	
		
			
				|  |  | +            "hostname": socket.gethostname(),
 | 
	
		
			
				|  |  | +            "id": task_id,
 | 
	
		
			
				|  |  | +            "name": task_name,
 | 
	
		
			
				|  |  | +            "exc": exc_info.exception,
 | 
	
		
			
				|  |  | +            "traceback": exc_info.traceback
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +        self.logger.error(self.fail_msg.strip() % context)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |          if SEND_CELERY_TASK_ERROR_EMAILS:
 | 
	
		
			
				|  |  | -            mail_admins(msg, ret_value.traceback, fail_silently=True)
 | 
	
		
			
				|  |  | +            subject = self.fail_email_subject.strip() % context
 | 
	
		
			
				|  |  | +            body = self.fail_email_body.strip() % context
 | 
	
		
			
				|  |  | +            mail_admins(subject, body, fail_silently=True)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def execute_using_pool(self, pool, loglevel=None, logfile=None):
 | 
	
		
			
				|  |  |          """Like :meth:`execute`, but using the :mod:`multiprocessing` pool.
 | 
	
	
		
			
				|  | @@ -331,7 +359,7 @@ class WorkController(object):
 | 
	
		
			
				|  |  |          self.logger.info("Got task from broker: %s[%s]" % (
 | 
	
		
			
				|  |  |                              task.task_name, task.task_id))
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        return task, message
 | 
	
		
			
				|  |  | +        return task
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def execute_next_task(self):
 | 
	
		
			
				|  |  |          """Execute the next task on the queue using the multiprocessing pool.
 | 
	
	
		
			
				|  | @@ -341,7 +369,7 @@ class WorkController(object):
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          """
 | 
	
		
			
				|  |  |          self.logger.debug("Trying to fetch a task.")
 | 
	
		
			
				|  |  | -        task, message = self.fetch_next_task()
 | 
	
		
			
				|  |  | +        task = self.fetch_next_task()
 | 
	
		
			
				|  |  |          self.logger.debug("Got a task: %s. Trying to execute it..." % task)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          result = task.execute_using_pool(self.pool, self.loglevel,
 | 
	
	
		
			
				|  | @@ -349,7 +377,7 @@ class WorkController(object):
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          self.logger.debug("Task %s has been executed asynchronously." % task)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        return result, task.task_name, task.task_id
 | 
	
		
			
				|  |  | +        return result
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def run_periodic_tasks(self):
 | 
	
		
			
				|  |  |          """Schedule all waiting periodic tasks for execution.
 | 
	
	
		
			
				|  | @@ -384,7 +412,7 @@ class WorkController(object):
 | 
	
		
			
				|  |  |          while True:
 | 
	
		
			
				|  |  |              [event.tick() for event in events]
 | 
	
		
			
				|  |  |              try:
 | 
	
		
			
				|  |  | -                result, task_name, task_id = self.execute_next_task()
 | 
	
		
			
				|  |  | +                self.execute_next_task()
 | 
	
		
			
				|  |  |              except ValueError:
 | 
	
		
			
				|  |  |                  # execute_next_task didn't return a r/name/id tuple,
 | 
	
		
			
				|  |  |                  # probably because it got an exception.
 | 
	
	
		
			
				|  | @@ -393,10 +421,10 @@ class WorkController(object):
 | 
	
		
			
				|  |  |                  ev_msg_waiting.tick()
 | 
	
		
			
				|  |  |                  time.sleep(self.queue_wakeup_after)
 | 
	
		
			
				|  |  |                  continue
 | 
	
		
			
				|  |  | -            except UnknownTask, e:
 | 
	
		
			
				|  |  | -                self.logger.info("Unknown task ignored: %s" % (e))
 | 
	
		
			
				|  |  | +            except UnknownTask, exc:
 | 
	
		
			
				|  |  | +                self.logger.info("Unknown task ignored: %s" % (exc))
 | 
	
		
			
				|  |  |                  continue
 | 
	
		
			
				|  |  | -            except Exception, e:
 | 
	
		
			
				|  |  | +            except Exception, exc:
 | 
	
		
			
				|  |  |                  self.logger.critical("Message queue raised %s: %s\n%s" % (
 | 
	
		
			
				|  |  | -                             e.__class__, e, traceback.format_exc()))
 | 
	
		
			
				|  |  | +                             exc.__class__, exc, traceback.format_exc()))
 | 
	
		
			
				|  |  |                  continue
 |