celeryd.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. # -*- coding: utf-8 -*-
  2. """celeryd
  3. .. program:: celeryd
  4. .. cmdoption:: -c, --concurrency
  5. Number of child processes processing the queue. The default
  6. is the number of CPUs available on your system.
  7. .. cmdoption:: -f, --logfile
  8. Path to log file. If no logfile is specified, `stderr` is used.
  9. .. cmdoption:: -l, --loglevel
  10. Logging level, choose between `DEBUG`, `INFO`, `WARNING`,
  11. `ERROR`, `CRITICAL`, or `FATAL`.
  12. .. cmdoption:: -n, --hostname
  13. Set custom hostname.
  14. .. cmdoption:: -B, --beat
  15. Also run the `celerybeat` periodic task scheduler. Please note that
  16. there must only be one instance of this service.
  17. .. cmdoption:: -Q, --queues
  18. List of queues to enable for this worker, separated by comma.
  19. By default all configured queues are enabled.
  20. Example: `-Q video,image`
  21. .. cmdoption:: -I, --include
  22. Comma separated list of additional modules to import.
  23. Example: -I foo.tasks,bar.tasks
  24. .. cmdoption:: -s, --schedule
  25. Path to the schedule database if running with the `-B` option.
  26. Defaults to `celerybeat-schedule`. The extension ".db" will be
  27. appended to the filename.
  28. .. cmdoption:: --scheduler
  29. Scheduler class to use. Default is celery.beat.PersistentScheduler
  30. .. cmdoption:: -E, --events
  31. Send events that can be captured by monitors like `celerymon`.
  32. .. cmdoption:: --purge, --discard
  33. Discard all waiting tasks before the daemon is started.
  34. **WARNING**: This is unrecoverable, and the tasks will be
  35. deleted from the messaging server.
  36. .. cmdoption:: --time-limit
  37. Enables a hard time limit (in seconds) for tasks.
  38. .. cmdoption:: --soft-time-limit
  39. Enables a soft time limit (in seconds) for tasks.
  40. .. cmdoption:: --maxtasksperchild
  41. Maximum number of tasks a pool worker can execute before it's
  42. terminated and replaced by a new worker.
  43. """
  44. from __future__ import absolute_import
  45. if __name__ == "__main__" and __package__ is None:
  46. __package__ = "celery.bin.celeryd"
  47. import sys
  48. try:
  49. from celery.concurrency.processes.forking import freeze_support
  50. except ImportError: # pragma: no cover
  51. freeze_support = lambda: True # noqa
  52. from .base import Command, Option
  53. class WorkerCommand(Command):
  54. namespace = "celeryd"
  55. enable_config_from_cmdline = True
  56. supports_args = False
  57. def run(self, *args, **kwargs):
  58. kwargs.pop("app", None)
  59. # Pools like eventlet/gevent needs to patch libs as early
  60. # as possible.
  61. from celery import concurrency
  62. kwargs["pool_cls"] = concurrency.get_implementation(
  63. kwargs.get("pool_cls") or self.app.conf.CELERYD_POOL)
  64. return self.app.Worker(**kwargs).run()
  65. def get_options(self):
  66. conf = self.app.conf
  67. return (
  68. Option('-c', '--concurrency',
  69. default=conf.CELERYD_CONCURRENCY,
  70. action="store", dest="concurrency", type="int",
  71. help="Number of worker threads/processes"),
  72. Option('-P', '--pool',
  73. default=conf.CELERYD_POOL,
  74. action="store", dest="pool_cls", type="str",
  75. help="Pool implementation: "
  76. "processes (default), eventlet, gevent, "
  77. "solo or threads."),
  78. Option('--purge', '--discard', default=False,
  79. action="store_true", dest="discard",
  80. help="Discard all waiting tasks before the server is"
  81. "started. WARNING: There is no undo operation "
  82. "and the tasks will be deleted."),
  83. Option('-f', '--logfile', default=conf.CELERYD_LOG_FILE,
  84. action="store", dest="logfile",
  85. help="Path to log file."),
  86. Option('-l', '--loglevel', default=conf.CELERYD_LOG_LEVEL,
  87. action="store", dest="loglevel",
  88. help="Choose between DEBUG/INFO/WARNING/ERROR/CRITICAL"),
  89. Option('-n', '--hostname', default=None,
  90. action="store", dest="hostname",
  91. help="Set custom host name. E.g. 'foo.example.com'."),
  92. Option('-B', '--beat', default=False,
  93. action="store_true", dest="embed_clockservice",
  94. help="Also run the celerybeat periodic task scheduler. "
  95. "NOTE: Only one instance of celerybeat must be"
  96. "running at any one time."),
  97. Option('-s', '--schedule',
  98. default=conf.CELERYBEAT_SCHEDULE_FILENAME,
  99. action="store", dest="schedule_filename",
  100. help="Path to the schedule database if running with the -B "
  101. "option. The extension '.db' will be appended to the "
  102. "filename. Default: %s" % (
  103. conf.CELERYBEAT_SCHEDULE_FILENAME, )),
  104. Option('--scheduler',
  105. default=None,
  106. action="store", dest="scheduler_cls",
  107. help="Scheduler class. Default is "
  108. "celery.beat:PersistentScheduler"),
  109. Option('-S', '--statedb', default=conf.CELERYD_STATE_DB,
  110. action="store", dest="state_db",
  111. help="Path to the state database. The extension '.db' will "
  112. "be appended to the filename. Default: %s" % (
  113. conf.CELERYD_STATE_DB, )),
  114. Option('-E', '--events', default=conf.CELERY_SEND_EVENTS,
  115. action="store_true", dest="send_events",
  116. help="Send events so the worker can be monitored by "
  117. "celeryev, celerymon and other monitors.."),
  118. Option('--time-limit',
  119. default=conf.CELERYD_TASK_TIME_LIMIT,
  120. action="store", type="int", dest="task_time_limit",
  121. help="Enables a hard time limit (in seconds) for tasks."),
  122. Option('--soft-time-limit',
  123. default=conf.CELERYD_TASK_SOFT_TIME_LIMIT,
  124. action="store", type="int", dest="task_soft_time_limit",
  125. help="Enables a soft time limit (in seconds) for tasks."),
  126. Option('--maxtasksperchild',
  127. default=conf.CELERYD_MAX_TASKS_PER_CHILD,
  128. action="store", type="int", dest="max_tasks_per_child",
  129. help="Maximum number of tasks a pool worker can execute"
  130. "before it's terminated and replaced by a new worker."),
  131. Option('--queues', '-Q', default=[],
  132. action="store", dest="queues",
  133. help="Comma separated list of queues to consume from. "
  134. "By default all configured queues are used. "
  135. "Example: -Q video,image"),
  136. Option('--include', '-I', default=[],
  137. action="store", dest="include",
  138. help="Comma separated list of additional modules to import. "
  139. "Example: -I foo.tasks,bar.tasks"),
  140. Option('--pidfile', dest="pidfile", default=None,
  141. help="Optional file used to store the workers pid. "
  142. "The worker will not start if this file already exists "
  143. "and the pid is still alive."),
  144. Option('--autoscale', dest="autoscale", default=None,
  145. help="Enable autoscaling by providing "
  146. "max_concurrency,min_concurrency. Example: "
  147. "--autoscale=10,3 (always keep 3 processes, "
  148. "but grow to 10 if necessary)."),
  149. Option('--autoreload', dest="autoreload",
  150. action="store_true", default=False,
  151. help="Enable autoreloading."),
  152. )
  153. def main():
  154. # Fix for setuptools generated scripts, so that it will
  155. # work with multiprocessing fork emulation.
  156. # (see multiprocessing.forking.get_preparation_data())
  157. if __name__ != "__main__":
  158. sys.modules["__main__"] = sys.modules[__name__]
  159. freeze_support()
  160. worker = WorkerCommand()
  161. worker.execute_from_commandline()
  162. if __name__ == "__main__": # pragma: no cover
  163. main()