Browse Source

Sync subtask option (#3696)

* Added disable_sync_subtasks option

* Added some documentation for  *disable_sync_subtasks*  option

* Added myself to contributors

* Added unit test for sync subtask option
shalev67 8 years ago
parent
commit
b9a84ca49d
4 changed files with 52 additions and 3 deletions
  1. 2 1
      CONTRIBUTORS.txt
  2. 7 2
      celery/result.py
  3. 25 0
      docs/userguide/tasks.rst
  4. 18 0
      t/unit/tasks/test_result.py

+ 2 - 1
CONTRIBUTORS.txt

@@ -226,4 +226,5 @@ zhengxiaowai, 2016/12/07
 Michael Howitz, 2016/12/08
 Andreas Pelme, 2016/12/13
 Mike Chen, 2016/12/20
-Alejandro Pernin, 2016/12/23
+Alejandro Pernin, 2016/12/23
+Yuval Shalev, 2016/12/27

+ 7 - 2
celery/result.py

@@ -136,7 +136,8 @@ class AsyncResult(ResultBase):
 
     def get(self, timeout=None, propagate=True, interval=0.5,
             no_ack=True, follow_parents=True, callback=None, on_message=None,
-            on_interval=None, EXCEPTION_STATES=states.EXCEPTION_STATES,
+            on_interval=None, disable_sync_subtasks=True,
+            EXCEPTION_STATES=states.EXCEPTION_STATES,
             PROPAGATE_STATES=states.PROPAGATE_STATES):
         """Wait until task is ready, and return its result.
 
@@ -157,6 +158,9 @@ class AsyncResult(ResultBase):
                 **not be acked**.
             follow_parents (bool): Re-raise any exception raised by
                 parent tasks.
+            disable_sync_subtasks (bool): Disable tasks to wait for sub tasks
+                this is the default configuration. CAUTION do not enable this
+                unless you must.
 
         Raises:
             celery.exceptions.TimeoutError: if `timeout` isn't
@@ -165,7 +169,8 @@ class AsyncResult(ResultBase):
             Exception: If the remote call raised an exception then that
                 exception will be re-raised in the caller process.
         """
-        assert_will_not_block()
+        if disable_sync_subtasks:
+            assert_will_not_block()
         _on_interval = promise()
         if follow_parents and propagate and self.parent:
             on_interval = promise(self._maybe_reraise_parent_error, weak=True)

+ 25 - 0
docs/userguide/tasks.rst

@@ -1565,6 +1565,31 @@ different :func:`~celery.signature`'s.
 You can read about chains and other powerful constructs
 at :ref:`designing-workflows`.
 
+By default celery will not enable you to run tasks within task synchronously
+in rare or extreme cases you might have to do so.
+**WARNING**:
+enabling subtasks run synchronously is not recommended!
+.. code-block:: python
+
+    @app.task
+    def update_page_info(url):
+        page = fetch_page.delay(url).get(disable_sync_subtasks=False)
+        info = parse_page.delay(url, page).get(disable_sync_subtasks=False)
+        store_page_info.delay(url, info)
+
+    @app.task
+    def fetch_page(url):
+        return myhttplib.get(url)
+
+    @app.task
+    def parse_page(url, page):
+        return myparser.parse_document(page)
+
+    @app.task
+    def store_page_info(url, info):
+        return PageInfo.objects.create(url, info)
+
+
 .. _task-performance-and-strategies:
 
 Performance and Strategies

+ 18 - 0
t/unit/tasks/test_result.py

@@ -62,6 +62,14 @@ def make_mock_group(app, size=10):
     return [app.AsyncResult(task['id']) for task in tasks]
 
 
+class _MockBackend:
+    def add_pending_result(self, *args, **kwargs):
+        return True
+
+    def wait_for_pending(self, *args, **kwargs):
+        return True
+
+
 class test_AsyncResult:
 
     def setup(self):
@@ -91,6 +99,16 @@ class test_AsyncResult:
         task_join_will_block.return_value = False
         assert_will_not_block()
 
+    @patch('celery.result.task_join_will_block')
+    def test_get_sync_subtask_option(self, task_join_will_block):
+        task_join_will_block.return_value = True
+        tid = uuid()
+        backend = _MockBackend()
+        res_subtask_async = AsyncResult(tid, backend=backend)
+        with pytest.raises(RuntimeError):
+            res_subtask_async.get()
+        res_subtask_async.get(disable_sync_subtasks=False)
+
     def test_without_id(self):
         with pytest.raises(ValueError):
             AsyncResult(None, app=self.app)