Преглед на файлове

Ability to use one log file per child process using format keys %i/%I

Starting from 3.2 there will be one log file per process, but that would be
useful even in this verison and is very hard to accomplish by configuration,
so this patch introduces a new format specifier for the logfile name.

Note that the numbers will stay within the process limit even if processes exit or
if autoscale/maxtasksperchild/time limits are used.  I.e. the number is the
*process index* not the process count or pid.

The new format specifiers are:

* ``%i`` -  Pool process index or 0 if MainProcess.

    With ``-n worker1@example.com -c2 -f %n-%i.log`` this will result in
    three logfiles:

        - ``worker1-0.log`` (main process)
        - ``worker1-1.log`` (pool process 1)
        - ``worker1-2.log`` (pool process 2)

* ``%I`` -  Pool process index with separator.

    With ``-n worker1@example.com -c2 -f %n%i.log` this will result in
    three logfiles:

        - ``worker1.log`` (main process)
        - ``worker1-1.log`` (pool process 1)
        - ``worker1-2.log`` (pool process 2)
Ask Solem преди 11 години
родител
ревизия
3eafba5a49
променени са 9 файла, в които са добавени 83 реда и са изтрити 36 реда
  1. 12 7
      celery/app/log.py
  2. 1 1
      celery/apps/worker.py
  3. 5 17
      celery/bin/base.py
  4. 2 2
      celery/bin/worker.py
  5. 7 2
      celery/concurrency/prefork.py
  6. 5 5
      celery/tests/bin/test_base.py
  7. 40 0
      celery/utils/__init__.py
  8. 6 0
      celery/utils/log.py
  9. 5 2
      funtests/stress/stress/app.py

+ 12 - 7
celery/app/log.py

@@ -24,7 +24,7 @@ from kombu.utils.encoding import set_default_encoding_file
 from celery import signals
 from celery._state import get_current_task
 from celery.five import class_property, string_t
-from celery.utils import isatty
+from celery.utils import isatty, node_format
 from celery.utils.log import (
     get_logger, mlevel,
     ColorFormatter, ensure_process_aware_logger,
@@ -65,9 +65,9 @@ class Logging(object):
         self.colorize = self.app.conf.CELERYD_LOG_COLOR
 
     def setup(self, loglevel=None, logfile=None, redirect_stdouts=False,
-              redirect_level='WARNING', colorize=None):
+              redirect_level='WARNING', colorize=None, hostname=None):
         handled = self.setup_logging_subsystem(
-            loglevel, logfile, colorize=colorize,
+            loglevel, logfile, colorize=colorize, hostname=hostname,
         )
         if not handled:
             if redirect_stdouts:
@@ -87,10 +87,12 @@ class Logging(object):
             CELERY_LOG_REDIRECT_LEVEL=str(loglevel or ''),
         )
 
-    def setup_logging_subsystem(self, loglevel=None, logfile=None,
-                                format=None, colorize=None, **kwargs):
+    def setup_logging_subsystem(self, loglevel=None, logfile=None, format=None,
+                                colorize=None, hostname=None, **kwargs):
         if self.already_setup:
             return
+        if logfile and hostname:
+            logfile = node_format(logfile, hostname)
         self.already_setup = True
         loglevel = mlevel(loglevel or self.loglevel)
         format = format or self.format
@@ -107,6 +109,9 @@ class Logging(object):
 
             if self.app.conf.CELERYD_HIJACK_ROOT_LOGGER:
                 root.handlers = []
+                get_logger('celery').handlers = []
+                get_logger('celery.task').handlers = []
+                get_logger('celery.redirected').handlers = []
 
             # Configure root logger
             self._configure_logger(
@@ -228,8 +233,8 @@ class Logging(object):
         return WatchedFileHandler(logfile)
 
     def _has_handler(self, logger):
-        return (logger.handlers and
-                not isinstance(logger.handlers[0], NullHandler))
+        if logger.handlers:
+            return any(not isinstance(h, NullHandler) for h in logger.handlers)
 
     def _is_configured(self, logger):
         return self._has_handler(logger) and not getattr(

+ 1 - 1
celery/apps/worker.py

@@ -181,7 +181,7 @@ class Worker(WorkController):
             colorize = not self.no_color
         return self.app.log.setup(
             self.loglevel, self.logfile,
-            redirect_stdouts=False, colorize=colorize,
+            redirect_stdouts=False, colorize=colorize, hostname=self.hostname,
         )
 
     def purge_messages(self):

+ 5 - 17
celery/bin/base.py

@@ -68,7 +68,6 @@ from __future__ import absolute_import, print_function, unicode_literals
 import os
 import random
 import re
-import socket
 import sys
 import warnings
 import json
@@ -86,7 +85,7 @@ from celery.five import items, string, string_t
 from celery.platforms import EX_FAILURE, EX_OK, EX_USAGE
 from celery.utils import term
 from celery.utils import text
-from celery.utils import NODENAME_DEFAULT, nodesplit
+from celery.utils import node_format, host_format
 from celery.utils.imports import symbol_by_name, import_from_cwd
 
 try:
@@ -106,7 +105,6 @@ Try --help?
 
 find_long_opt = re.compile(r'.+?(--.+?)(?:\s|,|$)')
 find_rst_ref = re.compile(r':\w+:`(.+?)`')
-find_sformat = re.compile(r'%(\w)')
 
 __all__ = ['Error', 'UsageError', 'Extensions', 'HelpFormatter',
            'Command', 'Option', 'daemon_options']
@@ -566,20 +564,10 @@ class Command(object):
         pass
 
     def node_format(self, s, nodename, **extra):
-        name, host = nodesplit(nodename)
-        return self._simple_format(
-            s, host, n=name or NODENAME_DEFAULT, **extra)
-
-    def simple_format(self, s, **extra):
-        return self._simple_format(s, socket.gethostname(), **extra)
-
-    def _simple_format(self, s, host,
-                       match=find_sformat, expand=r'\1', **keys):
-        if s:
-            name, _, domain = host.partition('.')
-            keys = dict({'%': '%', 'h': host, 'n': name, 'd': domain}, **keys)
-            return match.sub(lambda m: keys[m.expand(expand)], s)
-        return s
+        return node_format(s, nodename, **extra)
+
+    def host_format(self, s, **extra):
+        return host_format(s, **extra)
 
     def _get_default_app(self, *args, **kwargs):
         from celery._state import get_current_app

+ 2 - 2
celery/bin/worker.py

@@ -192,7 +192,7 @@ class worker(Command):
         if self.app.IS_WINDOWS and kwargs.get('beat'):
             self.die('-B option does not work on Windows.  '
                      'Please run celery beat as a separate service.')
-        hostname = self.simple_format(default_nodename(hostname))
+        hostname = self.host_format(default_nodename(hostname))
         if loglevel:
             try:
                 loglevel = mlevel(loglevel)
@@ -203,7 +203,7 @@ class worker(Command):
 
         return self.app.Worker(
             hostname=hostname, pool_cls=pool_cls, loglevel=loglevel,
-            logfile=self.node_format(logfile, hostname),
+            logfile=logfile,  # node format handled by celery.app.log.setup
             pidfile=self.node_format(pidfile, hostname),
             state_db=self.node_format(state_db, hostname), **kwargs
         ).start()

+ 7 - 2
celery/concurrency/prefork.py

@@ -57,10 +57,15 @@ def process_initializer(app, hostname):
     # run once per process.
     app.loader.init_worker()
     app.loader.init_worker_process()
+    logfile = os.environ.get('CELERY_LOG_FILE') or None
+    if logfile and '%i' in logfile.lower():
+        # logfile path will differ so need to set up logging again.
+        app.log.already_setup = False
     app.log.setup(int(os.environ.get('CELERY_LOG_LEVEL', 0) or 0),
-                  os.environ.get('CELERY_LOG_FILE') or None,
+                  logfile,
                   bool(os.environ.get('CELERY_LOG_REDIRECT', False)),
-                  str(os.environ.get('CELERY_LOG_REDIRECT_LEVEL')))
+                  str(os.environ.get('CELERY_LOG_REDIRECT_LEVEL')),
+                  hostname=hostname)
     if os.environ.get('FORKED_BY_MULTIPROCESSING'):
         # pool did execv after fork
         trace.setup_worker_optimizations(app)

+ 5 - 5
celery/tests/bin/test_base.py

@@ -241,21 +241,21 @@ class test_Command(AppCase):
         with self.assertRaises(AttributeError):
             cmd.find_app(__name__)
 
-    def test_simple_format(self):
+    def test_host_format(self):
         cmd = MockCommand(app=self.app)
         with patch('socket.gethostname') as hn:
             hn.return_value = 'blacktron.example.com'
-            self.assertEqual(cmd.simple_format(''), '')
+            self.assertEqual(cmd.host_format(''), '')
             self.assertEqual(
-                cmd.simple_format('celery@%h'),
+                cmd.host_format('celery@%h'),
                 'celery@blacktron.example.com',
             )
             self.assertEqual(
-                cmd.simple_format('celery@%d'),
+                cmd.host_format('celery@%d'),
                 'celery@example.com',
             )
             self.assertEqual(
-                cmd.simple_format('celery@%n'),
+                cmd.host_format('celery@%n'),
                 'celery@blacktron',
             )
 

+ 40 - 0
celery/utils/__init__.py

@@ -10,12 +10,14 @@ from __future__ import absolute_import, print_function
 
 import numbers
 import os
+import re
 import socket
 import sys
 import traceback
 import warnings
 import datetime
 
+from collections import Callable
 from functools import partial, wraps
 from inspect import getargspec
 from pprint import pprint
@@ -60,6 +62,7 @@ WORKER_DIRECT_QUEUE_FORMAT = '{hostname}.dq'
 NODENAME_SEP = '@'
 
 NODENAME_DEFAULT = 'celery'
+RE_FORMAT = re.compile(r'%(\w)')
 
 
 def worker_direct(hostname):
@@ -345,6 +348,43 @@ def default_nodename(hostname):
     return nodename(name or NODENAME_DEFAULT, host or socket.gethostname())
 
 
+def node_format(s, nodename, **extra):
+    name, host = nodesplit(nodename)
+    return host_format(
+        s, host, n=name or NODENAME_DEFAULT, **extra)
+
+
+def _fmt_process_index(prefix='', default='0'):
+    from .log import current_process_index
+    index = current_process_index()
+    return '{0}{1}'.format(prefix, index) if index else default
+_fmt_process_index_with_prefix = partial(_fmt_process_index, '-', '')
+
+
+def host_format(s, host=None, **extra):
+    host = host or socket.gethostname()
+    name, _, domain = host.partition('.')
+    keys = dict({
+        'h': host, 'n': name, 'd': domain,
+        'i': _fmt_process_index, 'I': _fmt_process_index_with_prefix,
+    }, **extra)
+    return simple_format(s, keys)
+
+
+def simple_format(s, keys, pattern=RE_FORMAT, expand=r'\1'):
+    if s:
+        keys.setdefault('%', '%')
+
+        def resolve(match):
+            resolver = keys[match.expand(expand)]
+            if isinstance(resolver, Callable):
+                return resolver()
+            return resolver
+
+        return pattern.sub(resolve, s)
+    return s
+
+
 # ------------------------------------------------------------------------ #
 # > XXX Compat
 from .log import LOG_LEVELS     # noqa

+ 6 - 0
celery/utils/log.py

@@ -282,4 +282,10 @@ def get_multiprocessing_logger():
 def reset_multiprocessing_logger():
     if mputil and hasattr(mputil, '_logger'):
         mputil._logger = None
+
+
+def current_process_index(base=1):
+    if current_process:
+        index = getattr(current_process(), 'index', None)
+        return index + base if index is not None else index
 ensure_process_aware_logger()

+ 5 - 2
funtests/stress/stress/app.py

@@ -11,9 +11,12 @@ from celery import Celery
 from celery import signals
 from celery.bin.base import Option
 from celery.exceptions import SoftTimeLimitExceeded
+from celery.utils.log import get_task_logger
 
 from .templates import use_template, template_names
 
+logger = get_task_logger(__name__)
+
 
 class App(Celery):
     template_selected = False
@@ -130,8 +133,8 @@ def raising(exc=KeyError()):
 
 
 @app.task
-def logs(msg):
-    print(msg)
+def logs(msg, p=False):
+    print(msg) if p else logger.info(msg)
 
 
 def marker(s, sep='-'):