Browse Source

Some small optimizations

Ask Solem 13 years ago
parent
commit
cf522ee855
5 changed files with 51 additions and 28 deletions
  1. 3 3
      celery/concurrency/base.py
  2. 17 9
      celery/platforms.py
  3. 5 1
      celery/worker/consumer.py
  4. 20 12
      celery/worker/job.py
  5. 6 3
      celery/worker/mediator.py

+ 3 - 3
celery/concurrency/base.py

@@ -16,7 +16,7 @@ from ..utils.encoding import safe_repr
 
 
 def apply_target(target, args=(), kwargs={}, callback=None,
-        accept_callback=None, pid=None):
+        accept_callback=None, pid=None, **_):
     if accept_callback:
         accept_callback(pid or os.getpid(), time.time())
     callback(target(*args, **kwargs))
@@ -41,7 +41,7 @@ class BasePool(object):
         self.putlocks = putlocks
         self.logger = logger or log.get_default_logger()
         self.options = options
-        self.does_debug = self.logger.isEnabledFor(logging.DEBUG)
+        self._does_debug = self.logger.isEnabledFor(logging.DEBUG)
 
     def on_start(self):
         pass
@@ -87,7 +87,7 @@ class BasePool(object):
         on_ready = partial(self.on_ready, callback, errback)
         on_worker_error = partial(self.on_worker_error, errback)
 
-        if self.does_debug:
+        if self._does_debug:
             self.logger.debug("TaskPool: Apply %s (args:%s kwargs:%s)",
                             target, safe_repr(args), safe_repr(kwargs))
 

+ 17 - 9
celery/platforms.py

@@ -21,11 +21,18 @@ import sys
 
 from .local import try_import
 
+from kombu.utils.limits import TokenBucket
+
 _setproctitle = try_import("setproctitle")
 resource = try_import("resource")
 pwd = try_import("pwd")
 grp = try_import("grp")
 
+try:
+    from multiprocessing.process import current_process
+except ImportError:
+    current_process = None  # noqa
+
 SYSTEM = _platform.system()
 IS_OSX = SYSTEM == "Darwin"
 IS_WINDOWS = SYSTEM == "Windows"
@@ -34,6 +41,8 @@ DAEMON_UMASK = 0
 DAEMON_WORKDIR = "/"
 DAEMON_REDIRECT_TO = getattr(os, "devnull", "/dev/null")
 
+_setps_bucket = TokenBucket(0.5)  # 30/m, every 2 seconds
+
 
 def pyimplementation():
     if hasattr(_platform, "python_implementation"):
@@ -558,15 +567,14 @@ def set_mp_process_title(progname, info=None, hostname=None):
     Only works if :mod:`setproctitle` is installed.
 
     """
-    if hostname:
-        progname = "%s@%s" % (progname, hostname.split(".")[0])
-    try:
-        from multiprocessing.process import current_process
-    except ImportError:
-        return set_process_title(progname, info=info)
-    else:
-        return set_process_title("%s:%s" % (progname,
-                                            current_process().name), info=info)
+    if _setps_bucket.can_consume(1):
+        if hostname:
+            progname = "%s@%s" % (progname, hostname.split(".")[0])
+        if current_process is not None:
+            return set_process_title("%s:%s" % (progname,
+                                                current_process().name), info=info)
+        else:
+            return set_process_title(progname, info=info)
 
 
 def shellsplit(s, posix=True):

+ 5 - 1
celery/worker/consumer.py

@@ -76,6 +76,7 @@ up and running.
 from __future__ import absolute_import
 from __future__ import with_statement
 
+import logging
 import socket
 import sys
 import threading
@@ -293,6 +294,8 @@ class Consumer(object):
         self.connection_errors = conninfo.connection_errors
         self.channel_errors = conninfo.channel_errors
 
+        self._does_info = self.logger.isEnabledFor(logging.INFO)
+
     def start(self):
         """Start the consumer.
 
@@ -341,7 +344,8 @@ class Consumer(object):
         if task.revoked():
             return
 
-        self.logger.info("Got task from broker: %s", task.shortinfo())
+        if self._does_info:
+            self.logger.info("Got task from broker: %s", task.shortinfo())
 
         if self.event_dispatcher.enabled:
             self.event_dispatcher.send("task-received", uuid=task.task_id,

+ 20 - 12
celery/worker/job.py

@@ -12,6 +12,7 @@
 """
 from __future__ import absolute_import
 
+import logging
 import os
 import sys
 import time
@@ -272,6 +273,10 @@ class TaskRequest(object):
         if self.expires is not None:
             self.expires = timezone.to_local(self.expires, self.tzlocal, tz)
 
+        # shortcuts
+        self._does_debug = self.logger.isEnabledFor(logging.DEBUG)
+        self._does_info = self.logger.isEnabledFor(logging.INFO)
+
     @classmethod
     def from_message(cls, message, body, on_ack=noop, **kw):
         """Create request from a task message.
@@ -435,8 +440,9 @@ class TaskRequest(object):
         if not self.task.acks_late:
             self.acknowledge()
         self.send_event("task-started", uuid=self.task_id, pid=pid)
-        self.logger.debug("Task accepted: %s[%s] pid:%r",
-                          self.task_name, self.task_id, pid)
+        if self._does_debug:
+            self.logger.debug("Task accepted: %s[%s] pid:%r",
+                              self.task_name, self.task_id, pid)
         if self._terminate_on_ack is not None:
             _, pool, signal = self._terminate_on_ack
             self.terminate(pool, signal)
@@ -467,11 +473,12 @@ class TaskRequest(object):
         self.send_event("task-succeeded", uuid=self.task_id,
                         result=safe_repr(ret_value), runtime=runtime)
 
-        self.logger.info(self.success_msg.strip(),
-                         {"id": self.task_id,
-                          "name": self.task_name,
-                          "return_value": self.repr_result(ret_value),
-                          "runtime": runtime})
+        if self._does_info:
+            self.logger.info(self.success_msg.strip(),
+                            {"id": self.task_id,
+                             "name": self.task_name,
+                             "return_value": self.repr_result(ret_value),
+                             "runtime": runtime})
 
     def on_retry(self, exc_info):
         """Handler called if the task should be retried."""
@@ -479,11 +486,12 @@ class TaskRequest(object):
                          exception=safe_repr(exc_info.exception.exc),
                          traceback=safe_str(exc_info.traceback))
 
-        self.logger.info(self.retry_msg.strip(),
-                         {"id": self.task_id,
-                         "name": self.task_name,
-                         "exc": safe_repr(exc_info.exception.exc)},
-                         exc_info=exc_info)
+        if self._does_info:
+            self.logger.info(self.retry_msg.strip(),
+                            {"id": self.task_id,
+                             "name": self.task_name,
+                             "exc": safe_repr(exc_info.exception.exc)},
+                            exc_info=exc_info)
 
     def on_failure(self, exc_info):
         """Handler called if the task raised an exception."""

+ 6 - 3
celery/worker/mediator.py

@@ -18,6 +18,7 @@
 """
 from __future__ import absolute_import
 
+import logging
 import sys
 import traceback
 
@@ -40,6 +41,7 @@ class Mediator(bgThread):
         self.logger = logger or self.app.log.get_default_logger()
         self.ready_queue = ready_queue
         self.callback = callback
+        self._does_debug = self.logger.isEnabledFor(logging.DEBUG)
         super(Mediator, self).__init__()
 
     def body(self):
@@ -51,9 +53,10 @@ class Mediator(bgThread):
         if task.revoked():
             return
 
-        self.logger.debug(
-            "Mediator: Running callback for task: %s[%s]" % (
-                task.task_name, task.task_id))
+        if self._does_debug:
+            self.logger.debug(
+                "Mediator: Running callback for task: %s[%s]" % (
+                    task.task_name, task.task_id))
 
         try:
             self.callback(task)