[RFC PATCH v2 08/14] selftests/kcov_dataflow: add trigger-view.py

1 view
Skip to first unread message

Yunseong Kim

unread,
Jun 11, 2026, 12:21:59 PMJun 11
to Ingo Molnar, Peter Zijlstra, Juri Lelli, Vincent Guittot, Dietmar Eggemann, Steven Rostedt, Ben Segall, Mel Gorman, Valentin Schneider, K Prateek Nayak, Andrey Konovalov, Alexander Potapenko, Dmitry Vyukov, Andrew Morton, Miguel Ojeda, Boqun Feng, Gary Guo, Björn Roy Baron, Benno Lossin, Andreas Hindborg, Alice Ryhl, Trevor Gross, Danilo Krummrich, Nathan Chancellor, Nicolas Schier, Nick Desaulniers, Bill Wendling, Justin Stitt, Kees Cook, David Hildenbrand, Lorenzo Stoakes, Liam R. Howlett, Vlastimil Babka, Mike Rapoport, Suren Baghdasaryan, Michal Hocko, Shuah Khan, Jonathan Corbet, Shuah Khan, Yunseong Kim, linux-...@vger.kernel.org, kasa...@googlegroups.com, rust-fo...@vger.kernel.org, linux-...@vger.kernel.org, ll...@lists.linux.dev, linu...@kvack.org, linux-k...@vger.kernel.org, work...@vger.kernel.org, linu...@vger.kernel.org, Yeoreum Yun
Add a Python script that loads a test module, triggers its debugfs
entry with kcov_dataflow recording active, then pretty-prints captured
records as a nested call tree with kallsyms symbol resolution.

Features:
- 8MB ring buffer (1M u64 words) for INSTRUMENT_ALL kernels
- Enable recording after module load, before trigger (avoids VFS noise)
- Variable-length record parsing using header-encoded field count
- Module-only filtering via kallsyms symbol lookup
- --context/-C N: show N records before/after each module function call
- --raw: print raw records instead of call tree
- Architecture-aware syscall numbers (x86_64 and arm64)

Usage:

python3 trigger-view.py eight_args_c \
--ko eight_args_c/eight_args_c.ko

python3 trigger-view.py eight_args_rust \
--ko eight_args_rust/eight_args_rust.ko

python3 trigger-view.py rust_ffi_contract \
--ko rust_ffi_contract/rust_ffi_contract.ko

Cc: Alexander Potapenko <gli...@google.com>
Assisted-by: Claude:claude-opus-4-6 [kiro-chat]
Link: https://github.com/yskzalloc/kcov-dataflow/actions
Signed-off-by: Yunseong Kim <yunseo...@est.tech>
---
.../selftests/kcov_dataflow/trigger-view.py | 377 +++++++++++++++++++++
1 file changed, 377 insertions(+)

diff --git a/tools/testing/selftests/kcov_dataflow/trigger-view.py b/tools/testing/selftests/kcov_dataflow/trigger-view.py
new file mode 100755
index 000000000000..a3274e472dc1
--- /dev/null
+++ b/tools/testing/selftests/kcov_dataflow/trigger-view.py
@@ -0,0 +1,377 @@
+#!/usr/bin/env python3
+# SPDX-License-Identifier: GPL-2.0
+"""
+trigger-view.py - Load a module with kcov_dataflow
+recording active, then pretty-print captured records.
+
+Usage:
+ python3 trigger-view.py eight_args_c
+ python3 trigger-view.py rust_ffi_contract
+ python3 trigger-view.py eight_args_c --raw
+
+The script:
+ 1. Opens /sys/kernel/debug/kcov_dataflow
+ 2. Inits and mmaps the buffer
+ 3. Enables recording for this process
+ 4. Loads the module via finit_module() -- init runs in our context
+ 5. Disables recording
+ 6. Unloads the module
+ 7. Parses and prints captured records with kallsyms resolution
+"""
+import os
+import sys
+import struct
+import ctypes
+import ctypes.util
+import argparse
+import fcntl
+
+# Constants
+DF_TYPE_ENTRY = 0xE
+DF_TYPE_RET = 0xF
+MAGIC_BAD = 0xBADADD85
+BUF_SIZE = 1048576 # 1M words = 8MB
+
+# Ioctl numbers
+def _IOR(t, nr, size):
+ return (2 << 30) | (ord(t) << 8) | nr | (size << 16)
+
+def _IO(t, nr):
+ return (ord(t) << 8) | nr
+
+KCOV_DF_INIT_TRACK = _IOR('d', 1, 8)
+KCOV_DF_ENABLE = _IO('d', 100)
+KCOV_DF_DISABLE = _IO('d', 101)
+
+# syscall numbers (x86_64)
+import platform
+_machine = platform.machine()
+if _machine == "aarch64":
+ SYS_FINIT_MODULE = 273
+ SYS_DELETE_MODULE = 106
+else: # x86_64
+ SYS_FINIT_MODULE = 313
+ SYS_DELETE_MODULE = 176
+
+SELFTEST_DIR = os.path.dirname(os.path.abspath(__file__))
+
+
+def load_kallsyms():
+ """Load kernel symbols for PC resolution."""
+ syms = []
+ try:
+ with open("/proc/kallsyms") as f:
+ for line in f:
+ parts = line.split()
+ if len(parts) >= 3:
+ addr = int(parts[0], 16)
+ name = parts[2]
+ mod = parts[3].strip("[]") if len(parts) > 3 else ""
+ syms.append((addr, name, mod))
+ except (PermissionError, FileNotFoundError):
+ pass
+ syms.sort()
+ return syms
+
+
+def symbolize(pc, syms):
+ """Find nearest symbol <= pc."""
+ if not syms:
+ return f"0x{pc:x}"
+ lo, hi = 0, len(syms) - 1
+ while lo < hi:
+ mid = (lo + hi + 1) // 2
+ if syms[mid][0] <= pc:
+ lo = mid
+ else:
+ hi = mid - 1
+ addr, name, mod = syms[lo]
+ if addr > pc:
+ return f"0x{pc:x}"
+ offset = pc - addr
+ if mod:
+ return f"{name}+0x{offset:x} [{mod}]" if offset else f"{name} [{mod}]"
+ return f"{name}+0x{offset:x}" if offset else name
+
+
+def format_val(v):
+ """Format a captured value."""
+ if v == MAGIC_BAD:
+ return "FAULT"
+ if v == 0:
+ return "0x0"
+ return f"0x{v:x}"
+
+
+def find_module(name):
+ """Find the .ko file for the given test name."""
+ ko_path = os.path.join(SELFTEST_DIR, name, f"{name}_mod.ko")
+ if os.path.exists(ko_path):
+ return ko_path
+ # Try without _mod suffix
+ ko_path = os.path.join(SELFTEST_DIR, name, f"{name}.ko")
+ if os.path.exists(ko_path):
+ return ko_path
+ # Search for any .ko in the directory
+ mod_dir = os.path.join(SELFTEST_DIR, name)
+ if os.path.isdir(mod_dir):
+ for f in os.listdir(mod_dir):
+ if f.endswith(".ko"):
+ return os.path.join(mod_dir, f)
+ return None
+
+
+def finit_module(ko_path):
+ """Load a kernel module via finit_module syscall."""
+ libc = ctypes.CDLL(ctypes.util.find_library("c"), use_errno=True)
+ fd = os.open(ko_path, os.O_RDONLY)
+ ret = libc.syscall(SYS_FINIT_MODULE, fd, b"", 0)
+ os.close(fd)
+ if ret != 0:
+ errno = ctypes.get_errno()
+ raise OSError(errno, f"finit_module({ko_path}): {os.strerror(errno)}")
+
+
+def delete_module(name):
+ """Unload a kernel module."""
+ libc = ctypes.CDLL(ctypes.util.find_library("c"), use_errno=True)
+ ret = libc.syscall(SYS_DELETE_MODULE, name.encode(), 0)
+ if ret != 0:
+ errno = ctypes.get_errno()
+ raise OSError(errno, f"delete_module({name}): {os.strerror(errno)}")
+
+
+def parse_records(buf, total_words):
+ """Parse the ring buffer into a list of records."""
+ records = []
+ pos = 1
+ while pos + 3 <= total_words and pos < BUF_SIZE:
+ hdr = buf[pos]
+
+ # Valid headers fit in 32 bits (upper 32 must be zero)
+ if hdr >> 32:
+ pos += 1
+ continue
+
+ rtype = (hdr >> 28) & 0xF
+
+ if rtype not in (DF_TYPE_ENTRY, DF_TYPE_RET):
+ pos += 1
+ continue
+
+ pc = buf[pos + 1]
+ meta = buf[pos + 2]
+ seq = hdr & 0x00FFFFFF
+ num_vals = (hdr >> 24) & 0xF
+ if num_vals == 0:
+ num_vals = 1
+
+ # Valid records always have a non-zero PC (kernel text address)
+ if pc == 0:
+ pos += 1
+ continue
+
+ val = buf[pos + 3] if pos + 3 < BUF_SIZE else 0
+ records.append({
+ "type": rtype,
+ "seq": seq,
+ "pc": pc,
+ "meta": meta,
+ "val": val,
+ })
+ pos += 3 + num_vals
+ return records
+
+
+def print_raw(records, syms):
+ """Print records in raw format."""
+ for r in records:
+ sym = symbolize(r["pc"], syms)
+ t = "ENTRY" if r["type"] == DF_TYPE_ENTRY else "RET "
+ arg_idx = (r["meta"] >> 56) & 0xFF
+ size = (r["meta"] >> 48) & 0xFF
+ print(f"[{t}] seq={r['seq']:3d} {sym} "
+ f"arg[{arg_idx}]({size}) = {format_val(r['val'])}")
+
+
+def print_tree(records, syms):
+ """Print records as indented call tree matching converted.txt format."""
+ depth = 0
+ # Group consecutive ENTRY records by PC to collect all args
+ i = 0
+ while i < len(records):
+ r = records[i]
+ sym = symbolize(r["pc"], syms)
+
+ if r["type"] == DF_TYPE_ENTRY:
+ # Collect all args for this call (same PC, consecutive entries)
+ args = []
+ pc = r["pc"]
+ while i < len(records) and records[i]["type"] == DF_TYPE_ENTRY \
+ and records[i]["pc"] == pc:
+ args.append(format_val(records[i]["val"]))
+ i += 1
+ indent = " " * depth
+ print(f"{indent}{sym}({', '.join(args)})")
+ depth += 1
+ else:
+ depth = max(0, depth - 1)
+ indent = " " * depth
+ print(f"{indent}{format_val(r['val'])} = {sym}()")
+ i += 1
+
+
+def main():
+ parser = argparse.ArgumentParser(
+ description="Load a test module with kcov_dataflow and view records")
+ parser.add_argument("module", help="Test module name (e.g. eight_args_c)")
+ parser.add_argument("--raw", action="store_true",
+ help="Print raw records instead of tree")
+ parser.add_argument("--ko", help="Explicit path to .ko file")
+ parser.add_argument("--context", "-C", type=int, default=0,
+ help="Show N lines before/after each module record")
+ args = parser.parse_args()
+
+ # Find module
+ if args.ko:
+ ko_path = args.ko
+ else:
+ ko_path = find_module(args.module)
+ if not ko_path or not os.path.exists(ko_path):
+ print(f"Cannot find module for '{args.module}'", file=sys.stderr)
+ print(f"Build it first: make LLVM=1 CC=clang "
+ f"M=tools/testing/selftests/kcov_dataflow/{args.module} modules",
+ file=sys.stderr)
+ sys.exit(1)
+
+ # Open kcov_dataflow
+ # Ensure kallsyms shows real addresses
+ try:
+ with open("/proc/sys/kernel/kptr_restrict", "w") as f:
+ f.write("0")
+ except (PermissionError, FileNotFoundError):
+ pass
+
+ try:
+ df_fd = os.open("/sys/kernel/debug/kcov_dataflow", os.O_RDWR)
+ except OSError as e:
+ print(f"Cannot open kcov_dataflow: {e}", file=sys.stderr)
+ sys.exit(1)
+
+ # Init + mmap
+ fcntl.ioctl(df_fd, KCOV_DF_INIT_TRACK, BUF_SIZE)
+ libc = ctypes.CDLL(ctypes.util.find_library("c"), use_errno=True)
+ libc.mmap.restype = ctypes.c_void_p
+ libc.mmap.argtypes = [
+ ctypes.c_void_p, ctypes.c_size_t, ctypes.c_int,
+ ctypes.c_int, ctypes.c_int, ctypes.c_long
+ ]
+ buf_ptr = libc.mmap(None, BUF_SIZE * 8, 0x3, 0x01, df_fd, 0)
+ if buf_ptr == ctypes.c_void_p(-1).value:
+ print("mmap failed", file=sys.stderr)
+ sys.exit(1)
+ buf = (ctypes.c_uint64 * BUF_SIZE).from_address(buf_ptr)
+
+ # Load module first (generates noise with INSTRUMENT_ALL)
+ mod_name = os.path.basename(ko_path).replace(".ko", "")
+ try:
+ finit_module(ko_path)
+ print(f"# Loaded {mod_name}")
+ except OSError as e:
+ print(f"Failed to load module: {e}", file=sys.stderr)
+ sys.exit(1)
+
+ # Get module .text address for PC filtering
+ mod_text_start = 0
+ try:
+ with open(f"/sys/module/{mod_name}/sections/.text") as f:
+ mod_text_start = int(f.read().strip(), 16)
+ except (FileNotFoundError, ValueError, PermissionError):
+ pass
+
+ # Enable recording AFTER load, BEFORE trigger (avoids VFS/loader noise)
+ fcntl.ioctl(df_fd, KCOV_DF_ENABLE, 0)
+ buf[0] = 0
+
+ # Trigger the module's debugfs file to invoke test functions
+ trigger_paths = [
+ f"/sys/kernel/debug/kcov_dataflow_test/trigger",
+ f"/sys/kernel/debug/kcov_dataflow_test/rust_ffi_trigger",
+ f"/sys/kernel/debug/trigger_rust",
+ f"/sys/kernel/debug/{mod_name}/trigger",
+ ]
+ for tp in trigger_paths:
+ try:
+ with open(tp, "w") as f:
+ f.write("1")
+ break
+ except (FileNotFoundError, PermissionError):
+ continue
+
+ fcntl.ioctl(df_fd, KCOV_DF_DISABLE, 0)
+
+ # Read kallsyms while module is still loaded (symbols available)
+ syms = load_kallsyms()
+
+ # Unload
+ try:
+ delete_module(mod_name)
+ except OSError:
+ pass
+
+ # Parse and display
+ total = int(buf[0])
+ print(f"# Captured {total} words")
+ records = parse_records(buf, total)
+ print(f"# {len(records)} records")
+
+ # Filter to module records using kallsyms
+ # Build set of module symbol addresses for fast lookup
+ mod_syms = set()
+ for addr, name, mod in syms:
+ if mod == mod_name and addr != 0:
+ mod_syms.add(addr)
+
+ def is_module_pc(pc):
+ """Check if PC belongs to mod_name via kallsyms."""
+ if mod_syms:
+ # Binary search: find nearest symbol <= pc, check module
+ lo, hi = 0, len(syms) - 1
+ while lo < hi:
+ mid = (lo + hi + 1) // 2
+ if syms[mid][0] <= pc:
+ lo = mid
+ else:
+ hi = mid - 1
+ return syms[lo][2] == mod_name
+ # Fallback: if no module symbols (kptr_restrict), use .text start
+ return mod_text_start and pc >= mod_text_start
+
+ if syms or mod_text_start:
+ if args.context > 0:
+ module_indices = set()
+ for i, r in enumerate(records):
+ if is_module_pc(r["pc"]):
+ for j in range(max(0, i - args.context),
+ min(len(records), i + args.context + 1)):
+ module_indices.add(j)
+ records = [records[i] for i in sorted(module_indices)]
+ print(f"# showing {len(records)} records with context={args.context} "
+ f"around {mod_name}\n")
+ else:
+ module_records = [r for r in records if is_module_pc(r["pc"])]
+ print(f"# {len(module_records)} from {mod_name}\n")
+ records = module_records
+ else:
+ print("")
+
+ if args.raw:
+ print_raw(records, syms)
+ else:
+ print_tree(records, syms)
+
+ os.close(df_fd)
+
+
+if __name__ == "__main__":
+ main()

--
2.43.0

Reply all
Reply to author
Forward
0 new messages