123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270 |
- # -*- coding: utf-8 -*-
- """
- The :program:`celery worker` command (previously known as ``celeryd``)
- .. program:: celery worker
- .. seealso::
- See :ref:`preload-options`.
- .. cmdoption:: -c, --concurrency
- Number of child processes processing the queue. The default
- is the number of CPUs available on your system.
- .. cmdoption:: -P, --pool
- Pool implementation:
- prefork (default), eventlet, gevent, solo or threads.
- .. cmdoption:: -f, --logfile
- Path to log file. If no logfile is specified, `stderr` is used.
- .. cmdoption:: -l, --loglevel
- Logging level, choose between `DEBUG`, `INFO`, `WARNING`,
- `ERROR`, `CRITICAL`, or `FATAL`.
- .. cmdoption:: -n, --hostname
- Set custom hostname, e.g. 'w1.%h'. Expands: %h (hostname),
- %n (name) and %d, (domain).
- .. cmdoption:: -B, --beat
- Also run the `celery beat` periodic task scheduler. Please note that
- there must only be one instance of this service.
- .. cmdoption:: -Q, --queues
- List of queues to enable for this worker, separated by comma.
- By default all configured queues are enabled.
- Example: `-Q video,image`
- .. cmdoption:: -I, --include
- Comma separated list of additional modules to import.
- Example: -I foo.tasks,bar.tasks
- .. cmdoption:: -s, --schedule
- Path to the schedule database if running with the `-B` option.
- Defaults to `celerybeat-schedule`. The extension ".db" may be
- appended to the filename.
- .. cmdoption:: -O
- Apply optimization profile. Supported: default, fair
- .. cmdoption:: --scheduler
- Scheduler class to use. Default is celery.beat.PersistentScheduler
- .. cmdoption:: -S, --statedb
- Path to the state database. The extension '.db' may
- be appended to the filename. Default: {default}
- .. cmdoption:: -E, --events
- Send events that can be captured by monitors like :program:`celery events`,
- `celerymon`, and others.
- .. cmdoption:: --without-gossip
- Do not subscribe to other workers events.
- .. cmdoption:: --without-mingle
- Do not synchronize with other workers at startup.
- .. cmdoption:: --without-heartbeat
- Do not send event heartbeats.
- .. cmdoption:: --heartbeat-interval
- Interval in seconds at which to send worker heartbeat
- .. cmdoption:: --purge
- Purges all waiting tasks before the daemon is started.
- **WARNING**: This is unrecoverable, and the tasks will be
- deleted from the messaging server.
- .. cmdoption:: --time-limit
- Enables a hard time limit (in seconds int/float) for tasks.
- .. cmdoption:: --soft-time-limit
- Enables a soft time limit (in seconds int/float) for tasks.
- .. cmdoption:: --maxtasksperchild
- Maximum number of tasks a pool worker can execute before it's
- terminated and replaced by a new worker.
- .. cmdoption:: --pidfile
- Optional file used to store the workers pid.
- The worker will not start if this file already exists
- and the pid is still alive.
- .. cmdoption:: --autoscale
- Enable autoscaling by providing
- max_concurrency, min_concurrency. Example::
- --autoscale=10,3
- (always keep 3 processes, but grow to 10 if necessary)
- .. cmdoption:: --autoreload
- Enable autoreloading.
- .. cmdoption:: --no-execv
- Don't do execv after multiprocessing child fork.
- """
- from __future__ import absolute_import, unicode_literals
- import sys
- from celery import concurrency
- from celery.bin.base import Command, Option, daemon_options
- from celery.bin.celeryd_detach import detached_celeryd
- from celery.five import string_t
- from celery.platforms import maybe_drop_privileges
- from celery.utils import default_nodename
- from celery.utils.log import LOG_LEVELS, mlevel
- __all__ = ['worker', 'main']
- __MODULE_DOC__ = __doc__
- class worker(Command):
- """Start worker instance.
- Examples::
- celery worker --app=proj -l info
- celery worker -A proj -l info -Q hipri,lopri
- celery worker -A proj --concurrency=4
- celery worker -A proj --concurrency=1000 -P eventlet
- celery worker --autoscale=10,0
- """
- doc = __MODULE_DOC__ # parse help from this too
- namespace = 'celeryd'
- enable_config_from_cmdline = True
- supports_args = False
- def run_from_argv(self, prog_name, argv=None, command=None):
- command = sys.argv[0] if command is None else command
- argv = sys.argv[1:] if argv is None else argv
- # parse options before detaching so errors can be handled.
- options, args = self.prepare_args(
- *self.parse_options(prog_name, argv, command))
- self.maybe_detach([command] + argv)
- return self(*args, **options)
- def maybe_detach(self, argv, dopts=['-D', '--detach']):
- if any(arg in argv for arg in dopts):
- argv = [v for v in argv if v not in dopts]
- # will never return
- detached_celeryd(self.app).execute_from_commandline(argv)
- raise SystemExit(0)
- def run(self, hostname=None, pool_cls=None, app=None, uid=None, gid=None,
- loglevel=None, logfile=None, pidfile=None, state_db=None,
- **kwargs):
- maybe_drop_privileges(uid=uid, gid=gid)
- # Pools like eventlet/gevent needs to patch libs as early
- # as possible.
- pool_cls = (concurrency.get_implementation(pool_cls) or
- self.app.conf.CELERYD_POOL)
- if self.app.IS_WINDOWS and kwargs.get('beat'):
- self.die('-B option does not work on Windows. '
- 'Please run celery beat as a separate service.')
- hostname = self.host_format(default_nodename(hostname))
- if loglevel:
- try:
- loglevel = mlevel(loglevel)
- except KeyError: # pragma: no cover
- self.die('Unknown level {0!r}. Please use one of {1}.'.format(
- loglevel, '|'.join(
- l for l in LOG_LEVELS if isinstance(l, string_t))))
- return self.app.Worker(
- hostname=hostname, pool_cls=pool_cls, loglevel=loglevel,
- logfile=logfile, # node format handled by celery.app.log.setup
- pidfile=self.node_format(pidfile, hostname),
- state_db=self.node_format(state_db, hostname), **kwargs
- ).start()
- def with_pool_option(self, argv):
- # this command support custom pools
- # that may have to be loaded as early as possible.
- return (['-P'], ['--pool'])
- def get_options(self):
- conf = self.app.conf
- return (
- Option('-c', '--concurrency',
- default=conf.CELERYD_CONCURRENCY, type='int'),
- Option('-P', '--pool', default=conf.CELERYD_POOL, dest='pool_cls'),
- Option('--purge', '--discard', default=False, action='store_true'),
- Option('-l', '--loglevel', default=conf.CELERYD_LOG_LEVEL),
- Option('-n', '--hostname'),
- Option('-B', '--beat', action='store_true'),
- Option('-s', '--schedule', dest='schedule_filename',
- default=conf.CELERYBEAT_SCHEDULE_FILENAME),
- Option('--scheduler', dest='scheduler_cls'),
- Option('-S', '--statedb',
- default=conf.CELERYD_STATE_DB, dest='state_db'),
- Option('-E', '--events', default=conf.CELERY_SEND_EVENTS,
- action='store_true', dest='send_events'),
- Option('--time-limit', type='float', dest='task_time_limit',
- default=conf.CELERYD_TASK_TIME_LIMIT),
- Option('--soft-time-limit', dest='task_soft_time_limit',
- default=conf.CELERYD_TASK_SOFT_TIME_LIMIT, type='float'),
- Option('--maxtasksperchild', dest='max_tasks_per_child',
- default=conf.CELERYD_MAX_TASKS_PER_CHILD, type='int'),
- Option('--queues', '-Q', default=[]),
- Option('--exclude-queues', '-X', default=[]),
- Option('--include', '-I', default=[]),
- Option('--autoscale'),
- Option('--autoreload', action='store_true'),
- Option('--no-execv', action='store_true', default=False),
- Option('--without-gossip', action='store_true', default=False),
- Option('--without-mingle', action='store_true', default=False),
- Option('--without-heartbeat', action='store_true', default=False),
- Option('--heartbeat-interval', type='int'),
- Option('-O', dest='optimization'),
- Option('-D', '--detach', action='store_true'),
- ) + daemon_options() + tuple(self.app.user_options['worker'])
- def main(app=None):
- # Fix for setuptools generated scripts, so that it will
- # work with multiprocessing fork emulation.
- # (see multiprocessing.forking.get_preparation_data())
- if __name__ != '__main__': # pragma: no cover
- sys.modules['__main__'] = sys.modules[__name__]
- from billiard import freeze_support
- freeze_support()
- worker(app=app).execute_from_commandline()
- if __name__ == '__main__': # pragma: no cover
- main()
|