From 39d7d0140f9efc08fed79f493eb0d232b148ae4b Mon Sep 17 00:00:00 2001 From: shanegcstearns Date: Thu, 28 May 2026 12:08:18 -0700 Subject: [PATCH 1/3] now getting logits, issues with features still --- cocotb/Makefile | 2 + cocotb/chip_top_sim_wrap.sv | 4 +- cocotb/chip_top_tb.py | 195 +++++++++++++-------------- cocotb/sensors/i2c_slave_lis2dw12.sv | 8 +- cocotb/sim/tb/ml/ml_tb.py | 2 + misc/ML/nngen_out/ml_tb.py | 1 + src/chip_core.sv | 5 + src/chip_top.sv | 8 +- src/top.sv | 2 +- src/weight_flash_axi.v | 9 +- 10 files changed, 129 insertions(+), 107 deletions(-) diff --git a/cocotb/Makefile b/cocotb/Makefile index 743cbf2..90ab09b 100644 --- a/cocotb/Makefile +++ b/cocotb/Makefile @@ -263,6 +263,7 @@ FW_C_test_ml_weights := $(FW_TESTS)/test_ml_weights.c FW_C_test_ml_cpu_wake_i2c := $(FW_TESTS)/test_ml_cpu_wake_i2c.c FW_C_test_top_feature_ml_cpu:= $(FW_TESTS)/test_top_feature_ml_cpu.c FW_C_test_top_feature_ml_cpu_spi_flash:= $(FW_TESTS)/test_top_feature_ml_cpu_spi_flash.c +FW_C_test_top_feature_ml_30logits := $(FW_TESTS)/test_top_feature_ml_30logits.c FW_C_test_top_ml_control_unified := $(FW_TESTS)/test_top_ml_control_unified.c FW_C_test_top_sleep_wake_unified := $(FW_TESTS)/test_top_sleep_wake_unified.c FW_C_test_top_ml_golden_vector_unified := $(FW_TESTS)/test_top_ml_golden_vector_unified.c @@ -283,6 +284,7 @@ FW_ARCH_test_ml_cpu_wake_i2c := rv32im FW_ARCH_prod_main := rv32im FW_ARCH_test_top_feature_ml_cpu := rv32im FW_ARCH_test_top_feature_ml_cpu_spi_flash := rv32im +FW_ARCH_test_top_feature_ml_30logits := rv32im FW_ARCH_test_top_ml_control_unified := rv32im FW_ARCH_test_top_sleep_wake_unified := rv32im FW_ARCH_test_top_ml_golden_vector_unified := rv32im diff --git a/cocotb/chip_top_sim_wrap.sv b/cocotb/chip_top_sim_wrap.sv index cf6d049..95298f4 100644 --- a/cocotb/chip_top_sim_wrap.sv +++ b/cocotb/chip_top_sim_wrap.sv @@ -72,7 +72,9 @@ module chip_top_sim_wrap #( .ACC_POLL_PERIOD_TICKS (8), .PPG_POLL_PERIOD_TICKS (2), .PPG_WATERMARK (8), - .PPG_MAX_BURST_SAMPLES (32) + .PPG_MAX_BURST_SAMPLES (32), + .CFG_MOTION_HI_TH (16'hFFFF), + .CFG_MAX_MOTION_HI (16'hFFFF) ) `endif u_chip_top ( diff --git a/cocotb/chip_top_tb.py b/cocotb/chip_top_tb.py index 95bac8d..73d8e21 100644 --- a/cocotb/chip_top_tb.py +++ b/cocotb/chip_top_tb.py @@ -27,7 +27,7 @@ _PROJ = Path(__file__).resolve().parent -_FIRMWARE_NAME = os.getenv("FIRMWARE_NAME", "test_top_feature_ml_cpu_spi_flash") +_FIRMWARE_NAME = os.getenv("FIRMWARE_NAME", "test_top_feature_ml_30logits") _FIRMWARE_HEX = str(_PROJ / "firmware" / "build" / _FIRMWARE_NAME / "firmware.hex") _WEIGHT_HEX = str(_PROJ / "firmware" / "build" / "generated" / "taketwo_params.hex") @@ -178,9 +178,45 @@ async def test_chip_top_boot(dut): #full pipeline test +_N_LOGITS = 30 # number of inferences firmware runs before writing CAFEBABE + + +async def _feat_monitor(u_top): + """Background coroutine: prints features when they change, logits when they change.""" + prev_feat = None + prev_logit = None + while True: + await RisingEdge(u_top.feat_valid_o) + + mot = _s16_or_none(u_top.feat_motion_latched_r) + tim = _s16_or_none(u_top.feat_time_latched_r) + dhr = _s16_or_none(u_top.feat_delta_hr_latched_r) + msd = _s16_or_none(u_top.feat_mssd_latched_r) + curr_feat = (mot, tim, dhr, msd) + if curr_feat != prev_feat: + print( + f"[feat] mot={mot!s:>7} tim={tim!s:>6} " + f"dhr={dhr!s:>7} msd={msd!s:>7}", + flush=True, + ) + prev_feat = curr_feat + + try: + lr = int(u_top.u_weight_ram.logit_reg_0.value) + log0 = _s16(lr & 0xFFFF) + log1 = _s16((lr >> 16) & 0xFFFF) + curr_logit = (log0, log1) + except Exception: + curr_logit = None + if curr_logit is not None and curr_logit != prev_logit: + pred = "non-N3" if log1 > log0 else "N3" + print(f"[logit] log0={log0:6d} log1={log1:6d} → {pred}", flush=True) + prev_logit = curr_logit + + @cocotb.test(skip=(hdl_toplevel != "chip_top_sim_wrap")) async def test_chip_top_feature_inject(dut): - """Full pipeline: sensor → features → ML inference → alarm output.""" + """Full pipeline: sensor → features → 30 ML inferences → print logits live.""" logger = logging.getLogger("chip_top_feature_inject") # ALL mode: features + ML + CPU all active; bypasses SLEEP state in sim. @@ -188,12 +224,10 @@ async def test_chip_top_feature_inject(dut): dut.input_PAD.value = 0b00000101 await start_clock(dut.clk_PAD) await reset(dut.rst_n_PAD) - core = _core(dut) u_top = _top(dut) - BOOT_TIMEOUT = 500_000 - RUNTIME_TIMEOUT = 10_000_000 + RUNTIME_TIMEOUT = 3_000_000 # --- Phase 1: wait for boot --- logger.info("Waiting for boot_done...") @@ -205,32 +239,22 @@ async def test_chip_top_feature_inject(dut): raise AssertionError("Timeout waiting for boot_done") assert core.pico_trap_w.value == 0, "CPU trapped during boot" - logger.info("Boot done. Waiting for firmware to signal pass/fail...") + logger.info("Boot done. Waiting for 30 logits...") + + # Start background monitor — prints features + stale logits on every feat_valid_o + cocotb.start_soon(_feat_monitor(u_top)) + + # --- Phase 2: collect 30 logits as firmware writes them --- + # Firmware writes TEST_STATUS = 1..30 (one per inference), then 0xCAFEBABE. + # TEST_CODE = packed logits: bits[15:0]=log0, bits[31:16]=log1. + last_status = 0 + logit_count = 0 - # --- Phase 2: wait for firmware test_status (CAFE_BABE = pass, DEAD_BEEF = fail) --- for cycle in range(RUNTIME_TIMEOUT): await RisingEdge(dut.clk_PAD) - if cycle % 100_000 == 0: - try: - ts = u_top.test_status.value.to_unsigned() - tc_raw = u_top.test_code.value - tc_str = str(tc_raw) - alarm = dut.alarm_o.value - try: - tc_int = tc_raw.to_unsigned() - tc_display = f"0x{tc_int:08X}" - except ValueError: - tc_display = f"X:{tc_str}" - logger.info( - f" cycle {cycle}: test_status=0x{ts:08X} " - f"test_code={tc_display} alarm={alarm}" - ) - except Exception: - pass - if core.pico_trap_w.value == 1: - raise AssertionError("CPU trapped during runtime") + raise AssertionError(f"CPU trapped at cycle {cycle}") try: status = u_top.test_status.value.to_unsigned() @@ -238,86 +262,59 @@ async def test_chip_top_feature_inject(dut): continue if status == 0xDEAD_BEEF: - tc_raw = u_top.test_code.value try: - code = tc_raw.to_unsigned() - code_str = f"0x{code:08X}" - except ValueError: - code_str = f"X:{tc_raw!s}" + code_str = f"0x{u_top.test_code.value.to_unsigned():08X}" + except Exception: + code_str = str(u_top.test_code.value) raise AssertionError(f"Firmware reported FAIL: test_code={code_str}") - if status == 0xCAFE_BABE: - tc_raw = u_top.test_code.value - try: - code = tc_raw.to_unsigned() - code_str = f"0x{code:08X}" - except ValueError: - code = None - code_str = f"X:{tc_raw!s}" - logger.info( - f"Firmware reported PASS: test_code={code_str} " - f"alarm={dut.alarm_o.value}" - ) - - # Check bits 30 and 29 by position in binary string (MSB first). - # This handles X bits in other positions (e.g., the confidence field). - binstr = str(tc_raw) # 32-char string: '0'/'1'/'X'/'Z', MSB at [0] - bit30 = binstr[1] # bit 30 = outputs_mutated - bit29 = binstr[2] # bit 29 = saw_busy - assert bit30 == "1", \ - f"Firmware: ML output mutation not observed (bit30={bit30}, test_code={code_str})" - assert bit29 == "1", \ - f"Firmware: ML BUSY high not observed (bit29={bit29}, test_code={code_str})" - - if code is not None: - logger.info( - f" predicted_class={code >> 31} " - f"outputs_mutated={(code >> 30) & 1} " - f"saw_busy={(code >> 29) & 1} " - f"confidence={code & 0xFFFF}" - ) - else: - logger.warning( - f"test_code has X bits — confidence field indeterminate. " - f"Raw: {code_str}. Check logit WRAM region for X propagation." - ) - await ClockCycles(dut.clk_PAD, 4) - await ReadOnly() - - dbg_log0 = _s16_or_none(u_top.logit0) - dbg_log1 = _s16_or_none(u_top.logit1) - logit_word0 = _u32_or_none(u_top.u_weight_ram.logit_reg_0) - logit_word1 = _u32_or_none(u_top.u_weight_ram.logit_reg_1) - - if dbg_log0 is not None and dbg_log1 is not None: - logger.info(f" logits_dbg=({dbg_log0}, {dbg_log1})") - else: - logger.warning( - f"top-level dbg logits unresolved: " - f"logit0={u_top.logit0.value} logit1={u_top.logit1.value}" - ) + if status != last_status: + last_status = status - if logit_word0 is not None: - reg_log0 = _s16(logit_word0) - reg_log1 = _s16(logit_word0 >> 16) - aux_word = "X" if logit_word1 is None else f"0x{logit_word1:08X}" - logger.info( - f" logits_reg=({reg_log0}, {reg_log1}) " - f"word0=0x{logit_word0:08X} word1={aux_word}" - ) - else: - logger.warning( - f"logit register window unresolved: " - f"word0={u_top.u_weight_ram.logit_reg_0.value} " - f"word1={u_top.u_weight_ram.logit_reg_1.value}" + # Each value 1..N_LOGITS signals a completed inference + if 1 <= status <= _N_LOGITS: + try: + tc = u_top.test_code.value.to_unsigned() + log0 = _s16(tc & 0xFFFF) + log1 = _s16((tc >> 16) & 0xFFFF) + pred = "N3" if log1 > log0 else "non-N3" + # Read latched feature values directly from RTL for diagnostics + try: + mot = _s16(int(u_top.feat_motion_latched_r.value)) + tim = _s16(int(u_top.feat_time_latched_r.value)) + dhr = _s16(int(u_top.feat_delta_hr_latched_r.value)) + msd = _s16(int(u_top.feat_mssd_latched_r.value)) + feat_str = f" feat=[mot={mot} tim={tim} dhr={dhr} msd={msd}]" + except Exception: + feat_str = "" + logger.info( + f" logit {status:2d}/{_N_LOGITS}: " + f"log0={log0:6d} log1={log1:6d} → {pred}{feat_str}" + ) + logit_count += 1 + except Exception as e: + logger.warning(f" logit {status}: could not read TEST_CODE ({e})") + + if status == 0xCAFE_BABE: + assert logit_count == _N_LOGITS, ( + f"Expected {_N_LOGITS} logits before PASS, got {logit_count}" ) - - logger.info("Full pipeline test passed.") - return + try: + tc = u_top.test_code.value.to_unsigned() + log0 = _s16(tc & 0xFFFF) + log1 = _s16((tc >> 16) & 0xFFFF) + pred_class = (tc >> 31) & 1 + logger.info( + f"Full pipeline PASS — 30 logits collected. " + f"Final: log0={log0} log1={log1} class={pred_class}" + ) + except Exception: + logger.info(f"Full pipeline PASS — 30 logits collected.") + return raise AssertionError( f"Timeout after {RUNTIME_TIMEOUT} cycles — " - f"firmware never reached pass/fail state" + f"only {logit_count}/{_N_LOGITS} logits collected" ) @@ -351,7 +348,7 @@ async def test_chip_top_normal_mode(dut): u_top = _top(dut) BOOT_TIMEOUT = 500_000 - RUNTIME_TIMEOUT = 10_000_000 + RUNTIME_TIMEOUT = 1#10_000_000 logger.info("Normal mode test started. Monitoring for boot completion...") diff --git a/cocotb/sensors/i2c_slave_lis2dw12.sv b/cocotb/sensors/i2c_slave_lis2dw12.sv index 2fb9830..b061609 100644 --- a/cocotb/sensors/i2c_slave_lis2dw12.sv +++ b/cocotb/sensors/i2c_slave_lis2dw12.sv @@ -73,8 +73,12 @@ module i2c_slave_lis2dw12 #( ay16 = {raw_ay[13:0], 2'b00}; az16 = {raw_az[13:0], 2'b00}; end else begin - ax16 = 16'h0; ay16 = 16'h0; az16 = 16'h0; - $display("i2c_slave_lis2dw12: EOF at time %0t", $time); + $display("i2c_slave_lis2dw12: EOF at time %0t — rewinding", $time); + $rewind(fd); + r = $fscanf(fd, "%d,%d,%d\n", raw_ax, raw_ay, raw_az); + ax16 = {raw_ax[13:0], 2'b00}; + ay16 = {raw_ay[13:0], 2'b00}; + az16 = {raw_az[13:0], 2'b00}; end endtask diff --git a/cocotb/sim/tb/ml/ml_tb.py b/cocotb/sim/tb/ml/ml_tb.py index 6cc96ae..906f4b9 100644 --- a/cocotb/sim/tb/ml/ml_tb.py +++ b/cocotb/sim/tb/ml/ml_tb.py @@ -149,3 +149,5 @@ async def load_weights_and_infer_once(dut): elif (pred == False and int(feats[4]) == 0): true_n += 1 print(f"Conf matrix: true_n: {true_n}, false_n: {false_n}, false_p: {false_p}, true_p: {true_p}") + print(f"features: [{feats[0]}, {feats[1]}, {feats[2]}, {feats[3]}]") + print(line) \ No newline at end of file diff --git a/misc/ML/nngen_out/ml_tb.py b/misc/ML/nngen_out/ml_tb.py index f90eae5..ce161c2 100644 --- a/misc/ML/nngen_out/ml_tb.py +++ b/misc/ML/nngen_out/ml_tb.py @@ -116,3 +116,4 @@ async def load_weights_and_infer_once(dut): out_bytes = axi_ram.read(out_addr, 4) log0, log1 = struct.unpack("<2h", out_bytes) print(f"logits int16: [{log0}, {log1}] (raw bytes={out_bytes.hex()})") + print(line) diff --git a/src/chip_core.sv b/src/chip_core.sv index 32c6102..cf40967 100644 --- a/src/chip_core.sv +++ b/src/chip_core.sv @@ -473,6 +473,11 @@ module chip_core #( .sim_len_o (sim_len_w), .sim_write_o (sim_write_w), .sim_wdata_o (sim_wdata_w), + .sim_ack_i (sim_ack_i), + .sim_rdata_i (sim_rdata_i), + .sim_rvalid_i (sim_rvalid_i), + .sim_rlast_i (sim_rlast_i), + .sim_err_i (sim_err_i), `endif .feat_valid_o (feat_valid_w), .time_feat_o (time_feat_top_w), diff --git a/src/chip_top.sv b/src/chip_top.sv index 28609cf..0296727 100644 --- a/src/chip_top.sv +++ b/src/chip_top.sv @@ -25,7 +25,9 @@ module chip_top #( parameter integer ACC_POLL_PERIOD_TICKS = 50_000, parameter integer PPG_POLL_PERIOD_TICKS = 100, parameter integer PPG_WATERMARK = 8, - parameter integer PPG_MAX_BURST_SAMPLES = 32 + parameter integer PPG_MAX_BURST_SAMPLES = 32, + parameter [15:0] CFG_MOTION_HI_TH = 16'd2000, + parameter [15:0] CFG_MAX_MOTION_HI = 16'd3 )( `ifdef USE_POWER_PINS inout wire VDD, @@ -208,7 +210,9 @@ module chip_top #( .ACC_POLL_PERIOD_TICKS (ACC_POLL_PERIOD_TICKS), .PPG_POLL_PERIOD_TICKS (PPG_POLL_PERIOD_TICKS), .PPG_WATERMARK (PPG_WATERMARK), - .PPG_MAX_BURST_SAMPLES (PPG_MAX_BURST_SAMPLES) + .PPG_MAX_BURST_SAMPLES (PPG_MAX_BURST_SAMPLES), + .CFG_MOTION_HI_TH (CFG_MOTION_HI_TH), + .CFG_MAX_MOTION_HI (CFG_MAX_MOTION_HI) ) i_chip_core ( `ifdef USE_POWER_PINS .VDD (VDD), diff --git a/src/top.sv b/src/top.sv index 3e50628..3223f9f 100644 --- a/src/top.sv +++ b/src/top.sv @@ -1032,7 +1032,7 @@ module top #( .BASE_ADDR (WEIGHT_BASE), .CLK_DIV (8'd2), .FLASH_BASE(24'h00_1000) - ) u_weight_ram ( + ) u_weight_flash ( `ifdef USE_POWER_PINS .VDD (VDD), .VSS (VSS), diff --git a/src/weight_flash_axi.v b/src/weight_flash_axi.v index 5695af1..2d1cef1 100644 --- a/src/weight_flash_axi.v +++ b/src/weight_flash_axi.v @@ -331,8 +331,13 @@ module weight_flash_axi #( spi_clk <= ~spi_clk; if (!spi_clk) begin // going high = rising edge: sample MISO if (bit_cnt == 5'd0) begin - // 32nd bit received — word complete - saxi_rdata <= {rx_sr[30:0], spi_miso}; + // 32nd bit received — word complete. + // SPI sends LSB-byte first; left-shift accumulates + // byte0 in [31:24]. Byte-reverse to match AXI LE. + saxi_rdata <= {rx_sr[6:0], spi_miso, + rx_sr[14:7], + rx_sr[22:15], + rx_sr[30:23]}; saxi_rvalid <= 1'b1; end else begin rx_sr <= {rx_sr[30:0], spi_miso}; From 79b91f1991e087245f8053ff5d7bec652d32b535 Mon Sep 17 00:00:00 2001 From: shanegcstearns Date: Thu, 28 May 2026 12:52:58 -0700 Subject: [PATCH 2/3] new test firmware --- .../tests/test_top_feature_ml_30logits.c | 151 ++++++++++++++++++ 1 file changed, 151 insertions(+) create mode 100644 cocotb/firmware/tests/test_top_feature_ml_30logits.c diff --git a/cocotb/firmware/tests/test_top_feature_ml_30logits.c b/cocotb/firmware/tests/test_top_feature_ml_30logits.c new file mode 100644 index 0000000..a83d40e --- /dev/null +++ b/cocotb/firmware/tests/test_top_feature_ml_30logits.c @@ -0,0 +1,151 @@ +#include + +#define FEATURE_BASE 0x03004000u +#define ML_BASE 0x03003000u +#define WEIGHT_BASE 0x03006000u +#define TEST_BASE 0x0300F000u + +#define TEST_STATUS (*(volatile uint32_t*)(TEST_BASE + 0x00u)) +#define TEST_CODE (*(volatile uint32_t*)(TEST_BASE + 0x04u)) +#define ML_SCORE (*(volatile uint32_t*)(TEST_BASE + 0x20u)) + +#define FEATURE_STATUS (*(volatile uint32_t*)(FEATURE_BASE + 0x00u)) +#define FEATURE_TIME (*(volatile uint32_t*)(FEATURE_BASE + 0x04u)) +#define FEATURE_MOTION (*(volatile uint32_t*)(FEATURE_BASE + 0x08u)) +#define FEATURE_DHR (*(volatile uint32_t*)(FEATURE_BASE + 0x0Cu)) +#define FEATURE_RMSSD (*(volatile uint32_t*)(FEATURE_BASE + 0x10u)) + +#define FEATURE_VALID_MASK (1u << 0) + +#define ML_REG(off) (*(volatile uint32_t*)(ML_BASE + (off))) +#define WRAM_U32(off) (*(volatile uint32_t*)(WEIGHT_BASE + (off))) +#define WRAM_I16(off) (*(volatile int16_t*) (WEIGHT_BASE + (off))) + +#define VAR_BASE 128u +#define X_BASE 64u +#define LOGIT_BASE 5504u + +#define ALARM_BASE 0x03000000u +#define ALARM_CTRL (*(volatile uint32_t*)(ALARM_BASE + 0x00u)) + +#define TEST_PASS 0xCAFEBABEu +#define TEST_FAIL 0xDEADBEEFu + +#define OUT0_SENTINEL 0xA5A55A5Au + +#define N_LOGITS 10 + +#define WAKE_CLASS 1u +#define WAKE_STREAK_REQ 1u + +static void fail(uint32_t code) { + TEST_CODE = code; + TEST_STATUS = TEST_FAIL; + for (;;) {} +} + +static int wait_feature_valid(uint32_t timeout, uint32_t *status_out) { + while (timeout--) { + uint32_t s = FEATURE_STATUS; + if (s & FEATURE_VALID_MASK) { *status_out = s; return 1; } + } + return 0; +} + +static int wait_busy_value(uint32_t target, uint32_t timeout) { + while (timeout--) { + if ((ML_REG(0x14u) & 1u) == target) return 1; + } + return 0; +} + +void main(void) { + int i; + uint32_t feature_status; + int16_t time_feat, motion_feat, delta_hr_feat, rmssd_feat; + uint32_t out0_after; + int16_t log0, log1; + uint16_t conf; + uint32_t predicted_class; + uint32_t wake_streak; + + TEST_STATUS = 0u; + TEST_CODE = 0u; + ML_SCORE = 0u; + ALARM_CTRL = 0u; + log0 = 0; log1 = 0; predicted_class = 0u; wake_streak = 0u; + + /* One-time ML register setup */ + ML_REG(0x80u) = WEIGHT_BASE; + if (ML_REG(0x80u) != WEIGHT_BASE) fail(0xF201u); + ML_REG(0x88u) = LOGIT_BASE; + if (ML_REG(0x88u) != LOGIT_BASE) fail(0xF20Au); + ML_REG(0x8Cu) = X_BASE; + if (ML_REG(0x8Cu) != X_BASE) fail(0xF20Bu); + ML_REG(0x90u) = VAR_BASE; + if (ML_REG(0x90u) != VAR_BASE) fail(0xF20Cu); + ML_REG(0x28u) = 1u; + ML_REG(0x2Cu) = 1u; + + for (i = 0; i < N_LOGITS; i++) { + /* Wait for the feature engine to produce a new valid epoch */ + if (!wait_feature_valid(5000000u, &feature_status)) fail(0xF210u | (uint32_t)i); + + /* Latch features then consume (clear valid so next epoch can be produced) */ + time_feat = (int16_t)(FEATURE_TIME & 0xFFFFu); + motion_feat = (int16_t)(FEATURE_MOTION & 0xFFFFu); + delta_hr_feat = (int16_t)(FEATURE_DHR & 0xFFFFu); + rmssd_feat = (int16_t)(FEATURE_RMSSD & 0xFFFFu); + FEATURE_STATUS = 1u; + + /* Write feature vector into X buffer */ + WRAM_I16(X_BASE + 0u) = motion_feat; + WRAM_I16(X_BASE + 2u) = time_feat; + WRAM_I16(X_BASE + 4u) = delta_hr_feat; + WRAM_I16(X_BASE + 6u) = rmssd_feat; + + /* Poison output so we can detect mutation */ + WRAM_U32(LOGIT_BASE + 0u) = OUT0_SENTINEL; + + /* Start inference, wait BUSY 0->1->0, clear START */ + ML_REG(0x10u) = 1u; + if (!wait_busy_value(1u, 200000u)) fail(0xF220u | (uint32_t)i); + if (!wait_busy_value(0u, 2000000u)) fail(0xF230u | (uint32_t)i); + ML_REG(0x10u) = 0u; + + /* Read output and verify mutation */ + out0_after = WRAM_U32(LOGIT_BASE + 0u); + if (out0_after == OUT0_SENTINEL) fail(0xF240u | (uint32_t)i); + + log0 = (int16_t)(out0_after & 0xFFFFu); + log1 = (int16_t)((out0_after >> 16) & 0xFFFFu); + conf = (log1 > log0) ? (uint16_t)(log1 - log0) : (uint16_t)(log0 - log1); + predicted_class = (log1 > log0) ? 1u : 0u; + + /* Streak: 5 consecutive wake predictions trips the alarm */ + if (predicted_class == WAKE_CLASS) + wake_streak++; + else + wake_streak = 0u; + + if (wake_streak >= WAKE_STREAK_REQ) + ALARM_CTRL = 1u; + + ML_SCORE = conf; + + /* Signal testbench: packed logits in TEST_CODE, iteration number in TEST_STATUS */ + TEST_CODE = ((predicted_class & 1u) << 31) | + ((uint32_t)(uint16_t)log1 << 16) | + ((uint32_t)(uint16_t)log0); + TEST_STATUS = (uint32_t)(i + 1); + } + + /* Final: write last logit info + PASS */ + ML_SCORE = (uint32_t)(log1 > log0 ? (log1 - log0) : (log0 - log1)); + TEST_CODE = ((predicted_class & 1u) << 31) | + ((uint32_t)(uint16_t)log1 << 16) | + ((uint32_t)(uint16_t)log0); + TEST_STATUS = TEST_PASS; + + for (;;) {} +} From 31f2052d95ad6d6ea3c78239feca247a0e5451a9 Mon Sep 17 00:00:00 2001 From: rgovindan04 Date: Thu, 28 May 2026 13:06:39 -0700 Subject: [PATCH 3/3] I2C check using cocotb i2c extension --- Makefile | 4 + cocotb/chip_top_tb.py | 13 +- .../tb/sim_chip_top_gl_sensor_bridge_env.sv | 6 +- cocotb/sim/tb/test_chip_top_i2c_pads.py | 487 ++++++++++++++++++ requirements.txt | 3 + .../sensor_models/sensors/accelerometer.py | 19 +- scripts/sensor_models/sensors/ppg.py | 21 +- 7 files changed, 536 insertions(+), 17 deletions(-) create mode 100644 cocotb/sim/tb/test_chip_top_i2c_pads.py diff --git a/Makefile b/Makefile index e0f1292..d9f1214 100644 --- a/Makefile +++ b/Makefile @@ -152,6 +152,10 @@ sim-gl-sensor-bridge-full: ## Run long gate-level sensor I2C debug-feature refer cd cocotb; PYTHONPATH=$(MAKEFILE_DIR)/cocotb/sim/tb GL=1 GL_SENSOR_I2C_FULL_FEATURES=1 CHIP_TOPLEVEL=sim_chip_top_gl_sensor_bridge_env CHIP_NETLIST_TOP=$(TOP) FINAL_DIR=$(FINAL_DIR) PDK_ROOT=${PDK_ROOT} PDK=${PDK} SLOT=${SLOT} COCOTB_TEST_MODULE=test_chip_top_gl_sensor_bridge COCOTB_TEST_FILTER=test_chip_top_gl_sensor_bridge_debug_features_match_python_reference $(PYTHON) chip_top_tb.py .PHONY: sim-gl-sensor-bridge-full +sim-rtl-sensor-i2c-pads: ## Run RTL chip_top sensor I2C pad test with cocotbext-i2c + cd cocotb; PYTHONPATH=$(MAKEFILE_DIR)/cocotb/sim/tb CHIP_TOP_PAD_I2C=1 CHIP_TOPLEVEL=sim_chip_top_gl_sensor_bridge_env PDK_ROOT=${PDK_ROOT} PDK=${PDK} SLOT=${SLOT} COCOTB_TEST_MODULE=test_chip_top_i2c_pads COCOTB_TEST_FILTER=test_chip_top_i2c_pads_reach_cocotbext_sensor_models $(PYTHON) chip_top_tb.py +.PHONY: sim-rtl-sensor-i2c-pads + sim-view: ## View simulation waveforms in GTKWave gtkwave cocotb/sim_build/chip_top.fst diff --git a/cocotb/chip_top_tb.py b/cocotb/chip_top_tb.py index 73d8e21..c17e996 100644 --- a/cocotb/chip_top_tb.py +++ b/cocotb/chip_top_tb.py @@ -20,6 +20,7 @@ gl = os.getenv("GL", False) slot = os.getenv("SLOT", "1x1") test_module = os.getenv("COCOTB_TEST_MODULE", "chip_top_tb") +pad_i2c_rtl = os.getenv("CHIP_TOP_PAD_I2C", "0") == "1" hdl_toplevel = os.getenv("CHIP_TOPLEVEL", "chip_top_sim_wrap") @@ -411,8 +412,11 @@ def chip_top_runner(): proj_path = Path(__file__).resolve().parent sources = [] - # RTL builds use the SIM-only sensor bus; GL builds below intentionally do not. - defines = {f"SLOT_{slot.upper().replace('.', 'P')}": True, "SIM": True} + # Most RTL builds use the SIM-only sensor bus. Pad-level I2C RTL tests + # intentionally compile without SIM so i2c_master drives bidir_PAD[23:24]. + defines = {f"SLOT_{slot.upper().replace('.', 'P')}": True} + if not pad_i2c_rtl: + defines["SIM"] = True includes = [proj_path / "../src/"] if gl: @@ -450,7 +454,7 @@ def chip_top_runner(): defines = {"FUNCTIONAL": True, "functional": True, "USE_POWER_PINS": True} else: src_dir = proj_path / "../src" - pad_level = hdl_toplevel in {"chip_top", "chip_top_sim_wrap"} + pad_level = hdl_toplevel in {"chip_top", "chip_top_sim_wrap", "sim_chip_top_gl_sensor_bridge_env"} skip = {"dummy_top.sv", "soc_top.v"} if pad_level: skip.add("gf180mcu_fd_ip_sram__sram512x8m8wm1.v") @@ -466,9 +470,12 @@ def chip_top_runner(): # Sim wrapper is only needed when it is the selected HDL toplevel. if hdl_toplevel == "chip_top_sim_wrap": sources.append(proj_path / "chip_top_sim_wrap.sv") + elif hdl_toplevel == "sim_chip_top_gl_sensor_bridge_env": + sources.append(proj_path / "sim/tb/sim_chip_top_gl_sensor_bridge_env.sv") # Pad-level builds need GF180 IO models. Direct chip_core RTL DFT does not. if pad_level: + defines["USE_POWER_PINS"] = True sources += [ Path(pdk_root) / pdk / "libs.ref/gf180mcu_fd_io/verilog/gf180mcu_fd_io.v", Path(pdk_root) / pdk / "libs.ref/gf180mcu_fd_io/verilog/gf180mcu_ws_io.v", diff --git a/cocotb/sim/tb/sim_chip_top_gl_sensor_bridge_env.sv b/cocotb/sim/tb/sim_chip_top_gl_sensor_bridge_env.sv index e3b645e..3f5c5e1 100644 --- a/cocotb/sim/tb/sim_chip_top_gl_sensor_bridge_env.sv +++ b/cocotb/sim/tb/sim_chip_top_gl_sensor_bridge_env.sv @@ -18,6 +18,10 @@ module sim_chip_top_gl_sensor_bridge_env; wire sensor_scl_sample; wire sensor_sda_sample; logic sensor_sda_drive_low = 1'b0; + logic accel_sda_o = 1'b1; + logic ppg_sda_o = 1'b1; + logic accel_scl_o = 1'b1; + logic ppg_scl_o = 1'b1; wire [1:0] analog_PAD; wire VDD = 1'b1; @@ -34,7 +38,7 @@ module sim_chip_top_gl_sensor_bridge_env; generate for (i = 0; i < 40; i = i + 1) begin : tb_bidir_drive if (i == SENSOR_SDA_PAD) begin : sensor_sda_drive - assign bidir_PAD[i] = sensor_sda_drive_low ? 1'b0 : + assign bidir_PAD[i] = (sensor_sda_drive_low || !accel_sda_o || !ppg_sda_o) ? 1'b0 : (bidir_oe[i] ? bidir_drv[i] : 1'bz); end else begin : external_drive assign bidir_PAD[i] = bidir_oe[i] ? bidir_drv[i] : 1'bz; diff --git a/cocotb/sim/tb/test_chip_top_i2c_pads.py b/cocotb/sim/tb/test_chip_top_i2c_pads.py new file mode 100644 index 0000000..8e4a81b --- /dev/null +++ b/cocotb/sim/tb/test_chip_top_i2c_pads.py @@ -0,0 +1,487 @@ +"""Pad-level chip_top sensor-I2C smoke test using project Python sensor models. + +This test is for the real chip_top pad path, not the SIM-only sensor bus. +The Makefile target compiles chip_top RTL with CHIP_TOP_PAD_I2C=1, which keeps +`SIM` undefined so `src/i2c_master.sv` drives the physical sensor I2C pads: + + bidir_PAD[23] -> sensor SCL + bidir_PAD[24] -> sensor SDA + +The testbench wrapper exposes those pad nets as `sensor_scl_sample` and +`sensor_sda_sample`. This cocotb test attaches two open-drain I2C slave models +to the SDA pad via cocotbext-i2c-style `sda_o` signals: + + LIS2DW12 accelerometer at 7-bit address 0x19 + ADPD144RI PPG sensor at 7-bit address 0x64 + +The sensor payloads are generated by the repository's Python sensor models +(`scripts/sensor_models/sensors/accelerometer.py` and `ppg.py`) from small, +deterministic synthetic raw inputs. That means the test exercises: + + Python sensor model -> I2C register model -> chip_top pads -> + GF180 IO model -> top/i2c_master -> accel_reader/ppg_fifo_reader + +Pass criteria: + * sensor SCL toggles on bidir_PAD[23] + * the accelerometer init writes are observed over pads + * at least one accelerometer data burst is read over pads + * the PPG FIFO access-enable write is observed over pads + * at least one PPG FIFO burst is read over pads + +This intentionally does not prove full feature correctness or ML/logit behavior. +It proves that real chip_top sensor-pad I2C traffic can reach sensor-like models +and return model-generated accel/PPG samples into the RTL. +""" + +import logging +import os +import sys +import tempfile +from pathlib import Path + +import cocotb +from cocotb.clock import Clock +from cocotb.triggers import FallingEdge, ReadOnly, RisingEdge, Timer +from cocotbext.i2c import I2cDevice + + +SENSOR_SCL_PAD = 23 +SENSOR_SDA_PAD = 24 + +TEST_MODE_MSSD = 0x1 + +ACC_ADDR = 0x19 +PPG_ADDR = 0x64 +ACC_REG_CTRL1 = 0x20 +ACC_REG_RANGE = 0x23 +ACC_REG_STATUS = 0x27 +ACC_REG_OUT_X_L = 0x28 +PPG_REG_STATUS = 0x00 +PPG_REG_FIFO_THRESH = 0x06 +PPG_REG_FIFO_ACCESS_ENA = 0x5F +PPG_REG_FIFO_ACCESS = 0x60 + +ROOT = Path(__file__).resolve().parents[3] +SENSOR_MODEL_DIR = ROOT / "scripts" / "sensor_models" +if str(SENSOR_MODEL_DIR) not in sys.path: + sys.path.insert(0, str(SENSOR_MODEL_DIR)) + +import numpy as np +from sensors import accelerometer as accel_model +from sensors import ppg as ppg_model + + +async def _startup(dut): + """Put chip_top in sensor debug mode and release reset. + + TEST_MODE_MSSD enables the sensor/feature path used by the existing pad + bridge checks. The bidir drive signals are released except for SDA, where + this test's I2C devices may pull the line low through open-drain controls. + """ + + dut.input_drv.value = TEST_MODE_MSSD + dut.bidir_drv.value = 0 + dut.bidir_oe.value = 0 + dut.sensor_sda_drive_low.value = 0 + dut.accel_sda_o.value = 1 + dut.ppg_sda_o.value = 1 + dut.accel_scl_o.value = 1 + dut.ppg_scl_o.value = 1 + dut.rst_n_drv.value = 0 + cocotb.start_soon(Clock(dut.clk_drv, 40, "ns").start()) + await Timer(400, "ns") + dut.rst_n_drv.value = 1 + + +def _build_sensor_model_streams(): + """Generate deterministic digital streams with the project sensor models. + + The production model scripts normally write CSV files for SV simulations. + Here we call the same processing functions directly and keep the returned + arrays in memory, avoiding PhysioNet/network access during the cocotb run. + """ + + sample_count = int(os.getenv("I2C_PAD_SENSOR_MODEL_SAMPLES", "512")) + rng_seed = int(os.getenv("I2C_PAD_SENSOR_MODEL_SEED", "7")) + np.random.seed(rng_seed) + + accel_t = np.arange(sample_count, dtype=float) / 25.0 + raw_g = np.column_stack( + [ + 0.020 * np.sin(2 * np.pi * 0.25 * accel_t), + 0.015 * np.cos(2 * np.pi * 0.18 * accel_t), + 1.000 + 0.010 * np.sin(2 * np.pi * 0.11 * accel_t), + ] + ) + + bpm_t = np.arange(0.0, max(12.0, sample_count / 25.0 + 4.0), 1.0) + bpm = 64.0 + 3.0 * np.sin(2 * np.pi * bpm_t / 20.0) + + with tempfile.TemporaryDirectory(prefix="sensor_model_i2c_") as tmp: + tmp_path = Path(tmp) + accel_model.PLOT_DIR = str(tmp_path / "plots") + ppg_model.PLOT_DIR = str(tmp_path / "plots") + accel_counts = accel_model.process_accelerometer(raw_g, tmp_path) + red_counts, _ir_counts = ppg_model.process_ppg(bpm_t, bpm, tmp_path) + + return accel_counts.astype(np.int16), red_counts.astype(np.uint16) + + +class RegisterSensorDevice(I2cDevice): + """Base class for simple register-addressed I2C sensor devices. + + It subclasses cocotbext-i2c so the device owns/release-drives SDA through + the same open-drain style interface as cocotbext. The byte sampler is kept + tolerant of this chip_top pad simulation's edge ordering; the stock + cocotbext state machine was seeing false stop/repeated-start tokens while + the passive monitor still decoded valid address bytes. + """ + + def __init__(self, *args, addr, **kwargs): + self.addr = addr + self.reg_ptr = 0 + self._first_write = True + self._read_buf = [] + self.transactions = 0 + self.writes = 0 + self.reads = 0 + self.seen = False + self.observed_tokens = [] + self._pending_start = False + super().__init__(*args, **kwargs) + + def _scl_is_high(self): + return str(self.scl.value) == "1" + + def _sda_is_high(self): + return str(self.sda.value) == "1" + + async def _wait_start(self): + if self._pending_start: + self._pending_start = False + return + while True: + await FallingEdge(self.sda) + await ReadOnly() + if self._scl_is_high(): + return + + async def _recv_byte(self, record=True): + value = 0 + for bit in range(7, -1, -1): + await RisingEdge(self.scl) + await ReadOnly() + if self._sda_is_high(): + value |= 1 << bit + if record: + self.observed_tokens.append(value) + return value + + def handle_start(self): + self._first_write = True + self._read_buf = [] + + async def handle_write(self, data): + self.seen = True + if self._first_write: + self.reg_ptr = data + self._first_write = False + else: + self.writes += 1 + self.handle_reg_write(self.reg_ptr, data) + self.reg_ptr = (self.reg_ptr + 1) & 0xFF + + async def handle_read(self): + self.seen = True + if not self._read_buf: + self._read_buf = list(self.make_read_response(self.reg_ptr)) + data = self._read_buf.pop(0) if self._read_buf else 0 + if not self._read_buf: + self.reg_ptr = (self.reg_ptr + 1) & 0xFF + self.reads += 1 + return data + + def handle_stop(self): + self.transactions += 1 + self._first_write = True + self._read_buf = [] + + def handle_reg_write(self, reg, data): + pass + + def make_read_response(self, reg): + return [0] + + def is_data_write_reg(self, reg): + return False + + async def _send_ack(self): + await FallingEdge(self.scl) + self._set_sda(0) + await RisingEdge(self.scl) + await FallingEdge(self.scl) + self._set_sda(1) + + async def _send_byte(self, value): + for bit in range(7, -1, -1): + await FallingEdge(self.scl) + self._set_sda((value >> bit) & 1) + await RisingEdge(self.scl) + await FallingEdge(self.scl) + self._set_sda(1) + await RisingEdge(self.scl) + await ReadOnly() + master_nack = self._sda_is_high() + await FallingEdge(self.scl) + return not master_nack + + async def _send_read_response(self, reg): + response = list(self.make_read_response(reg)) + byte_index = 0 + while True: + byte = response[byte_index] if byte_index < len(response) else 0x00 + byte_index += 1 + if not await self._send_byte(byte): + return + + async def _run(self): + """Handle write-reg/read-data and write-reg/write-data transactions.""" + + self._set_sda(1) + while True: + await self._wait_start() + self.handle_start() + + addr_rw = await self._recv_byte() + addr = (addr_rw >> 1) & 0x7F + read = bool(addr_rw & 1) + if addr != self.addr: + continue + + self.seen = True + await self._send_ack() + + if read: + await self._send_read_response(self.reg_ptr) + self.transactions += 1 + continue + + reg = await self._recv_byte(record=False) + await self.handle_write(reg) + await self._send_ack() + + if self.is_data_write_reg(reg): + data = await self._recv_byte(record=False) + await self.handle_write(data) + await self._send_ack() + self.transactions += 1 + continue + + await self._wait_start() + self.handle_start() + addr_rw = await self._recv_byte() + read_addr = (addr_rw >> 1) & 0x7F + if read_addr != self.addr or not (addr_rw & 1): + raise AssertionError( + f"unexpected repeated-start address 0x{addr_rw:02x} for device 0x{self.addr:02x}" + ) + await self._send_ack() + await self._send_read_response(reg) + self.transactions += 1 + + +class PassiveI2CAddressMonitor: + """Non-driving monitor used to debug what the chip puts on the sensor pads. + + On timeout, the first decoded address bytes help distinguish pad activity + problems from missing/incorrect sensor responses. Expected early bytes are + usually 0x32 for ACC_ADDR write and 0xc8 for PPG_ADDR write. + """ + + def __init__(self, dut): + self.dut = dut + self.addr_bytes = [] + + def _scl_is_high(self): + return str(self.dut.sensor_scl_sample.value) == "1" + + def _sda_is_high(self): + return str(self.dut.sensor_sda_sample.value) == "1" + + async def _wait_start(self): + while True: + await FallingEdge(self.dut.sensor_sda_sample) + await ReadOnly() + if self._scl_is_high(): + return + + async def _recv_byte(self): + value = 0 + for bit in range(7, -1, -1): + await RisingEdge(self.dut.sensor_scl_sample) + await ReadOnly() + if self._sda_is_high(): + value |= 1 << bit + return value + + async def run(self): + while True: + await self._wait_start() + self.addr_bytes.append(await self._recv_byte()) + + +class Lis2dw12Device(RegisterSensorDevice): + """LIS2DW12-like register behavior backed by accelerometer.py samples.""" + + def __init__(self, *args, samples, **kwargs): + self.samples = samples + self.index = 0 + self.ctrl_writes = 0 + self.range_writes = 0 + self.sample_reads = 0 + super().__init__(*args, addr=ACC_ADDR, **kwargs) + + def handle_reg_write(self, reg, data): + if reg == ACC_REG_CTRL1: + self.ctrl_writes += 1 + elif reg == ACC_REG_RANGE: + self.range_writes += 1 + + def is_data_write_reg(self, reg): + return reg in (ACC_REG_CTRL1, ACC_REG_RANGE) + + def make_read_response(self, reg): + # STATUS reports data-ready. OUT_X_L returns XL/XH/YL/YH/ZL/ZH, matching + # the existing SV sensor model's left-justified 14-bit sample format. + if reg == ACC_REG_STATUS: + return [0x01] + if reg == ACC_REG_OUT_X_L: + if self.index >= len(self.samples): + self.index = 0 + sample = self.samples[self.index] + self.index += 1 + self.sample_reads += 1 + ax = (int(sample[0]) << 2) & 0xFFFF + ay = (int(sample[1]) << 2) & 0xFFFF + az = (int(sample[2]) << 2) & 0xFFFF + return [ax & 0xFF, ax >> 8, ay & 0xFF, ay >> 8, az & 0xFF, az >> 8] + return [0x00] + + +class Adpd144riDevice(RegisterSensorDevice): + """ADPD144RI-like register behavior backed by ppg.py red-channel samples.""" + + def __init__(self, *args, samples, **kwargs): + self.samples = samples + self.index = 0 + self.fifo_en_writes = 0 + self.fifo_reads = 0 + self._last_sample = 0 + super().__init__(*args, addr=PPG_ADDR, **kwargs) + + def handle_reg_write(self, reg, data): + if reg == PPG_REG_FIFO_ACCESS_ENA: + self.fifo_en_writes += 1 + + def is_data_write_reg(self, reg): + return reg == PPG_REG_FIFO_ACCESS_ENA + + def _next_fifo_byte(self, byte_index): + if byte_index % 2 == 0: + if self.index >= len(self.samples): + self.index = 0 + sample = int(self.samples[self.index]) & 0x3FFF + self.index += 1 + self._last_sample = sample + return sample & 0xFF + return (self._last_sample >> 8) & 0xFF + + def make_read_response(self, reg): + # The FIFO path streams 16-bit little-endian words from the generated + # PPG red-channel counts, matching what ppg_fifo_reader consumes. + if reg == PPG_REG_STATUS: + return [0x00, 0x10] + if reg == PPG_REG_FIFO_THRESH: + return [0x00, 0x02] + if reg == PPG_REG_FIFO_ACCESS: + self.fifo_reads += 1 + return [self._next_fifo_byte(i) for i in range(16)] + return [0x00] + + +@cocotb.test() +async def test_chip_top_i2c_pads_reach_cocotbext_sensor_models(dut): + """Drive chip_top sensor pads and require both sensor models to be reached. + + The DUT drives SCL/SDA through the real pad-level path. This test only + supplies external open-drain sensor responses; it does not poke internal + sim bus signals or force feature outputs. + """ + + log = logging.getLogger("chip_top_i2c_pads") + accel_samples, ppg_samples = _build_sensor_model_streams() + await _startup(dut) + + accel = Lis2dw12Device( + sda=dut.sensor_sda_sample, + sda_o=dut.accel_sda_o, + scl=dut.sensor_scl_sample, + scl_o=dut.accel_scl_o, + samples=accel_samples, + ) + ppg = Adpd144riDevice( + sda=dut.sensor_sda_sample, + sda_o=dut.ppg_sda_o, + scl=dut.sensor_scl_sample, + scl_o=dut.ppg_scl_o, + samples=ppg_samples, + ) + accel.log.setLevel(logging.WARNING) + ppg.log.setLevel(logging.WARNING) + + # This monitor never drives SDA. It is only here to make failures actionable + # by showing which 8-bit address phases were visible on the physical pads. + addr_monitor = PassiveI2CAddressMonitor(dut) + cocotb.start_soon(addr_monitor.run()) + + saw_scl_toggle = False + last_scl = str(dut.sensor_scl_sample.value) + max_cycles = int(os.getenv("I2C_PAD_MAX_CYCLES", "250000")) + + for cycle in range(max_cycles): + await RisingEdge(dut.clk_drv) + await ReadOnly() + scl = str(dut.sensor_scl_sample.value) + saw_scl_toggle = saw_scl_toggle or (scl != last_scl) + last_scl = scl + + if accel.sample_reads > 0 and ppg.fifo_reads > 0: + log.info( + "pad I2C reached both models at cycle %d: accel_samples=%d ppg_fifo_reads=%d ppg_samples=%d", + cycle, + accel.sample_reads, + ppg.fifo_reads, + ppg.index, + ) + break + else: + # The extra counters separate "no pad clocks", "wrong address", + # "init never completed", and "data FIFO never reached" failures. + raise AssertionError( + "timed out waiting for cocotbext I2C sensor traffic; " + f"scl_toggle={saw_scl_toggle}, " + f"accel_seen={accel.seen}, accel_samples={accel.sample_reads}, " + f"accel_ctrl_writes={accel.ctrl_writes}, accel_range_writes={accel.range_writes}, " + f"ppg_seen={ppg.seen}, ppg_fifo_en_writes={ppg.fifo_en_writes}, " + f"ppg_fifo_reads={ppg.fifo_reads}, " + f"accel_tokens={accel.observed_tokens[:16]}, " + f"ppg_tokens={ppg.observed_tokens[:16]}, " + f"addr_bytes={[hex(b) for b in addr_monitor.addr_bytes[:16]]}" + ) + + assert saw_scl_toggle, "sensor SCL pad did not toggle" + assert accel.ctrl_writes > 0, "accelerometer CTRL1 init write was not observed" + assert accel.range_writes > 0, "accelerometer range init write was not observed" + assert accel.sample_reads > 0, "accelerometer data was not read through pads" + assert ppg.fifo_en_writes > 0, "PPG FIFO access enable write was not observed" + assert ppg.fifo_reads > 0, "PPG FIFO data was not read through pads" diff --git a/requirements.txt b/requirements.txt index 6e3f785..bd867dd 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,9 @@ cocotb>=2.0,<3.0 cocotb-tools>=0.1 cocotbext-axi>=0.1 +cocotbext-i2c>=0.1 +numpy>=1.24 +scipy>=1.10 pytest>=8.0 Pillow>=10.0 docopt>=0.6 diff --git a/scripts/sensor_models/sensors/accelerometer.py b/scripts/sensor_models/sensors/accelerometer.py index 32ad763..d667ad6 100644 --- a/scripts/sensor_models/sensors/accelerometer.py +++ b/scripts/sensor_models/sensors/accelerometer.py @@ -28,10 +28,14 @@ All sensor model parameters can be overridden at call time for flexibility. """ -import numpy as np -import os -import matplotlib.pyplot as plt -from pathlib import Path +import numpy as np +import os +from pathlib import Path + +try: + import matplotlib.pyplot as plt +except ImportError: + plt = None # Reference defaults — documented here for traceability but not used directly. # Pass overrides to process_accelerometer() as keyword arguments. @@ -68,8 +72,11 @@ def _adc_quantize_lis2dw12(accel_g, sensitivity_mg): # Validation plot -def _save_validation_plot(raw_g, digital, odr_hz, sensitivity_mg): - duration_s = 10 +def _save_validation_plot(raw_g, digital, odr_hz, sensitivity_mg): + if plt is None: + return + + duration_s = 10 n = duration_s * odr_hz raw_plot = raw_g[:n, 0] diff --git a/scripts/sensor_models/sensors/ppg.py b/scripts/sensor_models/sensors/ppg.py index 7c5d713..06ddea3 100644 --- a/scripts/sensor_models/sensors/ppg.py +++ b/scripts/sensor_models/sensors/ppg.py @@ -28,11 +28,15 @@ signal levels for wrist-worn PPG. All parameters can be overridden at call time. """ -import numpy as np -import os -import matplotlib.pyplot as plt -from pathlib import Path -from scipy.interpolate import interp1d +import numpy as np +import os +from pathlib import Path +from scipy.interpolate import interp1d + +try: + import matplotlib.pyplot as plt +except ImportError: + plt = None # Reference defaults -- documented here for traceability but not used directly. # Pass overrides to process_ppg() as keyword arguments. @@ -90,8 +94,11 @@ def _apply_adpd144ri_model(ppg_wave, dc_red, dc_ir, ac_amplitude, gain_error, of # Validation plot -def _save_validation_plot(t_hr, red, ir, bpm_hr, odr_hz): - duration_s = 5 +def _save_validation_plot(t_hr, red, ir, bpm_hr, odr_hz): + if plt is None: + return + + duration_s = 5 n = duration_s * odr_hz t = t_hr[:n]