qemu-block
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [PATCH 3/3] iotests: check: multiprocessing support


From: John Snow
Subject: Re: [PATCH 3/3] iotests: check: multiprocessing support
Date: Mon, 6 Dec 2021 13:35:00 -0500



On Fri, Dec 3, 2021 at 7:22 AM Vladimir Sementsov-Ogievskiy <vsementsov@virtuozzo.com> wrote:
Add -j <JOBS> parameter, to run tests in several jobs simultaneously.
For realization - simply utilize multiprocessing.Pool class.

Notes:

1. Of course, tests can't run simultaneously in same TEST_DIR. So,
   use subdirectories TEST_DIR/testname/ and SOCK_DIR/testname/
   instead of simply TEST_DIR and SOCK_DIR


SOCK_DIR was introduced because TEST_DIR was getting too long, and the length of the filepath was causing problems on some platforms. I hope that we aren't pushing our luck by making the directory longer here. Using the test name is nice because a human operator can quickly correlate directories to the tests that populated them, but if test names get kind of long, I wonder if we'll cause problems again.

Just a stray thought.
 
2. multiprocessing.Pool.starmap function doesn't support passing
   context managers, so we can't simply pass "self". Happily, we need
   self only for read-only access, and it just works if it is defined
   in global space. So, add a temporary link TestRunner.shared_self
   during run_tests().

I'm a little confused on this point, see below
 
Signed-off-by: Vladimir Sementsov-Ogievskiy <vsementsov@virtuozzo.com>
---
 tests/qemu-iotests/check         |  4 +-
 tests/qemu-iotests/testrunner.py | 69 ++++++++++++++++++++++++++++----
 2 files changed, 64 insertions(+), 9 deletions(-)

diff --git a/tests/qemu-iotests/check b/tests/qemu-iotests/check
index 43a4b694cc..0c27721a41 100755
--- a/tests/qemu-iotests/check
+++ b/tests/qemu-iotests/check
@@ -34,6 +34,8 @@ def make_argparser() -> argparse.ArgumentParser:
                    help='show me, do not run tests')
     p.add_argument('-makecheck', action="">                     help='pretty print output for make check')
+    p.add_argument('-j', dest='jobs', type=int, default=1,
+                   help='run tests in multiple parallel jobs')

     p.add_argument('-d', dest='debug', action="" help='debug')
     p.add_argument('-p', dest='print', action=""> @@ -165,6 +167,6 @@ if __name__ == '__main__':
         with TestRunner(env, makecheck=args.makecheck,
                         color=args.color) as tr:
             paths = [os.path.join(env.source_iotests, t) for t in tests]
-            ok = tr.run_tests(paths)
+            ok = tr.run_tests(paths, args.jobs)
             if not ok:
                 sys.exit(1)

(OK)
 
diff --git a/tests/qemu-iotests/testrunner.py b/tests/qemu-iotests/testrunner.py
index a9f2feb58c..0feaa396d0 100644
--- a/tests/qemu-iotests/testrunner.py
+++ b/tests/qemu-iotests/testrunner.py
@@ -26,6 +26,7 @@
 import json
 import termios
 import sys
+from multiprocessing import Pool
 from contextlib import contextmanager
 from typing import List, Optional, Iterator, Any, Sequence, Dict, \
         ContextManager
@@ -126,6 +127,31 @@ def __init__(self, status: str, description: str = '',


 class TestRunner(ContextManager['TestRunner']):
+    shared_self = None
+
+    @staticmethod
+    def proc_run_test(test: str, test_field_width: int) -> TestResult:
+        # We are in a subprocess, we can't change the runner object!

*can't*, or shouldn't? I thought changing anything would just result in the child process diverging, but nothing harmful overall. Am I mistaken?
 
+        runner = TestRunner.shared_self
+        assert runner is not None
+        return runner.run_test(test, test_field_width, mp=True)
+
+    def run_tests_pool(self, tests: List[str],
+                       test_field_width: int, jobs: int) -> List[TestResult]:
+
+        # passing self directly to Pool.starmap() just doesn't work, because
+        # it's a context manager.

Why, what happens? (Or what doesn't happen?)

(I believe you that it doesn't work, but it's not immediately obvious to me why.)
 
+        assert TestRunner.shared_self is None
+        TestRunner.shared_self = self
+
+        with Pool(jobs) as p:
+            results = p.starmap(self.proc_run_test,
+                                zip(tests, [test_field_width] * len(tests)))
+
+        TestRunner.shared_self = None
+
+        return results
+
     def __init__(self, env: TestEnv, makecheck: bool = False,
                  color: str = 'auto') -> None:
         self.env = env
@@ -219,11 +245,16 @@ def find_reference(self, test: str) -> str:

         return f'{test}.out'

-    def do_run_test(self, test: str) -> TestResult:
+    def do_run_test(self, test: str, mp: bool) -> TestResult:
         """
         Run one test

         :param test: test file path
+        :param mp: if true, we are in a multiprocessing environment, use
+                   personal subdirectories for test run
+
+        Note: this method may be called from subprocess, so it does not
+        change ``self`` object in any way!
         """

Maybe worth mentioning that it *does* change environment variables, but because this is "mp", it won't affect the parent execution environment.
 

         f_test = Path(test)
@@ -249,6 +280,12 @@ def do_run_test(self, test: str) -> TestResult:

         args = [str(f_test.resolve())]
         env = self.env.prepare_subprocess(args)
+        if mp:
+            # Split test directories, so that tests running in parallel don't
+            # break each other.
+            for d in ['TEST_DIR', 'SOCK_DIR']:
+                env[d] = os.path.join(env[d], f_test.name)
+                Path(env[d]).mkdir(parents=True, exist_ok=True)

         t0 = time.time()
         with f_bad.open('w', encoding="utf-8") as f:
@@ -291,23 +328,32 @@ def do_run_test(self, test: str) -> TestResult:
                               casenotrun=casenotrun)

     def run_test(self, test: str,
-                 test_field_width: Optional[int] = None) -> TestResult:
+                 test_field_width: Optional[int] = None,
+                 mp: bool = False) -> TestResult:
         """
         Run one test and print short status

         :param test: test file path
         :param test_field_width: width for first field of status format
+        :param mp: if true, we are in a multiprocessing environment, don't try
+                   to rewrite things in stdout
+
+        Note: this method may be called from subprocess, so it does not
+        change ``self`` object in any way!
         """

         last_el = self.last_elapsed.get(test)
         start = datetime.datetime.now().strftime('%H:%M:%S')

         if not self.makecheck:
-            self.test_print_one_line(test=test, starttime=start,
-                                     lasttime=last_el, end='\r',
+            self.test_print_one_line(test=test,
+                                     status = 'started' if mp else '...',
+                                     starttime=start,
+                                     lasttime=last_el,
+                                     end = '\n' if mp else '\r',
                                      test_field_width=test_field_width)

-        res = self.do_run_test(test)
+        res = self.do_run_test(test, mp)

         end = datetime.datetime.now().strftime('%H:%M:%S')
         self.test_print_one_line(test=test, status=res.status,
@@ -321,7 +367,7 @@ def run_test(self, test: str,

         return res

-    def run_tests(self, tests: List[str]) -> bool:
+    def run_tests(self, tests: List[str], jobs: int = 1) -> bool:
         n_run = 0
         failed = []
         notrun = []
@@ -332,9 +378,16 @@ def run_tests(self, tests: List[str]) -> bool:

         test_field_width = max(len(os.path.basename(t)) for t in tests) + 2

-        for t in tests:
+        if jobs > 1:
+            results = self.run_tests_pool(tests, test_field_width, jobs)
+
+        for i, t in enumerate(tests):
             name = os.path.basename(t)
-            res = self.run_test(t, test_field_width=test_field_width)
+
+            if jobs > 1:
+                res = results[i]
+            else:
+                res = self.run_test(t, test_field_width)

             assert res.status in ('pass', 'fail', 'not run')


Looks good and surprisingly minimal, I just have a curiosity about the nature of the workaround here.

Either way, I believe this will probably work as written, so I can give it an ACK at a minimum while I wait for answers.

Acked-by: John Snow <jsnow@redhat.com>


reply via email to

[Prev in Thread] Current Thread [Next in Thread]