worker.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378
  1. # -*- coding: utf-8 -*-
  2. """
  3. The :program:`celery worker` command (previously known as ``celeryd``)
  4. .. program:: celery worker
  5. .. seealso::
  6. See :ref:`preload-options`.
  7. .. cmdoption:: -c, --concurrency
  8. Number of child processes processing the queue. The default
  9. is the number of CPUs available on your system.
  10. .. cmdoption:: -P, --pool
  11. Pool implementation:
  12. prefork (default), eventlet, gevent, solo or threads.
  13. .. cmdoption:: -n, --hostname
  14. Set custom hostname, e.g. 'w1.%h'. Expands: %h (hostname),
  15. %n (name) and %d, (domain).
  16. .. cmdoption:: -B, --beat
  17. Also run the `celery beat` periodic task scheduler. Please note that
  18. there must only be one instance of this service.
  19. .. cmdoption:: -Q, --queues
  20. List of queues to enable for this worker, separated by comma.
  21. By default all configured queues are enabled.
  22. Example: `-Q video,image`
  23. .. cmdoption:: -X, --exclude-queues
  24. List of queues to disable for this worker, separated by comma.
  25. By default all configured queues are enabled.
  26. Example: `-X video,image`.
  27. .. cmdoption:: -I, --include
  28. Comma separated list of additional modules to import.
  29. Example: -I foo.tasks,bar.tasks
  30. .. cmdoption:: -s, --schedule
  31. Path to the schedule database if running with the `-B` option.
  32. Defaults to `celerybeat-schedule`. The extension ".db" may be
  33. appended to the filename.
  34. .. cmdoption:: -O
  35. Apply optimization profile. Supported: default, fair
  36. .. cmdoption:: --prefetch-multiplier
  37. Set custom prefetch multiplier value for this worker instance.
  38. .. cmdoption:: --scheduler
  39. Scheduler class to use. Default is celery.beat.PersistentScheduler
  40. .. cmdoption:: -S, --statedb
  41. Path to the state database. The extension '.db' may
  42. be appended to the filename. Default: {default}
  43. .. cmdoption:: -E, --events
  44. Send task-related events that can be captured by monitors like
  45. :program:`celery events`, `celerymon`, and others.
  46. .. cmdoption:: --without-gossip
  47. Do not subscribe to other workers events.
  48. .. cmdoption:: --without-mingle
  49. Do not synchronize with other workers at startup.
  50. .. cmdoption:: --without-heartbeat
  51. Do not send event heartbeats.
  52. .. cmdoption:: --heartbeat-interval
  53. Interval in seconds at which to send worker heartbeat
  54. .. cmdoption:: --purge
  55. Purges all waiting tasks before the daemon is started.
  56. **WARNING**: This is unrecoverable, and the tasks will be
  57. deleted from the messaging server.
  58. .. cmdoption:: --time-limit
  59. Enables a hard time limit (in seconds int/float) for tasks.
  60. .. cmdoption:: --soft-time-limit
  61. Enables a soft time limit (in seconds int/float) for tasks.
  62. .. cmdoption:: --maxtasksperchild
  63. Maximum number of tasks a pool worker can execute before it's
  64. terminated and replaced by a new worker.
  65. .. cmdoption:: --maxmemperchild
  66. Maximum amount of resident memory, in KiB, that may be consumed by a
  67. child process before it will be replaced by a new one. If a single
  68. task causes a child process to exceed this limit, the task will be
  69. completed and the child process will be replaced afterwards.
  70. Default: no limit.
  71. .. cmdoption:: --autoscale
  72. Enable autoscaling by providing
  73. max_concurrency, min_concurrency. Example::
  74. --autoscale=10,3
  75. (always keep 3 processes, but grow to 10 if necessary)
  76. .. cmdoption:: --autoreload
  77. Enable autoreloading.
  78. .. cmdoption:: --no-execv
  79. Don't do execv after multiprocessing child fork.
  80. .. cmdoption:: --detach
  81. Start worker as a background process.
  82. .. cmdoption:: -f, --logfile
  83. Path to log file. If no logfile is specified, `stderr` is used.
  84. .. cmdoption:: -l, --loglevel
  85. Logging level, choose between `DEBUG`, `INFO`, `WARNING`,
  86. `ERROR`, `CRITICAL`, or `FATAL`.
  87. .. cmdoption:: --pidfile
  88. Optional file used to store the process pid.
  89. The program will not start if this file already exists
  90. and the pid is still alive.
  91. .. cmdoption:: --uid
  92. User id, or user name of the user to run as after detaching.
  93. .. cmdoption:: --gid
  94. Group id, or group name of the main group to change to after
  95. detaching.
  96. .. cmdoption:: --umask
  97. Effective umask (in octal) of the process after detaching. Inherits
  98. the umask of the parent process by default.
  99. .. cmdoption:: --workdir
  100. Optional directory to change to after detaching.
  101. .. cmdoption:: --executable
  102. Executable to use for the detached process.
  103. """
  104. from __future__ import absolute_import, unicode_literals
  105. import sys
  106. from optparse import OptionGroup
  107. from celery import concurrency
  108. from celery.bin.base import Command, daemon_options
  109. from celery.bin.celeryd_detach import detached_celeryd
  110. from celery.five import string_t
  111. from celery.platforms import maybe_drop_privileges
  112. from celery.utils import default_nodename
  113. from celery.utils.log import LOG_LEVELS, mlevel
  114. __all__ = ['worker', 'main']
  115. __MODULE_DOC__ = __doc__
  116. class worker(Command):
  117. """Start worker instance.
  118. Examples::
  119. celery worker --app=proj -l info
  120. celery worker -A proj -l info -Q hipri,lopri
  121. celery worker -A proj --concurrency=4
  122. celery worker -A proj --concurrency=1000 -P eventlet
  123. celery worker --autoscale=10,0
  124. """
  125. doc = __MODULE_DOC__ # parse help from this too
  126. namespace = 'worker'
  127. enable_config_from_cmdline = True
  128. supports_args = False
  129. def run_from_argv(self, prog_name, argv=None, command=None):
  130. command = sys.argv[0] if command is None else command
  131. argv = sys.argv[1:] if argv is None else argv
  132. # parse options before detaching so errors can be handled.
  133. options, args = self.prepare_args(
  134. *self.parse_options(prog_name, argv, command))
  135. self.maybe_detach([command] + argv)
  136. return self(*args, **options)
  137. def maybe_detach(self, argv, dopts=['-D', '--detach']):
  138. if any(arg in argv for arg in dopts):
  139. argv = [v for v in argv if v not in dopts]
  140. # will never return
  141. detached_celeryd(self.app).execute_from_commandline(argv)
  142. raise SystemExit(0)
  143. def run(self, hostname=None, pool_cls=None, app=None, uid=None, gid=None,
  144. loglevel=None, logfile=None, pidfile=None, state_db=None,
  145. **kwargs):
  146. maybe_drop_privileges(uid=uid, gid=gid)
  147. # Pools like eventlet/gevent needs to patch libs as early
  148. # as possible.
  149. pool_cls = (concurrency.get_implementation(pool_cls) or
  150. self.app.conf.worker_pool)
  151. if self.app.IS_WINDOWS and kwargs.get('beat'):
  152. self.die('-B option does not work on Windows. '
  153. 'Please run celery beat as a separate service.')
  154. hostname = self.host_format(default_nodename(hostname))
  155. if loglevel:
  156. try:
  157. loglevel = mlevel(loglevel)
  158. except KeyError: # pragma: no cover
  159. self.die('Unknown level {0!r}. Please use one of {1}.'.format(
  160. loglevel, '|'.join(
  161. l for l in LOG_LEVELS if isinstance(l, string_t))))
  162. worker = self.app.Worker(
  163. hostname=hostname, pool_cls=pool_cls, loglevel=loglevel,
  164. logfile=logfile, # node format handled by celery.app.log.setup
  165. pidfile=self.node_format(pidfile, hostname),
  166. state_db=self.node_format(state_db, hostname), **kwargs
  167. )
  168. worker.start()
  169. return worker.exitcode
  170. def with_pool_option(self, argv):
  171. # this command support custom pools
  172. # that may have to be loaded as early as possible.
  173. return (['-P'], ['--pool'])
  174. def prepare_arguments(self, parser):
  175. conf = self.app.conf
  176. wopts = OptionGroup(parser, 'Worker Options')
  177. wopts.add_option('-n', '--hostname')
  178. wopts.add_option('-D', '--detach', action='store_true')
  179. wopts.add_option(
  180. '-S', '--statedb',
  181. default=conf.worker_state_db, dest='state_db',
  182. )
  183. wopts.add_option('-l', '--loglevel', default='WARN')
  184. wopts.add_option('-O', dest='optimization')
  185. wopts.add_option(
  186. '--prefetch-multiplier',
  187. dest='prefetch_multiplier', type='int',
  188. default=conf.worker_prefetch_multiplier,
  189. )
  190. parser.add_option_group(wopts)
  191. topts = OptionGroup(parser, 'Pool Options')
  192. topts.add_option(
  193. '-c', '--concurrency',
  194. default=conf.worker_concurrency, type='int',
  195. )
  196. topts.add_option(
  197. '-P', '--pool',
  198. default=conf.worker_pool, dest='pool_cls',
  199. )
  200. topts.add_option(
  201. '-E', '--events',
  202. default=conf.worker_send_task_events,
  203. action='store_true', dest='send_events',
  204. )
  205. topts.add_option(
  206. '--time-limit',
  207. type='float', dest='task_time_limit',
  208. default=conf.task_time_limit,
  209. )
  210. topts.add_option(
  211. '--soft-time-limit',
  212. dest='task_soft_time_limit', type='float',
  213. default=conf.task_soft_time_limit,
  214. )
  215. topts.add_option(
  216. '--maxtasksperchild',
  217. dest='max_tasks_per_child', type='int',
  218. default=conf.worker_max_tasks_per_child,
  219. )
  220. topts.add_option(
  221. '--maxmemperchild',
  222. dest='max_memory_per_child', type='int',
  223. default=conf.worker_max_memory_per_child,
  224. )
  225. parser.add_option_group(topts)
  226. qopts = OptionGroup(parser, 'Queue Options')
  227. qopts.add_option(
  228. '--purge', '--discard',
  229. default=False, action='store_true',
  230. )
  231. qopts.add_option('--queues', '-Q', default=[])
  232. qopts.add_option('--exclude-queues', '-X', default=[])
  233. qopts.add_option('--include', '-I', default=[])
  234. parser.add_option_group(qopts)
  235. fopts = OptionGroup(parser, 'Features')
  236. fopts.add_option('--autoscale')
  237. fopts.add_option('--autoreload', action='store_true')
  238. fopts.add_option(
  239. '--without-gossip', action='store_true', default=False,
  240. )
  241. fopts.add_option(
  242. '--without-mingle', action='store_true', default=False,
  243. )
  244. fopts.add_option(
  245. '--without-heartbeat', action='store_true', default=False,
  246. )
  247. fopts.add_option('--heartbeat-interval', type='int')
  248. parser.add_option_group(fopts)
  249. daemon_options(parser)
  250. bopts = OptionGroup(parser, 'Embedded Beat Options')
  251. bopts.add_option('-B', '--beat', action='store_true')
  252. bopts.add_option(
  253. '-s', '--schedule', dest='schedule_filename',
  254. default=conf.beat_schedule_filename,
  255. )
  256. bopts.add_option('--scheduler', dest='scheduler_cls')
  257. parser.add_option_group(bopts)
  258. user_options = self.app.user_options['worker']
  259. if user_options:
  260. uopts = OptionGroup(parser, 'User Options')
  261. uopts.options_list.extend(user_options)
  262. parser.add_option_group(uopts)
  263. def main(app=None):
  264. # Fix for setuptools generated scripts, so that it will
  265. # work with multiprocessing fork emulation.
  266. # (see multiprocessing.forking.get_preparation_data())
  267. if __name__ != '__main__': # pragma: no cover
  268. sys.modules['__main__'] = sys.modules[__name__]
  269. from billiard import freeze_support
  270. freeze_support()
  271. worker(app=app).execute_from_commandline()
  272. if __name__ == '__main__': # pragma: no cover
  273. main()