imports.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. # -*- coding: utf-8 -*-
  2. """Utilities related to importing modules and symbols by name."""
  3. from __future__ import absolute_import, unicode_literals
  4. import imp as _imp
  5. import importlib
  6. import os
  7. import sys
  8. from contextlib import contextmanager
  9. from imp import reload
  10. from kombu.utils import symbol_by_name
  11. #: Billiard sets this when execv is enabled.
  12. #: We use it to find out the name of the original ``__main__``
  13. #: module, so that we can properly rewrite the name of the
  14. #: task to be that of ``App.main``.
  15. MP_MAIN_FILE = os.environ.get('MP_MAIN_FILE')
  16. __all__ = [
  17. 'NotAPackage', 'qualname', 'instantiate', 'symbol_by_name',
  18. 'cwd_in_path', 'find_module', 'import_from_cwd',
  19. 'reload_from_cwd', 'module_file', 'gen_task_name',
  20. ]
  21. class NotAPackage(Exception):
  22. pass
  23. if sys.version_info > (3, 3): # pragma: no cover
  24. def qualname(obj):
  25. if not hasattr(obj, '__name__') and hasattr(obj, '__class__'):
  26. obj = obj.__class__
  27. q = getattr(obj, '__qualname__', None)
  28. if '.' not in q:
  29. q = '.'.join((obj.__module__, q))
  30. return q
  31. else:
  32. def qualname(obj): # noqa
  33. if not hasattr(obj, '__name__') and hasattr(obj, '__class__'):
  34. obj = obj.__class__
  35. return '.'.join((obj.__module__, obj.__name__))
  36. def instantiate(name, *args, **kwargs):
  37. """Instantiate class by name.
  38. See :func:`symbol_by_name`.
  39. """
  40. return symbol_by_name(name)(*args, **kwargs)
  41. @contextmanager
  42. def cwd_in_path():
  43. cwd = os.getcwd()
  44. if cwd in sys.path:
  45. yield
  46. else:
  47. sys.path.insert(0, cwd)
  48. try:
  49. yield cwd
  50. finally:
  51. try:
  52. sys.path.remove(cwd)
  53. except ValueError: # pragma: no cover
  54. pass
  55. def find_module(module, path=None, imp=None):
  56. """Version of :func:`imp.find_module` supporting dots."""
  57. if imp is None:
  58. imp = importlib.import_module
  59. with cwd_in_path():
  60. if '.' in module:
  61. last = None
  62. parts = module.split('.')
  63. for i, part in enumerate(parts[:-1]):
  64. mpart = imp('.'.join(parts[:i + 1]))
  65. try:
  66. path = mpart.__path__
  67. except AttributeError:
  68. raise NotAPackage(module)
  69. last = _imp.find_module(parts[i + 1], path)
  70. return last
  71. return _imp.find_module(module)
  72. def import_from_cwd(module, imp=None, package=None):
  73. """Import module, but make sure it finds modules
  74. located in the current directory.
  75. Modules located in the current directory has
  76. precedence over modules located in `sys.path`.
  77. """
  78. if imp is None:
  79. imp = importlib.import_module
  80. with cwd_in_path():
  81. return imp(module, package=package)
  82. def reload_from_cwd(module, reloader=None):
  83. if reloader is None:
  84. reloader = reload
  85. with cwd_in_path():
  86. return reloader(module)
  87. def module_file(module):
  88. """Return the correct original file name of a module."""
  89. name = module.__file__
  90. return name[:-1] if name.endswith('.pyc') else name
  91. def gen_task_name(app, name, module_name):
  92. """Generate task name from name/module pair."""
  93. module_name = module_name or '__main__'
  94. try:
  95. module = sys.modules[module_name]
  96. except KeyError:
  97. # Fix for manage.py shell_plus (Issue #366)
  98. module = None
  99. if module is not None:
  100. module_name = module.__name__
  101. # - If the task module is used as the __main__ script
  102. # - we need to rewrite the module part of the task name
  103. # - to match App.main.
  104. if MP_MAIN_FILE and module.__file__ == MP_MAIN_FILE:
  105. # - see comment about :envvar:`MP_MAIN_FILE` above.
  106. module_name = '__main__'
  107. if module_name == '__main__' and app.main:
  108. return '.'.join([app.main, name])
  109. return '.'.join(p for p in (module_name, name) if p)