Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions REPORT.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# 无线通信技术实验报告:信道编码与信道均衡

## 1. 实验目的

说明本实验希望掌握的信道编码、信道均衡和 GitHub 自动评分技能。

## 2. 实验原理

### 2.1 信道编码

说明 Hamming(7,4)、伴随式、单比特纠错、编码增益等概念。

### 2.2 信道均衡

说明 ISI、多径信道、ZF、LMS 自适应均衡等概念。

## 3. 实验环境

- Python 版本:
- 主要依赖:NumPy、Matplotlib、pytest
- AI 助手使用情况:

## 4. 实验方法与步骤

### 4.1 Part 1:信道编码

描述编码、加噪/翻转、译码、BER 对比流程。

### 4.2 Part 2:信道均衡

描述多径信道、ZF/LMS 均衡器设计、均衡效果评估流程。

## 5. 实验结果

插入结果图:

```markdown
![编码BER曲线](results/coding_ber_curve.png)
![均衡眼图对比](results/equalization_eye_comparison.png)
![LMS误差曲线](results/equalization_mse_curve.png)
```

## 6. 结果分析与讨论

回答:

1. Hamming(7,4) 为什么能纠正单比特错误?
2. 为什么信道编码会引入冗余并降低码率?
3. ZF 均衡为什么可能放大噪声?
4. LMS 的步长过大或过小会出现什么问题?
5. 均衡前后 ISI 有什么变化?

## 7. 实验心得

说明你对信道编码、信道均衡、自动评分和 AI 编程辅助的理解。

## 8. 参考资料

- 课程课件:第6章 信道编码
- 课程课件:第7章 均衡
Binary file added results/coding_ber_curve.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added results/equalization_eye_comparison.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added results/equalization_mse_curve.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
119 changes: 109 additions & 10 deletions src/part1_channel_coding.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@
[0, 1, 1, 1, 0, 0, 1],
], dtype=int)

SYNDROME_TO_ERROR = {
tuple(HAMMING_H[:, j]): j
for j in range(HAMMING_H.shape[1])
}


def hamming74_encode(bits):
"""
Expand All @@ -48,8 +53,10 @@ def hamming74_encode(bits):
if not np.all((bits == 0) | (bits == 1)):
raise ValueError('bits 只能包含 0 或 1')

# TODO: 将 bits reshape 为 (-1, 4),再与 HAMMING_G 相乘并对 2 取模。
raise NotImplementedError('请实现 Hamming(7,4) 编码')
# 将 bits reshape 为 (-1, 4),再与 HAMMING_G 相乘并对 2 取模
blocks = bits.reshape(-1, 4)
encoded = (blocks @ HAMMING_G) % 2
return encoded.flatten()


def hamming74_syndrome(codewords):
Expand All @@ -70,8 +77,9 @@ def hamming74_syndrome(codewords):
if codewords.shape[1] != 7:
raise ValueError('每个 Hamming(7,4) 码字长度必须为 7')

# TODO: 计算 s = r H^T mod 2。
raise NotImplementedError('请实现伴随式计算')
# 计算 s = r H^T mod 2
syndromes = (codewords @ HAMMING_H.T) % 2
return syndromes


def hamming74_decode(received):
Expand All @@ -94,8 +102,19 @@ def hamming74_decode(received):
if received.ndim != 1 or len(received) % 7 != 0:
raise ValueError('received 必须是一维数组,长度为 7 的倍数')

# TODO: 使用 hamming74_syndrome 完成单比特纠错,并返回前 4 个信息位。
raise NotImplementedError('请实现 Hamming(7,4) 译码')
# reshape为(-1, 7),避免直接修改输入
codewords = received.reshape(-1, 7).copy()
syndromes = hamming74_syndrome(codewords)

# 对每个非零伴随式进行纠错
for i, syndrome in enumerate(syndromes):
if syndrome.any():
error_position = SYNDROME_TO_ERROR.get(tuple(syndrome))
if error_position is not None:
codewords[i, error_position] ^= 1

# 取前 4 个信息位并 flatten 返回
return codewords[:, :4].ravel()


def convolutional_encode(bits):
Expand All @@ -108,8 +127,26 @@ def convolutional_encode(bits):
if not np.all((bits == 0) | (bits == 1)):
raise ValueError('bits 只能包含 0 或 1')

# TODO: 选做任务,可参考课件第6章卷积码部分。
raise NotImplementedError('选做:请实现卷积码编码')
# 在末尾添加 2 个尾比特使状态回到全零
bits_with_tail = np.concatenate([bits, np.array([0, 0], dtype=int)])

# 初始状态:2个秘密状态位
state = 0 # (s1, s0)
encoded = []

for bit in bits_with_tail:
# 根据 g1=111 (7) 和 g2=101 (5) 计算两个输出比特
# g1=111: 输出1 = input XOR state_bit1 XOR state_bit0
# g2=101: 输出2 = input XOR state_bit0
out1 = bit ^ ((state >> 1) & 1) ^ (state & 1) # g1
out2 = bit ^ (state & 1) # g2
encoded.append(out1)
encoded.append(out2)

# 更新状态: 新状态 = (input, state_bit1)
state = (bit << 1) | ((state >> 1) & 1)

return np.array(encoded, dtype=int)


def viterbi_decode_hard(received_bits):
Expand All @@ -120,8 +157,70 @@ def viterbi_decode_hard(received_bits):
if len(received_bits) % 2 != 0:
raise ValueError('卷积码接收序列长度必须是 2 的倍数')

# TODO: 选做任务,可使用汉明距离作为路径度量。
raise NotImplementedError('选做:请实现 Viterbi 硬判决译码')
# 4个可能的状态:(s1, s0)
num_states = 4
num_steps = len(received_bits) // 2

# 处理接收序列,每次输入一寸(2比特)
received_pairs = received_bits.reshape(-1, 2)

# 动态规划:路径度量和路径跟踪
# path_metric[state] = 到当前状态的最小累计度量
# paths[t, state] = 第 t 时刻到达 state 时的前一状态
path_metric = np.full(num_states, np.inf, dtype=float)
path_metric[0] = 0 # 初始状态为 0
paths = np.zeros((num_steps, num_states), dtype=int)

# Viterbi 轨迹
for t, received_pair in enumerate(received_pairs):
new_metric = np.full(num_states, np.inf, dtype=float)
new_paths = np.zeros(num_states, dtype=int)

# 对每个当前可达状态
for curr_state in range(num_states):
if path_metric[curr_state] == np.inf:
continue

# 对每个可能的输入(0 或 1)
for input_bit in [0, 1]:
# 计算下一个状态和输出
# state = (s1, s0), 新下一个状态 = (input_bit, s1)
next_state = (input_bit << 1) | ((curr_state >> 1) & 1)

# 根据 g1, g2 计算输出
s1 = (curr_state >> 1) & 1
s0 = curr_state & 1
out1 = input_bit ^ s1 ^ s0 # g1 = 111
out2 = input_bit ^ s0 # g2 = 101

# 计算汉明距离
hamming_dist = int((out1 != received_pair[0]) + (out2 != received_pair[1]))
new_metric_value = path_metric[curr_state] + hamming_dist

# 更新最小路径
if new_metric_value < new_metric[next_state]:
new_metric[next_state] = new_metric_value
new_paths[next_state] = curr_state

path_metric = new_metric
paths[t] = new_paths

# 回溯:从最优终状态出发
final_state = np.argmin(path_metric)

# 追踪轨迹
decoded_bits = []
state = final_state
for t in range(num_steps - 1, -1, -1):
prev_state = paths[t, state]
# 从 prev_state 到 state 的输入比特是 input_bit
# state = (input_bit << 1) | ((prev_state >> 1) & 1)
input_bit = (state >> 1) & 1
decoded_bits.insert(0, input_bit)
state = prev_state

# 删除最后 2 个尾比特
return np.array(decoded_bits[:-2], dtype=int)


def run_coding_demo():
Expand Down
52 changes: 46 additions & 6 deletions src/part2_equalization.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@
)


def _build_convolution_matrix(channel, num_taps):
"""构造信道与 FIR 系数卷积的 Toeplitz 矩阵。"""
conv_len = len(channel) + num_taps - 1
A = np.zeros((conv_len, num_taps), dtype=float)
for i in range(num_taps):
A[i:i+len(channel), i] = channel
return A


def estimate_zf_equalizer(channel, num_taps):
"""
估计迫零(Zero-Forcing, ZF)FIR 均衡器。
Expand All @@ -38,8 +47,13 @@ def estimate_zf_equalizer(channel, num_taps):
if num_taps < 1:
raise ValueError('num_taps 必须为正整数')

# TODO: 构造卷积矩阵并求解 ZF 均衡器抽头。
raise NotImplementedError('请实现 ZF 均衡器估计')
A = _build_convolution_matrix(channel, num_taps)
d = np.zeros(A.shape[0], dtype=float)
center = (A.shape[0] - 1) // 2
d[center] = 1.0

taps, _, _, _ = np.linalg.lstsq(A, d, rcond=None)
return taps


def apply_fir_filter(signal, taps):
Expand All @@ -58,8 +72,13 @@ def apply_fir_filter(signal, taps):
if signal.ndim != 1 or taps.ndim != 1:
raise ValueError('signal 和 taps 必须是一维数组')

# TODO: 使用 np.convolve,并截取与 signal 等长的输出。
raise NotImplementedError('请实现 FIR 滤波')
# 使用 np.convolve 的 full 模式
convolved = np.convolve(signal, taps, mode='full')

# 截取前 len(signal) 个样本,使输出与输入等长
filtered = convolved[:len(signal)]

return filtered


def lms_equalizer(rx_train, tx_train, num_taps, step_size=0.01):
Expand Down Expand Up @@ -89,8 +108,29 @@ def lms_equalizer(rx_train, tx_train, num_taps, step_size=0.01):
if num_taps < 1:
raise ValueError('num_taps 必须为正整数')

# TODO: 实现 LMS 自适应均衡训练。
raise NotImplementedError('请实现 LMS 均衡器')
# 初始化抽头,中心位置为 1
taps = np.zeros(num_taps, dtype=float)
center = num_taps // 2
taps[center] = 1.0

errors = []

# 从第 num_taps-1 个样本开始迭代,确保有足够的过去样本
for n in range(num_taps - 1, len(rx_train)):
# 构造当前输入向量 x[n] = [rx_train[n], rx_train[n-1], ..., rx_train[n-num_taps+1]]
x = rx_train[n - num_taps + 1:n + 1][::-1]

# 计算输出 y[n] = w^T x[n]
y = np.dot(taps, x)

# 计算误差 e[n] = d[n] - y[n]
e = tx_train[n] - y
errors.append(e)

# 更新抽头 w = w + μ e[n] x[n]
taps += step_size * e * x

return taps, np.asarray(errors, dtype=float)


def run_equalization_demo():
Expand Down
Loading