celeryd.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. # -*- coding: utf-8 -*-
  2. """
  3. The :program:`celery worker` command (previously known as ``celeryd``)
  4. .. program:: celery worker
  5. .. seealso::
  6. See :ref:`preload-options`.
  7. .. cmdoption:: -c, --concurrency
  8. Number of child processes processing the queue. The default
  9. is the number of CPUs available on your system.
  10. .. cmdoption:: -P, --pool
  11. Pool implementation:
  12. processes (default), eventlet, gevent, solo or threads.
  13. .. cmdoption:: -f, --logfile
  14. Path to log file. If no logfile is specified, `stderr` is used.
  15. .. cmdoption:: -l, --loglevel
  16. Logging level, choose between `DEBUG`, `INFO`, `WARNING`,
  17. `ERROR`, `CRITICAL`, or `FATAL`.
  18. .. cmdoption:: -n, --hostname
  19. Set custom hostname, e.g. 'foo.example.com'.
  20. .. cmdoption:: -B, --beat
  21. Also run the `celerybeat` periodic task scheduler. Please note that
  22. there must only be one instance of this service.
  23. .. cmdoption:: -Q, --queues
  24. List of queues to enable for this worker, separated by comma.
  25. By default all configured queues are enabled.
  26. Example: `-Q video,image`
  27. .. cmdoption:: -I, --include
  28. Comma separated list of additional modules to import.
  29. Example: -I foo.tasks,bar.tasks
  30. .. cmdoption:: -s, --schedule
  31. Path to the schedule database if running with the `-B` option.
  32. Defaults to `celerybeat-schedule`. The extension ".db" may be
  33. appended to the filename.
  34. .. cmdoption:: --scheduler
  35. Scheduler class to use. Default is celery.beat.PersistentScheduler
  36. .. cmdoption:: -S, --statedb
  37. Path to the state database. The extension '.db' may
  38. be appended to the filename. Default: %(default)s
  39. .. cmdoption:: -E, --events
  40. Send events that can be captured by monitors like :program:`celeryev`,
  41. `celerymon`, and others.
  42. .. cmdoption:: --purge
  43. Purges all waiting tasks before the daemon is started.
  44. **WARNING**: This is unrecoverable, and the tasks will be
  45. deleted from the messaging server.
  46. .. cmdoption:: --time-limit
  47. Enables a hard time limit (in seconds int/float) for tasks.
  48. .. cmdoption:: --soft-time-limit
  49. Enables a soft time limit (in seconds int/float) for tasks.
  50. .. cmdoption:: --maxtasksperchild
  51. Maximum number of tasks a pool worker can execute before it's
  52. terminated and replaced by a new worker.
  53. .. cmdoption:: --pidfile
  54. Optional file used to store the workers pid.
  55. The worker will not start if this file already exists
  56. and the pid is still alive.
  57. .. cmdoption:: --autoscale
  58. Enable autoscaling by providing
  59. max_concurrency, min_concurrency. Example::
  60. --autoscale=10,3
  61. (always keep 3 processes, but grow to 10 if necessary)
  62. .. cmdoption:: --autoreload
  63. Enable autoreloading.
  64. .. cmdoption:: --no-execv
  65. Don't do execv after multiprocessing child fork.
  66. """
  67. from __future__ import absolute_import
  68. import sys
  69. from celery import concurrency
  70. from celery.bin.base import Command, Option
  71. from celery.utils.log import LOG_LEVELS, mlevel
  72. class WorkerCommand(Command):
  73. doc = __doc__ # parse help from this.
  74. namespace = 'celeryd'
  75. enable_config_from_cmdline = True
  76. supports_args = False
  77. def execute_from_commandline(self, argv=None):
  78. if argv is None:
  79. argv = list(sys.argv)
  80. return super(WorkerCommand, self).execute_from_commandline(argv)
  81. def run(self, *args, **kwargs):
  82. kwargs.pop('app', None)
  83. # Pools like eventlet/gevent needs to patch libs as early
  84. # as possible.
  85. kwargs['pool_cls'] = concurrency.get_implementation(
  86. kwargs.get('pool_cls') or self.app.conf.CELERYD_POOL)
  87. if self.app.IS_WINDOWS and kwargs.get('beat'):
  88. self.die('-B option does not work on Windows. '
  89. 'Please run celerybeat as a separate service.')
  90. loglevel = kwargs.get('loglevel')
  91. if loglevel:
  92. try:
  93. kwargs['loglevel'] = mlevel(loglevel)
  94. except KeyError: # pragma: no cover
  95. self.die('Unknown level %r. Please use one of %s.' % (
  96. loglevel, '|'.join(l for l in LOG_LEVELS
  97. if isinstance(l, basestring))))
  98. return self.app.Worker(**kwargs).run()
  99. def with_pool_option(self, argv):
  100. # this command support custom pools
  101. # that may have to be loaded as early as possible.
  102. return (['-P'], ['--pool'])
  103. def get_options(self):
  104. conf = self.app.conf
  105. return (
  106. Option('-c', '--concurrency',
  107. default=conf.CELERYD_CONCURRENCY, type='int'),
  108. Option('-P', '--pool', default=conf.CELERYD_POOL, dest='pool_cls'),
  109. Option('--purge', '--discard', default=False, action='store_true'),
  110. Option('-f', '--logfile', default=conf.CELERYD_LOG_FILE),
  111. Option('-l', '--loglevel', default=conf.CELERYD_LOG_LEVEL),
  112. Option('-n', '--hostname'),
  113. Option('-B', '--beat', action='store_true'),
  114. Option('-s', '--schedule', dest='schedule_filename',
  115. default=conf.CELERYBEAT_SCHEDULE_FILENAME),
  116. Option('--scheduler', dest='scheduler_cls'),
  117. Option('-S', '--statedb',
  118. default=conf.CELERYD_STATE_DB, dest='state_db'),
  119. Option('-E', '--events', default=conf.CELERY_SEND_EVENTS,
  120. action='store_true', dest='send_events'),
  121. Option('--time-limit', type='float', dest='task_time_limit',
  122. default=conf.CELERYD_TASK_TIME_LIMIT),
  123. Option('--soft-time-limit', dest='task_soft_time_limit',
  124. default=conf.CELERYD_TASK_SOFT_TIME_LIMIT, type='float'),
  125. Option('--maxtasksperchild', dest='max_tasks_per_child',
  126. default=conf.CELERYD_MAX_TASKS_PER_CHILD, type='int'),
  127. Option('--queues', '-Q', default=[]),
  128. Option('--include', '-I', default=[]),
  129. Option('--pidfile'),
  130. Option('--autoscale'),
  131. Option('--autoreload', action='store_true'),
  132. Option('--no-execv', action='store_true', default=False),
  133. )
  134. def main():
  135. # Fix for setuptools generated scripts, so that it will
  136. # work with multiprocessing fork emulation.
  137. # (see multiprocessing.forking.get_preparation_data())
  138. if __name__ != '__main__': # pragma: no cover
  139. sys.modules['__main__'] = sys.modules[__name__]
  140. from billiard import freeze_support
  141. freeze_support()
  142. worker = WorkerCommand()
  143. worker.execute_from_commandline()
  144. if __name__ == '__main__': # pragma: no cover
  145. main()