celeryd.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. # -*- coding: utf-8 -*-
  2. """
  3. The :program:`celery worker` command (previously known as ``celeryd``)
  4. .. program:: celery worker
  5. .. seealso::
  6. See :ref:`preload-options`.
  7. .. cmdoption:: -c, --concurrency
  8. Number of child processes processing the queue. The default
  9. is the number of CPUs available on your system.
  10. .. cmdoption:: -P, --pool
  11. Pool implementation:
  12. processes (default), eventlet, gevent, solo or threads.
  13. .. cmdoption:: -f, --logfile
  14. Path to log file. If no logfile is specified, `stderr` is used.
  15. .. cmdoption:: -l, --loglevel
  16. Logging level, choose between `DEBUG`, `INFO`, `WARNING`,
  17. `ERROR`, `CRITICAL`, or `FATAL`.
  18. .. cmdoption:: -n, --hostname
  19. Set custom hostname, e.g. 'foo.example.com'.
  20. .. cmdoption:: -B, --beat
  21. Also run the `celerybeat` periodic task scheduler. Please note that
  22. there must only be one instance of this service.
  23. .. cmdoption:: -Q, --queues
  24. List of queues to enable for this worker, separated by comma.
  25. By default all configured queues are enabled.
  26. Example: `-Q video,image`
  27. .. cmdoption:: -I, --include
  28. Comma separated list of additional modules to import.
  29. Example: -I foo.tasks,bar.tasks
  30. .. cmdoption:: -s, --schedule
  31. Path to the schedule database if running with the `-B` option.
  32. Defaults to `celerybeat-schedule`. The extension ".db" may be
  33. appended to the filename.
  34. .. cmdoption:: --scheduler
  35. Scheduler class to use. Default is celery.beat.PersistentScheduler
  36. .. cmdoption:: -S, --statedb
  37. Path to the state database. The extension '.db' may
  38. be appended to the filename. Default: %(default)s
  39. .. cmdoption:: -E, --events
  40. Send events that can be captured by monitors like :program:`celeryev`,
  41. `celerymon`, and others.
  42. .. cmdoption:: --purge
  43. Purges all waiting tasks before the daemon is started.
  44. **WARNING**: This is unrecoverable, and the tasks will be
  45. deleted from the messaging server.
  46. .. cmdoption:: --time-limit
  47. Enables a hard time limit (in seconds int/float) for tasks.
  48. .. cmdoption:: --soft-time-limit
  49. Enables a soft time limit (in seconds int/float) for tasks.
  50. .. cmdoption:: --maxtasksperchild
  51. Maximum number of tasks a pool worker can execute before it's
  52. terminated and replaced by a new worker.
  53. .. cmdoption:: --pidfile
  54. Optional file used to store the workers pid.
  55. The worker will not start if this file already exists
  56. and the pid is still alive.
  57. .. cmdoption:: --autoscale
  58. Enable autoscaling by providing
  59. max_concurrency, min_concurrency. Example::
  60. --autoscale=10,3
  61. (always keep 3 processes, but grow to 10 if necessary)
  62. .. cmdoption:: --autoreload
  63. Enable autoreloading.
  64. .. cmdoption:: --no-execv
  65. Don't do execv after multiprocessing child fork.
  66. """
  67. from __future__ import absolute_import
  68. import sys
  69. from billiard import freeze_support
  70. from celery import concurrency
  71. from celery.bin.base import Command, Option
  72. from celery.utils.log import LOG_LEVELS, mlevel
  73. class WorkerCommand(Command):
  74. doc = __doc__ # parse help from this.
  75. namespace = 'celeryd'
  76. enable_config_from_cmdline = True
  77. supports_args = False
  78. def execute_from_commandline(self, argv=None):
  79. if argv is None:
  80. argv = list(sys.argv)
  81. return super(WorkerCommand, self).execute_from_commandline(argv)
  82. def run(self, *args, **kwargs):
  83. kwargs.pop('app', None)
  84. # Pools like eventlet/gevent needs to patch libs as early
  85. # as possible.
  86. kwargs['pool_cls'] = concurrency.get_implementation(
  87. kwargs.get('pool_cls') or self.app.conf.CELERYD_POOL)
  88. if self.app.IS_WINDOWS and kwargs.get('beat'):
  89. self.die('-B option does not work on Windows. '
  90. 'Please run celerybeat as a separate service.')
  91. loglevel = kwargs.get('loglevel')
  92. if loglevel:
  93. try:
  94. kwargs['loglevel'] = mlevel(loglevel)
  95. except KeyError: # pragma: no cover
  96. self.die('Unknown level %r. Please use one of %s.' % (
  97. loglevel, '|'.join(l for l in LOG_LEVELS.keys()
  98. if isinstance(l, basestring))))
  99. return self.app.Worker(**kwargs).run()
  100. def with_pool_option(self, argv):
  101. # this command support custom pools
  102. # that may have to be loaded as early as possible.
  103. return (['-P'], ['--pool'])
  104. def get_options(self):
  105. conf = self.app.conf
  106. return (
  107. Option('-c', '--concurrency',
  108. default=conf.CELERYD_CONCURRENCY, type='int'),
  109. Option('-P', '--pool', default=conf.CELERYD_POOL, dest='pool_cls'),
  110. Option('--purge', '--discard', default=False, action='store_true'),
  111. Option('-f', '--logfile', default=conf.CELERYD_LOG_FILE),
  112. Option('-l', '--loglevel', default=conf.CELERYD_LOG_LEVEL),
  113. Option('-n', '--hostname'),
  114. Option('-B', '--beat', action='store_true'),
  115. Option('-s', '--schedule', dest='schedule_filename',
  116. default=conf.CELERYBEAT_SCHEDULE_FILENAME),
  117. Option('--scheduler', dest='scheduler_cls'),
  118. Option('-S', '--statedb',
  119. default=conf.CELERYD_STATE_DB, dest='state_db'),
  120. Option('-E', '--events', default=conf.CELERY_SEND_EVENTS,
  121. action='store_true', dest='send_events'),
  122. Option('--time-limit', type='float', dest='task_time_limit',
  123. default=conf.CELERYD_TASK_TIME_LIMIT),
  124. Option('--soft-time-limit', dest='task_soft_time_limit',
  125. default=conf.CELERYD_TASK_SOFT_TIME_LIMIT, type='float'),
  126. Option('--maxtasksperchild', dest='max_tasks_per_child',
  127. default=conf.CELERYD_MAX_TASKS_PER_CHILD, type='int'),
  128. Option('--queues', '-Q', default=[]),
  129. Option('--include', '-I', default=[]),
  130. Option('--pidfile'),
  131. Option('--autoscale'),
  132. Option('--autoreload', action='store_true'),
  133. Option('--no-execv', action='store_true', default=False),
  134. )
  135. def main():
  136. # Fix for setuptools generated scripts, so that it will
  137. # work with multiprocessing fork emulation.
  138. # (see multiprocessing.forking.get_preparation_data())
  139. if __name__ != '__main__': # pragma: no cover
  140. sys.modules['__main__'] = sys.modules[__name__]
  141. freeze_support()
  142. worker = WorkerCommand()
  143. worker.execute_from_commandline()
  144. if __name__ == '__main__': # pragma: no cover
  145. main()