Explorar o código

celery.apps.worker is now a subclass of WorkController

Ask Solem %!s(int64=12) %!d(string=hai) anos
pai
achega
1d05b0d1e7
Modificáronse 4 ficheiros con 133 adicións e 156 borrados
  1. 19 95
      celery/apps/worker.py
  2. 1 1
      celery/bin/celeryd.py
  3. 19 34
      celery/tests/bin/test_celeryd.py
  4. 94 26
      celery/worker/__init__.py

+ 19 - 95
celery/apps/worker.py

@@ -14,31 +14,22 @@ from __future__ import absolute_import, print_function
 
 import logging
 import os
-import socket
 import sys
 import warnings
 
 from functools import partial
 
-from billiard import cpu_count, current_process
+from billiard import current_process
 
 from celery import VERSION_BANNER, platforms, signals
-from celery.app import app_or_default
-from celery.app.abstract import configurated, from_config
-from celery.exceptions import ImproperlyConfigured, SystemTerminate
+from celery.exceptions import SystemTerminate
 from celery.loaders.app import AppLoader
-from celery.utils import cry, isatty, worker_direct
+from celery.utils import cry, isatty
 from celery.utils.imports import qualname
-from celery.utils.log import get_logger, mlevel, set_in_sighandler
+from celery.utils.log import get_logger, set_in_sighandler
 from celery.utils.text import pluralize
 from celery.worker import WorkController
 
-try:
-    from greenlet import GreenletExit
-    IGNORE_ERRORS = (GreenletExit, )
-except ImportError:  # pragma: no cover
-    IGNORE_ERRORS = ()
-
 logger = get_logger(__name__)
 
 
@@ -84,69 +75,27 @@ EXTRA_INFO_FMT = """
 {tasks}
 """
 
-UNKNOWN_QUEUE = """\
-Trying to select queue subset of {0!r}, but queue {1} is not
-defined in the CELERY_QUEUES setting.
-
-If you want to automatically declare unknown queues you can
-enable the CELERY_CREATE_MISSING_QUEUES setting.
-"""
-
 
-class Worker(configurated):
-    WorkController = WorkController
-
-    app = None
-    inherit_confopts = (WorkController, )
-    loglevel = from_config('log_level')
-    redirect_stdouts = from_config()
-    redirect_stdouts_level = from_config()
-
-    def __init__(self, hostname=None, purge=False, beat=False,
-            queues=None, include=None, app=None, pidfile=None,
-            autoscale=None, autoreload=False, no_execv=False, **kwargs):
-        self.app = app = app_or_default(app or self.app)
-        self.hostname = hostname or socket.gethostname()
+class Worker(WorkController):
 
+    def on_before_init(self, purge=False, redirect_stdouts=None,
+            redirect_stdouts_level=None, **kwargs):
         # this signal can be used to set up configuration for
         # workers by name.
+        conf = self.app.conf
         signals.celeryd_init.send(sender=self.hostname, instance=self,
-                                  conf=self.app.conf)
-
-        self.setup_defaults(kwargs, namespace='celeryd')
-        if not self.concurrency:
-            try:
-                self.concurrency = cpu_count()
-            except NotImplementedError:
-                self.concurrency = 2
+                                  conf=conf)
         self.purge = purge
-        self.beat = beat
-        self.use_queues = [] if queues is None else queues
-        self.queues = None
-        self.include = include
-        self.pidfile = pidfile
-        self.autoscale = None
-        self.autoreload = autoreload
-        self.no_execv = no_execv
-        if autoscale:
-            max_c, _, min_c = autoscale.partition(',')
-            self.autoscale = [int(max_c), min_c and int(min_c) or 0]
         self._isatty = isatty(sys.stdout)
-
-        self.colored = app.log.colored(self.logfile)
-
-        if isinstance(self.use_queues, basestring):
-            self.use_queues = self.use_queues.split(',')
-        if self.include:
-            if isinstance(self.include, basestring):
-                self.include = self.include.split(',')
-            app.conf.CELERY_INCLUDE = (
-                tuple(app.conf.CELERY_INCLUDE) + tuple(self.include))
-        self.loglevel = mlevel(self.loglevel)
-
-    def run(self):
-        self.init_queues()
-        self.app.loader.init_worker()
+        self.colored = self.app.log.colored(self.logfile)
+        if redirect_stdouts is None:
+            redirect_stdouts = conf.CELERY_REDIRECT_STDOUTS,
+        if redirect_stdouts_level is None:
+            redirect_stdouts_level = conf.CELERY_REDIRECT_STDOUTS_LEVEL
+        self.redirect_stdouts = redirect_stdouts
+        self.redirect_stdouts_level = redirect_stdouts_level
+
+    def on_start(self):
         # this signal can be used to e.g. change queues after
         # the -Q option has been applied.
         signals.celeryd_after_setup.send(sender=self.hostname, instance=self,
@@ -164,26 +113,13 @@ class Worker(configurated):
         print(str(self.colored.cyan(' \n', self.startup_info())) +
               str(self.colored.reset(self.extra_info() or '')))
         self.set_process_status('-active-')
-
         self.redirect_stdouts_to_logger()
-        try:
-            self.run_worker()
-        except IGNORE_ERRORS:
-            pass
+        self.install_platform_tweaks(self)
 
     def on_consumer_ready(self, consumer):
         signals.worker_ready.send(sender=consumer)
         print('celery@{0.hostname} has started.'.format(self))
 
-    def init_queues(self):
-        try:
-            self.app.select_queues(self.use_queues)
-        except KeyError as exc:
-            raise ImproperlyConfigured(
-                    UNKNOWN_QUEUE.format(self.use_queues, exc))
-        if self.app.conf.CELERY_WORKER_DIRECT:
-            self.app.amqp.queues.select_add(worker_direct(self.hostname))
-
     def redirect_stdouts_to_logger(self):
         self.app.log.setup(self.loglevel, self.logfile,
                            self.redirect_stdouts, self.redirect_stdouts_level)
@@ -243,18 +179,6 @@ class Worker(configurated):
                 banner[i] = ' ' * 16 + banner[i]
         return '\n'.join(banner) + '\n'
 
-    def run_worker(self):
-        worker = self.WorkController(app=self.app,
-                    hostname=self.hostname,
-                    ready_callback=self.on_consumer_ready, beat=self.beat,
-                    autoscale=self.autoscale, autoreload=self.autoreload,
-                    no_execv=self.no_execv,
-                    pidfile=self.pidfile,
-                    **self.confopts_as_dict())
-        self.install_platform_tweaks(worker)
-        signals.worker_init.send(sender=worker)
-        worker.start()
-
     def install_platform_tweaks(self, worker):
         """Install platform specific tweaks and workarounds."""
         if self.app.IS_OSX:

+ 1 - 1
celery/bin/celeryd.py

@@ -155,7 +155,7 @@ class WorkerCommand(Command):
                       if isinstance(l, basestring))))
         return self.app.Worker(
             hostname=hostname, pool_cls=pool_cls, loglevel=loglevel, **kwargs
-        ).run()
+        ).start()
 
     def with_pool_option(self, argv):
         # this command support custom pools

+ 19 - 34
celery/tests/bin/test_celeryd.py

@@ -51,17 +51,10 @@ def disable_stdouts(fun):
     return disable
 
 
-class _WorkController(object):
-
-    def __init__(self, *args, **kwargs):
-        pass
-
-    def start(self):
-        pass
-
-
 class Worker(cd.Worker):
-    WorkController = _WorkController
+
+    def start(self, *args, **kwargs):
+        self.on_start()
 
 
 class test_Worker(AppCase):
@@ -73,15 +66,15 @@ class test_Worker(AppCase):
     @disable_stdouts
     def test_queues_string(self):
         celery = Celery(set_as_current=False)
-        worker = celery.Worker(queues='foo,bar,baz')
-        worker.init_queues()
-        self.assertEqual(worker.use_queues, ['foo', 'bar', 'baz'])
+        worker = celery.Worker()
+        worker.setup_queues('foo,bar,baz')
+        self.assertEqual(worker.queues, ['foo', 'bar', 'baz'])
         self.assertTrue('foo' in celery.amqp.queues)
 
     @disable_stdouts
     def test_cpu_count(self):
         celery = Celery(set_as_current=False)
-        with patch('celery.apps.worker.cpu_count') as cpu_count:
+        with patch('celery.worker.cpu_count') as cpu_count:
             cpu_count.side_effect = NotImplementedError()
             worker = celery.Worker(concurrency=None)
             self.assertEqual(worker.concurrency, 2)
@@ -146,14 +139,14 @@ class test_Worker(AppCase):
         try:
             w = self.Worker()
             w._isatty = False
-            w.run_worker()
+            w.on_start()
             for sig in 'SIGINT', 'SIGHUP', 'SIGTERM':
                 self.assertIn(sig, handlers)
 
             handlers.clear()
             w = self.Worker()
             w._isatty = True
-            w.run_worker()
+            w.on_start()
             for sig in 'SIGINT', 'SIGTERM':
                 self.assertIn(sig, handlers)
             self.assertNotIn('SIGHUP', handlers)
@@ -163,7 +156,7 @@ class test_Worker(AppCase):
     @disable_stdouts
     def test_startup_info(self):
         worker = self.Worker()
-        worker.run()
+        worker.on_start()
         self.assertTrue(worker.startup_info())
         worker.loglevel = logging.DEBUG
         self.assertTrue(worker.startup_info())
@@ -206,18 +199,10 @@ class test_Worker(AppCase):
 
     @disable_stdouts
     def test_run(self):
-        self.Worker().run()
-        self.Worker(purge=True).run()
+        self.Worker().on_start()
+        self.Worker(purge=True).on_start()
         worker = self.Worker()
-        worker.run()
-
-        prev, cd.IGNORE_ERRORS = cd.IGNORE_ERRORS, (KeyError, )
-        try:
-            worker.run_worker = Mock()
-            worker.run_worker.side_effect = KeyError()
-            worker.run()
-        finally:
-            cd.IGNORE_ERRORS = prev
+        worker.on_start()
 
     @disable_stdouts
     def test_purge_messages(self):
@@ -233,8 +218,8 @@ class test_Worker(AppCase):
                 'video': {'exchange': 'video',
                            'routing_key': 'video'}})
         try:
-            worker = self.Worker(queues=['video'])
-            worker.init_queues()
+            worker = self.Worker()
+            worker.setup_queues(['video'])
             self.assertIn('video', app.amqp.queues)
             self.assertIn('video', app.amqp.queues.consume_from)
             self.assertIn('celery', app.amqp.queues)
@@ -243,11 +228,11 @@ class test_Worker(AppCase):
             c.CELERY_CREATE_MISSING_QUEUES = False
             del(app.amqp.queues)
             with self.assertRaises(ImproperlyConfigured):
-                self.Worker(queues=['image']).init_queues()
+                self.Worker().setup_queues(['image'])
             del(app.amqp.queues)
             c.CELERY_CREATE_MISSING_QUEUES = True
-            worker = self.Worker(queues=['image'])
-            worker.init_queues()
+            worker = self.Worker()
+            worker.setup_queues(queues=['image'])
             self.assertIn('image', app.amqp.queues.consume_from)
             self.assertEqual(Queue('image', Exchange('image'),
                              routing_key='image'), app.amqp.queues['image'])
@@ -289,7 +274,7 @@ class test_Worker(AppCase):
             with self.assertWarnsRegex(RuntimeWarning,
                     r'superuser privileges is discouraged'):
                 worker = self.Worker()
-                worker.run()
+                worker.on_start()
         finally:
             os.getuid = prev
 

+ 94 - 26
celery/worker/__init__.py

@@ -12,7 +12,6 @@
 from __future__ import absolute_import
 
 import atexit
-import logging
 import socket
 import sys
 import time
@@ -21,20 +20,23 @@ import traceback
 from functools import partial
 from threading import Event
 
-from billiard import forking_enable
+from billiard import cpu_count, forking_enable
 from billiard.exceptions import WorkerLostError
 from kombu.syn import detect_environment
 from kombu.utils.finalize import Finalize
 
 from celery import concurrency as _concurrency
 from celery import platforms
+from celery import signals
 from celery.app import app_or_default, set_default_app
 from celery.app.abstract import configurated, from_config
-from celery.exceptions import SystemTerminate, TaskRevokedError
+from celery.exceptions import (
+    ImproperlyConfigured, SystemTerminate, TaskRevokedError,
+)
 from celery.task import trace
-from celery.utils.functional import noop
+from celery.utils import worker_direct
 from celery.utils.imports import qualname, reload_from_cwd
-from celery.utils.log import get_logger
+from celery.utils.log import get_logger, mlevel
 from celery.utils.timer2 import Schedule
 
 from . import bootsteps
@@ -42,10 +44,24 @@ from . import state
 from .buckets import TaskBucket, FastQueue
 from .hub import Hub, BoundedSemaphore
 
+try:
+    from greenlet import GreenletExit
+    IGNORE_ERRORS = (GreenletExit, )
+except ImportError:  # pragma: no cover
+    IGNORE_ERRORS = ()
+
 RUN = 0x1
 CLOSE = 0x2
 TERMINATE = 0x3
 
+UNKNOWN_QUEUE = """\
+Trying to select queue subset of {0!r}, but queue {1} is not
+defined in the CELERY_QUEUES setting.
+
+If you want to automatically declare unknown queues you can
+enable the CELERY_CREATE_MISSING_QUEUES setting.
+"""
+
 logger = get_logger(__name__)
 
 
@@ -85,6 +101,9 @@ class Pool(bootsteps.StartStopComponent):
     requires = ('queues', )
 
     def __init__(self, w, autoscale=None, no_execv=False, **kwargs):
+        if isinstance(autoscale, basestring):
+            max_c, _, min_c = autoscale.partition(',')
+            autoscale = [int(max_c), min_c and int(min_c) or 0]
         w.autoscale = autoscale
         w.pool = None
         w.max_concurrency = None
@@ -271,7 +290,7 @@ class WorkController(configurated):
 
     app = None
     concurrency = from_config()
-    loglevel = logging.ERROR
+    loglevel = from_config('log_level')
     logfile = from_config('log_file')
     send_events = from_config()
     pool_cls = from_config('pool')
@@ -295,9 +314,9 @@ class WorkController(configurated):
 
     _state = None
     _running = 0
+    pidlock = None
 
-    def __init__(self, loglevel=None, hostname=None, ready_callback=noop,
-            queues=None, app=None, pidfile=None, **kwargs):
+    def __init__(self, app=None, hostname=None, **kwargs):
         self.app = app_or_default(app or self.app)
         # all new threads start without a current app, so if an app is not
         # passed on to the thread it will fall back to the "default app",
@@ -307,36 +326,81 @@ class WorkController(configurated):
         # running in the same process.
         set_default_app(self.app)
         self.app.finalize()
-        trace._tasks = self.app._tasks
+        trace._tasks = self.app._tasks   # optimization
+        self.hostname = hostname or socket.gethostname()
+        self.on_before_init(**kwargs)
 
+        self._finalize = Finalize(self, self.stop, exitpriority=1)
         self._shutdown_complete = Event()
+        self.setup_instance(**self.prepare_args(**kwargs))
+
+    def on_before_init(self, **kwargs):
+        pass
+
+    def on_start(self):
+        pass
+
+    def on_consumer_ready(self, consumer):
+        pass
+
+    def setup_instance(self, queues=None, ready_callback=None,
+            pidfile=None, include=None, **kwargs):
+        self.pidfile = pidfile
+        self.app.loader.init_worker()
         self.setup_defaults(kwargs, namespace='celeryd')
-        self.app.select_queues(queues)  # select queues subset.
+        self.setup_queues(queues)
+        self.setup_includes(include)
+
+        # Set default concurrency
+        if not self.concurrency:
+            try:
+                self.concurrency = cpu_count()
+            except NotImplementedError:
+                self.concurrency = 2
 
         # Options
-        self.loglevel = loglevel or self.loglevel
-        self.hostname = hostname or socket.gethostname()
-        self.ready_callback = ready_callback
-        self._finalize = Finalize(self, self.stop, exitpriority=1)
-        self.pidfile = pidfile
-        self.pidlock = None
+        self.loglevel = mlevel(self.loglevel)
+        self.ready_callback = ready_callback or self.on_consumer_ready
         self.use_eventloop = self.should_use_eventloop()
 
-        # Update celery_include to have all known task modules, so that we
-        # ensure all task modules are imported in case an execv happens.
-        task_modules = set(task.__class__.__module__
-                            for task in self.app.tasks.itervalues())
-        self.app.conf.CELERY_INCLUDE = tuple(
-            set(self.app.conf.CELERY_INCLUDE) | task_modules,
-        )
+        signals.worker_init.send(sender=self)
 
         # Initialize boot steps
         self.pool_cls = _concurrency.get_implementation(self.pool_cls)
         self.components = []
         self.namespace = Namespace(app=self.app).apply(self, **kwargs)
 
+    def setup_queues(self, queues):
+        if isinstance(queues, basestring):
+            queues = queues.split(',')
+        self.queues = queues
+        try:
+            self.app.select_queues(queues)
+        except KeyError as exc:
+            raise ImproperlyConfigured(
+                    UNKNOWN_QUEUE.format(queues, exc))
+        if self.app.conf.CELERY_WORKER_DIRECT:
+            self.app.amqp.queues.select_add(worker_direct(self.hostname))
+
+    def setup_includes(self, includes):
+        # Update celery_include to have all known task modules, so that we
+        # ensure all task modules are imported in case an execv happens.
+        inc = self.app.conf.CELERY_INCLUDE
+        if includes:
+            if isinstance(includes, basestring):
+                includes = includes.split(',')
+            inc = self.app.conf.CELERY_INCLUDE = tuple(inc) + tuple(includes)
+        self.include = includes
+        task_modules = set(task.__class__.__module__
+                            for task in self.app.tasks.itervalues())
+        self.app.conf.CELERY_INCLUDE = tuple(set(inc) | task_modules)
+
+    def prepare_args(self, **kwargs):
+        return kwargs
+
     def start(self):
         """Starts the workers main loop."""
+        self.on_start()
         self._state = self.RUN
         if self.pidfile:
             self.pidlock = platforms.create_pidlock(self.pidfile)
@@ -356,9 +420,13 @@ class WorkController(configurated):
         except (KeyboardInterrupt, SystemExit):
             self.stop()
 
-        # Will only get here if running green,
-        # makes sure all greenthreads have exited.
-        self._shutdown_complete.wait()
+        try:
+            # Will only get here if running green,
+            # makes sure all greenthreads have exited.
+            self._shutdown_complete.wait()
+        except IGNORE_ERRORS:
+            pass
+    run = start   # XXX Compat
 
     def process_task_sem(self, req):
         return self.semaphore.acquire(self.process_task, req)