1. 深度学习
1.1. 数据读取
1.1.1. 标签转换为数据迭代器
from torch.utils import data
# 传入多个数据,类似数据压缩 data_arrays,对数据进行解压缩
dataset = data.TensorDataset(data_arrays)
# 创建迭代器,batch_size
data.DataLoader(dataset, batch_size, shuffle=is_train)
# iter返回一个迭代器对象, next读取迭代器的下一个数据
next(iter(data))
1.1.2. 从torchvision.datasets读取
from torchvision import transforms
from torch.utils import data
import torchvision
def load_data(batch_size, resize = None):
# 定义数据处理方式
trans = [transforms.ToTensors()]
if resize:
trans.insert(0, transforms.Resize(resize))
trans = transforms.Compose(trans)# 使用Compose转换
# 选择数据导出模式
'''
root - 数据保存地址
train - 训练数据
transforms 转换模式
download 是否选择下载
'''
mnist_train = torchvision.datasets.FashionMNIST(
root = "../data", train = True, transforms = trans, download = True)
'''
batch_size: 每个批次包含多少个样本。
shuffle: 是否在每个 epoch 开始时打乱数据(训练集通常为 True,测试集通常为 False)。
num_workers (可选): 用于数据加载的子进程数量,可以加快数据读取速度。
'''
data = data.DataLoader(mnist_train, batch_size, shuffle = True,
num_workers = 3)
1.1.3. 序列数据
对于总长度T, 时间$\tau$ ,将数据分为
$$
y_t = X_t \
x_t = [x_[t-\tau]… x_{t-1}] \
但是x_t 相对Y_t少了\tau个,可以舍弃,也可以填充0
$$
tau = 4
features = torch.zeros((T - tau, tau))
for i in range(tau):
features[:, i] = x[i: T - tau + i]
labels = x[tau:].reshape((-1, 1))
- 读取数据
- 词元化(转换为单词或字符)
- 转换为词元与数字的映射
- 将映射作用在序列数据上,转换为向量表示
- 数据预处理代码
1.1.4. 文本序列数据
- 读取数据到string中
- 使用split(\t) 分割为英法双语,然后使用.split(‘ ‘)将单词分割
- 然后将单词词元,建立词表
- 使用词表,转换为向量,对向量进行阶段或填充,然后在向量末尾增加
,再统计序列中有效的单元数量valid_len - 指定批量大小,转换为小批量迭代器
1.2. 模型
1.2.1. 线性模型linear
第一个指定输入特征形状,即2,第二个指定输出特征形状
from torch import nn
net = nn.Sequential(nn.Linear(2, 1))
loss = nn.MSELoss()
trainer = torch.optim.SGD(net.parameters(), lr=0.03)
1.2.2. 展平层
将(batch_size, channel, height, weight) 转换为(batch_size, channel* height*weight)
nn.Flatten(start_dim = 1, end_dim =-1) # 默认保留第一维batch_size
nn.LogSoftmax(dim=1) # 作用于最后一个维度,进行归一化
1.2.3. MLP
from torch import nn
net = nn.Sequential(nn.Flatten(),
nn.Linear(784, 256),
nn.ReLU(),
nn.Linear(256, 10))
1.2.4. CNN
不变性:无论使用什么方法找到这个物体,都与物体的位置无关
原因:与像素点计算的卷积核都是相同的,不随着位置的改变而改变
'''
param1: 输入通道
param2: 输出通道
parma3: 卷积核大小
param4: 参数
'''
conv2d = nn.Conv2d(1,1, kernel_size=(1, 2), bias=False)
'''
卷积核大小 = input_channels* kernel_size,
每一个卷积核计算的出来都是一个二维图形
卷积核数量 = output_channels
'''
1.2.4.1. 填充padding
卷积核最好选择奇数
填充高度与宽度满足$p_h = k_h -1 , p_w = k_w -1$, 在上下填充的高度与宽度分别为 ${p_h}/{2} , p_w/2$
conv2d = nn.Conv2d(1,1, kernel_size=(3, 3),padding = 1, bias=False)
1.2.4.2. 步幅
输出形状公式为
$$
\lfloor(n_h-k_h-+p_h+s_h)/s_h\rfloor \
p_h = k_h-1 \
所以结果为(n_h+s_h-1)/s_h, \
可以整除情况下,=(n_h+s_h)/s_h
$$
conv2d = nn.Conv2d(1, 1, kernel_size=(3, 5), padding=(1, 2), stride=(3, 4))
- 当以每像素为基础应用时,$1\times 1$卷积层相当于全连接层。
1.2.5. 批量归一化层BatchNorm
$$
\mathrm{BN}(\mathbf{x}) = \boldsymbol{\gamma} \odot \frac{\mathbf{x} - \hat{\boldsymbol{\mu}}_\mathcal{B}}{\hat{\boldsymbol{\sigma}}_\mathcal{B}} + \boldsymbol{\beta}.
$$
对特征维度进行归一化,$\gamma、\beta$ 是拉伸和偏移参数
对特征维度进行归一化
mean = X.mean(dim = 0, keepdim = True)
卷积层
- 对通道维度进行归一化
训练状态下使用小批次的样本均值与方差,测试状态使用的移动平均估算的均值与方差
'''
param: 输入通道数
'''
nn.BatchNorm2d(6)
nn.BatchNorm1d(128)
1.2.6. 残差块
训练是模型训练出F(X) = H(x)- X
import torch
from torch import nn
from torch.nn import functional as F
class Residual(nn.Module):
def __init__(self , input_channels, num_channels, use_1x1conv= False, strides =1):
super().__init__()
self.conv1 = nn.Conv2d(input_channels, num_channels, kernel_size = 3, paddding = 1, stride = strides)
self.conv2 = nn.Conv2d(num_channels, num_channels, kernel_size = 3, paddding = 1, stride = strides)
# 修改X的通道数量,与f(x) - x匹配
if use_1x1conv :
self.conv3 = nn.Conv2d(input_channels, num_channels, kernel_size =1 ,padding = 0, stride = strides)
else :
self.conv3 = none
self.bn1 = nn.BatchNorm2d(num_channels)
self.bn2 = nn.BatchNorm2d(num_channels)
def forward(self,X):
Y = F.relu(self.bn1(self.conv1(X)))
Y = self.bn2(self.conv2(Y))
# 修改X输出通道数量
if conv3 :
X = self.conv3(X)
# 实现Y = f(x) +X
Y = Y+ X
Y = F.relu(Y)
return Y
1.2.7. 池化层pooling
汇聚卷积层计算出的信息,降低卷积层对位置的敏感性,同时降低对空间降采样表示的敏感性
from torch import nn
nn.MaxPool2d((3,3), padding =(0,1), stride = (0,1)) # 最大池化层
nn.AvgPool2d((2, 3), stride=(2, 3), padding=(0, 1)) # 平均池化层
'''
param1 : 输出层形状(1,1), 网络自动计算padding, stride ,k ,转化为(1,1)的输出维度
'''
nn.AdaptiveAvgPool2d((1,1))
池化层不需要指出Input_channels, output_channels, input_channels =output_channels
例如,Lenet网络
net = nn.Sequential(
nn.Conv2d(1, 6, kernel_size=5, padding=2), nn.Sigmoid(),
nn.AvgPool2d(kernel_size=2, stride=2),
nn.Conv2d(6, 16, kernel_size=5), nn.Sigmoid(),
nn.AvgPool2d(kernel_size=2, stride=2),
nn.Flatten(),
nn.Linear(16 * 5 * 5, 120), nn.Sigmoid(),
nn.Linear(120, 84), nn.Sigmoid(),
nn.Linear(84, 10))
1.2.8. 循环神经网络RNN
$$
隐藏输出\\mathbf{H}t = \phi(\mathbf{X}t \mathbf{W}{xh} + \mathbf{H}{t-1} \mathbf{W}_{hh} + \mathbf{b}_h).\
输出\
\mathbf{O}_t = \mathbf{H}t \mathbf{W}{hq} + \mathbf{b}_q.
$$
1.2.8.1. 深度循环deep_rnn
RNN相当于Drnn中,$H_{t}^{0} = X_t$
$$
\mathbf{H}t^{(l)} = \phi_l(\mathbf{H}t^{(l-1)} \mathbf{W}{xh}^{(l)} + \mathbf{H}{t-1}^{(l)} \mathbf{W}_{hh}^{(l)} + \mathbf{b}_h^{(l)})\
\mathbf{O}_t = \mathbf{H}t^{(L)} \mathbf{W}{hq} + \mathbf{b}_q
$$
1.2.8.2. 双向循环rnn
多用于对文本的编码,而不是预测文本
1.2.8.3. 代码介绍
输入X = (time_step ,batch_size, feature), output = (time_step ,batch_size, num_hidden*(1/2)), state = (time_step ,batch_size, num_hidden)
from torch import nn
rnn = nn.RNN(num_input, num_hiddens)
gru_layer = nn.GRU(num_inputs, num_hiddens)
lstm_layer=nn.LSTM(num_inputs, num_hiddens)
# drnn
'''
X_{batch* num_input}* W_{num_input * num_hidder}
num_inputs: 输入的特征数量
num_hidden:隐神经元数量 = 隐状态的特征数量
bidirectional: 双向循环网络
'''
lstm_layer = nn.LSTM(num_inputs, num_hidden, num_layers, bidirectional=True)
RNN 利用时间维度的参数共享实现了对*时间位置,每一个时间步使用的一套参数
def rnn(inputs, state, params):
# inputs的形状:(时间步数量,批量大小,词表大小)
W_xh, W_hh, b_h, W_hq, b_q = params
H, = state
outputs = []
# X的形状:(批量大小,词表大小)
for X in inputs:
H = torch.tanh(torch.mm(X, W_xh) + torch.mm(H, W_hh) + b_h)
Y = torch.mm(H, W_hq) + b_q
outputs.append(Y)
return torch.cat(outputs, dim=0), (H,)
1.2.8.4. 梯度衰减
减小梯度,避免梯度爆炸,使得梯度始终保持在$\theta$ 以下
$$
\mathbf{g} \leftarrow \min\left(1, \frac{\theta}{|\mathbf{g}|}\right) \mathbf{g}.
$$
梯度裁剪代码
1.3. 激活函数
将模型从线性变为非线性
1.3.1. relu
$$
\operatorname{ReLU}(x) = \max(x, 0).
$$
torch.relu(X)
1.3.2. sigmod
特征图像,曲线在(0,1)中,关于0.5对称
$$
$$\operatorname{sigmoid}(x) = \frac{1}{1 + \exp(-x)}.$$
$$
y = torch.sigmod(X)
1.3.3. tanh
特征图像,曲线在(-1,1)中,关于0对称
$$
$$\operatorname{tanh}(x) = \frac{1 - \exp(-2x)}{1 + \exp(-2x)}
$$
y = torch.tanh(x)
1.4. 损失函数
1.4.1. 均方误差
loss = nn.MSELoss()
1.4.2. 交叉熵损失
loss = nn.CrossEntropyLoss(reduction='none')
# 返回的是一个张量,反向传播需要计算为标量
# 相当于 NLLLoss(LogSoftmax(logits), target)
1.4.3. NLLLoss
loss = nn.NLLLoss()
1.5. 训练过程
num_epoch = 2
for epoch in range(num_epoch):
for X, y in data_iter :
l = loss(net(X), y)
train.zero_grad()
# 清除训练模型的梯度, 返回的是一个标量
l.backward()
# 反向计算梯度
trainer.step()
l = loss(net(features), labels) # 计算总体梯度
print(f'epoch{epoch}, loss{1:0.2f}')
1.6. 模型问题
1.6.1. 复杂性因素
- 可调整参数的数量。当可调整参数的数量(有时称为自由度)很大时,模型往往更容易过拟合。
- 参数采用的值。当权重的取值范围较大时,模型可能更容易过拟合。
- 训练样本的数量。即使模型很简单,也很容易过拟合只包含一两个样本的数据集。而过拟合一个有数百万个样本的数据集则需要一个极其灵活的模型。
1.6.2. K折交叉验证
一个epoch中,将训练数据分为K份,在k-1份上进行训练,在第K份上进行验证
1.6.3. L2正则化
$$
\begin{aligned}
\mathbf{w} & \leftarrow \left(1- \eta\lambda \right) \mathbf{w} - \frac{\eta}{|\mathcal{B}|} \sum_{i \in \mathcal{B}} \mathbf{x}^{(i)} \left(\mathbf{w}^\top \mathbf{x}^{(i)} + b - y^{(i)}\right).
\end{aligned}
$$
每次都减小一定的权重,岭回归相对于线性回归增加了L2正则化,LASSO回归相当于增加了L1回归,相当于参数选择
- 选择权重参数,然后正则化
# 选择参数组
for name, param in net.named_parameters():
# param.requires_grad 确保只包含需要梯度的参数
if param.requires_grad:
# 根据参数名称判断是否是偏置项
if 'bias' in name: # 简单的判断,更严格的判断可以是 name.endswith('.bias')
params_without_wd.append(param)
else:
params_with_wd.append(param)
# 初始化优化器,使用参数组
trainer = torch.optim.SGD([
{'params': params_with_wd, 'weight_decay': wd},
{'params': params_without_wd, 'weight_decay': 0} # 对偏置项设置 weight_decay 为 0
], lr=lr)
对所有参数进行正则化
trainer = torch.optim.SGD([ net.parameters(), lr = lr, weigth_decay = wd ])
1.6.4. 暂退法(Dropout)
随即丢弃部分神经元
nn.Dropout(ratio)
1.6.5. 随机初始化
暂退法和随机初始化,都可以减小神经元的对称性
from torch.nn.init as init
class SimpleMLP(nn.Module):
def __init__(self):
super(SimpleMLP, self).__init__()
self.fc1 = nn.Linear(784, 128) # 例如处理 28x28 图像展平后的输入
self.relu = nn.ReLU()
self.fc2 = nn.Linear(128, 64)
self.relu2 = nn.ReLU()
self.fc3 = nn.Linear(64, 10) # 例如输出 10 个类别的概率
# 在这里调用自定义初始化函数
self._initialize_weights()
def _initialize_weights(self):
print("正在进行自定义初始化...")
for m in self.modules(): # 遍历模型的所有模块 (包括子模块自身)
# print(f"处理模块: {m}") # 可以打印查看正在处理的模块类型
if isinstance(m, nn.Linear):
# 对线性层的权重使用 He/Kaiming 初始化
init.kaiming_normal_(m.weight, mode='fan_in', nonlinearity='relu')
# 对线性层的偏置初始化为常数 0
if m.bias is not None: # 检查偏置是否存在
init.constant_(m.bias, 0)
1.7. 附录
squence 序列数据预处理
# 读取序列数据
d2l.DATA_HUB['time_machine'] = (d2l.DATA_URL + 'timemachine.txt',
'090b5e7e70c295757f55df93cb0a180b9691891a')
def read_time_machine(): #@save
"""将时间机器数据集加载到文本行的列表中"""
with open(d2l.download('time_machine'), 'r') as f:
lines = f.readlines()
return [re.sub('[^A-Za-z]+', ' ', line).strip().lower() for line in lines]
lines = read_time_machine()
# 词元化
def tokenize(lines, token='word'): #@save
"""将文本行拆分为单词或字符词元"""
if token == 'word':
return [line.split() for line in lines]
elif token == 'char':
return [list(line) for line in lines]
else:
print('错误:未知词元类型:' + token)
print(type(lines[0]))
tokens = tokenize(lines)
# 词元与数值的映射
class Vocab: #@save
"""文本词表"""
def __init__(self, tokens=None, min_freq=0, reserved_tokens=None):
if tokens is None:
tokens = []
if reserved_tokens is None:
reserved_tokens = []
# 按出现频率排序
counter = count_corpus(tokens)
self._token_freqs = sorted(counter.items(), key=lambda x: x[1],
reverse=True)
# 未知词元的索引为0
self.idx_to_token = ['<unk>'] + reserved_tokens
# 单词到索引梭顺序
self.token_to_idx = {token: idx
for idx, token in enumerate(self.idx_to_token)}
for token, freq in self._token_freqs:
if freq < min_freq:
break
if token not in self.token_to_idx:
# 顺序到单词
self.idx_to_token.append(token)
self.token_to_idx[token] = len(self.idx_to_token) - 1
def __len__(self):
return len(self.idx_to_token)
def __getitem__(self, tokens):
if not isinstance(tokens, (list, tuple)):
return self.token_to_idx.get(tokens, self.unk)
return [self.__getitem__(token) for token in tokens]
def to_tokens(self, indices):
if not isinstance(indices, (list, tuple)):
return self.idx_to_token[indices]
return [self.idx_to_token[index] for index in indices]
@property
def unk(self): # 未知词元的索引为0
return 0
@property
def token_freqs(self):
return self._token_freqs
def count_corpus(tokens): #@save
"""统计词元的频率"""
# 这里的tokens是1D列表或2D列表
if len(tokens) == 0 or isinstance(tokens[0], list):
# 将词元列表展平成一个列表
tokens = [token for line in tokens for token in line]
return collections.Counter(tokens)
def load_corpus_time_machine(max_tokens=-1): #@save
"""返回时光机器数据集的词元索引列表和词表"""
lines = read_time_machine()
tokens = tokenize(lines, 'char')
vocab = Vocab(tokens)
# 因为时光机器数据集中的每个文本行不一定是一个句子或一个段落,
# 所以将所有文本行展平到一个列表中
corpus = [vocab[token] for line in tokens for token in line]
if max_tokens > 0:
corpus = corpus[:max_tokens]
return corpus, vocab
梯度衰减
def grad_clipping(net, theta): #@save
"""裁剪梯度"""
if isinstance(net, nn.Module):
params = [p for p in net.parameters() if p.requires_grad]
else:
params = net.params
norm = torch.sqrt(sum(torch.sum((p.grad ** 2)) for p in params))
if norm > theta:
for param in params:
param.grad[:] *= theta / norm
1.7.1. 读取文本序列数据
import os
import torch
from d2l import torch as d2l
# 下载并读出序列
d2l.DATA_HUB['fra-eng'] = (d2l.DATA_URL + 'fra-eng.zip',
'94646ad1522d915e7b0f9296181140edcf86a4f5')
#@save
def read_data_nmt():
"""载入“英语-法语”数据集"""
data_dir = d2l.download_extract('fra-eng')
with open(os.path.join(data_dir, 'fra.txt'), 'r',
encoding='utf-8') as f:
return f.read()
raw_text = read_data_nmt()
print(raw_text[:75])
# 处理序列
def preprocess_nmt(text):
"""预处理“英语-法语”数据集"""
def no_space(char, prev_char):
return char in set(',.!?') and prev_char != ' '
# 使用空格替换不间断空格
# 使用小写字母替换大写字母
text = text.replace('\u202f', ' ').replace('\xa0', ' ').lower()
# 在单词和标点符号之间插入空格
out = [' ' + char if i > 0 and no_space(char, text[i - 1]) else char
for i, char in enumerate(text)]
return ''.join(out)
# 词元化,并区分出feature与label
def tokenize_nmt(text, num_examples=None):
"""词元化“英语-法语”数据数据集"""
source, target = [], []
for i, line in enumerate(text.split('\n')):
if num_examples and i > num_examples:
break
parts = line.split('\t')
if len(parts) == 2:
# 之前标点符号之间增加了空格,使用空格分割
source.append(parts[0].split(' '))
target.append(parts[1].split(' '))
return source, target
source, target = tokenize_nmt(text)
# 阶段或填充词元
def truncate_pad(line, num_steps, padding_token):
"""截断或填充文本序列"""
if len(line) > num_steps:
return line[:num_steps] # 截断
return line + [padding_token] * (num_steps - len(line)) # 填充
truncate_pad(src_vocab[source[0]], 10, src_vocab['<pad>'])
# 统计序列数据valid_len
def build_array_nmt(lines, vocab, num_steps):
"""将机器翻译的文本序列转换成小批量"""
lines = [vocab[l] for l in lines]
lines = [l + [vocab['<eos>']] for l in lines]
array = torch.tensor([truncate_pad(
l, num_steps, vocab['<pad>']) for l in lines])
valid_len = (array != vocab['<pad>']).type(torch.int32).sum(1)
return array, valid_len
def load_data_nmt(batch_size, num_steps, num_examples=600):
"""返回翻译数据集的迭代器和词表"""
text = preprocess_nmt(read_data_nmt()) # 读取序列
source, target = tokenize_nmt(text, num_examples) # 词元化序列
src_vocab = d2l.Vocab(source, min_freq=2, # 建立词表
reserved_tokens=['<pad>', '<bos>', '<eos>'])
tgt_vocab = d2l.Vocab(target, min_freq=2,
reserved_tokens=['<pad>', '<bos>', '<eos>'])
src_array, src_valid_len = build_array_nmt(source, src_vocab, num_steps) # 填充或阶段
tgt_array, tgt_valid_len = build_array_nmt(target, tgt_vocab, num_steps)
data_arrays = (src_array, src_valid_len, tgt_array, tgt_valid_len)
data_iter = d2l.load_array(data_arrays, batch_size) # 创建小批量迭代器
return data_iter, src_vocab, tgt_vocab