qemu-block
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [PATCH 3/3] iotests: check: multiprocessing support


From: John Snow
Subject: Re: [PATCH 3/3] iotests: check: multiprocessing support
Date: Mon, 6 Dec 2021 16:00:56 -0500



On Mon, Dec 6, 2021 at 3:20 PM Vladimir Sementsov-Ogievskiy <vsementsov@virtuozzo.com> wrote:
06.12.2021 21:35, John Snow wrote:
>
>
> On Fri, Dec 3, 2021 at 7:22 AM Vladimir Sementsov-Ogievskiy <vsementsov@virtuozzo.com <mailto:vsementsov@virtuozzo.com>> wrote:
>
>     Add -j <JOBS> parameter, to run tests in several jobs simultaneously.
>     For realization - simply utilize multiprocessing.Pool class.
>
>     Notes:
>
>     1. Of course, tests can't run simultaneously in same TEST_DIR. So,
>         use subdirectories TEST_DIR/testname/ and SOCK_DIR/testname/
>         instead of simply TEST_DIR and SOCK_DIR
>
>
> SOCK_DIR was introduced because TEST_DIR was getting too long, and the length of the filepath was causing problems on some platforms. I hope that we aren't pushing our luck by making the directory longer here. Using the test name is nice because a human operator can quickly correlate directories to the tests that populated them, but if test names get kind of long, I wonder if we'll cause problems again.
>
> Just a stray thought.
>
>     2. multiprocessing.Pool.starmap function doesn't support passing
>         context managers, so we can't simply pass "self". Happily, we need
>         self only for read-only access, and it just works if it is defined
>         in global space. So, add a temporary link TestRunner.shared_self
>         during run_tests().
>
>
> I'm a little confused on this point, see below
>
>     Signed-off-by: Vladimir Sementsov-Ogievskiy <vsementsov@virtuozzo.com <mailto:vsementsov@virtuozzo.com>>
>     ---
>       tests/qemu-iotests/check         |  4 +-
>       tests/qemu-iotests/testrunner.py | 69 ++++++++++++++++++++++++++++----
>       2 files changed, 64 insertions(+), 9 deletions(-)
>
>     diff --git a/tests/qemu-iotests/check b/tests/qemu-iotests/check
>     index 43a4b694cc..0c27721a41 100755
>     --- a/tests/qemu-iotests/check
>     +++ b/tests/qemu-iotests/check
>     @@ -34,6 +34,8 @@ def make_argparser() -> argparse.ArgumentParser:
>                          help='show me, do not run tests')
>           p.add_argument('-makecheck', action=""> >                          help='pretty print output for make check')
>     +    p.add_argument('-j', dest='jobs', type=int, default=1,
>     +                   help='run tests in multiple parallel jobs')
>
>           p.add_argument('-d', dest='debug', action="" help='debug')
>           p.add_argument('-p', dest='print', action=""> >     @@ -165,6 +167,6 @@ if __name__ == '__main__':
>               with TestRunner(env, makecheck=args.makecheck,
>                               color=args.color) as tr:
>                   paths = [os.path.join(env.source_iotests, t) for t in tests]
>     -            ok = tr.run_tests(paths)
>     +            ok = tr.run_tests(paths, args.jobs <http://args.jobs>)
>                   if not ok:
>                       sys.exit(1)
>
>
> (OK)
>
>     diff --git a/tests/qemu-iotests/testrunner.py b/tests/qemu-iotests/testrunner.py
>     index a9f2feb58c..0feaa396d0 100644
>     --- a/tests/qemu-iotests/testrunner.py
>     +++ b/tests/qemu-iotests/testrunner.py
>     @@ -26,6 +26,7 @@
>       import json
>       import termios
>       import sys
>     +from multiprocessing import Pool
>       from contextlib import contextmanager
>       from typing import List, Optional, Iterator, Any, Sequence, Dict, \
>               ContextManager
>     @@ -126,6 +127,31 @@ def __init__(self, status: str, description: str = '',
>
>
>       class TestRunner(ContextManager['TestRunner']):
>     +    shared_self = None
>
>     +
>     +    @staticmethod
>     +    def proc_run_test(test: str, test_field_width: int) -> TestResult:
>     +        # We are in a subprocess, we can't change the runner object!
>
>
> *can't*, or shouldn't? I thought changing anything would just result in the child process diverging, but nothing harmful overall. Am I mistaken?

Yes you are right. "Shouldn't" is OK

>
>     +        runner = TestRunner.shared_self
>     +        assert runner is not None
>     +        return runner.run_test(test, test_field_width, mp=True)
>     +
>     +    def run_tests_pool(self, tests: List[str],
>     +                       test_field_width: int, jobs: int) -> List[TestResult]:
>     +
>     +        # passing self directly to Pool.starmap() just doesn't work, because
>     +        # it's a context manager.
>
>
> Why, what happens? (Or what doesn't happen?)
>
> (I believe you that it doesn't work, but it's not immediately obvious to me why.)

Simple check:

diff --git a/tests/qemu-iotests/testrunner.py b/tests/qemu-iotests/testrunner.py
index 0feaa396d0..49c1780697 100644
--- a/tests/qemu-iotests/testrunner.py
+++ b/tests/qemu-iotests/testrunner.py
@@ -130,7 +130,7 @@ class TestRunner(ContextManager['TestRunner']):
      shared_self = None

      @staticmethod
-    def proc_run_test(test: str, test_field_width: int) -> TestResult:
+    def proc_run_test(x, test: str, test_field_width: int) -> TestResult:
          # We are in a subprocess, we can't change the runner object!
          runner = TestRunner.shared_self
          assert runner is not None
@@ -146,7 +146,7 @@ def run_tests_pool(self, tests: List[str],

          with Pool(jobs) as p:
              results = p.starmap(self.proc_run_test,
-                                zip(tests, [test_field_width] * len(tests)))
+                                [(self, t, test_field_width) for t in tests])

          TestRunner.shared_self = None




Something like this happens:

   File "/work/src/qemu/up/up-iotests-multiprocessing/build/tests/qemu-iotests/./check", line 170, in <module>
     ok = tr.run_tests(paths, args.jobs)
   File "/work/src/qemu/up/up-iotests-multiprocessing/tests/qemu-iotests/testrunner.py", line 383, in run_tests
     results = self.run_tests_pool(tests, test_field_width, jobs)
   File "/work/src/qemu/up/up-iotests-multiprocessing/tests/qemu-iotests/testrunner.py", line 149, in run_tests_pool
     results = p.starmap(self.proc_run_test,
   File "/usr/lib64/python3.9/multiprocessing/pool.py", line 372, in starmap
     return self._map_async(func, iterable, starmapstar, chunksize).get()
   File "/usr/lib64/python3.9/multiprocessing/pool.py", line 771, in get
     raise self._value
   File "/usr/lib64/python3.9/multiprocessing/pool.py", line 537, in _handle_tasks
     put(task)
   File "/usr/lib64/python3.9/multiprocessing/connection.py", line 211, in send
     self._send_bytes(_ForkingPickler.dumps(obj))
   File "/usr/lib64/python3.9/multiprocessing/reduction.py", line 51, in dumps
     cls(buf, protocol).dump(obj)
TypeError: cannot pickle 'generator' object


Hmm, I remember that was cannot pickle context manager.. Probably I remember wrong :) Honestly I didn't dig into it except for detecting that not passing "self" fixes the problem.


Oh, I see. Even using a *bound method*, it still wants to pass 'self' as an argument, but it's unable to do so ... uh, interesting! but having it as global state somehow works. That's ... fascinating. Well, without spending much time on it myself, I think your workaround is probably the best possible thing without really tearing things apart and refactoring. Asserting that shared_self is None will prevent run_tests_pool from being called twice concurrently, so the limitation of the workaround is well-contained.

Good enough.
 
>
>     +        assert TestRunner.shared_self is None
>     +        TestRunner.shared_self = self
>     +
>     +        with Pool(jobs) as p:
>     +            results = p.starmap(self.proc_run_test,
>     +                                zip(tests, [test_field_width] * len(tests)))
>     +
>     +        TestRunner.shared_self = None
>     +
>     +        return results
>     +
>           def __init__(self, env: TestEnv, makecheck: bool = False,
>                        color: str = 'auto') -> None:
>               self.env = env
>     @@ -219,11 +245,16 @@ def find_reference(self, test: str) -> str:
>
>               return f'{test}.out'
>
>     -    def do_run_test(self, test: str) -> TestResult:
>     +    def do_run_test(self, test: str, mp: bool) -> TestResult:
>               """
>               Run one test
>
>               :param test: test file path
>     +        :param mp: if true, we are in a multiprocessing environment, use
>     +                   personal subdirectories for test run
>     +
>     +        Note: this method may be called from subprocess, so it does not
>     +        change ``self`` object in any way!
>               """
>
>
> Maybe worth mentioning that it *does* change environment variables, but because this is "mp", it won't affect the parent execution environment.


Hmm. actually, it does not change it. And yes the reason is that we'll not change the original object anyway, so any logic that change the runner object in hope that it will make some effect would be wrong.


>
>
>               f_test = Path(test)
>     @@ -249,6 +280,12 @@ def do_run_test(self, test: str) -> TestResult:
>
>               args = [str(f_test.resolve())]
>               env = self.env.prepare_subprocess(args)
>     +        if mp:
>     +            # Split test directories, so that tests running in parallel don't
>     +            # break each other.
>     +            for d in ['TEST_DIR', 'SOCK_DIR']:
>     +                env[d] = os.path.join(env[d], f_test.name <http://f_test.name>)
>     +                Path(env[d]).mkdir(parents=True, exist_ok=True)
>
>               t0 = time.time()
>               with f_bad.open('w', encoding="utf-8") as f:
>     @@ -291,23 +328,32 @@ def do_run_test(self, test: str) -> TestResult:
>                                     casenotrun=casenotrun)
>
>           def run_test(self, test: str,
>     -                 test_field_width: Optional[int] = None) -> TestResult:
>     +                 test_field_width: Optional[int] = None,
>     +                 mp: bool = False) -> TestResult:
>               """
>               Run one test and print short status
>
>               :param test: test file path
>               :param test_field_width: width for first field of status format
>     +        :param mp: if true, we are in a multiprocessing environment, don't try
>     +                   to rewrite things in stdout
>     +
>     +        Note: this method may be called from subprocess, so it does not
>     +        change ``self`` object in any way!
>               """
>
>               last_el = self.last_elapsed.get(test)
>               start = datetime.datetime.now().strftime('%H:%M:%S')
>
>               if not self.makecheck:
>     -            self.test_print_one_line(test=test, starttime=start,
>     -                                     lasttime=last_el, end='\r',
>     +            self.test_print_one_line(test=test,
>     +                                     status = 'started' if mp else '...',
>     +                                     starttime=start,
>     +                                     lasttime=last_el,
>     +                                     end = '\n' if mp else '\r',
>                                            test_field_width=test_field_width)
>
>     -        res = self.do_run_test(test)
>     +        res = self.do_run_test(test, mp)
>
>               end = datetime.datetime.now().strftime('%H:%M:%S')
>               self.test_print_one_line(test=test, status=res.status,
>
>     @@ -321,7 +367,7 @@ def run_test(self, test: str,
>
>               return res
>
>     -    def run_tests(self, tests: List[str]) -> bool:
>     +    def run_tests(self, tests: List[str], jobs: int = 1) -> bool:
>               n_run = 0
>               failed = []
>               notrun = []
>     @@ -332,9 +378,16 @@ def run_tests(self, tests: List[str]) -> bool:
>
>               test_field_width = max(len(os.path.basename(t)) for t in tests) + 2
>
>     -        for t in tests:
>     +        if jobs > 1:
>     +            results = self.run_tests_pool(tests, test_field_width, jobs)
>     +
>     +        for i, t in enumerate(tests):
>                   name = os.path.basename(t)
>     -            res = self.run_test(t, test_field_width=test_field_width)
>     +
>     +            if jobs > 1:
>     +                res = results[i]
>     +            else:
>     +                res = self.run_test(t, test_field_width)
>
>                   assert res.status in ('pass', 'fail', 'not run')
>
>
> Looks good and surprisingly minimal, I just have a curiosity about the nature of the workaround here.
>
> Either way, I believe this will probably work as written, so I can give it an ACK at a minimum while I wait for answers.
>
> Acked-by: John Snow <jsnow@redhat.com <mailto:jsnow@redhat.com>>
>

Thanks!

Yes, the workaround is a ugly.. But it's small, so I think we could live with.

I agree, I just wanted to make sure I understood what was happening and why.
 
I don't think that refactoring TestRunner to move all needed things to some simple structure supported by Pool is good idea: actually, we don't want to copy these data for each subprocess, we are OK with readonly access to shared object. And we do call methods on self, and on self.env, so refactoring would not be simple.

But about shared object, I didn't find any way to pass a link to shared object to Pool.map()..   Something like Pool.map( , ... , shared_state=self) would be good. But were is such an option? Note that this is my first experience with multiprocessing.

The only thing I find is passing through global variable. I started with real global variably, but then thought that hiding it inside the class would be a bit better.

Yeah, don't worry about making it absolutely beautiful.  Thanks for explaining the problem to me, I agree that your workaround is a good compromise.

Reviewed-by: John Snow <jsnow@redhat.com>

reply via email to

[Prev in Thread] Current Thread [Next in Thread]