Browse Source

Merge branch 'dcramer/master'

Conflicts:
	celery/task/base.py
	celery/tests/test_task.py
Ask Solem 14 years ago
parent
commit
ee8bec3ccb

+ 2 - 1
celery/beat.py

@@ -5,6 +5,7 @@ Periodic Task Scheduler
 """
 import time
 import shelve
+import sys
 import threading
 import traceback
 import multiprocessing
@@ -155,7 +156,7 @@ class Scheduler(UserDict):
                 result = self.apply_async(entry, publisher=publisher)
             except Exception, exc:
                 self.logger.error("Message Error: %s\n%s" % (exc,
-                    traceback.format_stack()))
+                    traceback.format_stack()), exc_info=sys.exc_info())
             else:
                 self.logger.debug("%s sent. id->%s" % (entry.task,
                                                        result.task_id))

+ 12 - 2
celery/log.py

@@ -3,6 +3,7 @@ import logging
 import threading
 import sys
 import traceback
+import types
 
 from multiprocessing import current_process
 from multiprocessing import util as mputil
@@ -31,18 +32,27 @@ class ColorFormatter(logging.Formatter):
         logging.Formatter.__init__(self, msg)
         self.use_color = use_color
 
+    def formatException(self, ei):
+        r = logging.Formatter.formatException(self, ei)
+        if type(r) in [types.StringType]:
+            r = r.decode('utf-8', 'replace') # Convert to unicode
+        return r
+
     def format(self, record):
         levelname = record.levelname
 
         if self.use_color and levelname in COLORS:
-            record.msg = str(colored().names[COLORS[levelname]](record.msg))
+            record.msg = unicode(colored().names[COLORS[levelname]](record.msg))
 
         # Very ugly, but have to make sure processName is supported
         # by foreign logger instances.
         # (processName is always supported by Python 2.7)
         if "processName" not in record.__dict__:
             record.__dict__["processName"] = current_process()._name
-        return logging.Formatter.format(self, record)
+        t = logging.Formatter.format(self, record)
+        if type(t) in [types.UnicodeType]:
+            t = t.encode('utf-8', 'replace')
+        return t
 
 
 class Logging(object):

+ 4 - 5
celery/task/base.py

@@ -46,8 +46,6 @@ def _unpickle_task(name):
     return tasks[name]
 
 
-
-
 class Context(threading.local):
 
     def update(self, d, **kwargs):
@@ -541,9 +539,10 @@ class BaseTask(object):
         if kwargs is None:
             kwargs = request.kwargs
 
-        delivery_info = request.delivery_info or {}
-        options.setdefault("exchange", delivery_info.get("exchange"))
-        options.setdefault("routing_key", delivery_info.get("routing_key"))
+        delivery_info = request.delivery_info
+        if delivery_info:
+            options.setdefault("exchange", delivery_info.get("exchange"))
+            options.setdefault("routing_key", delivery_info.get("routing_key"))
 
         options["retries"] = request.retries + 1
         options["task_id"] = request.id

+ 0 - 1
celery/tests/test_task.py

@@ -4,7 +4,6 @@ from datetime import datetime, timedelta
 
 from pyparsing import ParseException
 
-
 from celery import task
 from celery.app import app_or_default
 from celery.decorators import task as task_dec

+ 1 - 1
celery/tests/test_worker_state.py

@@ -108,5 +108,5 @@ class test_state(StateResetCase):
             state.task_accepted(request)
         self.assertEqual(len(state.active_requests), 2)
         for request in requests:
-            state.task_ready(requests)
+            state.task_ready(request)
         self.assertEqual(len(state.active_requests), 0)

+ 3 - 1
celery/worker/control/__init__.py

@@ -1,3 +1,5 @@
+import sys
+
 from celery.app import app_or_default
 from celery.pidbox import ControlReplyPublisher
 from celery.utils import kwdict
@@ -69,7 +71,7 @@ class ControlDispatch(object):
             except Exception, exc:
                 self.logger.error(
                         "Error running control command %s kwargs=%s: %s" % (
-                            command, kwargs, exc))
+                            command, kwargs, exc), exc_info=sys.exc_info())
                 reply = {"error": str(exc)}
             if reply_to:
                 self.reply({self.hostname: reply},

+ 2 - 1
celery/worker/control/builtins.py

@@ -1,3 +1,4 @@
+import sys
 from datetime import datetime
 
 from celery.registry import tasks
@@ -68,7 +69,7 @@ def rate_limit(panel, task_name, rate_limit, **kwargs):
         tasks[task_name].rate_limit = rate_limit
     except KeyError:
         panel.logger.error("Rate limit attempt for unknown task %s" % (
-            task_name, ))
+            task_name, ), exc_info=sys.exc_info())
         return {"error": "unknown task"}
 
     if not hasattr(panel.listener.ready_queue, "refresh"):

+ 7 - 1
celery/worker/controllers.py

@@ -55,7 +55,13 @@ class Mediator(threading.Thread):
             self.callback(task)
         except Exception, exc:
             self.logger.error("Mediator callback raised exception %r\n%s" % (
-                exc, traceback.format_exc()), exc_info=sys.exc_info())
+                exc, traceback.format_exc()), exc_info=sys.exc_info(), extra={
+                    "data": {
+                        "hostname": task.hostname,
+                        "id": task.task_id,
+                        "name": task.task_name,
+                    }
+                })
 
     def run(self):
         while not self._shutdown.isSet():

+ 10 - 1
celery/worker/job.py

@@ -488,7 +488,16 @@ class TaskRequest(object):
                    "traceback": unicode(exc_info.traceback, 'utf-8'),
                    "args": self.args,
                    "kwargs": self.kwargs}
-        self.logger.error(self.error_msg.strip() % context, exc_info=exc_info)
+
+        self.logger.error(self.error_msg.strip() % context,
+                          exc_info=exc_info,
+                          extra={
+                              "data": {
+                                  "hostname": self.hostname,
+                                  "id": self.task_id,
+                                  "name": self.task_name,
+                              }
+                          })
 
         task_obj = tasks.get(self.task_name, object)
         self.send_error_email(task_obj, context, exc_info.exception,

+ 4 - 3
celery/worker/listener.py

@@ -76,6 +76,7 @@ up and running.
 from __future__ import generators
 
 import socket
+import sys
 import warnings
 
 from carrot.connection import AMQPConnectionException
@@ -277,7 +278,7 @@ class CarrotListener(object):
             except OverflowError, exc:
                 self.logger.error(
                     "Couldn't convert eta %s to timestamp: %r. Task: %r" % (
-                        task.eta, exc, task.info(safe=True)))
+                        task.eta, exc, task.info(safe=True)), exc_info=sys.exc_info())
                 task.acknowledge()
             else:
                 self.qos.increment()
@@ -307,11 +308,11 @@ class CarrotListener(object):
                                                 eventer=self.event_dispatcher)
             except NotRegistered, exc:
                 self.logger.error("Unknown task ignored: %s: %s" % (
-                        str(exc), message_data))
+                        str(exc), message_data), exc_info=sys.exc_info())
                 message.ack()
             except InvalidTaskError, exc:
                 self.logger.error("Invalid task ignored: %s: %s" % (
-                        str(exc), message_data))
+                        str(exc), message_data), exc_info=sys.exc_info())
                 message.ack()
             else:
                 self.on_task(task)