Browse Source

Add `celery_worker_parameters` test fixture. (#3668)

This enables customized worker init parameters.
Michael Howitz 8 years ago
parent
commit
b59e911a06
3 changed files with 51 additions and 5 deletions
  1. 1 0
      CONTRIBUTORS.txt
  2. 28 5
      celery/contrib/pytest.py
  3. 22 0
      docs/userguide/testing.rst

+ 1 - 0
CONTRIBUTORS.txt

@@ -223,3 +223,4 @@ Marat Sharafutdinov, 2016/11/04
 Viktor Holmqvist, 2016/12/02
 Rick Wargo, 2016/12/02
 zhengxiaowai, 2016/12/07
+Michael Howitz, 2016/12/08

+ 28 - 5
celery/contrib/pytest.py

@@ -74,15 +74,19 @@ def celery_session_app(request,
 
 
 @pytest.fixture(scope='session')
-def celery_session_worker(request, celery_session_app,
-                          celery_includes, celery_worker_pool):
+def celery_session_worker(request,
+                          celery_session_app,
+                          celery_includes,
+                          celery_worker_pool,
+                          celery_worker_parameters):
     # type: (Any, Celery, Sequence[str], str) -> WorkController
     """Session Fixture: Start worker that lives throughout test suite."""
     if not NO_WORKER:
         for module in celery_includes:
             celery_session_app.loader.import_task_module(module)
         with worker.start_worker(celery_session_app,
-                                 pool=celery_worker_pool) as w:
+                                 pool=celery_worker_pool,
+                                 **celery_worker_parameters) as w:
             yield w
 
 
@@ -137,6 +141,19 @@ def celery_parameters():
     return {}
 
 
+@pytest.fixture(scope='session')
+def celery_worker_parameters():
+    # type: () -> Mapping[str, Any]
+    """Redefine this fixture to change the init parameters of Celery workers.
+
+    This can be used e. g. to define queues the worker will consume tasks from.
+
+    The dict returned by your fixture will then be used
+    as parameters when instantiating :class:`~celery.worker.WorkController`.
+    """
+    return {}
+
+
 @pytest.fixture()
 def celery_app(request,
                celery_config,
@@ -155,13 +172,19 @@ def celery_app(request,
 
 
 @pytest.fixture()
-def celery_worker(request, celery_app, celery_includes, celery_worker_pool):
+def celery_worker(request,
+                  celery_app,
+                  celery_includes,
+                  celery_worker_pool,
+                  celery_worker_parameters):
     # type: (Any, Celery, Sequence[str], str) -> WorkController
     """Fixture: Start worker in a thread, stop it when the test returns."""
     if not NO_WORKER:
         for module in celery_includes:
             celery_app.loader.import_task_module(module)
-        with worker.start_worker(celery_app, pool=celery_worker_pool) as w:
+        with worker.start_worker(celery_app,
+                                 pool=celery_worker_pool,
+                                 **celery_worker_parameters) as w:
             yield w
 
 

+ 22 - 0
docs/userguide/testing.rst

@@ -215,6 +215,28 @@ Example:
             'strict_typing': False,
         }
 
+``celery_worker_parameters`` - Override to setup Celery worker parameters.
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+You can redefine this fixture to change the ``__init__`` parameters of test
+Celery workers. These are directly passed to
+:class:`~celery.worker.WorkController` when it is instantiated.
+
+The config returned by your fixture will then be used
+to configure the :func:`celery_worker`, and :func:`celery_session_worker`
+fixtures.
+
+Example:
+
+.. code-block:: python
+
+    @pytest.fixture(scope='session')
+    def celery_worker_parameters():
+        return {
+            'queues':  ('high-prio', 'low-prio'),
+            'exclude_queues': ('celery'),
+        }
+
 
 ``celery_enable_logging`` - Override to enable logging in embedded workers.
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~