Prechádzať zdrojové kódy

Stress tests: Adds configuration templates (-t redis,acks_late,etc,etc)

Ask Solem 11 rokov pred
rodič
commit
edf5e2ae2f

+ 54 - 1
funtests/stress/README.rst

@@ -56,7 +56,7 @@ Ideas include:
 
 
 ::
 ::
 
 
-    $ celery -A stress worker -c1 --maxtasksperchild=1 -- celery.acks_late=1
+    $ celery -A stress worker -c1 --maxtasksperchild=1 -t acks_late
 
 
 
 
 8) Worker using eventlet pool.
 8) Worker using eventlet pool.
@@ -77,6 +77,59 @@ previous runs.
 Note that the stress client will probably hang if the test fails, so this
 Note that the stress client will probably hang if the test fails, so this
 test suite is currently not suited for automatic runs.
 test suite is currently not suited for automatic runs.
 
 
+Configuration Templates
+-----------------------
+
+You can select a configuration template using the `-t` command-line argument
+to any :program:`celery -A stress` command or the :program:`python -m stress`
+command when running the test suite itself.
+
+The templates available are:
+
+* default
+
+    Using amqp as a broker and rpc as a result backend,
+    and also using json for task and result messages.
+
+* redis
+
+    Using redis as a broker and result backend
+
+* acks_late
+
+    Enables late ack globally.
+
+* pickle
+
+    Using pickle as the serializer for tasks and results
+    (also allowing the worker to receive and process pickled messages)
+
+
+You can see the resulting configuration from any template by running
+the command::
+
+    $ celery -A stress report -t redis
+
+
+Example running the stress test using the ``redis`` configuration template::
+
+    $ python -m stress -t redis
+
+Example running the worker using the ``redis`` configuration template::
+
+    $ celery -A stress worker -t redis
+
+
+You can also mix several templates by listing them separated by commas::
+
+    $ celery -A stress worker -t redis,acks_late
+
+In this example (``redis,acks_late``) the ``redis`` template will be used
+as a configuration, and then additional keys from the ``acks_late`` template
+will be added on top as changes::
+
+    $ celery -A stress report -t redis,acks_late,pickle
+
 Running the client
 Running the client
 ------------------
 ------------------
 
 

+ 42 - 20
funtests/stress/stress/app.py

@@ -5,31 +5,53 @@ import os
 import sys
 import sys
 import signal
 import signal
 
 
-from kombu import Exchange, Queue
 from time import sleep
 from time import sleep
 
 
 from celery import Celery
 from celery import Celery
+from celery import signals
+from celery.bin.base import Option
 from celery.exceptions import SoftTimeLimitExceeded
 from celery.exceptions import SoftTimeLimitExceeded
 
 
-CSTRESS_QUEUE = os.environ.get('CSTRESS_QUEUE_NAME', 'c.stress')
-CSTRESS_BROKER = os.environ.get('CSTRESS_BROKER', 'amqp://')
-CSTRESS_BACKEND = os.environ.get('CSTRESS_BACKEND', 'redis://127.0.0.1')
-CSTRESS_PREFETCH = int(os.environ.get('CSTRESS_PREFETCH', 1))
-
-app = Celery(
-    'stress', broker=CSTRESS_BROKER, backend=CSTRESS_BACKEND,
-    set_as_current=False,
-)
-app.conf.update(
-    CELERY_ACCEPT_CONTENT=['pickle', 'json'],
-    CELERYD_PREFETCH_MULTIPLIER=CSTRESS_PREFETCH,
-    CELERY_DEFAULT_QUEUE=CSTRESS_QUEUE,
-    CELERY_QUEUES=(
-        Queue(CSTRESS_QUEUE,
-              exchange=Exchange(CSTRESS_QUEUE),
-              routing_key=CSTRESS_QUEUE),
-    ),
-)
+from .templates import use_template, template_names
+
+
+class App(Celery):
+    template_selected = False
+
+    def __init__(self, *args, **kwargs):
+        self.template = kwargs.pop('template', None)
+        super(App, self).__init__(*args, **kwargs)
+        self.user_options['preload'].add(
+            Option(
+                '-t', '--template', default='default',
+                help='Configuration template to use: {0}'.format(
+                    template_names(),
+                ),
+            )
+        )
+        signals.user_preload_options.connect(self.on_preload_parsed)
+        self.after_configure = None
+
+    def on_preload_parsed(self, options=None, **kwargs):
+        self.use_template(options['template'])
+
+    def use_template(self, name='default'):
+        if self.template_selected:
+            raise RuntimeError('App already configured')
+        use_template(self, name)
+        self.template_selected = True
+
+    def _get_config(self):
+        ret = super(App, self)._get_config()
+        if self.after_configure:
+            self.after_configure(ret)
+        return ret
+
+    def on_configure(self):
+        if not self.template_selected:
+            self.use_template('default')
+
+app = App('stress', set_as_current=False)
 
 
 
 
 @app.task
 @app.task

+ 77 - 0
funtests/stress/stress/templates.py

@@ -0,0 +1,77 @@
+from __future__ import absolute_import
+
+import os
+
+from functools import partial
+
+from celery.five import items
+from kombu import Exchange, Queue
+from kombu.utils import symbol_by_name
+
+CSTRESS_QUEUE = os.environ.get('CSTRESS_QUEUE_NAME', 'c.stress')
+
+templates = {}
+
+
+def template(name=None):
+
+    def _register(cls):
+        templates[name or cls.__name__] = '.'.join([__name__, cls.__name__])
+        return cls
+    return _register
+
+
+def use_template(app, template='default'):
+    template = template.split(',')
+    app.after_configure = partial(mixin_templates, template[1:])
+    app.config_from_object(templates[template[0]])
+
+
+def mixin_templates(templates, conf):
+    return [mixin_template(template, conf) for template in templates]
+
+
+def mixin_template(template, conf):
+    cls = symbol_by_name(templates[template])
+    conf.update(dict(
+        (k, v) for k, v in items(vars(cls))
+        if k.isupper() and not k.startswith('_')
+    ))
+
+
+def template_names():
+    return ', '.join(templates)
+
+
+@template()
+class default(object):
+    CELERY_ACCEPT_CONTENT = ['json']
+    CELERY_DEFAULT_QUEUE = CSTRESS_QUEUE
+    CELERY_TASK_SERIALIZER = 'json'
+    CELERY_RESULT_SERIALIZER = 'json'
+    CELERY_QUEUES = [
+        Queue(CSTRESS_QUEUE,
+              exchange=Exchange(CSTRESS_QUEUE),
+              routing_key=CSTRESS_QUEUE),
+    ]
+    BROKER_URL = os.environ.get('CSTRESS_BROKER', 'amqp://')
+    CELERY_RESULT_BACKEND = os.environ.get('CSTRESS_BACKEND', 'rpc://')
+    CELERYD_PREFETCH_MULTIPLIER = int(os.environ.get('CSTRESS_PREFETCH', 1))
+
+
+@template()
+class redis(default):
+    BROKER_URL = os.environ.get('CSTRESS_BROKER', 'redis://')
+    CELERY_RESULT_BACKEND = os.environ.get('CSTRESS_bACKEND', 'redis://')
+
+
+@template()
+class acks_late(default):
+    CELERY_ACKS_LATE = True
+
+
+@template()
+class pickle(default):
+    CELERY_ACCEPT_CONTENT = ['pickle', 'json']
+    CELERY_TASK_SERIALIZER = 'pickle'
+    CELERY_RESULT_SERIALIZER = 'pickle'