使用多层感知机初步实现MNIST的分类

参考书目:《深度学习入门:基于Python的理论与实现》

神经网络可以视为一个带参函数,函数的输入是数据,也是函数的自变量,输出是我们所要的结果,也是函数的因变量。而根据监督数据,对其中的参数的调整就叫做“训练”,训练的标准用”损失函数“,训练的具体过程就是求这个多元函数的最值。可以使用梯度法。

项目计划

  1. 完成MNIST数据集的导入。

  2. 对参数初始化后运行程序,前向处理神经网络

  3. 根据多批监督数据计算损失

  4. 根据损失计算梯度

  5. 根据梯度更新参数

数据导入

1
2
3
4
5
6
7
8
9
10
11
12
13
import sys
import os
# 导入book_note下的数据
current_dir = os.path.dirname(os.path.abspath(__file__))
sys.path.append(os.path.join(current_dir, "..", "book_code"))

import dataset.mnist
# 导入MNIST数据集
# x为图片;t为监督数据,表示该图片所属的分类
(x_train, t_train), (x_test, t_test) = dataset.mnist.load_mnist(flatten=True, normalize=False)
# 测试数据是否导入成功
print(x_train.shape)
print(t_train.shape)

前向处理神经网络程序

forward为前向处理神经网络程序,W和b为权重和偏差(即需要调整的参数)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#前向处理函数
def forward(x):
a1 = np.dot(x, W1) + b1
z1 = sigmoid(a1)
a2 = np.dot(z1, W2) + b2
y = softmax(a2)
return y


if __name__ == 'main':
# 初始化参数
W1 = np.random.randn(784, 50)
W2 = np.random.randn(50, 10)
b1 = np.random.randn(50)
b2 = np.random.randn(10)

其中sigmoid函数和softmax的表达式分别为:

sigmoid函数和softmax函数(批处理版本)我定义在了另一个文件common.py中实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# common.py:
import numpy as np


# 遇到溢出问题,用deepseek改写
def sigmoid(x):
"""
计算 Sigmoid 函数,支持批处理输入。
参数:
x: 输入值,可以是标量、一维数组或多维数组。
返回:
Sigmoid 函数的结果,形状与输入相同。
"""
# 初始化结果数组
result = np.zeros_like(x, dtype=np.float64)
# 处理 x >= 0 的情况,建立索引掩码
mask = x >= 0
result[mask] = 1 / (1 + np.exp(-x[mask]))
# 处理 x < 0 的情况
result[~mask] = np.exp(x[~mask]) / (1 + np.exp(x[~mask]))
return result


# deepseek教我写的
def softmax(x):
"""
计算批处理 Softmax。
输入形状:(batch_size, num_features)
输出形状:(batch_size, num_features)
"""
# 1. 数值稳定性优化:每行减去最大值
# keepdims 为 广播矩阵时需要
max_vals = np.max(x, axis=1, keepdims=True)
x_shifted = x - max_vals
# 2. 计算指数
exp_x = np.exp(x_shifted)
# 3. 按行求和(分母)
sum_exp = np.sum(exp_x, axis=1, keepdims=True)
# 4. 归一化得到概率
softmax_output = exp_x / sum_exp
return softmax_output


def cross_entropy_error(y, t):
batch_size = y.shape[0]
# return -np.sum(np.log(y[np.arange(batch_size), t] + 1e-7)) / batch_size
# [np.arange(batch_size), t]是一组数对,放在y[]产生一组值
test = y[np.arange(batch_size), t] + 1e-7
# 计算一批的平均损失值
return -np.sum(np.log(test)) / batch_size

其中还有下面要用到的交叉熵损失函数

根据批数据计算损失函数,计算梯度并更新参数

这几个步骤写在一个循环之中比较好,我就放在一起了

涉及到批处理,python的语法非常宽容,也比较容易混乱,需要细心一点,可以输出shape观察矩阵情况

先展示整体代码,我自顶而下的解释其中的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 多次批处理
# 以下在main中
for i in range(iter_num):
# 运行神经网络,计算损失
los = neuf(para)
print("cost of batch_"+str(i)+" is "+str(los))
# 设置学习率和更新次数
lr = 0.06
step_num = 2
# 计算梯度,更新参数
for i in range(step_num):
for keys, value in para.items():
para[keys] -= lr * numerical_gradient(neuf, para, keys)

los = neuf(para)
print("cost of result is "+str(los))

这里用了一个neuf函数表示神经网络这个函数,其定义如下

1
2
3
4
5
6
def neuf(para):
# 前向处理
y_batch = forward(x_batch, para)
# 计算损失
loss = cross_entropy_error(y_batch, t_batch)
return loss

其中loss的计算就用到了再common中定义的交叉熵函数。

numerical_gradient的计算,我写的有点不太规范,是顺着逻辑写下去的,想的是能跑的通就行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def numerical_gradient(f, par, keys):
h = 1e-4 # 微小改变量
grad_res = np.zeros_like(par[keys]) # 生成和x形状相同的数组
rows = grad_res.shape[0]
cols = grad_res.shape[1]
for i in range(rows):
for j in range(cols):
tmp_val = par[keys][i][j]
par[keys][i][j] = tmp_val + h
fxh1 = f(par)
par[keys][i][j] = tmp_val - h
fxh2 = f(par)
grad_res[i][j] = (fxh1 - fxh2) / (2*h)
par[keys][i][j] = tmp_val # 还原值
return grad_res

这里直接遍历每个元素,使用数值微分计算梯度,没有用到其他加速技巧,所以运行的比较慢。

程序运行

程序运行结果如下,我设置的是学习率为0.6,学习次数为2,仅处理一批100个的数据,在我的电脑上要运行十秒左右,这个实践意义是几乎没有的,所以说反向传播和向量化加速计算是非常有必要的。

1
2
3
4
Neural Start.
loss of batch_0 is 5.189915434074649
loss of result is 0.4092244287792991
cost of time is 9.5535 seconds

源代码

在两个文件中实现,文件组织如下

1
2
3
project/
    ├── main.py
    └── common.py

common.py在上面展示过了,main.py如下(针对只进行一批处理进行了修改):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import sys
import os
import time
from common import *
# 导入book_note下的数据
current_dir = os.path.dirname(os.path.abspath(__file__))
sys.path.append(os.path.join(current_dir, "..", "book_code"))
import dataset.mnist
# 导入MNIST数据集
# x为图片;t为监督数据,表示该图片所属的分类
(x_train, t_train), (x_test, t_test) = dataset.mnist.load_mnist(flatten=True, normalize=True)
# 测试数据是否导入成功
# print(x_train.shape)
# print(t_train.shape)
# print(x_test.shape)
# print(t_test.shape)

# 批处理次数与数量
iter_num = 1
batch_size = 1
# 获取一个batch
# 如果要处理多批,这代码应该放在neuf中
batch_mask = np.random.choice(x_train.shape[0], batch_size)
x_batch = x_train[batch_mask]
t_batch = t_train[batch_mask]


# 前向处理函数
def forward(x, f_para):
a1 = np.dot(x, f_para['W1']) + f_para['b1']
z1 = sigmoid(a1)
a2 = np.dot(z1, f_para['W2']) + f_para['b2']
y = softmax(a2)
return y


def numerical_gradient(f, par, keys):
h = 1e-4 # 微小改变量
grad_res = np.zeros_like(par[keys]) # 生成和x形状相同的数组
rows = grad_res.shape[0]
cols = grad_res.shape[1]
for i in range(rows):
for j in range(cols):
tmp_val = par[keys][i][j]
par[keys][i][j] = tmp_val + h
fxh1 = f(par)
par[keys][i][j] = tmp_val - h
fxh2 = f(par)
grad_res[i][j] = (fxh1 - fxh2) / (2*h)
par[keys][i][j] = tmp_val # 还原值
return grad_res


def neuf(para):
# 前向处理
y_batch = forward(x_batch, para)
# 计算损失
loss = cross_entropy_error(y_batch, t_batch)
return loss


if __name__ == '__main__':
# 初始化参数
# W1是一个 784(28*28 flatten之后的)*50 的矩阵
para = {}
para['W1'] = np.random.randn(784, 50)
para['W2'] = np.random.randn(50, 10)
para['b1'] = np.random.randn(1, 50)
para['b2'] = np.random.randn(1, 10)

# 测时程序
start_time = time.time()
print("Neural Start.")

# 多次批处理
for i in range(iter_num):
# 运行神经网络,计算损失
los = neuf(para)
print("loss of batch_"+str(i)+" is "+str(los))
# 设置学习率和更新次数
lr = 0.06
step_num = 2
# 计算梯度,更新参数
for i in range(step_num):
for keys, value in para.items():
para[keys] -= lr * numerical_gradient(neuf, para, keys)

los = neuf(para)
print("loss of result is "+str(los))

# 测时程序
end_time = time.time()
elapsed_time = end_time - start_time
print(f"cost of time is {elapsed_time:.4f} seconds")