utils.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376
  1. # -*- coding: utf-8 -*-
  2. """App utilities: Compat settings, bug-report tool, pickling apps."""
  3. from __future__ import absolute_import, unicode_literals
  4. import os
  5. import platform as _platform
  6. import re
  7. from collections import Mapping, namedtuple
  8. from copy import deepcopy
  9. from types import ModuleType
  10. from kombu.utils.url import maybe_sanitize_url
  11. from celery.exceptions import ImproperlyConfigured
  12. from celery.five import items, keys, string_t, values
  13. from celery.platforms import pyimplementation
  14. from celery.utils.collections import ConfigurationView
  15. from celery.utils.text import pretty
  16. from celery.utils.imports import import_from_cwd, symbol_by_name, qualname
  17. from .defaults import (
  18. _TO_NEW_KEY, _TO_OLD_KEY, _OLD_DEFAULTS, _OLD_SETTING_KEYS,
  19. DEFAULTS, SETTING_KEYS, find,
  20. )
  21. __all__ = [
  22. 'Settings', 'appstr', 'bugreport',
  23. 'filter_hidden_settings', 'find_app',
  24. ]
  25. #: Format used to generate bug-report information.
  26. BUGREPORT_INFO = """
  27. software -> celery:{celery_v} kombu:{kombu_v} py:{py_v}
  28. billiard:{billiard_v} {driver_v}
  29. platform -> system:{system} arch:{arch} imp:{py_i}
  30. loader -> {loader}
  31. settings -> transport:{transport} results:{results}
  32. {human_settings}
  33. """
  34. HIDDEN_SETTINGS = re.compile(
  35. 'API|TOKEN|KEY|SECRET|PASS|PROFANITIES_LIST|SIGNATURE|DATABASE',
  36. re.IGNORECASE,
  37. )
  38. E_MIX_OLD_INTO_NEW = """
  39. Cannot mix new and old setting keys, please rename the
  40. following settings to the new format:
  41. {renames}
  42. """
  43. E_MIX_NEW_INTO_OLD = """
  44. Cannot mix new setting names with old setting names, please
  45. rename the following settings to use the old format:
  46. {renames}
  47. Or change all of the settings to use the new format :)
  48. """
  49. FMT_REPLACE_SETTING = '{replace:<36} -> {with_}'
  50. def appstr(app):
  51. """String used in __repr__ etc, to id app instances."""
  52. return '{0}:{1:#x}'.format(app.main or '__main__', id(app))
  53. class Settings(ConfigurationView):
  54. """Celery settings object.
  55. .. seealso:
  56. :ref:`configuration` for a full list of configuration keys.
  57. """
  58. @property
  59. def broker_read_url(self):
  60. return (
  61. os.environ.get('CELERY_BROKER_READ_URL') or
  62. self.get('broker_read_url') or
  63. self.broker_url
  64. )
  65. @property
  66. def broker_write_url(self):
  67. return (
  68. os.environ.get('CELERY_BROKER_WRITE_URL') or
  69. self.get('broker_write_url') or
  70. self.broker_url
  71. )
  72. @property
  73. def broker_url(self):
  74. return (
  75. os.environ.get('CELERY_BROKER_URL') or
  76. self.first('broker_url', 'broker_host')
  77. )
  78. @property
  79. def task_default_exchange(self):
  80. return self.first(
  81. 'task_default_exchange',
  82. 'task_default_queue',
  83. )
  84. @property
  85. def task_default_routing_key(self):
  86. return self.first(
  87. 'task_default_routing_key',
  88. 'task_default_queue',
  89. )
  90. @property
  91. def timezone(self):
  92. # this way we also support django's time zone.
  93. return self.first('timezone', 'time_zone')
  94. def without_defaults(self):
  95. """Return the current configuration, but without defaults."""
  96. # the last stash is the default settings, so just skip that
  97. return Settings({}, self.maps[:-1])
  98. def value_set_for(self, key):
  99. return key in self.without_defaults()
  100. def find_option(self, name, namespace=''):
  101. """Search for option by name.
  102. Example:
  103. >>> from proj.celery import app
  104. >>> app.conf.find_option('disable_rate_limits')
  105. ('worker', 'prefetch_multiplier',
  106. <Option: type->bool default->False>))
  107. Arguments:
  108. name (str): Name of option, cannot be partial.
  109. namespace (str): Preferred name-space (``None`` by default).
  110. Returns:
  111. Tuple: of ``(namespace, key, type)``.
  112. """
  113. return find(name, namespace)
  114. def find_value_for_key(self, name, namespace='celery'):
  115. """Shortcut to ``get_by_parts(*find_option(name)[:-1])``."""
  116. return self.get_by_parts(*self.find_option(name, namespace)[:-1])
  117. def get_by_parts(self, *parts):
  118. """Return the current value for setting specified as a path.
  119. Example:
  120. >>> from proj.celery import app
  121. >>> app.conf.get_by_parts('worker', 'disable_rate_limits')
  122. False
  123. """
  124. return self['_'.join(part for part in parts if part)]
  125. def table(self, with_defaults=False, censored=True):
  126. filt = filter_hidden_settings if censored else lambda v: v
  127. dict_members = dir(dict)
  128. return filt({
  129. k: v for k, v in items(
  130. self if with_defaults else self.without_defaults())
  131. if not k.startswith('_') and k not in dict_members
  132. })
  133. def humanize(self, with_defaults=False, censored=True):
  134. """Return a human readable text showing configuration changes."""
  135. return '\n'.join(
  136. '{0}: {1}'.format(key, pretty(value, width=50))
  137. for key, value in items(self.table(with_defaults, censored)))
  138. def _new_key_to_old(key, convert=_TO_OLD_KEY.get):
  139. return convert(key, key)
  140. def _old_key_to_new(key, convert=_TO_NEW_KEY.get):
  141. return convert(key, key)
  142. _settings_info_t = namedtuple('settings_info_t', (
  143. 'defaults', 'convert', 'key_t', 'mix_error',
  144. ))
  145. _settings_info = _settings_info_t(
  146. DEFAULTS, _TO_NEW_KEY, _old_key_to_new, E_MIX_OLD_INTO_NEW,
  147. )
  148. _old_settings_info = _settings_info_t(
  149. _OLD_DEFAULTS, _TO_OLD_KEY, _new_key_to_old, E_MIX_NEW_INTO_OLD,
  150. )
  151. def detect_settings(conf, preconf={}, ignore_keys=set(), prefix=None,
  152. all_keys=SETTING_KEYS, old_keys=_OLD_SETTING_KEYS):
  153. source = conf
  154. if conf is None:
  155. source, conf = preconf, {}
  156. have = set(keys(source)) - ignore_keys
  157. is_in_new = have.intersection(all_keys)
  158. is_in_old = have.intersection(old_keys)
  159. info = None
  160. if is_in_new:
  161. # have new setting names
  162. info, left = _settings_info, is_in_old
  163. if is_in_old and len(is_in_old) > len(is_in_new):
  164. # Majority of the settings are old.
  165. info, left = _old_settings_info, is_in_new
  166. if is_in_old:
  167. # have old setting names, or a majority of the names are old.
  168. if not info:
  169. info, left = _old_settings_info, is_in_new
  170. if is_in_new and len(is_in_new) > len(is_in_old):
  171. # Majority of the settings are new
  172. info, left = _settings_info, is_in_old
  173. else:
  174. # no settings, just use new format.
  175. info, left = _settings_info, is_in_old
  176. if prefix:
  177. # always use new format if prefix is used.
  178. info, left = _settings_info, set()
  179. # only raise error for keys that the user didn't provide two keys
  180. # for (e.g., both ``result_expires`` and ``CELERY_TASK_RESULT_EXPIRES``).
  181. really_left = {key for key in left if info.convert[key] not in have}
  182. if really_left:
  183. # user is mixing old/new, or new/old settings, give renaming
  184. # suggestions.
  185. raise ImproperlyConfigured(info.mix_error.format(renames='\n'.join(
  186. FMT_REPLACE_SETTING.format(replace=key, with_=info.convert[key])
  187. for key in sorted(really_left)
  188. )))
  189. preconf = {info.convert.get(k, k): v for k, v in items(preconf)}
  190. defaults = dict(deepcopy(info.defaults), **preconf)
  191. return Settings(
  192. preconf, [conf, defaults],
  193. (_old_key_to_new, _new_key_to_old),
  194. prefix=prefix,
  195. )
  196. class AppPickler(object):
  197. """Old application pickler/unpickler (< 3.1)."""
  198. def __call__(self, cls, *args):
  199. kwargs = self.build_kwargs(*args)
  200. app = self.construct(cls, **kwargs)
  201. self.prepare(app, **kwargs)
  202. return app
  203. def prepare(self, app, **kwargs):
  204. app.conf.update(kwargs['changes'])
  205. def build_kwargs(self, *args):
  206. return self.build_standard_kwargs(*args)
  207. def build_standard_kwargs(self, main, changes, loader, backend, amqp,
  208. events, log, control, accept_magic_kwargs,
  209. config_source=None):
  210. return dict(main=main, loader=loader, backend=backend, amqp=amqp,
  211. changes=changes, events=events, log=log, control=control,
  212. set_as_current=False,
  213. config_source=config_source)
  214. def construct(self, cls, **kwargs):
  215. return cls(**kwargs)
  216. def _unpickle_app(cls, pickler, *args):
  217. """Rebuild app for versions 2.5+."""
  218. return pickler()(cls, *args)
  219. def _unpickle_app_v2(cls, kwargs):
  220. """Rebuild app for versions 3.1+."""
  221. kwargs['set_as_current'] = False
  222. return cls(**kwargs)
  223. def filter_hidden_settings(conf):
  224. """Filter sensitive settings."""
  225. def maybe_censor(key, value, mask='*' * 8):
  226. if isinstance(value, Mapping):
  227. return filter_hidden_settings(value)
  228. if isinstance(key, string_t):
  229. if HIDDEN_SETTINGS.search(key):
  230. return mask
  231. elif 'broker_url' in key.lower():
  232. from kombu import Connection
  233. return Connection(value).as_uri(mask=mask)
  234. elif 'backend' in key.lower():
  235. return maybe_sanitize_url(value, mask=mask)
  236. return value
  237. return {k: maybe_censor(k, v) for k, v in items(conf)}
  238. def bugreport(app):
  239. """Return a string containing information useful in bug-reports."""
  240. import billiard
  241. import celery
  242. import kombu
  243. try:
  244. conn = app.connection()
  245. driver_v = '{0}:{1}'.format(conn.transport.driver_name,
  246. conn.transport.driver_version())
  247. transport = conn.transport_cls
  248. except Exception: # pylint: disable=broad-except
  249. transport = driver_v = ''
  250. return BUGREPORT_INFO.format(
  251. system=_platform.system(),
  252. arch=', '.join(x for x in _platform.architecture() if x),
  253. py_i=pyimplementation(),
  254. celery_v=celery.VERSION_BANNER,
  255. kombu_v=kombu.__version__,
  256. billiard_v=billiard.__version__,
  257. py_v=_platform.python_version(),
  258. driver_v=driver_v,
  259. transport=transport,
  260. results=maybe_sanitize_url(app.conf.result_backend or 'disabled'),
  261. human_settings=app.conf.humanize(),
  262. loader=qualname(app.loader.__class__),
  263. )
  264. def find_app(app, symbol_by_name=symbol_by_name, imp=import_from_cwd):
  265. """Find app by name."""
  266. from .base import Celery
  267. try:
  268. sym = symbol_by_name(app, imp=imp)
  269. except AttributeError:
  270. # last part was not an attribute, but a module
  271. sym = imp(app)
  272. if isinstance(sym, ModuleType) and ':' not in app:
  273. try:
  274. found = sym.app
  275. if isinstance(found, ModuleType):
  276. raise AttributeError()
  277. except AttributeError:
  278. try:
  279. found = sym.celery
  280. if isinstance(found, ModuleType):
  281. raise AttributeError()
  282. except AttributeError:
  283. if getattr(sym, '__path__', None):
  284. try:
  285. return find_app(
  286. '{0}.celery'.format(app),
  287. symbol_by_name=symbol_by_name, imp=imp,
  288. )
  289. except ImportError:
  290. pass
  291. for suspect in values(vars(sym)):
  292. if isinstance(suspect, Celery):
  293. return suspect
  294. raise
  295. else:
  296. return found
  297. else:
  298. return found
  299. return sym