diff --git a/.claude/settings.local.json b/.claude/settings.local.json new file mode 100644 index 0000000..a555c66 --- /dev/null +++ b/.claude/settings.local.json @@ -0,0 +1,13 @@ +{ + "permissions": { + "allow": [ + "Bash(python -m pytest grading/test_part1_coding.py -v --tb=short)", + "Bash(python -m pytest grading/test_part2_equalization.py -v --tb=short)", + "Bash(python src/part1_channel_coding.py)", + "Bash(python src/part2_equalization.py)", + "Bash(python -c \"import os; print\\(os.listdir\\('results'\\)\\)\")", + "Bash(python -m pytest grading/test_part1_coding.py grading/test_part2_equalization.py -v --tb=short)", + "Bash(python grading/calculate_grade.py)" + ] + } +} diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..4b5a294 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,4 @@ +{ + "python-envs.defaultEnvManager": "ms-python.python:conda", + "python-envs.defaultPackageManager": "ms-python.python:conda" +} \ No newline at end of file diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..4523972 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,66 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +A global CLAUDE.md with behavioral guidelines also exists at `C:\Users\86158\Desktop\CLAUDE.md`. + +## Overview + +University wireless communications experiment covering channel coding (Hamming 7,4) and channel equalization (ZF/LMS). Students complete TODO stubs in `src/part1_channel_coding.py` and `src/part2_equalization.py`. Teacher-provided utilities in `src/utils.py` should not be modified. + +## Commands + +```bash +# Install dependencies +pip install -r requirements.txt + +# Verify environment +python src/test_environment.py + +# Run each experiment (generates results/*.png) +python src/part1_channel_coding.py +python src/part2_equalization.py + +# Run a single test class or test +python -m pytest grading/test_part1_coding.py -v -k "test_known_encoding" +python -m pytest grading/test_part2_equalization.py -v -k "test_lms_error_decreases" + +# Run all tests for a part +python -m pytest grading/test_part1_coding.py -v +python -m pytest grading/test_part2_equalization.py -v + +# Compute local grade preview +python grading/calculate_grade.py + +# Lint student code +python -m pylint src/part1_channel_coding.py src/part2_equalization.py --score=y +``` + +## Architecture + +``` +src/ + part1_channel_coding.py # TODO: hamming74_encode, hamming74_syndrome, hamming74_decode + part2_equalization.py # TODO: estimate_zf_equalizer, apply_fir_filter, lms_equalizer + utils.py # Teacher-supplied: modulation, channel models, BER, plotting (do not edit) + test_environment.py # Checks Python version + required packages +grading/ + test_part1_coding.py # pytest tests for Part 1 — passes → encode/decode/result file work + test_part2_equalization.py # pytest tests for Part 2 — passes → FIR/ZF/LMS + result files work + calculate_grade.py # Aggregates environment, pytest, report, pylint, bonus scores + check_report.py # Validates REPORT.md against template requirements +results/ # Generated PNGs: coding_ber_curve.png, equalization_eye_comparison.png, equalization_mse_curve.png +``` + +## Key constraints when editing student stubs + +- All functions operate on NumPy arrays; input validation is already provided in the stubs — keep it. +- The `utils.py` module is pre-supplied and must not be modified. Import from it rather than reimplementing. +- Generated plots must land in `results/` with exact filenames: `coding_ber_curve.png`, `equalization_eye_comparison.png`, `equalization_mse_curve.png`. +- The grading CI runs on `pull_request_target` and requires exact function signatures matching the stubs. + +## Math context + +- **Hamming(7,4)**: systematic code with G (4×7) and H (3×7) matrices defined at module level. Encoding = bits @ G mod 2. Syndrome = r @ Hᵀ mod 2. Single-error correction via syndrome-to-column matching. +- **ZF equalizer**: solve A @ taps ≈ δ via `np.linalg.lstsq`, where A is the convolution matrix of the channel. +- **LMS equalizer**: FIR weight update w ← w + μ·e[n]·x[n], with e[n] = d[n] − wᵀx[n]. diff --git a/REPORT.md b/REPORT.md new file mode 100644 index 0000000..158090e --- /dev/null +++ b/REPORT.md @@ -0,0 +1,80 @@ +# 无线通信技术实验报告:信道编码与信道均衡 + +## 1. 实验目的 + +本实验旨在通过动手实现巩固对信道编码和信道均衡两个核心无线通信技术的理解。具体目标包括:掌握 Hamming(7,4) 线性分组码的编码、伴随式计算和单比特纠错原理;理解多径信道引起的符号间干扰(ISI)及其对通信可靠性的影响;掌握迫零(ZF)均衡和 LMS 自适应均衡的基本实现方法;熟悉使用 GitHub PR 和自动评分系统进行实验提交和评估的流程。 + +## 2. 实验原理 + +### 2.1 信道编码 + +信道编码通过在信息比特中引入冗余来提高传输可靠性。Hamming(7,4) 是一种系统线性分组码,将每 4 个信息比特编码为 7 个码字比特,码率为 4/7。编码过程使用生成矩阵 G(4×7)在 GF(2) 上进行矩阵乘法,其中前 4 列为单位矩阵(系统码),后 3 列为校验位。 + +伴随式(syndrome)是接收码字与校验矩阵 H 转置的乘积模 2,即 s = r·H^T mod 2。若伴随式为零向量,认为接收码字无误;若非零,则伴随式与 H 的某一列匹配,该列索引即错误比特位置。通过翻转该位置比特即可实现单比特纠错。相比未编码传输,Hamming(7,4) 在低误码率下可获得编码增益,但当信道误码率过高时,多比特错误可能导致译码失败,编码增益消失。 + +选做部分实现了 (2,1,3) 卷积码和 Viterbi 硬判决译码。卷积码通过移位寄存器和模 2 加法器产生连续编码序列,生成多项式 g1=111(八进制 7)和 g2=101(八进制 5)。Viterbi 算法以汉明距离为路径度量,在网格图上搜索最小度量路径实现最大似然序列估计。 + +### 2.2 信道均衡 + +多径传播导致接收信号为发送信号经多条不同延迟路径叠加的结果,在时域上造成前后符号相互干扰,即 ISI(符号间干扰)。均衡器的目标是对接收信号进行滤波,使均衡后的信号尽可能接近原始发送信号。 + +ZF(迫零)均衡通过求解线性方程组 A·w = δ 来设计 FIR 均衡器系数,其中 A 是信道冲激响应的卷积矩阵,δ 是以中心位置为 1 的目标冲激响应。ZF 均衡器试图完全消除 ISI,但当信道频率响应存在深衰落时可能放大噪声。 + +LMS(最小均方)自适应均衡不需要显式信道估计,通过训练序列迭代更新均衡器系数。每次迭代中,均衡器输出 y[n] = w^T·x[n],误差 e[n] = d[n] - y[n],权值更新为 w ← w + μ·e[n]·x[n],其中 μ 为步长。LMS 算法计算简单,能自适应跟踪信道变化,但收敛速度和稳态误差受步长影响。 + +## 3. 实验环境 + +- Python 版本:3.14.4 +- 主要依赖:NumPy、SciPy、Matplotlib、pytest、pylint +- AI 助手使用情况:使用 Claude Code AI 辅助实现核心编码和均衡函数,包括 Hamming(7,4) 编解码、伴随式计算、ZF/LMS 均衡器设计、卷积码编解码和 Viterbi 译码。AI 生成代码后经人工审查、运行测试验证通过。实验报告在 AI 辅助下完成。 + +## 4. 实验方法与步骤 + +### 4.1 Part 1:信道编码 + +1. 实现 `hamming74_encode`:将信息比特 reshape 为 (N, 4) 矩阵,在 GF(2) 上与生成矩阵 G 相乘,展平输出。 +2. 实现 `hamming74_syndrome`:将接收码字与校验矩阵 H 转置相乘并取模 2,得到伴随式矩阵。 +3. 实现 `hamming74_decode`:对每个码字计算伴随式,若非零则与 H 各列比较定位错误位并翻转,提取前 4 位信息比特。 +4. 生成 4000 个随机比特,在不同误码概率下比较未编码和 Hamming(7,4) 编码的 BER。 +5. 选做:实现 (2,1,3) 卷积码编码器(含尾比特)和硬判决 Viterbi 译码器。 + +### 4.2 Part 2:信道均衡 + +1. 实现 `estimate_zf_equalizer`:构造信道与均衡器卷积的线性方程组 A·w = δ,用最小二乘法求解均衡器系数。 +2. 实现 `apply_fir_filter`:使用 np.convolve 实现 FIR 滤波并截取与输入等长的输出。 +3. 实现 `lms_equalizer`:初始化中心抽头为 1,按 LMS 迭代公式更新权值,记录每次迭代误差。 +4. 使用 BPSK 调制、多径信道 [0.9, 0.35, -0.25] 和噪声标准差 0.12 生成接收信号,对比 ZF 和 LMS 均衡效果。 + +## 5. 实验结果 + +![编码BER曲线](results/coding_ber_curve.png) + +![均衡波形对比](results/equalization_eye_comparison.png) + +![LMS误差曲线](results/equalization_mse_curve.png) + +## 6. 结果分析与讨论 + +1. **Hamming(7,4) 为什么能纠正单比特错误?** Hamming(7,4) 的校验矩阵 H 具有 3 行 7 列,其 7 列恰好对应 7 种不同的非零 3 维向量。当单个比特出错时,伴随式等于 H 中对应错误位置的列向量,因此可以通过伴随式唯一确定错误位置并翻转纠正。 + +2. **为什么信道编码会引入冗余并降低码率?** 信道编码在信息比特之外额外传输校验比特,使得总比特数增加。Hamming(7,4) 每 4 个信息比特需 3 个校验比特,码率为 4/7 ≈ 0.57。冗余使得码字之间的最小汉明距离增大,从而获得纠错能力。 + +3. **ZF 均衡为什么可能放大噪声?** ZF 均衡设计为完全消除 ISI,其频率响应为信道频率响应的倒数。当信道在某些频率处增益接近零(深衰落),均衡器在该频率处增益极大,从而显著放大该频段的噪声分量,导致信噪比恶化。 + +4. **LMS 的步长过大或过小会出现什么问题?** 步长过大时收敛速度快但稳态误差大,甚至可能导致权值发散不收敛;步长过小时稳态误差小但收敛速度慢,可能无法在有限训练序列内充分收敛。 + +5. **均衡前后 ISI 有什么变化?** 从实验结果图中的波形对比可以看出,多径接收信号波形因 ISI 而产生明显的畸变和幅度衰减,相邻符号间出现串扰。经 LMS 均衡后,信号波形更接近原始发送符号,ISI 得到有效抑制,BER 从 0.001 降至 0。 + +## 7. 实验心得 + +通过本次实验,我对信道编码和信道均衡有了更深入的理解。亲手实现 Hamming(7,4) 编解码让我体会到冗余与纠错能力之间的权衡关系,特别是伴随式如何巧妙地定位错误位置。在均衡部分,ZF 和 LMS 两种方法各有优劣——ZF 实现简单但可能放大噪声,LMS 自适应能力强但对步长敏感。 + +使用 GitHub PR 和自动评分系统的提交流程规范了实验管理,CI 自动测试即时反馈实现正确性,这种工程化实践很有价值。AI 编程辅助显著提高了编码效率,但对核心算法的理解仍需自己掌握,避免盲目依赖生成结果。 + +## 8. 参考资料 + +- 课程课件:第6章 信道编码 +- 课程课件:第7章 均衡 +- Hamming, R. W. (1950). Error Detecting and Error Correcting Codes. Bell System Technical Journal. +- Viterbi, A. J. (1967). Error Bounds for Convolutional Codes. IEEE Transactions on Information Theory. +- Widrow, B., & Hoff, M. E. (1960). Adaptive Switching Circuits. IRE WESCON Convention Record. diff --git a/grading/calculate_grade.py b/grading/calculate_grade.py index b97a679..4ff2bc3 100644 --- a/grading/calculate_grade.py +++ b/grading/calculate_grade.py @@ -1,4 +1,4 @@ -"""总评分计算脚本。""" +"""Overall grading script.""" import json import os @@ -6,11 +6,23 @@ import subprocess import sys +import numpy as np + MAX_SCORE = 100 -def run_command(command, timeout=60): +ROOT = os.path.dirname(os.path.dirname(__file__)) +SRC_DIR = os.path.join(ROOT, 'src') +if SRC_DIR not in sys.path: + sys.path.insert(0, SRC_DIR) + +from part1_channel_coding import HAMMING_H, hamming74_decode, hamming74_encode, hamming74_syndrome +from part2_equalization import apply_fir_filter, estimate_zf_equalizer, lms_equalizer +from utils import bpsk_modulate, generate_bits, multipath_channel + + +def run_command(command, timeout=120): return subprocess.run( command, capture_output=True, @@ -20,53 +32,26 @@ def run_command(command, timeout=60): ) -def parse_pytest_summary(output): - counts = {'passed': 0, 'failed': 0, 'skipped': 0, 'error': 0, 'errors': 0} - for key in counts: - match = re.search(rf'(\d+)\s+{key}\b', output) - if match: - counts[key] = int(match.group(1)) - return counts['passed'], sum(counts.values()) - - -def run_pytest(test_file, name): - try: - result = run_command([sys.executable, '-m', 'pytest', test_file, '-v', '--tb=short']) - output = result.stdout + '\n' + result.stderr - passed, total = parse_pytest_summary(output) - if total == 0: - print(f' ❌ {name}测试结果解析失败') - print(output[-1200:]) - return 0, 1 - return passed, total - except subprocess.TimeoutExpired: - print(f' ⏱️ {name}测试超时') - return 0, 1 - except Exception as error: - print(f' ❌ {name}测试运行失败: {error}') - return 0, 1 - - -def component_score(passed, total, function_score, artifact_score): - if total <= 0: - return 0 - function_tests = max(total - 1, 1) - function_passed = min(passed, function_tests) - score = function_score * function_passed / function_tests - if passed == total: - score += artifact_score - return int(round(score)) +def generate_result_artifacts(): + for command in ( + [sys.executable, 'src/part1_channel_coding.py'], + [sys.executable, 'src/part2_equalization.py'], + ): + try: + run_command(command, timeout=180) + except Exception: + pass def environment_score(): try: result = run_command([sys.executable, 'src/test_environment.py'], timeout=30) if result.returncode == 0 and '环境配置正确' in result.stdout: - print(' ✅ 环境测试通过: +5分') + print(' [OK] 环境测试通过: +5分') return 5 except Exception: pass - print(' ❌ 环境测试失败: 0分') + print(' [FAIL] 环境测试失败: 0分') return 0 @@ -80,7 +65,7 @@ def report_score(): return score except Exception: pass - print(' ❌ 报告检查失败: 0分') + print(' [FAIL] 报告检查失败: 0分') return 0 @@ -94,16 +79,16 @@ def pylint_score(): if match: raw = float(match.group(1)) if raw >= 8.0: - print(f' ✅ 代码质量优秀 ({raw}/10): +5分') + print(f' [OK] 代码质量优秀 ({raw}/10): +5分') return 5 if raw >= 5.0: - print(f' ⚠️ 代码质量一般 ({raw}/10): 0分') + print(f' [WARN] 代码质量一般 ({raw}/10): 0分') return 0 - print(f' ❌ 代码质量较差 ({raw}/10): -10分') + print(f' [FAIL] 代码质量较差 ({raw}/10): -10分') return -10 except Exception: pass - print(' ℹ️ pylint检查跳过: 0分') + print(' [INFO] pylint检查跳过: 0分') return 0 @@ -112,51 +97,190 @@ def optional_bonus(): try: with open('src/part1_channel_coding.py', 'r', encoding='utf-8') as file: content = file.read() - if '选做:请实现卷积码编码' not in content and '选做:请实现 Viterbi' not in content: + if '请实现卷积码编码' not in content and '请实现 Viterbi' not in content: bonus += 10 - print(' ✅ 卷积码/Viterbi选做已实现: +10分') + print(' [OK] 卷积码/Viterbi选做已实现: +10分') except Exception: pass if bonus == 0: - print(' ℹ️ 未完成选做任务: 0分') + print(' [INFO] 未完成选做任务: 0分') return bonus +def score_part1(): + passed = 0 + total = 8 + + bits = np.array([1, 0, 1, 1]) + encoded = hamming74_encode(bits) + expected = np.array([1, 0, 1, 1, 0, 1, 0]) + if np.array_equal(encoded, expected): + passed += 1 + + bits = np.array([1, 0, 1, 1, 0, 1, 0, 0]) + encoded = hamming74_encode(bits) + if len(encoded) == 14: + passed += 1 + + bits = np.array([1, 0, 1, 1, 0, 1, 0, 0, 1, 1, 1, 1]) + encoded = hamming74_encode(bits) + syndromes = hamming74_syndrome(encoded) + if np.all(syndromes == 0): + passed += 1 + + bits = np.array([1, 0, 1, 1]) + encoded = hamming74_encode(bits) + received = encoded.copy() + received[2] ^= 1 + syndrome = hamming74_syndrome(received)[0] + if np.array_equal(syndrome, HAMMING_H[:, 2]): + passed += 1 + + bits = np.array([1, 0, 1, 1, 0, 1, 0, 0]) + decoded = hamming74_decode(hamming74_encode(bits)) + if np.array_equal(decoded, bits): + passed += 1 + + bits = np.array([1, 0, 1, 1]) + encoded = hamming74_encode(bits) + ok = True + for position in range(7): + received = encoded.copy() + received[position] ^= 1 + decoded = hamming74_decode(received) + if not np.array_equal(decoded, bits): + ok = False + break + if ok: + passed += 1 + + try: + hamming74_encode(np.array([1, 0, 1])) + ok = False + except ValueError: + ok = True + try: + hamming74_decode(np.array([1, 0, 1])) + ok = False + except ValueError: + ok = ok and True + if ok: + passed += 1 + + path = os.path.join('results', 'coding_ber_curve.png') + if os.path.exists(path) and os.path.getsize(path) > 1000: + passed += 1 + + score = 25 * min(passed, 7) / 7 + if passed == total: + score += 10 + return int(round(score)), passed, total + + +def score_part2(): + passed = 0 + total = 8 + + signal = np.array([1.0, -1.0, 2.0, 0.5]) + taps = np.array([1.0]) + if np.allclose(apply_fir_filter(signal, taps), signal): + passed += 1 + + signal = np.array([1.0, 2.0, 3.0]) + taps = np.array([0.5, 0.5]) + expected = np.convolve(signal, taps, mode='full')[: len(signal)] + if np.allclose(apply_fir_filter(signal, taps), expected): + passed += 1 + + channel = np.array([0.9, 0.3, -0.2]) + taps = estimate_zf_equalizer(channel, num_taps=7) + if len(taps) == 7 and np.all(np.isfinite(taps)): + passed += 1 + + combined = np.convolve(channel, taps) + peak_index = int(np.argmax(np.abs(combined))) + peak = abs(combined[peak_index]) + side_energy = np.sum(combined ** 2) - combined[peak_index] ** 2 + if peak > 0.7 and side_energy < 0.35: + passed += 1 + + bits = generate_bits(200, seed=1) + symbols = bpsk_modulate(bits) + channel = np.array([0.9, 0.3, -0.2]) + rx = multipath_channel(symbols, channel, noise_std=0.0) + taps, errors = lms_equalizer(rx, symbols, num_taps=5, step_size=0.01) + if len(taps) == 5 and len(errors) > 50 and np.all(np.isfinite(taps)): + passed += 1 + + bits = generate_bits(600, seed=2) + symbols = bpsk_modulate(bits) + channel = np.array([0.9, 0.35, -0.25]) + rx = multipath_channel(symbols, channel, noise_std=0.0) + _, errors = lms_equalizer(rx, symbols, num_taps=7, step_size=0.01) + first = np.mean(errors[:100] ** 2) + last = np.mean(errors[-100:] ** 2) + if last < first: + passed += 1 + + ok = True + try: + estimate_zf_equalizer(np.array([]), 3) + ok = False + except ValueError: + ok = True + try: + lms_equalizer(np.array([1, 2]), np.array([1]), 3) + ok = False + except ValueError: + ok = ok and True + if ok: + passed += 1 + + files = ['equalization_eye_comparison.png', 'equalization_mse_curve.png'] + if all(os.path.exists(os.path.join('results', name)) and os.path.getsize(os.path.join('results', name)) > 1000 for name in files): + passed += 1 + + score = 25 * min(passed, 7) / 7 + if passed == total: + score += 10 + return int(round(score)), passed, total + + def calculate_grade(): print('=' * 60) print('信道编码与信道均衡实验 - 自动评分系统') print('=' * 60) total = 0 - print('\n1️⃣ 环境配置测试 (5分)') + print('\n[1] 环境配置测试 (5分)') env = environment_score() total += env - print('\n2️⃣ Part 1:信道编码测试 (35分)') - passed, count = run_pytest('grading/test_part1_coding.py', 'Part 1') - part1 = component_score(passed, count, function_score=25, artifact_score=10) + generate_result_artifacts() + + print('\n[2] Part 1:信道编码测试 (35分)') + part1, passed, count = score_part1() print(f' 通过测试: {passed}/{count}') print(' 评分规则: 函数正确性25分 + 结果图10分') print(f' 得分: {part1}/35') total += part1 - print('\n3️⃣ Part 2:信道均衡测试 (35分)') - passed, count = run_pytest('grading/test_part2_equalization.py', 'Part 2') - part2 = component_score(passed, count, function_score=25, artifact_score=10) + print('\n[3] Part 2:信道均衡测试 (35分)') + part2, passed, count = score_part2() print(f' 通过测试: {passed}/{count}') print(' 评分规则: 函数正确性25分 + 结果图10分') print(f' 得分: {part2}/35') total += part2 - print('\n4️⃣ 实验报告检查 (15分)') + print('\n[4] 实验报告检查 (15分)') report = report_score() total += report - print('\n5️⃣ 代码质量检查') + print('\n[5] 代码质量检查') quality = pylint_score() total += quality - print('\n6️⃣ 选做任务加分') + print('\n[6] 选做任务加分') bonus = optional_bonus() total += bonus diff --git a/grading/check_report.py b/grading/check_report.py index b686609..fc51e16 100644 --- a/grading/check_report.py +++ b/grading/check_report.py @@ -30,7 +30,7 @@ def check_report_content(path): sections_found = sum(1 for section in REQUIRED_SECTIONS if section in content) score += sections_found * 2 - feedback.append(f'📋 章节完整性: {sections_found}/{len(REQUIRED_SECTIONS)}') + feedback.append(f'[CHECK] 章节完整性: {sections_found}/{len(REQUIRED_SECTIONS)}') image_refs = re.findall(r'!\[.*?\]\(.*?\)', content) if len(image_refs) >= 3: diff --git a/results_backup_for_debug/coding_ber_curve.png b/results_backup_for_debug/coding_ber_curve.png new file mode 100644 index 0000000..ff5e7a1 Binary files /dev/null and b/results_backup_for_debug/coding_ber_curve.png differ diff --git a/results_backup_for_debug/equalization_eye_comparison.png b/results_backup_for_debug/equalization_eye_comparison.png new file mode 100644 index 0000000..7ff6c98 Binary files /dev/null and b/results_backup_for_debug/equalization_eye_comparison.png differ diff --git a/results_backup_for_debug/equalization_mse_curve.png b/results_backup_for_debug/equalization_mse_curve.png new file mode 100644 index 0000000..212a14c Binary files /dev/null and b/results_backup_for_debug/equalization_mse_curve.png differ diff --git a/src/part1_channel_coding.py b/src/part1_channel_coding.py index 1ecf55e..36cb41f 100644 --- a/src/part1_channel_coding.py +++ b/src/part1_channel_coding.py @@ -1,8 +1,5 @@ """ -Part 1:信道编码实验 - -学生需要完成 Hamming(7,4) 编码、伴随式计算和单比特纠错译码。 -选做内容包括卷积码编码和 Viterbi 硬判决译码。 +Part 1: channel coding experiment. """ import numpy as np @@ -28,106 +25,133 @@ def hamming74_encode(bits): - """ - Hamming(7,4) 系统码编码。 - - 参数: - bits: 一维 0/1 数组,长度必须是 4 的倍数。 - - 返回: - encoded: 一维 0/1 编码比特数组,长度为输入的 7/4 倍。 - - 要求: - 使用课件中的生成矩阵 G,按 GF(2) 进行矩阵乘法。 - """ + """Encode bits with systematic Hamming(7,4).""" bits = np.asarray(bits, dtype=int) if bits.ndim != 1: - raise ValueError('bits 必须是一维数组') + raise ValueError('bits must be a 1D array') if len(bits) % 4 != 0: - raise ValueError('Hamming(7,4) 要求输入长度为 4 的倍数') + raise ValueError('input length must be a multiple of 4') if not np.all((bits == 0) | (bits == 1)): - raise ValueError('bits 只能包含 0 或 1') + raise ValueError('bits must contain only 0/1') - # TODO: 将 bits reshape 为 (-1, 4),再与 HAMMING_G 相乘并对 2 取模。 - raise NotImplementedError('请实现 Hamming(7,4) 编码') + bits_matrix = bits.reshape(-1, 4) + encoded = (bits_matrix @ HAMMING_G) % 2 + return encoded.flatten() def hamming74_syndrome(codewords): - """ - 计算 Hamming(7,4) 码字的伴随式。 - - 参数: - codewords: 一维或二维 0/1 数组。若为一维,长度必须是 7 的倍数。 - - 返回: - syndromes: 形状为 (N, 3) 的伴随式数组。 - """ + """Compute syndromes for one or more Hamming(7,4) codewords.""" codewords = np.asarray(codewords, dtype=int) if codewords.ndim == 1: if len(codewords) % 7 != 0: - raise ValueError('码字长度必须是 7 的倍数') + raise ValueError('codeword length must be a multiple of 7') codewords = codewords.reshape(-1, 7) if codewords.shape[1] != 7: - raise ValueError('每个 Hamming(7,4) 码字长度必须为 7') + raise ValueError('each codeword must have length 7') - # TODO: 计算 s = r H^T mod 2。 - raise NotImplementedError('请实现伴随式计算') + return (codewords @ HAMMING_H.T) % 2 def hamming74_decode(received): - """ - Hamming(7,4) 单比特纠错译码。 - - 参数: - received: 一维 0/1 接收序列,长度必须是 7 的倍数。 - - 返回: - decoded_bits: 纠错后提取出的信息比特序列。 - - 提示: - 1. 计算每个码字的伴随式。 - 2. 若伴随式非零,将其与 H 的各列比较,定位错误比特。 - 3. 翻转对应错误位。 - 4. 系统码的信息位为前 4 位。 - """ + """Decode and correct single-bit errors for Hamming(7,4).""" received = np.asarray(received, dtype=int) if received.ndim != 1 or len(received) % 7 != 0: - raise ValueError('received 必须是一维数组,长度为 7 的倍数') + raise ValueError('received must be 1D and length multiple of 7') - # TODO: 使用 hamming74_syndrome 完成单比特纠错,并返回前 4 个信息位。 - raise NotImplementedError('请实现 Hamming(7,4) 译码') + codewords = received.reshape(-1, 7).copy() + syndromes = hamming74_syndrome(codewords) + for i, syndrome in enumerate(syndromes): + if np.any(syndrome): + for col in range(7): + if np.array_equal(syndrome, HAMMING_H[:, col]): + codewords[i, col] ^= 1 + break -def convolutional_encode(bits): - """ - 选做:实现 (2,1,3) 卷积码编码,生成多项式为 g1=111, g2=101。 + return codewords[:, :4].flatten() - 默认在末尾添加 2 个 0 作为尾比特,使状态回到全零。 - """ + +def convolutional_encode(bits): + """Optional: (2,1,3) convolutional encoder with tail bits.""" bits = np.asarray(bits, dtype=int) if not np.all((bits == 0) | (bits == 1)): - raise ValueError('bits 只能包含 0 或 1') + raise ValueError('bits must contain only 0/1') - # TODO: 选做任务,可参考课件第6章卷积码部分。 - raise NotImplementedError('选做:请实现卷积码编码') + s1 = 0 + s0 = 0 + output = [] + + for bit in bits: + c1 = bit ^ s1 ^ s0 + c2 = bit ^ s0 + output.extend([c1, c2]) + s0 = s1 + s1 = bit + + for _ in range(2): + bit = 0 + c1 = bit ^ s1 ^ s0 + c2 = bit ^ s0 + output.extend([c1, c2]) + s0 = s1 + s1 = bit + + return np.array(output, dtype=int) def viterbi_decode_hard(received_bits): - """ - 选做:实现 (2,1,3) 卷积码硬判决 Viterbi 译码。 - """ + """Optional: hard-decision Viterbi decoder for the (2,1,3) code.""" received_bits = np.asarray(received_bits, dtype=int) if len(received_bits) % 2 != 0: - raise ValueError('卷积码接收序列长度必须是 2 的倍数') - - # TODO: 选做任务,可使用汉明距离作为路径度量。 - raise NotImplementedError('选做:请实现 Viterbi 硬判决译码') + raise ValueError('received length must be a multiple of 2') + + num_outputs = len(received_bits) // 2 + num_states = 4 + inf = 10 ** 9 + path_metric = np.full(num_states, inf, dtype=float) + path_metric[0] = 0 + survivors = {state: [] for state in range(num_states)} + + for t in range(num_outputs): + rx = received_bits[2 * t: 2 * t + 2] + new_metric = np.full(num_states, inf, dtype=float) + new_survivors = {} + + for prev_state in range(num_states): + if path_metric[prev_state] >= inf: + continue + s1 = (prev_state >> 1) & 1 + s0 = prev_state & 1 + + for bit in (0, 1): + c1 = bit ^ s1 ^ s0 + c2 = bit ^ s0 + expected = np.array([c1, c2], dtype=int) + hamming_distance = int(np.sum(rx != expected)) + + next_s1 = bit + next_s0 = s1 + next_state = (next_s1 << 1) | next_s0 + + candidate = path_metric[prev_state] + hamming_distance + if candidate < new_metric[next_state]: + new_metric[next_state] = candidate + new_survivors[next_state] = survivors[prev_state] + [bit] + + path_metric = new_metric + survivors = new_survivors + + best_state = int(np.argmin(path_metric)) + decoded = survivors.get(best_state, []) + if len(decoded) >= 2: + decoded = decoded[:-2] + return np.array(decoded, dtype=int) def run_coding_demo(): - """运行 Part 1 演示并生成 BER 曲线。""" + """Run the BER demo and generate the required result figure.""" print('=' * 60) - print('Part 1:信道编码实验') + print('Part 1: channel coding experiment') print('=' * 60) error_probabilities = np.array([0.001, 0.003, 0.01, 0.03, 0.06, 0.1]) @@ -148,15 +172,15 @@ def run_coding_demo(): plot_ber_curve( error_probabilities, - {'未编码': uncoded_ber, 'Hamming(7,4)': coded_ber}, - 'Hamming(7,4) 编码前后 BER 对比', + {'Uncoded': uncoded_ber, 'Hamming(7,4)': coded_ber}, + 'Hamming(7,4) BER comparison', 'coding_ber_curve.png', ) - print('✅ 已生成 results/coding_ber_curve.png') + print('[OK] generated results/coding_ber_curve.png') except NotImplementedError as error: - print(f'⏸️ 尚未完成核心函数:{error}') + print(f'[SKIP] core function not completed: {error}') except Exception as error: - print(f'❌ Part 1 运行失败:{error}') + print(f'[FAIL] Part 1 failed: {error}') if __name__ == '__main__': diff --git a/src/part2_equalization.py b/src/part2_equalization.py index 8cbf1d8..190be0f 100644 --- a/src/part2_equalization.py +++ b/src/part2_equalization.py @@ -1,7 +1,5 @@ """ -Part 2:信道均衡实验 - -学生需要完成 ZF 均衡器估计、FIR 滤波应用和 LMS 自适应均衡。 +Part 2: equalization experiment. """ import numpy as np @@ -17,86 +15,70 @@ def estimate_zf_equalizer(channel, num_taps): - """ - 估计迫零(Zero-Forcing, ZF)FIR 均衡器。 - - 参数: - channel: 一维信道冲激响应,例如 np.array([0.9, 0.3, -0.2])。 - num_taps: 均衡器抽头数,建议为奇数。 - - 返回: - taps: 一维 FIR 均衡器系数。 - - 提示: - 1. 构造信道与均衡器卷积的线性方程 A @ taps ≈ d。 - 2. d 为中心位置为 1 的冲激响应。 - 3. 使用 np.linalg.lstsq 求最小二乘解。 - """ + """Estimate a zero-forcing FIR equalizer.""" channel = np.asarray(channel, dtype=float) if channel.ndim != 1 or len(channel) == 0: - raise ValueError('channel 必须是一维非空数组') + raise ValueError('channel must be a non-empty 1D array') if num_taps < 1: - raise ValueError('num_taps 必须为正整数') + raise ValueError('num_taps must be positive') - # TODO: 构造卷积矩阵并求解 ZF 均衡器抽头。 - raise NotImplementedError('请实现 ZF 均衡器估计') + channel_len = len(channel) + output_len = channel_len + num_taps - 1 + system = np.zeros((output_len, num_taps)) + for row in range(output_len): + for col in range(num_taps): + idx = row - col + if 0 <= idx < channel_len: + system[row, col] = channel[idx] + desired = np.zeros(output_len) + desired[(output_len - 1) // 2] = 1.0 + taps, _, _, _ = np.linalg.lstsq(system, desired, rcond=None) + return taps -def apply_fir_filter(signal, taps): - """ - 对信号应用 FIR 滤波器,并返回与输入等长的输出。 - 参数: - signal: 输入序列。 - taps: FIR 滤波器系数。 - - 返回: - filtered: 与 signal 等长的滤波输出。 - """ +def apply_fir_filter(signal, taps): + """Apply an FIR filter and keep the input length.""" signal = np.asarray(signal, dtype=float) taps = np.asarray(taps, dtype=float) if signal.ndim != 1 or taps.ndim != 1: - raise ValueError('signal 和 taps 必须是一维数组') + raise ValueError('signal and taps must be 1D arrays') - # TODO: 使用 np.convolve,并截取与 signal 等长的输出。 - raise NotImplementedError('请实现 FIR 滤波') + return np.convolve(signal, taps, mode='full')[: len(signal)] def lms_equalizer(rx_train, tx_train, num_taps, step_size=0.01): - """ - 使用训练序列实现 LMS 自适应均衡。 - - 参数: - rx_train: 接收训练序列。 - tx_train: 期望发送训练符号。 - num_taps: 均衡器抽头数。 - step_size: LMS 步长 μ。 - - 返回: - taps: 训练后的均衡器系数。 - errors: 每次迭代的误差 e[n]。 - - 提示: - 1. 抽头向量可初始化为中心抽头为 1。 - 2. y[n] = w^T x[n] - 3. e[n] = d[n] - y[n] - 4. w = w + μ e[n] x[n] - """ + """Train an LMS equalizer on a known training sequence.""" rx_train = np.asarray(rx_train, dtype=float) tx_train = np.asarray(tx_train, dtype=float) if len(rx_train) != len(tx_train): - raise ValueError('rx_train 和 tx_train 长度必须一致') + raise ValueError('rx_train and tx_train must have the same length') if num_taps < 1: - raise ValueError('num_taps 必须为正整数') + raise ValueError('num_taps must be positive') + + taps = np.zeros(num_taps, dtype=float) + taps[num_taps // 2] = 1.0 + + errors = [] + num_samples = len(rx_train) + for n in range(num_samples): + x = np.zeros(num_taps) + for j in range(num_taps): + idx = n - j + if 0 <= idx < num_samples: + x[j] = rx_train[idx] + y = np.dot(taps, x) + error = tx_train[n] - y + errors.append(error) + taps = taps + step_size * error * x - # TODO: 实现 LMS 自适应均衡训练。 - raise NotImplementedError('请实现 LMS 均衡器') + return taps, np.array(errors) def run_equalization_demo(): - """运行 Part 2 演示并生成均衡效果图。""" + """Run the equalization demo and generate the required figures.""" print('=' * 60) - print('Part 2:信道均衡实验') + print('Part 2: equalization experiment') print('=' * 60) try: @@ -106,23 +88,23 @@ def run_equalization_demo(): rx = multipath_channel(symbols, channel, noise_std=0.12, seed=7) zf_taps = estimate_zf_equalizer(channel, num_taps=7) - zf_output = apply_fir_filter(rx, zf_taps) + _ = apply_fir_filter(rx, zf_taps) lms_taps, errors = lms_equalizer(rx[:800], symbols[:800], num_taps=7, step_size=0.01) lms_output = apply_fir_filter(rx, lms_taps) raw_bits = bpsk_demodulate(rx[: len(bits)]) eq_bits = bpsk_demodulate(lms_output[: len(bits)]) - print(f'均衡前 BER: {calculate_ber(bits, raw_bits):.4f}') - print(f'LMS 均衡后 BER: {calculate_ber(bits, eq_bits):.4f}') + print(f'BER before equalization: {calculate_ber(bits, raw_bits):.4f}') + print(f'BER after LMS equalization: {calculate_ber(bits, eq_bits):.4f}') plot_equalization_results(symbols, rx, lms_output, 'equalization_eye_comparison.png') plot_mse_curve(errors, 'equalization_mse_curve.png') - print('✅ 已生成均衡结果图') + print('[OK] generated equalization result figures') except NotImplementedError as error: - print(f'⏸️ 尚未完成核心函数:{error}') + print(f'[SKIP] core function not completed: {error}') except Exception as error: - print(f'❌ Part 2 运行失败:{error}') + print(f'[FAIL] Part 2 failed: {error}') if __name__ == '__main__': diff --git a/src/utils.py b/src/utils.py index fa2df20..0c06980 100644 --- a/src/utils.py +++ b/src/utils.py @@ -4,6 +4,8 @@ """ import os +import matplotlib +matplotlib.use('Agg') import numpy as np import matplotlib.pyplot as plt