Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added results/coding_ber_curve.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added results/coding_comparison_ber_curve.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added results/equalization_eye_comparison.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added results/equalization_mse_curve.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
175 changes: 147 additions & 28 deletions src/part1_channel_coding.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"""

import numpy as np
from typing import Optional, Tuple
from utils import (
binary_symmetric_channel,
calculate_ber,
Expand All @@ -26,8 +27,38 @@
[0, 1, 1, 1, 0, 0, 1],
], dtype=int)

CONVOLUTIONAL_GENERATORS = (
(1, 1, 1),
(1, 0, 1),
)


def _validate_binary_array(bits: np.ndarray, name: str) -> np.ndarray:
bits = np.asarray(bits, dtype=int)
if bits.ndim != 1:
raise ValueError(f'{name} 必须是一维数组')
if not np.all((bits == 0) | (bits == 1)):
raise ValueError(f'{name} 只能包含 0 或 1')
return bits


def _syndrome_to_error_position(syndrome: np.ndarray) -> Optional[int]:
for bit_pos in range(7):
if np.array_equal(syndrome, HAMMING_H[:, bit_pos]):
return bit_pos
return None


def hamming74_encode(bits):
def _convolutional_output_bits(input_bit: int, state: int) -> Tuple[int, int, int]:
register = (input_bit, (state >> 1) & 1, state & 1)
g1, g2 = CONVOLUTIONAL_GENERATORS
out1 = (register[0] * g1[0] + register[1] * g1[1] + register[2] * g1[2]) % 2
out2 = (register[0] * g2[0] + register[1] * g2[1] + register[2] * g2[2]) % 2
next_state = ((state << 1) | input_bit) & 0x3
return out1, out2, next_state


def hamming74_encode(bits: np.ndarray) -> np.ndarray:
"""
Hamming(7,4) 系统码编码。

Expand All @@ -40,19 +71,23 @@ def hamming74_encode(bits):
要求:
使用课件中的生成矩阵 G,按 GF(2) 进行矩阵乘法。
"""
bits = np.asarray(bits, dtype=int)
if bits.ndim != 1:
raise ValueError('bits 必须是一维数组')
bits = _validate_binary_array(bits, 'bits')
if len(bits) % 4 != 0:
raise ValueError('Hamming(7,4) 要求输入长度为 4 的倍数')
if not np.all((bits == 0) | (bits == 1)):
raise ValueError('bits 只能包含 0 或 1')

# TODO: 将 bits reshape 为 (-1, 4),再与 HAMMING_G 相乘并对 2 取模。
raise NotImplementedError('请实现 Hamming(7,4) 编码')
# 将 bits reshape 为 (-1, 4)
blocks = bits.reshape(-1, 4)

# 与 HAMMING_G 相乘并对 2 取模
encoded_blocks = (blocks @ HAMMING_G) % 2

# flatten 成一维数组返回
encoded = encoded_blocks.flatten()

return encoded


def hamming74_syndrome(codewords):
def hamming74_syndrome(codewords: np.ndarray) -> np.ndarray:
"""
计算 Hamming(7,4) 码字的伴随式。

Expand All @@ -70,11 +105,13 @@ def hamming74_syndrome(codewords):
if codewords.shape[1] != 7:
raise ValueError('每个 Hamming(7,4) 码字长度必须为 7')

# TODO: 计算 s = r H^T mod 2。
raise NotImplementedError('请实现伴随式计算')
# 计算 s = codewords @ H^T mod 2
syndromes = (codewords @ HAMMING_H.T) % 2

return syndromes


def hamming74_decode(received):
def hamming74_decode(received: np.ndarray) -> np.ndarray:
"""
Hamming(7,4) 单比特纠错译码。

Expand All @@ -94,37 +131,96 @@ def hamming74_decode(received):
if received.ndim != 1 or len(received) % 7 != 0:
raise ValueError('received 必须是一维数组,长度为 7 的倍数')

# TODO: 使用 hamming74_syndrome 完成单比特纠错,并返回前 4 个信息位。
raise NotImplementedError('请实现 Hamming(7,4) 译码')


def convolutional_encode(bits):
# 将 received reshape 为 (-1, 7),复制一份避免直接修改输入
codewords = received.reshape(-1, 7).copy()

# 计算伴随式
syndromes = hamming74_syndrome(codewords)

# 对每个码字进行纠错
for i, syndrome in enumerate(syndromes):
if np.any(syndrome):
bit_pos = _syndrome_to_error_position(syndrome)
if bit_pos is not None:
codewords[i, bit_pos] = 1 - codewords[i, bit_pos]

# 取每个码字前 4 位作为信息比特,flatten 返回
decoded_bits = codewords[:, :4].flatten()

return decoded_bits


def convolutional_encode(bits: np.ndarray) -> np.ndarray:
"""
选做:实现 (2,1,3) 卷积码编码,生成多项式为 g1=111, g2=101。

默认在末尾添加 2 个 0 作为尾比特,使状态回到全零。
"""
bits = np.asarray(bits, dtype=int)
if not np.all((bits == 0) | (bits == 1)):
raise ValueError('bits 只能包含 0 或 1')
bits = _validate_binary_array(bits, 'bits')

# TODO: 选做任务,可参考课件第6章卷积码部分。
raise NotImplementedError('选做:请实现卷积码编码')
bits_with_tail = np.concatenate([bits, np.zeros(2, dtype=int)])
encoded = []
state = 0 # 初始状态为 0

for bit in bits_with_tail:
out1, out2, state = _convolutional_output_bits(int(bit), state)
encoded.append(out1)
encoded.append(out2)

def viterbi_decode_hard(received_bits):
return np.array(encoded, dtype=int)


def viterbi_decode_hard(received_bits: np.ndarray) -> np.ndarray:
"""
选做:实现 (2,1,3) 卷积码硬判决 Viterbi 译码。
"""
received_bits = np.asarray(received_bits, dtype=int)
if len(received_bits) % 2 != 0:
raise ValueError('卷积码接收序列长度必须是 2 的倍数')

# TODO: 选做任务,可使用汉明距离作为路径度量。
raise NotImplementedError('选做:请实现 Viterbi 硬判决译码')
num_states = 4 # 2^2 个状态
num_symbols = len(received_bits) // 2

path_metrics = np.full(num_states, np.inf)
path_metrics[0] = 0.0

predecessors = np.zeros((num_symbols, num_states), dtype=int)
input_bits_seq = np.zeros((num_symbols, num_states), dtype=int)

for symbol_idx in range(num_symbols):
received = received_bits[2 * symbol_idx:2 * symbol_idx + 2]
new_path_metrics = np.full(num_states, np.inf)

for curr_state in range(num_states):
if path_metrics[curr_state] == np.inf:
continue

for input_bit in [0, 1]:
out1, out2, next_state = _convolutional_output_bits(input_bit, curr_state)
hamming_dist = abs(out1 - received[0]) + abs(out2 - received[1])
new_metric = path_metrics[curr_state] + hamming_dist

if new_metric < new_path_metrics[next_state]:
new_path_metrics[next_state] = new_metric
predecessors[symbol_idx, next_state] = curr_state
input_bits_seq[symbol_idx, next_state] = input_bit

path_metrics = new_path_metrics

final_state = np.argmin(path_metrics)
decoded = []
current_state = final_state

def run_coding_demo():
for symbol_idx in range(num_symbols - 1, -1, -1):
input_bit = input_bits_seq[symbol_idx, current_state]
decoded.append(input_bit)
current_state = predecessors[symbol_idx, current_state]

decoded.reverse()
return np.array(decoded[:-2], dtype=int)


def run_coding_demo() -> None:
"""运行 Part 1 演示并生成 BER 曲线。"""
print('=' * 60)
print('Part 1:信道编码实验')
Expand All @@ -149,15 +245,38 @@ def run_coding_demo():
plot_ber_curve(
error_probabilities,
{'未编码': uncoded_ber, 'Hamming(7,4)': coded_ber},
'Hamming(7,4) 编码前后 BER 对比',
'Hamming(7,4) 编码前后BER对比',
'coding_ber_curve.png',
)
print('✅ 已生成 results/coding_ber_curve.png')

# 卷积码演示
print('\n' + '=' * 60)
print('卷积码 (2,1,3) 与 Viterbi 硬判决译码演示')
print('=' * 60)

conv_ber = []
bits_conv = generate_bits(4000, seed=2027)
conv_encoded = convolutional_encode(bits_conv)

for index, probability in enumerate(error_probabilities):
conv_rx = binary_symmetric_channel(conv_encoded, probability, seed=300 + index)
conv_decoded = viterbi_decode_hard(conv_rx)

# 截断到原始长度进行 BER 计算
conv_ber.append(calculate_ber(bits_conv[:len(conv_decoded)], conv_decoded))

plot_ber_curve(
error_probabilities,
{'未编码': uncoded_ber, 'Hamming(7,4)': coded_ber, '卷积码(2,1,3)': conv_ber},
'信道编码方案BER对比',
'coding_comparison_ber_curve.png',
)
print('✅ 已生成 results/coding_comparison_ber_curve.png')
except NotImplementedError as error:
print(f'⏸️ 尚未完成核心函数:{error}')
except Exception as error:
print(f'❌ Part 1 运行失败:{error}')


if __name__ == '__main__':
run_coding_demo()
99 changes: 76 additions & 23 deletions src/part2_equalization.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"""

import numpy as np
from typing import Tuple
from utils import (
bpsk_demodulate,
bpsk_modulate,
Expand All @@ -16,7 +17,32 @@
)


def estimate_zf_equalizer(channel, num_taps):
def _validate_1d_array(array: np.ndarray, name: str) -> np.ndarray:
array = np.asarray(array, dtype=float)
if array.ndim != 1:
raise ValueError(f'{name} 必须是一维数组')
return array


def _validate_positive_int(value: int, name: str) -> int:
if not isinstance(value, int) or value < 1:
raise ValueError(f'{name} 必须为正整数')
return value


def _build_toeplitz_matrix(channel: np.ndarray, num_taps: int) -> np.ndarray:
"""构造 FIR 卷积矩阵,使 A @ taps 表示 channel 与 taps 的线性卷积。"""
conv_len = len(channel) + num_taps - 1
A = np.zeros((conv_len, num_taps), dtype=float)
for row in range(conv_len):
for col in range(num_taps):
idx = row - col
if 0 <= idx < len(channel):
A[row, col] = channel[idx]
return A


def estimate_zf_equalizer(channel: np.ndarray, num_taps: int) -> np.ndarray:
"""
估计迫零(Zero-Forcing, ZF)FIR 均衡器。

Expand All @@ -32,17 +58,24 @@ def estimate_zf_equalizer(channel, num_taps):
2. d 为中心位置为 1 的冲激响应。
3. 使用 np.linalg.lstsq 求最小二乘解。
"""
channel = np.asarray(channel, dtype=float)
if channel.ndim != 1 or len(channel) == 0:
raise ValueError('channel 必须是一维非空数组')
if num_taps < 1:
raise ValueError('num_taps 必须为正整数')
channel = _validate_1d_array(channel, 'channel')
if len(channel) == 0:
raise ValueError('channel 必须为非空数组')
_validate_positive_int(num_taps, 'num_taps')

# TODO: 构造卷积矩阵并求解 ZF 均衡器抽头。
raise NotImplementedError('请实现 ZF 均衡器估计')
# 构造卷积矩阵 A 并生成目标冲激响应 d
A = _build_toeplitz_matrix(channel, num_taps)
conv_len = A.shape[0]
d = np.zeros(conv_len, dtype=float)
center_pos = (num_taps - 1) // 2
d[center_pos] = 1.0

# 使用最小二乘法求解
taps, _, _, _ = np.linalg.lstsq(A, d, rcond=None)
return taps

def apply_fir_filter(signal, taps):

def apply_fir_filter(signal: np.ndarray, taps: np.ndarray) -> np.ndarray:
"""
对信号应用 FIR 滤波器,并返回与输入等长的输出。

Expand All @@ -53,16 +86,19 @@ def apply_fir_filter(signal, taps):
返回:
filtered: 与 signal 等长的滤波输出。
"""
signal = np.asarray(signal, dtype=float)
taps = np.asarray(taps, dtype=float)
if signal.ndim != 1 or taps.ndim != 1:
raise ValueError('signal 和 taps 必须是一维数组')
signal = _validate_1d_array(signal, 'signal')
taps = _validate_1d_array(taps, 'taps')

# 使用 np.convolve 进行卷积并截取与输入等长的输出
filtered = np.convolve(signal, taps, mode='full')[: len(signal)]
return filtered


# TODO: 使用 np.convolve,并截取与 signal 等长的输出。
raise NotImplementedError('请实现 FIR 滤波')
def _build_lms_input_vector(rx_train: np.ndarray, n: int, num_taps: int) -> np.ndarray:
return rx_train[n - num_taps + 1:n + 1][::-1]


def lms_equalizer(rx_train, tx_train, num_taps, step_size=0.01):
def lms_equalizer(rx_train: np.ndarray, tx_train: np.ndarray, num_taps: int, step_size: float = 0.01) -> Tuple[np.ndarray, np.ndarray]:
"""
使用训练序列实现 LMS 自适应均衡。

Expand All @@ -82,18 +118,35 @@ def lms_equalizer(rx_train, tx_train, num_taps, step_size=0.01):
3. e[n] = d[n] - y[n]
4. w = w + μ e[n] x[n]
"""
rx_train = np.asarray(rx_train, dtype=float)
tx_train = np.asarray(tx_train, dtype=float)
rx_train = _validate_1d_array(rx_train, 'rx_train')
tx_train = _validate_1d_array(tx_train, 'tx_train')
if len(rx_train) != len(tx_train):
raise ValueError('rx_train 和 tx_train 长度必须一致')
if num_taps < 1:
raise ValueError('num_taps 必须为正整数')
_validate_positive_int(num_taps, 'num_taps')
if len(rx_train) < num_taps:
raise ValueError('训练序列长度必须大于等于 num_taps')
if step_size <= 0:
raise ValueError('step_size 必须为正数')

# 初始化 taps,中心抽头为 1,其余为 0
taps = np.zeros(num_taps, dtype=float)
center_pos = (num_taps - 1) // 2
taps[center_pos] = 1.0

errors = []

# 从第 num_taps - 1 个样本开始迭代
for n in range(num_taps - 1, len(rx_train)):
x = _build_lms_input_vector(rx_train, n, num_taps)
y = taps @ x
e = tx_train[n] - y
taps = taps + step_size * e * x
errors.append(e)

# TODO: 实现 LMS 自适应均衡训练。
raise NotImplementedError('请实现 LMS 均衡器')
return taps, np.asarray(errors, dtype=float)


def run_equalization_demo():
def run_equalization_demo() -> None:
"""运行 Part 2 演示并生成均衡效果图。"""
print('=' * 60)
print('Part 2:信道均衡实验')
Expand Down
Loading