diff options
Diffstat (limited to 'tools/testing/selftests/drivers/net/hw/devmem.py')
-rwxr-xr-x | tools/testing/selftests/drivers/net/hw/devmem.py | 45 |
1 files changed, 39 insertions, 6 deletions
diff --git a/tools/testing/selftests/drivers/net/hw/devmem.py b/tools/testing/selftests/drivers/net/hw/devmem.py index 1223f0f5c10c..7947650210a0 100755 --- a/tools/testing/selftests/drivers/net/hw/devmem.py +++ b/tools/testing/selftests/drivers/net/hw/devmem.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 # SPDX-License-Identifier: GPL-2.0 +from os import path from lib.py import ksft_run, ksft_exit from lib.py import ksft_eq, KsftSkipEx from lib.py import NetDrvEpEnv @@ -10,8 +11,7 @@ from lib.py import ksft_disruptive def require_devmem(cfg): if not hasattr(cfg, "_devmem_probed"): - port = rand_port() - probe_command = f"./ncdevmem -f {cfg.ifname}" + probe_command = f"{cfg.bin_local} -f {cfg.ifname}" cfg._devmem_supported = cmd(probe_command, fail=False, shell=True).ret == 0 cfg._devmem_probed = True @@ -21,22 +21,55 @@ def require_devmem(cfg): @ksft_disruptive def check_rx(cfg) -> None: - cfg.require_v6() require_devmem(cfg) port = rand_port() - listen_cmd = f"./ncdevmem -l -f {cfg.ifname} -s {cfg.v6} -p {port}" + socat = f"socat -u - TCP{cfg.addr_ipver}:{cfg.addr}:{port},bind={cfg.remote_addr}:{port}" + listen_cmd = f"{cfg.bin_local} -l -f {cfg.ifname} -s {cfg.addr} -p {port} -c {cfg.remote_addr} -v 7" + + with bkg(listen_cmd, exit_wait=True) as ncdevmem: + wait_port_listen(port) + cmd(f"yes $(echo -e \x01\x02\x03\x04\x05\x06) | \ + head -c 1K | {socat}", host=cfg.remote, shell=True) + + ksft_eq(ncdevmem.ret, 0) + + +@ksft_disruptive +def check_tx(cfg) -> None: + require_devmem(cfg) + + port = rand_port() + listen_cmd = f"socat -U - TCP{cfg.addr_ipver}-LISTEN:{port}" with bkg(listen_cmd) as socat: wait_port_listen(port) - cmd(f"echo -e \"hello\\nworld\"| socat -u - TCP6:[{cfg.v6}]:{port}", host=cfg.remote, shell=True) + cmd(f"echo -e \"hello\\nworld\"| {cfg.bin_remote} -f {cfg.ifname} -s {cfg.addr} -p {port}", host=cfg.remote, shell=True) + + ksft_eq(socat.stdout.strip(), "hello\nworld") + + +@ksft_disruptive +def check_tx_chunks(cfg) -> None: + cfg.require_ipver("6") + require_devmem(cfg) + + port = rand_port() + listen_cmd = f"socat -U - TCP6-LISTEN:{port}" + + with bkg(listen_cmd, exit_wait=True) as socat: + wait_port_listen(port) + cmd(f"echo -e \"hello\\nworld\"| {cfg.bin_remote} -f {cfg.ifname} -s {cfg.addr_v['6']} -p {port} -z 3", host=cfg.remote, shell=True) ksft_eq(socat.stdout.strip(), "hello\nworld") def main() -> None: with NetDrvEpEnv(__file__) as cfg: - ksft_run([check_rx], + cfg.bin_local = path.abspath(path.dirname(__file__) + "/ncdevmem") + cfg.bin_remote = cfg.remote.deploy(cfg.bin_local) + + ksft_run([check_rx, check_tx, check_tx_chunks], args=(cfg, )) ksft_exit() |