celery.py 37 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202
  1. # -*- coding: utf-8 -*-
  2. """
  3. The :program:`celery` umbrella command.
  4. .. program:: celery
  5. .. _preload-options:
  6. Preload Options
  7. ---------------
  8. These options are supported by all commands,
  9. and usually parsed before command-specific arguments.
  10. .. cmdoption:: -A, --app
  11. app instance to use (e.g. ``module.attr_name``)
  12. .. cmdoption:: -b, --broker
  13. URL to broker. default is ``amqp://guest@localhost//``
  14. .. cmdoption:: --loader
  15. name of custom loader class to use.
  16. .. cmdoption:: --config
  17. Name of the configuration module
  18. .. cmdoption:: -C, --no-color
  19. Disable colors in output.
  20. .. cmdoption:: -q, --quiet
  21. Give less verbose output (behavior depends on the sub command).
  22. .. cmdoption:: --help
  23. Show help and exit.
  24. .. _daemon-options:
  25. Daemon Options
  26. --------------
  27. These options are supported by commands that can detach
  28. into the background (daemon). They will be present
  29. in any command that also has a `--detach` option.
  30. .. cmdoption:: -f, --logfile
  31. Path to log file. If no logfile is specified, `stderr` is used.
  32. .. cmdoption:: --pidfile
  33. Optional file used to store the process pid.
  34. The program will not start if this file already exists
  35. and the pid is still alive.
  36. .. cmdoption:: --uid
  37. User id, or user name of the user to run as after detaching.
  38. .. cmdoption:: --gid
  39. Group id, or group name of the main group to change to after
  40. detaching.
  41. .. cmdoption:: --umask
  42. Effective umask (in octal) of the process after detaching. Inherits
  43. the umask of the parent process by default.
  44. .. cmdoption:: --workdir
  45. Optional directory to change to after detaching.
  46. .. cmdoption:: --executable
  47. Executable to use for the detached process.
  48. ``celery inspect``
  49. ------------------
  50. .. program:: celery inspect
  51. .. cmdoption:: -t, --timeout
  52. Timeout in seconds (float) waiting for reply
  53. .. cmdoption:: -d, --destination
  54. Comma separated list of destination node names.
  55. .. cmdoption:: -j, --json
  56. Use json as output format.
  57. ``celery control``
  58. ------------------
  59. .. program:: celery control
  60. .. cmdoption:: -t, --timeout
  61. Timeout in seconds (float) waiting for reply
  62. .. cmdoption:: -d, --destination
  63. Comma separated list of destination node names.
  64. .. cmdoption:: -j, --json
  65. Use json as output format.
  66. ``celery migrate``
  67. ------------------
  68. .. program:: celery migrate
  69. .. cmdoption:: -n, --limit
  70. Number of tasks to consume (int).
  71. .. cmdoption:: -t, -timeout
  72. Timeout in seconds (float) waiting for tasks.
  73. .. cmdoption:: -a, --ack-messages
  74. Ack messages from source broker.
  75. .. cmdoption:: -T, --tasks
  76. List of task names to filter on.
  77. .. cmdoption:: -Q, --queues
  78. List of queues to migrate.
  79. .. cmdoption:: -F, --forever
  80. Continually migrate tasks until killed.
  81. ``celery upgrade``
  82. ------------------
  83. .. program:: celery upgrade
  84. .. cmdoption:: --django
  85. Upgrade a Django project.
  86. .. cmdoption:: --compat
  87. Maintain backwards compatibility.
  88. .. cmdoption:: --no-backup
  89. Don't backup original files.
  90. ``celery shell``
  91. ----------------
  92. .. program:: celery shell
  93. .. cmdoption:: -I, --ipython
  94. Force :pypi:`iPython` implementation.
  95. .. cmdoption:: -B, --bpython
  96. Force :pypi:`bpython` implementation.
  97. .. cmdoption:: -P, --python
  98. Force default Python shell.
  99. .. cmdoption:: -T, --without-tasks
  100. Don't add tasks to locals.
  101. .. cmdoption:: --eventlet
  102. Use :pypi:`eventlet` monkey patches.
  103. .. cmdoption:: --gevent
  104. Use :pypi:`gevent` monkey patches.
  105. ``celery result``
  106. -----------------
  107. .. program:: celery result
  108. .. cmdoption:: -t, --task
  109. Name of task (if custom backend).
  110. .. cmdoption:: --traceback
  111. Show traceback if any.
  112. ``celery purge``
  113. ----------------
  114. .. program:: celery purge
  115. .. cmdoption:: -f, --force
  116. Don't prompt for verification before deleting messages (DANGEROUS)
  117. ``celery call``
  118. ---------------
  119. .. program:: celery call
  120. .. cmdoption:: -a, --args
  121. Positional arguments (json format).
  122. .. cmdoption:: -k, --kwargs
  123. Keyword arguments (json format).
  124. .. cmdoption:: --eta
  125. Scheduled time in ISO-8601 format.
  126. .. cmdoption:: --countdown
  127. ETA in seconds from now (float/int).
  128. .. cmdoption:: --expires
  129. Expiry time in float/int seconds, or a ISO-8601 date.
  130. .. cmdoption:: --serializer
  131. Specify serializer to use (default is json).
  132. .. cmdoption:: --queue
  133. Destination queue.
  134. .. cmdoption:: --exchange
  135. Destination exchange (defaults to the queue exchange).
  136. .. cmdoption:: --routing-key
  137. Destination routing key (defaults to the queue routing key).
  138. """
  139. from __future__ import absolute_import, unicode_literals, print_function
  140. import codecs
  141. import numbers
  142. import os
  143. import sys
  144. from functools import partial
  145. from importlib import import_module
  146. from kombu.utils import json
  147. from celery.app import defaults
  148. from celery.five import keys, string_t, values
  149. from celery.platforms import EX_OK, EX_FAILURE, EX_UNAVAILABLE, EX_USAGE
  150. from celery.utils import term
  151. from celery.utils import text
  152. from celery.utils.functional import pass1
  153. from celery.utils.text import str_to_list
  154. from celery.utils.timeutils import maybe_iso8601
  155. # Cannot use relative imports here due to a Windows issue (#1111).
  156. from celery.bin.base import Command, Option, Extensions
  157. # Import commands from other modules
  158. from celery.bin.amqp import amqp
  159. from celery.bin.beat import beat
  160. from celery.bin.events import events
  161. from celery.bin.graph import graph
  162. from celery.bin.logtool import logtool
  163. from celery.bin.worker import worker
  164. __all__ = ['CeleryCommand', 'main']
  165. HELP = """
  166. ---- -- - - ---- Commands- -------------- --- ------------
  167. {commands}
  168. ---- -- - - --------- -- - -------------- --- ------------
  169. Type '{prog_name} <command> --help' for help using a specific command.
  170. """
  171. MIGRATE_PROGRESS_FMT = """\
  172. Migrating task {state.count}/{state.strtotal}: \
  173. {body[task]}[{body[id]}]\
  174. """
  175. command_classes = [
  176. ('Main', ['worker', 'events', 'beat', 'shell', 'multi', 'amqp'], 'green'),
  177. ('Remote Control', ['status', 'inspect', 'control'], 'blue'),
  178. ('Utils',
  179. ['purge', 'list', 'call', 'result', 'migrate', 'graph', 'upgrade'],
  180. None),
  181. ('Debugging', ['report', 'logtool'], 'red'),
  182. ]
  183. def determine_exit_status(ret):
  184. if isinstance(ret, numbers.Integral):
  185. return ret
  186. return EX_OK if ret else EX_FAILURE
  187. def main(argv=None):
  188. # Fix for setuptools generated scripts, so that it will
  189. # work with multiprocessing fork emulation.
  190. # (see multiprocessing.forking.get_preparation_data())
  191. try:
  192. if __name__ != '__main__': # pragma: no cover
  193. sys.modules['__main__'] = sys.modules[__name__]
  194. cmd = CeleryCommand()
  195. cmd.maybe_patch_concurrency()
  196. from billiard import freeze_support
  197. freeze_support()
  198. cmd.execute_from_commandline(argv)
  199. except KeyboardInterrupt:
  200. pass
  201. class multi(Command):
  202. """Start multiple worker instances."""
  203. respects_app_option = False
  204. def get_options(self):
  205. pass
  206. def run_from_argv(self, prog_name, argv, command=None):
  207. from celery.bin.multi import MultiTool
  208. multi = MultiTool(quiet=self.quiet, no_color=self.no_color)
  209. return multi.execute_from_commandline(
  210. [command] + argv, prog_name,
  211. )
  212. class list_(Command):
  213. """Get info from broker.
  214. Examples::
  215. celery list bindings
  216. NOTE: For RabbitMQ the management plugin is required.
  217. """
  218. args = '[bindings]'
  219. def list_bindings(self, management):
  220. try:
  221. bindings = management.get_bindings()
  222. except NotImplementedError:
  223. raise self.Error('Your transport cannot list bindings.')
  224. def fmt(q, e, r):
  225. return self.out('{0:<28} {1:<28} {2}'.format(q, e, r))
  226. fmt('Queue', 'Exchange', 'Routing Key')
  227. fmt('-' * 16, '-' * 16, '-' * 16)
  228. for b in bindings:
  229. fmt(b['destination'], b['source'], b['routing_key'])
  230. def run(self, what=None, *_, **kw):
  231. topics = {'bindings': self.list_bindings}
  232. available = ', '.join(topics)
  233. if not what:
  234. raise self.UsageError(
  235. 'You must specify one of {0}'.format(available))
  236. if what not in topics:
  237. raise self.UsageError(
  238. 'unknown topic {0!r} (choose one of: {1})'.format(
  239. what, available))
  240. with self.app.connection() as conn:
  241. self.app.amqp.TaskConsumer(conn).declare()
  242. topics[what](conn.manager)
  243. class call(Command):
  244. """Call a task by name.
  245. Examples::
  246. celery call tasks.add --args='[2, 2]'
  247. celery call tasks.add --args='[2, 2]' --countdown=10
  248. """
  249. args = '<task_name>'
  250. option_list = Command.option_list + (
  251. Option('--args', '-a', help='positional arguments (json).'),
  252. Option('--kwargs', '-k', help='keyword arguments (json).'),
  253. Option('--eta', help='scheduled time (ISO-8601).'),
  254. Option('--countdown', type='float',
  255. help='eta in seconds from now (float/int).'),
  256. Option('--expires', help='expiry time (ISO-8601/float/int).'),
  257. Option('--serializer', default='json', help='defaults to json.'),
  258. Option('--queue', help='custom queue name.'),
  259. Option('--exchange', help='custom exchange name.'),
  260. Option('--routing-key', help='custom routing key.'),
  261. )
  262. def run(self, name, *_, **kw):
  263. # Positional args.
  264. args = kw.get('args') or ()
  265. if isinstance(args, string_t):
  266. args = json.loads(args)
  267. # Keyword args.
  268. kwargs = kw.get('kwargs') or {}
  269. if isinstance(kwargs, string_t):
  270. kwargs = json.loads(kwargs)
  271. # Expires can be int/float.
  272. expires = kw.get('expires') or None
  273. try:
  274. expires = float(expires)
  275. except (TypeError, ValueError):
  276. # or a string describing an ISO 8601 datetime.
  277. try:
  278. expires = maybe_iso8601(expires)
  279. except (TypeError, ValueError):
  280. raise
  281. res = self.app.send_task(name, args=args, kwargs=kwargs,
  282. countdown=kw.get('countdown'),
  283. serializer=kw.get('serializer'),
  284. queue=kw.get('queue'),
  285. exchange=kw.get('exchange'),
  286. routing_key=kw.get('routing_key'),
  287. eta=maybe_iso8601(kw.get('eta')),
  288. expires=expires)
  289. self.out(res.id)
  290. class purge(Command):
  291. """Erase all messages from all known task queues.
  292. WARNING: There is no undo operation for this command.
  293. """
  294. warn_prelude = (
  295. '{warning}: This will remove all tasks from {queues}: {names}.\n'
  296. ' There is no undo for this operation!\n\n'
  297. '(to skip this prompt use the -f option)\n'
  298. )
  299. warn_prompt = 'Are you sure you want to delete all tasks'
  300. fmt_purged = 'Purged {mnum} {messages} from {qnum} known task {queues}.'
  301. fmt_empty = 'No messages purged from {qnum} {queues}'
  302. option_list = Command.option_list + (
  303. Option('--force', '-f', action='store_true',
  304. help='Do not prompt for verification'),
  305. Option('--queues', '-Q', default=[],
  306. help='Comma separated list of queue names to purge.'),
  307. Option('--exclude-queues', '-X', default=[],
  308. help='Comma separated list of queues names not to purge.')
  309. )
  310. def run(self, force=False, queues=None, exclude_queues=None, **kwargs):
  311. queues = set(str_to_list(queues or []))
  312. exclude = set(str_to_list(exclude_queues or []))
  313. names = (queues or set(keys(self.app.amqp.queues))) - exclude
  314. qnum = len(names)
  315. messages = None
  316. if names:
  317. if not force:
  318. self.out(self.warn_prelude.format(
  319. warning=self.colored.red('WARNING'),
  320. queues=text.pluralize(qnum, 'queue'),
  321. names=', '.join(sorted(names)),
  322. ))
  323. if self.ask(self.warn_prompt, ('yes', 'no'), 'no') != 'yes':
  324. return
  325. with self.app.connection_for_write() as conn:
  326. messages = sum(self._purge(conn, queue) for queue in names)
  327. fmt = self.fmt_purged if messages else self.fmt_empty
  328. self.out(fmt.format(
  329. mnum=messages, qnum=qnum,
  330. messages=text.pluralize(messages, 'message'),
  331. queues=text.pluralize(qnum, 'queue')))
  332. def _purge(self, conn, queue):
  333. try:
  334. return conn.default_channel.queue_purge(queue) or 0
  335. except conn.channel_errors:
  336. return 0
  337. class result(Command):
  338. """Gives the return value for a given task id.
  339. Examples::
  340. celery result 8f511516-e2f5-4da4-9d2f-0fb83a86e500
  341. celery result 8f511516-e2f5-4da4-9d2f-0fb83a86e500 -t tasks.add
  342. celery result 8f511516-e2f5-4da4-9d2f-0fb83a86e500 --traceback
  343. """
  344. args = '<task_id>'
  345. option_list = Command.option_list + (
  346. Option('--task', '-t', help='name of task (if custom backend)'),
  347. Option('--traceback', action='store_true',
  348. help='show traceback instead'),
  349. )
  350. def run(self, task_id, *args, **kwargs):
  351. result_cls = self.app.AsyncResult
  352. task = kwargs.get('task')
  353. traceback = kwargs.get('traceback', False)
  354. if task:
  355. result_cls = self.app.tasks[task].AsyncResult
  356. result = result_cls(task_id)
  357. if traceback:
  358. value = result.traceback
  359. else:
  360. value = result.get()
  361. self.out(self.pretty(value)[1])
  362. class _RemoteControl(Command):
  363. name = None
  364. choices = None
  365. leaf = False
  366. option_list = Command.option_list + (
  367. Option('--timeout', '-t', type='float',
  368. help='Timeout in seconds (float) waiting for reply'),
  369. Option('--destination', '-d',
  370. help='Comma separated list of destination node names.'),
  371. Option('--json', '-j', action='store_true',
  372. help='Use json as output format.'),
  373. )
  374. def __init__(self, *args, **kwargs):
  375. self.show_body = kwargs.pop('show_body', True)
  376. self.show_reply = kwargs.pop('show_reply', True)
  377. super(_RemoteControl, self).__init__(*args, **kwargs)
  378. @classmethod
  379. def get_command_info(self, command,
  380. indent=0, prefix='', color=None, help=False):
  381. if help:
  382. help = '|' + text.indent(self.choices[command][1], indent + 4)
  383. else:
  384. help = None
  385. try:
  386. # see if it uses args.
  387. meth = getattr(self, command)
  388. return text.join([
  389. '|' + text.indent('{0}{1} {2}'.format(
  390. prefix, color(command), meth.__doc__), indent),
  391. help,
  392. ])
  393. except AttributeError:
  394. return text.join([
  395. '|' + text.indent(prefix + str(color(command)), indent), help,
  396. ])
  397. @classmethod
  398. def list_commands(self, indent=0, prefix='', color=None, help=False):
  399. color = color if color else lambda x: x
  400. prefix = prefix + ' ' if prefix else ''
  401. return '\n'.join(self.get_command_info(c, indent, prefix, color, help)
  402. for c in sorted(self.choices))
  403. @property
  404. def epilog(self):
  405. return '\n'.join([
  406. '[Commands]',
  407. self.list_commands(indent=4, help=True)
  408. ])
  409. def usage(self, command):
  410. return '%prog {0} [options] {1} <command> [arg1 .. argN]'.format(
  411. command, self.args)
  412. def call(self, *args, **kwargs):
  413. raise NotImplementedError('call')
  414. def run(self, *args, **kwargs):
  415. if not args:
  416. raise self.UsageError(
  417. 'Missing {0.name} method. See --help'.format(self))
  418. return self.do_call_method(args, **kwargs)
  419. def do_call_method(self, args, **kwargs):
  420. method = args[0]
  421. if method == 'help':
  422. raise self.Error("Did you mean '{0.name} --help'?".format(self))
  423. if method not in self.choices:
  424. raise self.UsageError(
  425. 'Unknown {0.name} method {1}'.format(self, method))
  426. if self.app.connection_for_write().transport.driver_type == 'sql':
  427. raise self.Error('Broadcast not supported by SQL broker transport')
  428. output_json = kwargs.get('json')
  429. destination = kwargs.get('destination')
  430. timeout = kwargs.get('timeout') or self.choices[method][0]
  431. if destination and isinstance(destination, string_t):
  432. destination = [dest.strip() for dest in destination.split(',')]
  433. handler = getattr(self, method, self.call)
  434. callback = None if output_json else self.say_remote_command_reply
  435. replies = handler(method, *args[1:], timeout=timeout,
  436. destination=destination,
  437. callback=callback)
  438. if not replies:
  439. raise self.Error('No nodes replied within time constraint.',
  440. status=EX_UNAVAILABLE)
  441. if output_json:
  442. self.out(json.dumps(replies))
  443. return replies
  444. class inspect(_RemoteControl):
  445. """Inspect the worker at runtime.
  446. Availability: RabbitMQ (AMQP), Redis, and MongoDB transports.
  447. Examples::
  448. celery inspect active --timeout=5
  449. celery inspect scheduled -d worker1@example.com
  450. celery inspect revoked -d w1@e.com,w2@e.com
  451. """
  452. name = 'inspect'
  453. choices = {
  454. 'active': (1.0, 'dump active tasks (being processed)'),
  455. 'active_queues': (1.0, 'dump queues being consumed from'),
  456. 'clock': (1.0, 'get value of logical clock'),
  457. 'conf': (1.0, 'dump worker configuration'),
  458. 'memdump': (1.0, 'dump memory samples (requires psutil)'),
  459. 'memsample': (1.0, 'sample memory (requires psutil)'),
  460. 'objgraph': (60.0, 'create object graph (requires objgraph)'),
  461. 'ping': (0.2, 'ping worker(s)'),
  462. 'query_task': (1.0, 'query for task information by id'),
  463. 'reserved': (1.0, 'dump reserved tasks (waiting to be processed)'),
  464. 'scheduled': (1.0, 'dump scheduled tasks (eta/countdown/retry)'),
  465. 'stats': (1.0, 'dump worker statistics'),
  466. 'registered': (1.0, 'dump of registered tasks'),
  467. 'report': (1.0, 'get bugreport info'),
  468. 'revoked': (1.0, 'dump of revoked task ids'),
  469. }
  470. def call(self, method, *args, **options):
  471. i = self.app.control.inspect(**options)
  472. return getattr(i, method)(*args)
  473. def objgraph(self, type_='Request', *args, **kwargs):
  474. return self.call('objgraph', type_, **kwargs)
  475. def conf(self, with_defaults=False, *args, **kwargs):
  476. return self.call('conf', with_defaults, **kwargs)
  477. def query_task(self, *ids, **options):
  478. return self.call('query_task', ids, **options)
  479. class control(_RemoteControl):
  480. """Workers remote control.
  481. Availability: RabbitMQ (AMQP), Redis, and MongoDB transports.
  482. Examples::
  483. celery control enable_events --timeout=5
  484. celery control -d worker1@example.com enable_events
  485. celery control -d w1.e.com,w2.e.com enable_events
  486. celery control -d w1.e.com add_consumer queue_name
  487. celery control -d w1.e.com cancel_consumer queue_name
  488. celery control -d w1.e.com add_consumer queue exchange direct rkey
  489. """
  490. name = 'control'
  491. choices = {
  492. 'enable_events': (1.0, 'tell worker(s) to enable events'),
  493. 'disable_events': (1.0, 'tell worker(s) to disable events'),
  494. 'add_consumer': (1.0, 'tell worker(s) to start consuming a queue'),
  495. 'cancel_consumer': (1.0, 'tell worker(s) to stop consuming a queue'),
  496. 'rate_limit': (
  497. 1.0, 'tell worker(s) to modify the rate limit for a task type'),
  498. 'time_limit': (
  499. 1.0, 'tell worker(s) to modify the time limit for a task type.'),
  500. 'autoscale': (1.0, 'change autoscale settings'),
  501. 'pool_grow': (1.0, 'start more pool processes'),
  502. 'pool_shrink': (1.0, 'use less pool processes'),
  503. }
  504. def call(self, method, *args, **options):
  505. return getattr(self.app.control, method)(*args, reply=True, **options)
  506. def pool_grow(self, method, n=1, **kwargs):
  507. """[N=1]"""
  508. return self.call(method, int(n), **kwargs)
  509. def pool_shrink(self, method, n=1, **kwargs):
  510. """[N=1]"""
  511. return self.call(method, int(n), **kwargs)
  512. def autoscale(self, method, max=None, min=None, **kwargs):
  513. """[max] [min]"""
  514. return self.call(method, int(max), int(min), **kwargs)
  515. def rate_limit(self, method, task_name, rate_limit, **kwargs):
  516. """<task_name> <rate_limit> (e.g. 5/s | 5/m | 5/h)>"""
  517. return self.call(method, task_name, rate_limit, **kwargs)
  518. def time_limit(self, method, task_name, soft, hard=None, **kwargs):
  519. """<task_name> <soft_secs> [hard_secs]"""
  520. return self.call(method, task_name,
  521. float(soft), float(hard), **kwargs)
  522. def add_consumer(self, method, queue, exchange=None,
  523. exchange_type='direct', routing_key=None, **kwargs):
  524. """<queue> [exchange [type [routing_key]]]"""
  525. return self.call(method, queue, exchange,
  526. exchange_type, routing_key, **kwargs)
  527. def cancel_consumer(self, method, queue, **kwargs):
  528. """<queue>"""
  529. return self.call(method, queue, **kwargs)
  530. class status(Command):
  531. """Show list of workers that are online."""
  532. option_list = inspect.option_list
  533. def run(self, *args, **kwargs):
  534. I = inspect(
  535. app=self.app,
  536. no_color=kwargs.get('no_color', False),
  537. stdout=self.stdout, stderr=self.stderr,
  538. show_reply=False, show_body=False, quiet=True,
  539. )
  540. replies = I.run('ping', **kwargs)
  541. if not replies:
  542. raise self.Error('No nodes replied within time constraint',
  543. status=EX_UNAVAILABLE)
  544. nodecount = len(replies)
  545. if not kwargs.get('quiet', False):
  546. self.out('\n{0} {1} online.'.format(
  547. nodecount, text.pluralize(nodecount, 'node')))
  548. class migrate(Command):
  549. """Migrate tasks from one broker to another.
  550. Examples:
  551. .. code-block:: console
  552. $ celery migrate redis://localhost amqp://guest@localhost//
  553. $ celery migrate django:// redis://localhost
  554. NOTE: This command is experimental, make sure you have
  555. a backup of the tasks before you continue.
  556. """
  557. args = '<source_url> <dest_url>'
  558. option_list = Command.option_list + (
  559. Option('--limit', '-n', type='int',
  560. help='Number of tasks to consume (int)'),
  561. Option('--timeout', '-t', type='float', default=1.0,
  562. help='Timeout in seconds (float) waiting for tasks'),
  563. Option('--ack-messages', '-a', action='store_true',
  564. help='Ack messages from source broker.'),
  565. Option('--tasks', '-T',
  566. help='List of task names to filter on.'),
  567. Option('--queues', '-Q',
  568. help='List of queues to migrate.'),
  569. Option('--forever', '-F', action='store_true',
  570. help='Continually migrate tasks until killed.'),
  571. )
  572. progress_fmt = MIGRATE_PROGRESS_FMT
  573. def on_migrate_task(self, state, body, message):
  574. self.out(self.progress_fmt.format(state=state, body=body))
  575. def run(self, source, destination, **kwargs):
  576. from kombu import Connection
  577. from celery.contrib.migrate import migrate_tasks
  578. migrate_tasks(Connection(source),
  579. Connection(destination),
  580. callback=self.on_migrate_task,
  581. **kwargs)
  582. class shell(Command): # pragma: no cover
  583. """Start shell session with convenient access to celery symbols.
  584. The following symbols will be added to the main globals:
  585. - celery: the current application.
  586. - chord, group, chain, chunks,
  587. xmap, xstarmap subtask, Task
  588. - all registered tasks.
  589. """
  590. option_list = Command.option_list + (
  591. Option('--ipython', '-I',
  592. action='store_true', dest='force_ipython',
  593. help='force iPython.'),
  594. Option('--bpython', '-B',
  595. action='store_true', dest='force_bpython',
  596. help='force bpython.'),
  597. Option('--python', '-P',
  598. action='store_true', dest='force_python',
  599. help='force default Python shell.'),
  600. Option('--without-tasks', '-T', action='store_true',
  601. help="don't add tasks to locals."),
  602. Option('--eventlet', action='store_true',
  603. help='use eventlet.'),
  604. Option('--gevent', action='store_true', help='use gevent.'),
  605. )
  606. def run(self, force_ipython=False, force_bpython=False,
  607. force_python=False, without_tasks=False, eventlet=False,
  608. gevent=False, **kwargs):
  609. sys.path.insert(0, os.getcwd())
  610. if eventlet:
  611. import_module('celery.concurrency.eventlet')
  612. if gevent:
  613. import_module('celery.concurrency.gevent')
  614. import celery
  615. import celery.task.base
  616. self.app.loader.import_default_modules()
  617. self.locals = {'app': self.app,
  618. 'celery': self.app,
  619. 'Task': celery.Task,
  620. 'chord': celery.chord,
  621. 'group': celery.group,
  622. 'chain': celery.chain,
  623. 'chunks': celery.chunks,
  624. 'xmap': celery.xmap,
  625. 'xstarmap': celery.xstarmap,
  626. 'subtask': celery.subtask,
  627. 'signature': celery.signature}
  628. if not without_tasks:
  629. self.locals.update({
  630. task.__name__: task for task in values(self.app.tasks)
  631. if not task.name.startswith('celery.')
  632. })
  633. if force_python:
  634. return self.invoke_fallback_shell()
  635. elif force_bpython:
  636. return self.invoke_bpython_shell()
  637. elif force_ipython:
  638. return self.invoke_ipython_shell()
  639. return self.invoke_default_shell()
  640. def invoke_default_shell(self):
  641. try:
  642. import IPython # noqa
  643. except ImportError:
  644. try:
  645. import bpython # noqa
  646. except ImportError:
  647. return self.invoke_fallback_shell()
  648. else:
  649. return self.invoke_bpython_shell()
  650. else:
  651. return self.invoke_ipython_shell()
  652. def invoke_fallback_shell(self):
  653. import code
  654. try:
  655. import readline
  656. except ImportError:
  657. pass
  658. else:
  659. import rlcompleter
  660. readline.set_completer(
  661. rlcompleter.Completer(self.locals).complete)
  662. readline.parse_and_bind('tab:complete')
  663. code.interact(local=self.locals)
  664. def invoke_ipython_shell(self):
  665. for ip in (self._ipython, self._ipython_pre_10,
  666. self._ipython_terminal, self._ipython_010,
  667. self._no_ipython):
  668. try:
  669. return ip()
  670. except ImportError:
  671. pass
  672. def _ipython(self):
  673. from IPython import start_ipython
  674. start_ipython(argv=[], user_ns=self.locals)
  675. def _ipython_pre_10(self): # pragma: no cover
  676. from IPython.frontend.terminal.ipapp import TerminalIPythonApp
  677. app = TerminalIPythonApp.instance()
  678. app.initialize(argv=[])
  679. app.shell.user_ns.update(self.locals)
  680. app.start()
  681. def _ipython_terminal(self): # pragma: no cover
  682. from IPython.terminal import embed
  683. embed.TerminalInteractiveShell(user_ns=self.locals).mainloop()
  684. def _ipython_010(self): # pragma: no cover
  685. from IPython.Shell import IPShell
  686. IPShell(argv=[], user_ns=self.locals).mainloop()
  687. def _no_ipython(self): # pragma: no cover
  688. raise ImportError('no suitable ipython found')
  689. def invoke_bpython_shell(self):
  690. import bpython
  691. bpython.embed(self.locals)
  692. class upgrade(Command):
  693. """Perform upgrade between versions."""
  694. option_list = Command.option_list + (
  695. Option('--django', action='store_true',
  696. help='Upgrade Django project'),
  697. Option('--compat', action='store_true',
  698. help='Maintain backwards compatibility'),
  699. Option('--no-backup', action='store_true',
  700. help='Dont backup original files'),
  701. )
  702. choices = {'settings'}
  703. def usage(self, command):
  704. return '%prog <command> settings [filename] [options]'
  705. def run(self, *args, **kwargs):
  706. try:
  707. command = args[0]
  708. except IndexError:
  709. raise self.UsageError('missing upgrade type')
  710. if command not in self.choices:
  711. raise self.UsageError('unknown upgrade type: {0}'.format(command))
  712. return getattr(self, command)(*args, **kwargs)
  713. def settings(self, command, filename,
  714. no_backup=False, django=False, compat=False, **kwargs):
  715. lines = self._slurp(filename) if no_backup else self._backup(filename)
  716. keyfilter = self._compat_key if django or compat else pass1
  717. print('processing {0}...'.format(filename), file=self.stderr)
  718. with codecs.open(filename, 'w', 'utf-8') as write_fh:
  719. for line in lines:
  720. write_fh.write(self._to_new_key(line, keyfilter))
  721. def _slurp(self, filename):
  722. with codecs.open(filename, 'r', 'utf-8') as read_fh:
  723. return [line for line in read_fh]
  724. def _backup(self, filename, suffix='.orig'):
  725. lines = []
  726. backup_filename = ''.join([filename, suffix])
  727. print('writing backup to {0}...'.format(backup_filename),
  728. file=self.stderr)
  729. with codecs.open(filename, 'r', 'utf-8') as read_fh:
  730. with codecs.open(backup_filename, 'w', 'utf-8') as backup_fh:
  731. for line in read_fh:
  732. backup_fh.write(line)
  733. lines.append(line)
  734. return lines
  735. def _to_new_key(self, line, keyfilter=pass1, source=defaults._TO_NEW_KEY):
  736. # sort by length to avoid e.g. broker_transport overriding
  737. # broker_transport_options.
  738. for old_key in reversed(sorted(source, key=lambda x: len(x))):
  739. new_line = line.replace(old_key, keyfilter(source[old_key]))
  740. if line != new_line:
  741. return new_line # only one match per line.
  742. return line
  743. def _compat_key(self, key, namespace='CELERY'):
  744. key = key.upper()
  745. if not key.startswith(namespace):
  746. key = '_'.join([namespace, key])
  747. return key
  748. class help(Command):
  749. """Show help screen and exit."""
  750. def usage(self, command):
  751. return '%prog <command> [options] {0.args}'.format(self)
  752. def run(self, *args, **kwargs):
  753. self.parser.print_help()
  754. self.out(HELP.format(
  755. prog_name=self.prog_name,
  756. commands=CeleryCommand.list_commands(colored=self.colored),
  757. ))
  758. return EX_USAGE
  759. class report(Command):
  760. """Shows information useful to include in bug-reports."""
  761. def run(self, *args, **kwargs):
  762. self.out(self.app.bugreport())
  763. return EX_OK
  764. class CeleryCommand(Command):
  765. ext_fmt = '{self.namespace}.commands'
  766. commands = {
  767. 'amqp': amqp,
  768. 'beat': beat,
  769. 'call': call,
  770. 'control': control,
  771. 'events': events,
  772. 'graph': graph,
  773. 'help': help,
  774. 'inspect': inspect,
  775. 'list': list_,
  776. 'logtool': logtool,
  777. 'migrate': migrate,
  778. 'multi': multi,
  779. 'purge': purge,
  780. 'report': report,
  781. 'result': result,
  782. 'shell': shell,
  783. 'status': status,
  784. 'upgrade': upgrade,
  785. 'worker': worker,
  786. }
  787. enable_config_from_cmdline = True
  788. prog_name = 'celery'
  789. namespace = 'celery'
  790. @classmethod
  791. def register_command(cls, fun, name=None):
  792. cls.commands[name or fun.__name__] = fun
  793. return fun
  794. def execute(self, command, argv=None):
  795. try:
  796. cls = self.commands[command]
  797. except KeyError:
  798. cls, argv = self.commands['help'], ['help']
  799. cls = self.commands.get(command) or self.commands['help']
  800. try:
  801. return cls(
  802. app=self.app, on_error=self.on_error,
  803. no_color=self.no_color, quiet=self.quiet,
  804. on_usage_error=partial(self.on_usage_error, command=command),
  805. ).run_from_argv(self.prog_name, argv[1:], command=argv[0])
  806. except self.UsageError as exc:
  807. self.on_usage_error(exc)
  808. return exc.status
  809. except self.Error as exc:
  810. self.on_error(exc)
  811. return exc.status
  812. def on_usage_error(self, exc, command=None):
  813. if command:
  814. helps = '{self.prog_name} {command} --help'
  815. else:
  816. helps = '{self.prog_name} --help'
  817. self.error(self.colored.magenta('Error: {0}'.format(exc)))
  818. self.error("""Please try '{0}'""".format(helps.format(
  819. self=self, command=command,
  820. )))
  821. def _relocate_args_from_start(self, argv, index=0):
  822. if argv:
  823. rest = []
  824. while index < len(argv):
  825. value = argv[index]
  826. if value.startswith('--'):
  827. rest.append(value)
  828. elif value.startswith('-'):
  829. # we eat the next argument even though we don't know
  830. # if this option takes an argument or not.
  831. # instead we will assume what is the command name in the
  832. # return statements below.
  833. try:
  834. nxt = argv[index + 1]
  835. if nxt.startswith('-'):
  836. # is another option
  837. rest.append(value)
  838. else:
  839. # is (maybe) a value for this option
  840. rest.extend([value, nxt])
  841. index += 1
  842. except IndexError: # pragma: no cover
  843. rest.append(value)
  844. break
  845. else:
  846. break
  847. index += 1
  848. if argv[index:]: # pragma: no cover
  849. # if there are more arguments left then divide and swap
  850. # we assume the first argument in argv[i:] is the command
  851. # name.
  852. return argv[index:] + rest
  853. # if there are no more arguments then the last arg in rest'
  854. # must be the command.
  855. [rest.pop()] + rest
  856. return []
  857. def prepare_prog_name(self, name):
  858. if name == '__main__.py':
  859. return sys.modules['__main__'].__file__
  860. return name
  861. def handle_argv(self, prog_name, argv):
  862. self.prog_name = self.prepare_prog_name(prog_name)
  863. argv = self._relocate_args_from_start(argv)
  864. _, argv = self.prepare_args(None, argv)
  865. try:
  866. command = argv[0]
  867. except IndexError:
  868. command, argv = 'help', ['help']
  869. return self.execute(command, argv)
  870. def execute_from_commandline(self, argv=None):
  871. argv = sys.argv if argv is None else argv
  872. if 'multi' in argv[1:3]: # Issue 1008
  873. self.respects_app_option = False
  874. try:
  875. sys.exit(determine_exit_status(
  876. super(CeleryCommand, self).execute_from_commandline(argv)))
  877. except KeyboardInterrupt:
  878. sys.exit(EX_FAILURE)
  879. @classmethod
  880. def get_command_info(self, command, indent=0, color=None, colored=None):
  881. colored = term.colored() if colored is None else colored
  882. colored = colored.names[color] if color else lambda x: x
  883. obj = self.commands[command]
  884. cmd = 'celery {0}'.format(colored(command))
  885. if obj.leaf:
  886. return '|' + text.indent(cmd, indent)
  887. return text.join([
  888. ' ',
  889. '|' + text.indent('{0} --help'.format(cmd), indent),
  890. obj.list_commands(indent, 'celery {0}'.format(command), colored),
  891. ])
  892. @classmethod
  893. def list_commands(self, indent=0, colored=None):
  894. colored = term.colored() if colored is None else colored
  895. white = colored.white
  896. ret = []
  897. for cls, commands, color in command_classes:
  898. ret.extend([
  899. text.indent('+ {0}: '.format(white(cls)), indent),
  900. '\n'.join(
  901. self.get_command_info(command, indent + 4, color, colored)
  902. for command in commands),
  903. ''
  904. ])
  905. return '\n'.join(ret).strip()
  906. def with_pool_option(self, argv):
  907. if len(argv) > 1 and 'worker' in argv[0:3]:
  908. # this command supports custom pools
  909. # that may have to be loaded as early as possible.
  910. return (['-P'], ['--pool'])
  911. def on_concurrency_setup(self):
  912. self.load_extension_commands()
  913. def load_extension_commands(self):
  914. names = Extensions(self.ext_fmt.format(self=self),
  915. self.register_command).load()
  916. if names:
  917. command_classes.append(('Extensions', names, 'magenta'))
  918. def command(*args, **kwargs):
  919. """Deprecated: Use classmethod :meth:`CeleryCommand.register_command`
  920. instead."""
  921. _register = CeleryCommand.register_command
  922. return _register(args[0]) if args else _register
  923. if __name__ == '__main__': # pragma: no cover
  924. main()