Browse Source

Pass custom hostnames to error e-mails

Ask Solem 14 years ago
parent
commit
45dcce1c0f

+ 2 - 3
celery/events/state.py

@@ -5,6 +5,7 @@ from carrot.utils import partition
 
 from celery import states
 from celery.datastructures import LocalCache
+from celery.utils import kwdict
 
 HEARTBEAT_EXPIRE = 150 # 2 minutes, 30 seconds
 
@@ -183,11 +184,9 @@ class State(object):
 
     def event(self, event):
         """Process event."""
-        event = dict((key.encode("utf-8"), value)
-                        for key, value in event.items())
         self.event_count += 1
         group, _, type = partition(event.pop("type"), "-")
-        self.group_handlers[group](type, event)
+        self.group_handlers[group](type, kwdict(event))
         if self.event_callback:
             self.event_callback(self, event)
 

+ 11 - 0
celery/utils/__init__.py

@@ -31,6 +31,17 @@ def noop(*args, **kwargs):
     pass
 
 
+def kwdict(kwargs):
+    """Make sure keyword arguments are not in unicode.
+
+    This should be fixed in newer Python versions,
+      see: http://bugs.python.org/issue4978.
+
+    """
+    return dict((key.encode("utf-8"), value)
+                    for key, value in kwargs.items())
+
+
 def first(predicate, iterable):
     """Returns the first element in ``iterable`` that ``predicate`` returns a
     ``True`` value for."""

+ 3 - 7
celery/worker/control/__init__.py

@@ -1,7 +1,8 @@
 from celery import log
+from celery.messaging import ControlReplyPublisher, with_connection
+from celery.utils import kwdict
 from celery.worker.control.registry import Panel
 from celery.worker.control import builtins
-from celery.messaging import ControlReplyPublisher, with_connection
 
 
 class ControlDispatch(object):
@@ -55,12 +56,7 @@ class ControlDispatch(object):
         except KeyError:
             self.logger.error("No such control command: %s" % command)
         else:
-            # need to make sure keyword arguments are not in unicode
-            # this should be fixed in newer Python's
-            # (see: http://bugs.python.org/issue4978)
-            kwargs = dict((k.encode("utf8"), v)
-                            for k, v in kwargs.iteritems())
-            reply = control(self.panel, **kwargs)
+            reply = control(self.panel, **kwdict(kwargs))
             if reply_to:
                 self.reply({self.hostname: reply},
                            exchange=reply_to["exchange"],

+ 10 - 12
celery/worker/job.py

@@ -12,7 +12,7 @@ import warnings
 from celery import conf
 from celery import platform
 from celery.log import get_default_logger
-from celery.utils import noop, fun_takes_kwargs
+from celery.utils import noop, kwdict, fun_takes_kwargs
 from celery.utils.mail import mail_admins
 from celery.worker.revoke import revoked
 from celery.loaders import current_loader
@@ -195,7 +195,7 @@ class TaskRequest(object):
     time_start = None
 
     def __init__(self, task_name, task_id, args, kwargs,
-            on_ack=noop, retries=0, delivery_info=None, **opts):
+            on_ack=noop, retries=0, delivery_info=None, hostname=None, **opts):
         self.task_name = task_name
         self.task_id = task_id
         self.retries = retries
@@ -204,6 +204,7 @@ class TaskRequest(object):
         self.on_ack = on_ack
         self.delivery_info = delivery_info or {}
         self.task = tasks[self.task_name]
+        self.hostname = hostname or socket.gethostname()
         self._already_revoked = False
 
         for opt in ("success_msg", "fail_msg", "fail_email_subject",
@@ -232,7 +233,8 @@ class TaskRequest(object):
         return False
 
     @classmethod
-    def from_message(cls, message, message_data, logger=None, eventer=None):
+    def from_message(cls, message, message_data, logger=None, eventer=None,
+            hostname=None):
         """Create a :class:`TaskRequest` from a task message sent by
         :class:`celery.messaging.TaskPublisher`.
 
@@ -256,14 +258,10 @@ class TaskRequest(object):
         if not hasattr(kwargs, "items"):
             raise InvalidTaskError("Task kwargs must be a dictionary.")
 
-        # Convert any unicode keys in the keyword arguments to ascii.
-        kwargs = dict((key.encode("utf-8"), value)
-                        for key, value in kwargs.items())
-
-        return cls(task_name, task_id, args, kwargs,
-                    retries=retries, on_ack=message.ack,
-                    delivery_info=delivery_info,
-                    logger=logger, eventer=eventer)
+        return cls(task_name, task_id, args, kwdict(kwargs),
+                   retries=retries, on_ack=message.ack,
+                   delivery_info=delivery_info, logger=logger,
+                   eventer=eventer, hostname=hostname)
 
     def extend_with_default_kwargs(self, loglevel, logfile):
         """Extend the tasks keyword arguments with standard task arguments.
@@ -401,7 +399,7 @@ class TaskRequest(object):
                                        traceback=exc_info.traceback)
 
         context = {
-            "hostname": socket.gethostname(),
+            "hostname": self.hostname,
             "id": self.task_id,
             "name": self.task_name,
             "exc": repr(exc_info.exception),

+ 1 - 0
celery/worker/listener.py

@@ -288,6 +288,7 @@ class CarrotListener(object):
             try:
                 task = TaskRequest.from_message(message, message_data,
                                                 logger=self.logger,
+                                                hostname=self.hostname,
                                                 eventer=self.event_dispatcher)
             except NotRegistered, exc:
                 self.logger.error("Unknown task ignored: %s: %s" % (