shell.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. """The ``celery shell`` program, used to start a REPL."""
  2. from __future__ import absolute_import, unicode_literals
  3. import os
  4. import sys
  5. from importlib import import_module
  6. from celery.bin.base import Command
  7. from celery.five import values
  8. class shell(Command): # pragma: no cover
  9. """Start shell session with convenient access to celery symbols.
  10. The following symbols will be added to the main globals:
  11. - ``celery``: the current application.
  12. - ``chord``, ``group``, ``chain``, ``chunks``,
  13. ``xmap``, ``xstarmap`` ``subtask``, ``Task``
  14. - all registered tasks.
  15. """
  16. def add_arguments(self, parser):
  17. group = parser.add_argument_group('Shell Options')
  18. group.add_argument(
  19. '--ipython', '-I',
  20. action='store_true', help='force iPython.', default=False,
  21. )
  22. group.add_argument(
  23. '--bpython', '-B',
  24. action='store_true', help='force bpython.', default=False,
  25. )
  26. group.add_argument(
  27. '--python',
  28. action='store_true', default=False,
  29. help='force default Python shell.',
  30. )
  31. group.add_argument(
  32. '--without-tasks', '-T',
  33. action='store_true', default=False,
  34. help="don't add tasks to locals.",
  35. )
  36. group.add_argument(
  37. '--eventlet',
  38. action='store_true', default=False,
  39. help='use eventlet.',
  40. )
  41. group.add_argument(
  42. '--gevent', action='store_true', default=False,
  43. help='use gevent.',
  44. )
  45. def run(self, *args, **kwargs):
  46. if args:
  47. raise self.UsageError(
  48. 'shell command does not take arguments: {0}'.format(args))
  49. return self._run(**kwargs)
  50. def _run(self, ipython=False, bpython=False,
  51. python=False, without_tasks=False, eventlet=False,
  52. gevent=False, **kwargs):
  53. sys.path.insert(0, os.getcwd())
  54. if eventlet:
  55. import_module('celery.concurrency.eventlet')
  56. if gevent:
  57. import_module('celery.concurrency.gevent')
  58. import celery
  59. import celery.task.base
  60. self.app.loader.import_default_modules()
  61. # pylint: disable=attribute-defined-outside-init
  62. self.locals = {
  63. 'app': self.app,
  64. 'celery': self.app,
  65. 'Task': celery.Task,
  66. 'chord': celery.chord,
  67. 'group': celery.group,
  68. 'chain': celery.chain,
  69. 'chunks': celery.chunks,
  70. 'xmap': celery.xmap,
  71. 'xstarmap': celery.xstarmap,
  72. 'subtask': celery.subtask,
  73. 'signature': celery.signature,
  74. }
  75. if not without_tasks:
  76. self.locals.update({
  77. task.__name__: task for task in values(self.app.tasks)
  78. if not task.name.startswith('celery.')
  79. })
  80. if python:
  81. return self.invoke_fallback_shell()
  82. elif bpython:
  83. return self.invoke_bpython_shell()
  84. elif ipython:
  85. return self.invoke_ipython_shell()
  86. return self.invoke_default_shell()
  87. def invoke_default_shell(self):
  88. try:
  89. import IPython # noqa
  90. except ImportError:
  91. try:
  92. import bpython # noqa
  93. except ImportError:
  94. return self.invoke_fallback_shell()
  95. else:
  96. return self.invoke_bpython_shell()
  97. else:
  98. return self.invoke_ipython_shell()
  99. def invoke_fallback_shell(self):
  100. import code
  101. try:
  102. import readline
  103. except ImportError:
  104. pass
  105. else:
  106. import rlcompleter
  107. readline.set_completer(
  108. rlcompleter.Completer(self.locals).complete)
  109. readline.parse_and_bind('tab:complete')
  110. code.interact(local=self.locals)
  111. def invoke_ipython_shell(self):
  112. for ip in (self._ipython, self._ipython_pre_10,
  113. self._ipython_terminal, self._ipython_010,
  114. self._no_ipython):
  115. try:
  116. return ip()
  117. except ImportError:
  118. pass
  119. def _ipython(self):
  120. from IPython import start_ipython
  121. start_ipython(argv=[], user_ns=self.locals)
  122. def _ipython_pre_10(self): # pragma: no cover
  123. from IPython.frontend.terminal.ipapp import TerminalIPythonApp
  124. app = TerminalIPythonApp.instance()
  125. app.initialize(argv=[])
  126. app.shell.user_ns.update(self.locals)
  127. app.start()
  128. def _ipython_terminal(self): # pragma: no cover
  129. from IPython.terminal import embed
  130. embed.TerminalInteractiveShell(user_ns=self.locals).mainloop()
  131. def _ipython_010(self): # pragma: no cover
  132. from IPython.Shell import IPShell
  133. IPShell(argv=[], user_ns=self.locals).mainloop()
  134. def _no_ipython(self): # pragma: no cover
  135. raise ImportError('no suitable ipython found')
  136. def invoke_bpython_shell(self):
  137. import bpython
  138. bpython.embed(self.locals)