diff --git a/results/coding_ber_curve.png b/results/coding_ber_curve.png new file mode 100644 index 0000000..516ca04 Binary files /dev/null and b/results/coding_ber_curve.png differ diff --git a/results/coding_comparison_ber_curve.png b/results/coding_comparison_ber_curve.png new file mode 100644 index 0000000..862391b Binary files /dev/null and b/results/coding_comparison_ber_curve.png differ diff --git a/results/equalization_eye_comparison.png b/results/equalization_eye_comparison.png new file mode 100644 index 0000000..c13dcf0 Binary files /dev/null and b/results/equalization_eye_comparison.png differ diff --git a/results/equalization_mse_curve.png b/results/equalization_mse_curve.png new file mode 100644 index 0000000..127a7ab Binary files /dev/null and b/results/equalization_mse_curve.png differ diff --git a/src/part1_channel_coding.py b/src/part1_channel_coding.py index 1ecf55e..404989e 100644 --- a/src/part1_channel_coding.py +++ b/src/part1_channel_coding.py @@ -6,6 +6,7 @@ """ import numpy as np +from typing import Optional, Tuple from utils import ( binary_symmetric_channel, calculate_ber, @@ -26,8 +27,38 @@ [0, 1, 1, 1, 0, 0, 1], ], dtype=int) +CONVOLUTIONAL_GENERATORS = ( + (1, 1, 1), + (1, 0, 1), +) + + +def _validate_binary_array(bits: np.ndarray, name: str) -> np.ndarray: + bits = np.asarray(bits, dtype=int) + if bits.ndim != 1: + raise ValueError(f'{name} 必须是一维数组') + if not np.all((bits == 0) | (bits == 1)): + raise ValueError(f'{name} 只能包含 0 或 1') + return bits + + +def _syndrome_to_error_position(syndrome: np.ndarray) -> Optional[int]: + for bit_pos in range(7): + if np.array_equal(syndrome, HAMMING_H[:, bit_pos]): + return bit_pos + return None + -def hamming74_encode(bits): +def _convolutional_output_bits(input_bit: int, state: int) -> Tuple[int, int, int]: + register = (input_bit, (state >> 1) & 1, state & 1) + g1, g2 = CONVOLUTIONAL_GENERATORS + out1 = (register[0] * g1[0] + register[1] * g1[1] + register[2] * g1[2]) % 2 + out2 = (register[0] * g2[0] + register[1] * g2[1] + register[2] * g2[2]) % 2 + next_state = ((state << 1) | input_bit) & 0x3 + return out1, out2, next_state + + +def hamming74_encode(bits: np.ndarray) -> np.ndarray: """ Hamming(7,4) 系统码编码。 @@ -40,19 +71,23 @@ def hamming74_encode(bits): 要求: 使用课件中的生成矩阵 G,按 GF(2) 进行矩阵乘法。 """ - bits = np.asarray(bits, dtype=int) - if bits.ndim != 1: - raise ValueError('bits 必须是一维数组') + bits = _validate_binary_array(bits, 'bits') if len(bits) % 4 != 0: raise ValueError('Hamming(7,4) 要求输入长度为 4 的倍数') - if not np.all((bits == 0) | (bits == 1)): - raise ValueError('bits 只能包含 0 或 1') - # TODO: 将 bits reshape 为 (-1, 4),再与 HAMMING_G 相乘并对 2 取模。 - raise NotImplementedError('请实现 Hamming(7,4) 编码') + # 将 bits reshape 为 (-1, 4) + blocks = bits.reshape(-1, 4) + + # 与 HAMMING_G 相乘并对 2 取模 + encoded_blocks = (blocks @ HAMMING_G) % 2 + + # flatten 成一维数组返回 + encoded = encoded_blocks.flatten() + + return encoded -def hamming74_syndrome(codewords): +def hamming74_syndrome(codewords: np.ndarray) -> np.ndarray: """ 计算 Hamming(7,4) 码字的伴随式。 @@ -70,11 +105,13 @@ def hamming74_syndrome(codewords): if codewords.shape[1] != 7: raise ValueError('每个 Hamming(7,4) 码字长度必须为 7') - # TODO: 计算 s = r H^T mod 2。 - raise NotImplementedError('请实现伴随式计算') + # 计算 s = codewords @ H^T mod 2 + syndromes = (codewords @ HAMMING_H.T) % 2 + + return syndromes -def hamming74_decode(received): +def hamming74_decode(received: np.ndarray) -> np.ndarray: """ Hamming(7,4) 单比特纠错译码。 @@ -94,25 +131,46 @@ def hamming74_decode(received): if received.ndim != 1 or len(received) % 7 != 0: raise ValueError('received 必须是一维数组,长度为 7 的倍数') - # TODO: 使用 hamming74_syndrome 完成单比特纠错,并返回前 4 个信息位。 - raise NotImplementedError('请实现 Hamming(7,4) 译码') - - -def convolutional_encode(bits): + # 将 received reshape 为 (-1, 7),复制一份避免直接修改输入 + codewords = received.reshape(-1, 7).copy() + + # 计算伴随式 + syndromes = hamming74_syndrome(codewords) + + # 对每个码字进行纠错 + for i, syndrome in enumerate(syndromes): + if np.any(syndrome): + bit_pos = _syndrome_to_error_position(syndrome) + if bit_pos is not None: + codewords[i, bit_pos] = 1 - codewords[i, bit_pos] + + # 取每个码字前 4 位作为信息比特,flatten 返回 + decoded_bits = codewords[:, :4].flatten() + + return decoded_bits + + +def convolutional_encode(bits: np.ndarray) -> np.ndarray: """ 选做:实现 (2,1,3) 卷积码编码,生成多项式为 g1=111, g2=101。 默认在末尾添加 2 个 0 作为尾比特,使状态回到全零。 """ - bits = np.asarray(bits, dtype=int) - if not np.all((bits == 0) | (bits == 1)): - raise ValueError('bits 只能包含 0 或 1') + bits = _validate_binary_array(bits, 'bits') - # TODO: 选做任务,可参考课件第6章卷积码部分。 - raise NotImplementedError('选做:请实现卷积码编码') + bits_with_tail = np.concatenate([bits, np.zeros(2, dtype=int)]) + encoded = [] + state = 0 # 初始状态为 0 + for bit in bits_with_tail: + out1, out2, state = _convolutional_output_bits(int(bit), state) + encoded.append(out1) + encoded.append(out2) -def viterbi_decode_hard(received_bits): + return np.array(encoded, dtype=int) + + +def viterbi_decode_hard(received_bits: np.ndarray) -> np.ndarray: """ 选做:实现 (2,1,3) 卷积码硬判决 Viterbi 译码。 """ @@ -120,11 +178,49 @@ def viterbi_decode_hard(received_bits): if len(received_bits) % 2 != 0: raise ValueError('卷积码接收序列长度必须是 2 的倍数') - # TODO: 选做任务,可使用汉明距离作为路径度量。 - raise NotImplementedError('选做:请实现 Viterbi 硬判决译码') + num_states = 4 # 2^2 个状态 + num_symbols = len(received_bits) // 2 + + path_metrics = np.full(num_states, np.inf) + path_metrics[0] = 0.0 + + predecessors = np.zeros((num_symbols, num_states), dtype=int) + input_bits_seq = np.zeros((num_symbols, num_states), dtype=int) + + for symbol_idx in range(num_symbols): + received = received_bits[2 * symbol_idx:2 * symbol_idx + 2] + new_path_metrics = np.full(num_states, np.inf) + + for curr_state in range(num_states): + if path_metrics[curr_state] == np.inf: + continue + + for input_bit in [0, 1]: + out1, out2, next_state = _convolutional_output_bits(input_bit, curr_state) + hamming_dist = abs(out1 - received[0]) + abs(out2 - received[1]) + new_metric = path_metrics[curr_state] + hamming_dist + + if new_metric < new_path_metrics[next_state]: + new_path_metrics[next_state] = new_metric + predecessors[symbol_idx, next_state] = curr_state + input_bits_seq[symbol_idx, next_state] = input_bit + + path_metrics = new_path_metrics + final_state = np.argmin(path_metrics) + decoded = [] + current_state = final_state -def run_coding_demo(): + for symbol_idx in range(num_symbols - 1, -1, -1): + input_bit = input_bits_seq[symbol_idx, current_state] + decoded.append(input_bit) + current_state = predecessors[symbol_idx, current_state] + + decoded.reverse() + return np.array(decoded[:-2], dtype=int) + + +def run_coding_demo() -> None: """运行 Part 1 演示并生成 BER 曲线。""" print('=' * 60) print('Part 1:信道编码实验') @@ -149,15 +245,38 @@ def run_coding_demo(): plot_ber_curve( error_probabilities, {'未编码': uncoded_ber, 'Hamming(7,4)': coded_ber}, - 'Hamming(7,4) 编码前后 BER 对比', + 'Hamming(7,4) 编码前后BER对比', 'coding_ber_curve.png', ) print('✅ 已生成 results/coding_ber_curve.png') + + # 卷积码演示 + print('\n' + '=' * 60) + print('卷积码 (2,1,3) 与 Viterbi 硬判决译码演示') + print('=' * 60) + + conv_ber = [] + bits_conv = generate_bits(4000, seed=2027) + conv_encoded = convolutional_encode(bits_conv) + + for index, probability in enumerate(error_probabilities): + conv_rx = binary_symmetric_channel(conv_encoded, probability, seed=300 + index) + conv_decoded = viterbi_decode_hard(conv_rx) + + # 截断到原始长度进行 BER 计算 + conv_ber.append(calculate_ber(bits_conv[:len(conv_decoded)], conv_decoded)) + + plot_ber_curve( + error_probabilities, + {'未编码': uncoded_ber, 'Hamming(7,4)': coded_ber, '卷积码(2,1,3)': conv_ber}, + '信道编码方案BER对比', + 'coding_comparison_ber_curve.png', + ) + print('✅ 已生成 results/coding_comparison_ber_curve.png') except NotImplementedError as error: print(f'⏸️ 尚未完成核心函数:{error}') except Exception as error: print(f'❌ Part 1 运行失败:{error}') - if __name__ == '__main__': run_coding_demo() diff --git a/src/part2_equalization.py b/src/part2_equalization.py index 8cbf1d8..9c92e66 100644 --- a/src/part2_equalization.py +++ b/src/part2_equalization.py @@ -5,6 +5,7 @@ """ import numpy as np +from typing import Tuple from utils import ( bpsk_demodulate, bpsk_modulate, @@ -16,7 +17,32 @@ ) -def estimate_zf_equalizer(channel, num_taps): +def _validate_1d_array(array: np.ndarray, name: str) -> np.ndarray: + array = np.asarray(array, dtype=float) + if array.ndim != 1: + raise ValueError(f'{name} 必须是一维数组') + return array + + +def _validate_positive_int(value: int, name: str) -> int: + if not isinstance(value, int) or value < 1: + raise ValueError(f'{name} 必须为正整数') + return value + + +def _build_toeplitz_matrix(channel: np.ndarray, num_taps: int) -> np.ndarray: + """构造 FIR 卷积矩阵,使 A @ taps 表示 channel 与 taps 的线性卷积。""" + conv_len = len(channel) + num_taps - 1 + A = np.zeros((conv_len, num_taps), dtype=float) + for row in range(conv_len): + for col in range(num_taps): + idx = row - col + if 0 <= idx < len(channel): + A[row, col] = channel[idx] + return A + + +def estimate_zf_equalizer(channel: np.ndarray, num_taps: int) -> np.ndarray: """ 估计迫零(Zero-Forcing, ZF)FIR 均衡器。 @@ -32,17 +58,24 @@ def estimate_zf_equalizer(channel, num_taps): 2. d 为中心位置为 1 的冲激响应。 3. 使用 np.linalg.lstsq 求最小二乘解。 """ - channel = np.asarray(channel, dtype=float) - if channel.ndim != 1 or len(channel) == 0: - raise ValueError('channel 必须是一维非空数组') - if num_taps < 1: - raise ValueError('num_taps 必须为正整数') + channel = _validate_1d_array(channel, 'channel') + if len(channel) == 0: + raise ValueError('channel 必须为非空数组') + _validate_positive_int(num_taps, 'num_taps') - # TODO: 构造卷积矩阵并求解 ZF 均衡器抽头。 - raise NotImplementedError('请实现 ZF 均衡器估计') + # 构造卷积矩阵 A 并生成目标冲激响应 d + A = _build_toeplitz_matrix(channel, num_taps) + conv_len = A.shape[0] + d = np.zeros(conv_len, dtype=float) + center_pos = (num_taps - 1) // 2 + d[center_pos] = 1.0 + # 使用最小二乘法求解 + taps, _, _, _ = np.linalg.lstsq(A, d, rcond=None) + return taps -def apply_fir_filter(signal, taps): + +def apply_fir_filter(signal: np.ndarray, taps: np.ndarray) -> np.ndarray: """ 对信号应用 FIR 滤波器,并返回与输入等长的输出。 @@ -53,16 +86,19 @@ def apply_fir_filter(signal, taps): 返回: filtered: 与 signal 等长的滤波输出。 """ - signal = np.asarray(signal, dtype=float) - taps = np.asarray(taps, dtype=float) - if signal.ndim != 1 or taps.ndim != 1: - raise ValueError('signal 和 taps 必须是一维数组') + signal = _validate_1d_array(signal, 'signal') + taps = _validate_1d_array(taps, 'taps') + + # 使用 np.convolve 进行卷积并截取与输入等长的输出 + filtered = np.convolve(signal, taps, mode='full')[: len(signal)] + return filtered + - # TODO: 使用 np.convolve,并截取与 signal 等长的输出。 - raise NotImplementedError('请实现 FIR 滤波') +def _build_lms_input_vector(rx_train: np.ndarray, n: int, num_taps: int) -> np.ndarray: + return rx_train[n - num_taps + 1:n + 1][::-1] -def lms_equalizer(rx_train, tx_train, num_taps, step_size=0.01): +def lms_equalizer(rx_train: np.ndarray, tx_train: np.ndarray, num_taps: int, step_size: float = 0.01) -> Tuple[np.ndarray, np.ndarray]: """ 使用训练序列实现 LMS 自适应均衡。 @@ -82,18 +118,35 @@ def lms_equalizer(rx_train, tx_train, num_taps, step_size=0.01): 3. e[n] = d[n] - y[n] 4. w = w + μ e[n] x[n] """ - rx_train = np.asarray(rx_train, dtype=float) - tx_train = np.asarray(tx_train, dtype=float) + rx_train = _validate_1d_array(rx_train, 'rx_train') + tx_train = _validate_1d_array(tx_train, 'tx_train') if len(rx_train) != len(tx_train): raise ValueError('rx_train 和 tx_train 长度必须一致') - if num_taps < 1: - raise ValueError('num_taps 必须为正整数') + _validate_positive_int(num_taps, 'num_taps') + if len(rx_train) < num_taps: + raise ValueError('训练序列长度必须大于等于 num_taps') + if step_size <= 0: + raise ValueError('step_size 必须为正数') + + # 初始化 taps,中心抽头为 1,其余为 0 + taps = np.zeros(num_taps, dtype=float) + center_pos = (num_taps - 1) // 2 + taps[center_pos] = 1.0 + + errors = [] + + # 从第 num_taps - 1 个样本开始迭代 + for n in range(num_taps - 1, len(rx_train)): + x = _build_lms_input_vector(rx_train, n, num_taps) + y = taps @ x + e = tx_train[n] - y + taps = taps + step_size * e * x + errors.append(e) - # TODO: 实现 LMS 自适应均衡训练。 - raise NotImplementedError('请实现 LMS 均衡器') + return taps, np.asarray(errors, dtype=float) -def run_equalization_demo(): +def run_equalization_demo() -> None: """运行 Part 2 演示并生成均衡效果图。""" print('=' * 60) print('Part 2:信道均衡实验')