worker.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359
  1. # -*- coding: utf-8 -*-
  2. """
  3. The :program:`celery worker` command (previously known as ``celeryd``)
  4. .. program:: celery worker
  5. .. seealso::
  6. See :ref:`preload-options`.
  7. .. cmdoption:: -c, --concurrency
  8. Number of child processes processing the queue. The default
  9. is the number of CPUs available on your system.
  10. .. cmdoption:: -P, --pool
  11. Pool implementation:
  12. prefork (default), eventlet, gevent or solo.
  13. .. cmdoption:: -n, --hostname
  14. Set custom hostname, e.g. 'w1.%h'. Expands: %h (hostname),
  15. %n (name) and %d, (domain).
  16. .. cmdoption:: -B, --beat
  17. Also run the `celery beat` periodic task scheduler. Please note that
  18. there must only be one instance of this service.
  19. .. cmdoption:: -Q, --queues
  20. List of queues to enable for this worker, separated by comma.
  21. By default all configured queues are enabled.
  22. Example: `-Q video,image`
  23. .. cmdoption:: -X, --exclude-queues
  24. List of queues to disable for this worker, separated by comma.
  25. By default all configured queues are enabled.
  26. Example: `-X video,image`.
  27. .. cmdoption:: -I, --include
  28. Comma separated list of additional modules to import.
  29. Example: -I foo.tasks,bar.tasks
  30. .. cmdoption:: -s, --schedule
  31. Path to the schedule database if running with the `-B` option.
  32. Defaults to `celerybeat-schedule`. The extension ".db" may be
  33. appended to the filename.
  34. .. cmdoption:: -O
  35. Apply optimization profile. Supported: default, fair
  36. .. cmdoption:: --prefetch-multiplier
  37. Set custom prefetch multiplier value for this worker instance.
  38. .. cmdoption:: --scheduler
  39. Scheduler class to use. Default is
  40. :class:`celery.beat.PersistentScheduler`
  41. .. cmdoption:: -S, --statedb
  42. Path to the state database. The extension '.db' may
  43. be appended to the filename. Default: {default}
  44. .. cmdoption:: -E, --events
  45. Send task-related events that can be captured by monitors like
  46. :program:`celery events`, `celerymon`, and others.
  47. .. cmdoption:: --without-gossip
  48. Do not subscribe to other workers events.
  49. .. cmdoption:: --without-mingle
  50. Do not synchronize with other workers at start-up.
  51. .. cmdoption:: --without-heartbeat
  52. Do not send event heartbeats.
  53. .. cmdoption:: --heartbeat-interval
  54. Interval in seconds at which to send worker heartbeat
  55. .. cmdoption:: --purge
  56. Purges all waiting tasks before the daemon is started.
  57. **WARNING**: This is unrecoverable, and the tasks will be
  58. deleted from the messaging server.
  59. .. cmdoption:: --time-limit
  60. Enables a hard time limit (in seconds int/float) for tasks.
  61. .. cmdoption:: --soft-time-limit
  62. Enables a soft time limit (in seconds int/float) for tasks.
  63. .. cmdoption:: --maxtasksperchild
  64. Maximum number of tasks a pool worker can execute before it's
  65. terminated and replaced by a new worker.
  66. .. cmdoption:: --maxmemperchild
  67. Maximum amount of resident memory, in KiB, that may be consumed by a
  68. child process before it will be replaced by a new one. If a single
  69. task causes a child process to exceed this limit, the task will be
  70. completed and the child process will be replaced afterwards.
  71. Default: no limit.
  72. .. cmdoption:: --detach
  73. Start worker as a background process.
  74. .. cmdoption:: -f, --logfile
  75. Path to log file. If no logfile is specified, `stderr` is used.
  76. .. cmdoption:: -l, --loglevel
  77. Logging level, choose between `DEBUG`, `INFO`, `WARNING`,
  78. `ERROR`, `CRITICAL`, or `FATAL`.
  79. .. cmdoption:: --pidfile
  80. Optional file used to store the process pid.
  81. The program will not start if this file already exists
  82. and the pid is still alive.
  83. .. cmdoption:: --uid
  84. User id, or user name of the user to run as after detaching.
  85. .. cmdoption:: --gid
  86. Group id, or group name of the main group to change to after
  87. detaching.
  88. .. cmdoption:: --umask
  89. Effective :manpage:`umask(1)` (in octal) of the process after detaching.
  90. Inherits the :manpage:`umask(1)` of the parent process by default.
  91. .. cmdoption:: --workdir
  92. Optional directory to change to after detaching.
  93. .. cmdoption:: --executable
  94. Executable to use for the detached process.
  95. """
  96. from __future__ import absolute_import, unicode_literals
  97. import sys
  98. from optparse import OptionGroup
  99. from celery import concurrency
  100. from celery.bin.base import Command, daemon_options
  101. from celery.bin.celeryd_detach import detached_celeryd
  102. from celery.five import string_t
  103. from celery.platforms import maybe_drop_privileges
  104. from celery.utils.log import LOG_LEVELS, mlevel
  105. from celery.utils.nodenames import default_nodename
  106. __all__ = ['worker', 'main']
  107. __MODULE_DOC__ = __doc__
  108. class worker(Command):
  109. """Start worker instance.
  110. Examples::
  111. celery worker --app=proj -l info
  112. celery worker -A proj -l info -Q hipri,lopri
  113. celery worker -A proj --concurrency=4
  114. celery worker -A proj --concurrency=1000 -P eventlet
  115. """
  116. doc = __MODULE_DOC__ # parse help from this too
  117. namespace = 'worker'
  118. enable_config_from_cmdline = True
  119. supports_args = False
  120. def run_from_argv(self, prog_name, argv=None, command=None):
  121. command = sys.argv[0] if command is None else command
  122. argv = sys.argv[1:] if argv is None else argv
  123. # parse options before detaching so errors can be handled.
  124. options, args = self.prepare_args(
  125. *self.parse_options(prog_name, argv, command))
  126. self.maybe_detach([command] + argv)
  127. return self(*args, **options)
  128. def maybe_detach(self, argv, dopts=['-D', '--detach']):
  129. if any(arg in argv for arg in dopts):
  130. argv = [v for v in argv if v not in dopts]
  131. # will never return
  132. detached_celeryd(self.app).execute_from_commandline(argv)
  133. raise SystemExit(0)
  134. def run(self, hostname=None, pool_cls=None, app=None, uid=None, gid=None,
  135. loglevel=None, logfile=None, pidfile=None, state_db=None,
  136. **kwargs):
  137. maybe_drop_privileges(uid=uid, gid=gid)
  138. # Pools like eventlet/gevent needs to patch libs as early
  139. # as possible.
  140. pool_cls = (concurrency.get_implementation(pool_cls) or
  141. self.app.conf.worker_pool)
  142. if self.app.IS_WINDOWS and kwargs.get('beat'):
  143. self.die('-B option does not work on Windows. '
  144. 'Please run celery beat as a separate service.')
  145. hostname = self.host_format(default_nodename(hostname))
  146. if loglevel:
  147. try:
  148. loglevel = mlevel(loglevel)
  149. except KeyError: # pragma: no cover
  150. self.die('Unknown level {0!r}. Please use one of {1}.'.format(
  151. loglevel, '|'.join(
  152. l for l in LOG_LEVELS if isinstance(l, string_t))))
  153. worker = self.app.Worker(
  154. hostname=hostname, pool_cls=pool_cls, loglevel=loglevel,
  155. logfile=logfile, # node format handled by celery.app.log.setup
  156. pidfile=self.node_format(pidfile, hostname),
  157. state_db=self.node_format(state_db, hostname), **kwargs
  158. )
  159. worker.start()
  160. return worker.exitcode
  161. def with_pool_option(self, argv):
  162. # this command support custom pools
  163. # that may have to be loaded as early as possible.
  164. return (['-P'], ['--pool'])
  165. def prepare_arguments(self, parser):
  166. conf = self.app.conf
  167. wopts = OptionGroup(parser, 'Worker Options')
  168. wopts.add_option('-n', '--hostname')
  169. wopts.add_option('-D', '--detach', action='store_true')
  170. wopts.add_option(
  171. '-S', '--statedb',
  172. default=conf.worker_state_db, dest='state_db',
  173. )
  174. wopts.add_option('-l', '--loglevel', default='WARN')
  175. wopts.add_option('-O', dest='optimization')
  176. wopts.add_option(
  177. '--prefetch-multiplier',
  178. dest='prefetch_multiplier', type='int',
  179. default=conf.worker_prefetch_multiplier,
  180. )
  181. parser.add_option_group(wopts)
  182. topts = OptionGroup(parser, 'Pool Options')
  183. topts.add_option(
  184. '-c', '--concurrency',
  185. default=conf.worker_concurrency, type='int',
  186. )
  187. topts.add_option(
  188. '-P', '--pool',
  189. default=conf.worker_pool, dest='pool_cls',
  190. )
  191. topts.add_option(
  192. '-E', '--events',
  193. default=conf.worker_send_task_events,
  194. action='store_true', dest='send_events',
  195. )
  196. topts.add_option(
  197. '--time-limit',
  198. type='float', dest='task_time_limit',
  199. default=conf.task_time_limit,
  200. )
  201. topts.add_option(
  202. '--soft-time-limit',
  203. dest='task_soft_time_limit', type='float',
  204. default=conf.task_soft_time_limit,
  205. )
  206. topts.add_option(
  207. '--maxtasksperchild',
  208. dest='max_tasks_per_child', type='int',
  209. default=conf.worker_max_tasks_per_child,
  210. )
  211. topts.add_option(
  212. '--maxmemperchild',
  213. dest='max_memory_per_child', type='int',
  214. default=conf.worker_max_memory_per_child,
  215. )
  216. parser.add_option_group(topts)
  217. qopts = OptionGroup(parser, 'Queue Options')
  218. qopts.add_option(
  219. '--purge', '--discard',
  220. default=False, action='store_true',
  221. )
  222. qopts.add_option('--queues', '-Q', default=[])
  223. qopts.add_option('--exclude-queues', '-X', default=[])
  224. qopts.add_option('--include', '-I', default=[])
  225. parser.add_option_group(qopts)
  226. fopts = OptionGroup(parser, 'Features')
  227. fopts.add_option(
  228. '--without-gossip', action='store_true', default=False,
  229. )
  230. fopts.add_option(
  231. '--without-mingle', action='store_true', default=False,
  232. )
  233. fopts.add_option(
  234. '--without-heartbeat', action='store_true', default=False,
  235. )
  236. fopts.add_option('--heartbeat-interval', type='int')
  237. parser.add_option_group(fopts)
  238. daemon_options(parser)
  239. bopts = OptionGroup(parser, 'Embedded Beat Options')
  240. bopts.add_option('-B', '--beat', action='store_true')
  241. bopts.add_option(
  242. '-s', '--schedule', dest='schedule_filename',
  243. default=conf.beat_schedule_filename,
  244. )
  245. bopts.add_option('--scheduler', dest='scheduler_cls')
  246. parser.add_option_group(bopts)
  247. user_options = self.app.user_options['worker']
  248. if user_options:
  249. uopts = OptionGroup(parser, 'User Options')
  250. uopts.option_list.extend(user_options)
  251. parser.add_option_group(uopts)
  252. def main(app=None):
  253. # Fix for setuptools generated scripts, so that it will
  254. # work with multiprocessing fork emulation.
  255. # (see multiprocessing.forking.get_preparation_data())
  256. if __name__ != '__main__': # pragma: no cover
  257. sys.modules['__main__'] = sys.modules[__name__]
  258. from billiard import freeze_support
  259. freeze_support()
  260. worker(app=app).execute_from_commandline()
  261. if __name__ == '__main__': # pragma: no cover
  262. main()