深度学习


1. 深度学习

1.1. 数据读取

1.1.1. 标签转换为数据迭代器

from torch.utils import data
# 传入多个数据,类似数据压缩 data_arrays,对数据进行解压缩
dataset = data.TensorDataset(data_arrays)
# 创建迭代器,batch_size
data.DataLoader(dataset, batch_size, shuffle=is_train)
# iter返回一个迭代器对象, next读取迭代器的下一个数据
next(iter(data))

1.1.2. 从torchvision.datasets读取

from torchvision import transforms
from torch.utils import data
import torchvision
def load_data(batch_size, resize = None):
 # 定义数据处理方式   
    trans = [transforms.ToTensors()]
    if resize:
        trans.insert(0, transforms.Resize(resize))
    trans = transforms.Compose(trans)# 使用Compose转换
    
    # 选择数据导出模式
    '''
    root - 数据保存地址
    train - 训练数据
    transforms 转换模式
    download 是否选择下载
    '''
    mnist_train = torchvision.datasets.FashionMNIST(
        root = "../data", train = True, transforms = trans, download = True)
    
    '''
    batch_size: 每个批次包含多少个样本。
    shuffle: 是否在每个 epoch 开始时打乱数据(训练集通常为 True,测试集通常为 False)。
    num_workers (可选): 用于数据加载的子进程数量,可以加快数据读取速度。
    '''
    data = data.DataLoader(mnist_train, batch_size, shuffle = True,
                          num_workers = 3)
       

1.1.3. 序列数据

对于总长度T, 时间$\tau$ ,将数据分为
$$
y_t = X_t \
x_t = [x_[t-\tau]… x_{t-1}] \
但是x_t 相对Y_t少了\tau个,可以舍弃,也可以填充0
$$

tau = 4
features = torch.zeros((T - tau, tau))
for i in range(tau):
    features[:, i] = x[i: T - tau + i]
labels = x[tau:].reshape((-1, 1))
  1. 读取数据
  2. 词元化(转换为单词或字符)
  3. 转换为词元与数字的映射
  4. 将映射作用在序列数据上,转换为向量表示
  5. 数据预处理代码

1.1.4. 文本序列数据

  1. 读取数据到string中
  2. 使用split(\t) 分割为英法双语,然后使用.split(‘ ‘)将单词分割
  3. 然后将单词词元,建立词表
  4. 使用词表,转换为向量,对向量进行阶段或填充,然后在向量末尾增加,再统计序列中有效的单元数量valid_len
  5. 指定批量大小,转换为小批量迭代器

读取文本序列代码

1.2. 模型

1.2.1. 线性模型linear

第一个指定输入特征形状,即2,第二个指定输出特征形状

from torch import nn
net = nn.Sequential(nn.Linear(2, 1))
loss = nn.MSELoss()
trainer = torch.optim.SGD(net.parameters(), lr=0.03)

1.2.2. 展平层

将(batch_size, channel, height, weight) 转换为(batch_size, channel* height*weight)

nn.Flatten(start_dim = 1, end_dim =-1) # 默认保留第一维batch_size
nn.LogSoftmax(dim=1) # 作用于最后一个维度,进行归一化

1.2.3. MLP

from torch import nn
net = nn.Sequential(nn.Flatten(),
                    nn.Linear(784, 256),
                    nn.ReLU(),
                    nn.Linear(256, 10))

1.2.4. CNN

不变性:无论使用什么方法找到这个物体,都与物体的位置无关

原因:与像素点计算的卷积核都是相同的,不随着位置的改变而改变

'''
param1: 输入通道
param2: 输出通道
parma3: 卷积核大小
param4: 参数
'''
conv2d = nn.Conv2d(1,1, kernel_size=(1, 2), bias=False)
'''
卷积核大小 = input_channels* kernel_size, 
每一个卷积核计算的出来都是一个二维图形
卷积核数量 = output_channels
'''

1.2.4.1. 填充padding

卷积核最好选择奇数

填充高度与宽度满足$p_h = k_h -1 , p_w = k_w -1$, 在上下填充的高度与宽度分别为 ${p_h}/{2} , p_w/2$

conv2d = nn.Conv2d(1,1, kernel_size=(3, 3),padding = 1, bias=False)

1.2.4.2. 步幅

输出形状公式为
$$
\lfloor(n_h-k_h-+p_h+s_h)/s_h\rfloor \
p_h = k_h-1 \
所以结果为(n_h+s_h-1)/s_h, \
可以整除情况下,=(n_h+s_h)/s_h
$$

conv2d = nn.Conv2d(1, 1, kernel_size=(3, 5), padding=(1, 2), stride=(3, 4))
  • 当以每像素为基础应用时,$1\times 1$卷积层相当于全连接层。

1.2.5. 批量归一化层BatchNorm

$$
\mathrm{BN}(\mathbf{x}) = \boldsymbol{\gamma} \odot \frac{\mathbf{x} - \hat{\boldsymbol{\mu}}_\mathcal{B}}{\hat{\boldsymbol{\sigma}}_\mathcal{B}} + \boldsymbol{\beta}.
$$

对特征维度进行归一化,$\gamma、\beta$ 是拉伸和偏移参数

  1. 对特征维度进行归一化

    mean = X.mean(dim = 0, keepdim = True)
    
  2. 卷积层

    1. 对通道维度进行归一化
  3. 训练状态下使用小批次的样本均值与方差,测试状态使用的移动平均估算的均值与方差

'''
param: 输入通道数
'''
nn.BatchNorm2d(6)  
nn.BatchNorm1d(128)

1.2.6. 残差块

训练是模型训练出F(X) = H(x)- X

image-20250423225436209

import torch 
from torch import nn
from torch.nn import functional as F
class Residual(nn.Module):
    def __init__(self , input_channels, num_channels, use_1x1conv= False, strides =1):
        super().__init__()
        self.conv1 = nn.Conv2d(input_channels, num_channels, kernel_size = 3, paddding = 1, stride = strides)
        self.conv2 = nn.Conv2d(num_channels, num_channels, kernel_size = 3, paddding = 1, stride = strides)
        # 修改X的通道数量,与f(x) - x匹配
        if use_1x1conv :
            self.conv3 = nn.Conv2d(input_channels, num_channels, kernel_size =1 ,padding = 0, stride = strides)
        else :
            self.conv3 = none
        self.bn1 = nn.BatchNorm2d(num_channels)
        self.bn2 = nn.BatchNorm2d(num_channels)
    def forward(self,X):
        Y = F.relu(self.bn1(self.conv1(X)))
        Y = self.bn2(self.conv2(Y))
        # 修改X输出通道数量
        if conv3 :
            X = self.conv3(X)
        # 实现Y = f(x) +X
        Y = Y+ X
        Y = F.relu(Y)
 
return Y
        

1.2.7. 池化层pooling

汇聚卷积层计算出的信息,降低卷积层对位置的敏感性,同时降低对空间降采样表示的敏感性

from torch import nn
nn.MaxPool2d((3,3), padding =(0,1), stride = (0,1))  # 最大池化层
nn.AvgPool2d((2, 3), stride=(2, 3), padding=(0, 1))  # 平均池化层

'''
param1 : 输出层形状(1,1), 网络自动计算padding, stride ,k ,转化为(1,1)的输出维度
'''
nn.AdaptiveAvgPool2d((1,1))

池化层不需要指出Input_channels, output_channels, input_channels =output_channels

例如,Lenet网络

image-20250423211333286

net = nn.Sequential(
    nn.Conv2d(1, 6, kernel_size=5, padding=2), nn.Sigmoid(),
    nn.AvgPool2d(kernel_size=2, stride=2),
    nn.Conv2d(6, 16, kernel_size=5), nn.Sigmoid(),
    nn.AvgPool2d(kernel_size=2, stride=2),
    nn.Flatten(),
    nn.Linear(16 * 5 * 5, 120), nn.Sigmoid(),
    nn.Linear(120, 84), nn.Sigmoid(),
    nn.Linear(84, 10))

1.2.8. 循环神经网络RNN

$$
隐藏输出\\mathbf{H}t = \phi(\mathbf{X}t \mathbf{W}{xh} + \mathbf{H}{t-1} \mathbf{W}_{hh} + \mathbf{b}_h).\
输出\
\mathbf{O}_t = \mathbf{H}t \mathbf{W}{hq} + \mathbf{b}_q.
$$

1.2.8.1. 深度循环deep_rnn

RNN相当于Drnn中,$H_{t}^{0} = X_t$
$$
\mathbf{H}t^{(l)} = \phi_l(\mathbf{H}t^{(l-1)} \mathbf{W}{xh}^{(l)} + \mathbf{H}{t-1}^{(l)} \mathbf{W}_{hh}^{(l)} + \mathbf{b}_h^{(l)})\
\mathbf{O}_t = \mathbf{H}t^{(L)} \mathbf{W}{hq} + \mathbf{b}_q
$$

1.2.8.2. 双向循环rnn

image-20250424175214314

多用于对文本的编码,而不是预测文本

1.2.8.3. 代码介绍

输入X = (time_step ,batch_size, feature), output = (time_step ,batch_size, num_hidden*(1/2)), state = (time_step ,batch_size, num_hidden)

from torch import nn
rnn = nn.RNN(num_input, num_hiddens)
gru_layer = nn.GRU(num_inputs, num_hiddens)
lstm_layer=nn.LSTM(num_inputs, num_hiddens)

# drnn
'''
X_{batch* num_input}* W_{num_input * num_hidder} 
num_inputs: 输入的特征数量
num_hidden:隐神经元数量 = 隐状态的特征数量
bidirectional: 双向循环网络
'''
lstm_layer = nn.LSTM(num_inputs, num_hidden, num_layers, bidirectional=True)

RNN 利用时间维度的参数共享实现了对*时间位置,每一个时间步使用的一套参数

def rnn(inputs, state, params):
    # inputs的形状:(时间步数量,批量大小,词表大小)
    W_xh, W_hh, b_h, W_hq, b_q = params
    
    H, = state
    outputs = []
    # X的形状:(批量大小,词表大小)
    for X in inputs:
        H = torch.tanh(torch.mm(X, W_xh) + torch.mm(H, W_hh) + b_h)
        Y = torch.mm(H, W_hq) + b_q
        outputs.append(Y)
    return torch.cat(outputs, dim=0), (H,)

image-20250424173608710

1.2.8.4. 梯度衰减

减小梯度,避免梯度爆炸,使得梯度始终保持在$\theta$ 以下
$$
\mathbf{g} \leftarrow \min\left(1, \frac{\theta}{|\mathbf{g}|}\right) \mathbf{g}.
$$
梯度裁剪代码

1.3. 激活函数

将模型从线性变为非线性

1.3.1. relu

$$
\operatorname{ReLU}(x) = \max(x, 0).
$$

torch.relu(X)

1.3.2. sigmod

特征图像,曲线在(0,1)中,关于0.5对称
$$
$$\operatorname{sigmoid}(x) = \frac{1}{1 + \exp(-x)}.$$
$$

y = torch.sigmod(X)

1.3.3. tanh

特征图像,曲线在(-1,1)中,关于0对称
$$
$$\operatorname{tanh}(x) = \frac{1 - \exp(-2x)}{1 + \exp(-2x)}
$$

y = torch.tanh(x)

1.4. 损失函数

1.4.1. 均方误差

loss = nn.MSELoss()

1.4.2. 交叉熵损失

loss = nn.CrossEntropyLoss(reduction='none') 
# 返回的是一个张量,反向传播需要计算为标量
# 相当于 NLLLoss(LogSoftmax(logits), target)

1.4.3. NLLLoss

loss = nn.NLLLoss()  

1.5. 训练过程

num_epoch = 2
for epoch in range(num_epoch):
    for X, y in data_iter :
        l = loss(net(X), y)
        train.zero_grad()
        # 清除训练模型的梯度, 返回的是一个标量
        l.backward()  
        # 反向计算梯度
        trainer.step()
        
    l = loss(net(features), labels)  # 计算总体梯度
    print(f'epoch{epoch}, loss{1:0.2f}')
    
        
        

1.6. 模型问题

1.6.1. 复杂性因素

  1. 可调整参数的数量。当可调整参数的数量(有时称为自由度)很大时,模型往往更容易过拟合。
  2. 参数采用的值。当权重的取值范围较大时,模型可能更容易过拟合。
  3. 训练样本的数量。即使模型很简单,也很容易过拟合只包含一两个样本的数据集。而过拟合一个有数百万个样本的数据集则需要一个极其灵活的模型。

1.6.2. K折交叉验证

一个epoch中,将训练数据分为K份,在k-1份上进行训练,在第K份上进行验证

1.6.3. L2正则化

$$
\begin{aligned}

\mathbf{w} & \leftarrow \left(1- \eta\lambda \right) \mathbf{w} - \frac{\eta}{|\mathcal{B}|} \sum_{i \in \mathcal{B}} \mathbf{x}^{(i)} \left(\mathbf{w}^\top \mathbf{x}^{(i)} + b - y^{(i)}\right).

\end{aligned}
$$

每次都减小一定的权重,岭回归相对于线性回归增加了L2正则化,LASSO回归相当于增加了L1回归,相当于参数选择

  1. 选择权重参数,然后正则化
# 选择参数组
for name, param in net.named_parameters():
    # param.requires_grad 确保只包含需要梯度的参数
    if param.requires_grad:
        # 根据参数名称判断是否是偏置项
        if 'bias' in name: # 简单的判断,更严格的判断可以是 name.endswith('.bias')
            params_without_wd.append(param)
        else:
            params_with_wd.append(param)

# 初始化优化器,使用参数组
trainer = torch.optim.SGD([
    {'params': params_with_wd, 'weight_decay': wd},
    {'params': params_without_wd, 'weight_decay': 0} # 对偏置项设置 weight_decay 为 0
], lr=lr)
  1. 对所有参数进行正则化

    trainer = torch.optim.SGD([
        net.parameters(), lr = lr, weigth_decay = wd
    ])
    

1.6.4. 暂退法(Dropout)

随即丢弃部分神经元

nn.Dropout(ratio)

1.6.5. 随机初始化

暂退法和随机初始化,都可以减小神经元的对称性

from torch.nn.init as init
class SimpleMLP(nn.Module):
    def __init__(self):
        super(SimpleMLP, self).__init__()
        self.fc1 = nn.Linear(784, 128) # 例如处理 28x28 图像展平后的输入
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(128, 64)
        self.relu2 = nn.ReLU()
        self.fc3 = nn.Linear(64, 10) # 例如输出 10 个类别的概率

        # 在这里调用自定义初始化函数
        self._initialize_weights()

    def _initialize_weights(self):
        print("正在进行自定义初始化...")
        for m in self.modules(): # 遍历模型的所有模块 (包括子模块自身)
            # print(f"处理模块: {m}") # 可以打印查看正在处理的模块类型
            if isinstance(m, nn.Linear):
                # 对线性层的权重使用 He/Kaiming 初始化
                init.kaiming_normal_(m.weight, mode='fan_in', nonlinearity='relu')
                # 对线性层的偏置初始化为常数 0
                if m.bias is not None: # 检查偏置是否存在
                    init.constant_(m.bias, 0)

1.7. 附录

squence 序列数据预处理

# 读取序列数据
d2l.DATA_HUB['time_machine'] = (d2l.DATA_URL + 'timemachine.txt',
                                '090b5e7e70c295757f55df93cb0a180b9691891a')

def read_time_machine():  #@save
    """将时间机器数据集加载到文本行的列表中"""
    with open(d2l.download('time_machine'), 'r') as f:
        lines = f.readlines()
    return [re.sub('[^A-Za-z]+', ' ', line).strip().lower() for line in lines]

lines = read_time_machine()

# 词元化
def tokenize(lines, token='word'):  #@save
    """将文本行拆分为单词或字符词元"""
    if token == 'word':
        return [line.split() for line in lines]
    elif token == 'char':
        return [list(line) for line in lines]
    else:
        print('错误:未知词元类型:' + token)

print(type(lines[0]))
tokens = tokenize(lines)


# 词元与数值的映射
class Vocab:  #@save
    """文本词表"""
    def __init__(self, tokens=None, min_freq=0, reserved_tokens=None):
        if tokens is None:
            tokens = []
        if reserved_tokens is None:
            reserved_tokens = []
        # 按出现频率排序
        counter = count_corpus(tokens)
        self._token_freqs = sorted(counter.items(), key=lambda x: x[1],
                                   reverse=True)
        # 未知词元的索引为0
        self.idx_to_token = ['<unk>'] + reserved_tokens

        # 单词到索引梭顺序
        self.token_to_idx = {token: idx
                             for idx, token in enumerate(self.idx_to_token)}
        
        for token, freq in self._token_freqs:
            if freq < min_freq:
                break
            if token not in self.token_to_idx:
                # 顺序到单词
                self.idx_to_token.append(token)
                self.token_to_idx[token] = len(self.idx_to_token) - 1

    def __len__(self):
        return len(self.idx_to_token)

    def __getitem__(self, tokens):
        if not isinstance(tokens, (list, tuple)):
            return self.token_to_idx.get(tokens, self.unk)
        return [self.__getitem__(token) for token in tokens]

    def to_tokens(self, indices):
        if not isinstance(indices, (list, tuple)):
            return self.idx_to_token[indices]
        return [self.idx_to_token[index] for index in indices]

    @property
    def unk(self):  # 未知词元的索引为0
        return 0

    @property
    def token_freqs(self):
        return self._token_freqs

def count_corpus(tokens):  #@save
    """统计词元的频率"""
    # 这里的tokens是1D列表或2D列表
    if len(tokens) == 0 or isinstance(tokens[0], list):
        # 将词元列表展平成一个列表
        tokens = [token for line in tokens for token in line]
    return collections.Counter(tokens)

def load_corpus_time_machine(max_tokens=-1):  #@save
    """返回时光机器数据集的词元索引列表和词表"""
    lines = read_time_machine()
    tokens = tokenize(lines, 'char')
    vocab = Vocab(tokens)
    # 因为时光机器数据集中的每个文本行不一定是一个句子或一个段落,
    # 所以将所有文本行展平到一个列表中
    corpus = [vocab[token] for line in tokens for token in line]
    if max_tokens > 0:
        corpus = corpus[:max_tokens]
    return corpus, vocab

梯度衰减

def grad_clipping(net, theta):  #@save
    """裁剪梯度"""
    if isinstance(net, nn.Module):
        params = [p for p in net.parameters() if p.requires_grad]
    else:
        params = net.params
    norm = torch.sqrt(sum(torch.sum((p.grad ** 2)) for p in params))
    if norm > theta:
        for param in params:
            param.grad[:] *= theta / norm

1.7.1. 读取文本序列数据

import os
import torch
from d2l import torch as d2l

# 下载并读出序列
d2l.DATA_HUB['fra-eng'] = (d2l.DATA_URL + 'fra-eng.zip',
                           '94646ad1522d915e7b0f9296181140edcf86a4f5')

#@save
def read_data_nmt():
    """载入“英语-法语”数据集"""
    data_dir = d2l.download_extract('fra-eng')
    with open(os.path.join(data_dir, 'fra.txt'), 'r',
             encoding='utf-8') as f:
        return f.read()

raw_text = read_data_nmt()
print(raw_text[:75])

# 处理序列
def preprocess_nmt(text):
    """预处理“英语-法语”数据集"""
    def no_space(char, prev_char):
        return char in set(',.!?') and prev_char != ' '

    # 使用空格替换不间断空格
    # 使用小写字母替换大写字母
    text = text.replace('\u202f', ' ').replace('\xa0', ' ').lower()
    # 在单词和标点符号之间插入空格
    out = [' ' + char if i > 0 and no_space(char, text[i - 1]) else char
           for i, char in enumerate(text)]
    return ''.join(out)

# 词元化,并区分出feature与label
def tokenize_nmt(text, num_examples=None):
    """词元化“英语-法语”数据数据集"""
    source, target = [], []
    for i, line in enumerate(text.split('\n')):
        if num_examples and i > num_examples:
            break
        parts = line.split('\t')
        if len(parts) == 2:
            # 之前标点符号之间增加了空格,使用空格分割
            source.append(parts[0].split(' '))
            target.append(parts[1].split(' '))
    return source, target

source, target = tokenize_nmt(text)

# 阶段或填充词元
def truncate_pad(line, num_steps, padding_token):
    """截断或填充文本序列"""
    if len(line) > num_steps:
        return line[:num_steps]  # 截断
    return line + [padding_token] * (num_steps - len(line))  # 填充

truncate_pad(src_vocab[source[0]], 10, src_vocab['<pad>'])

# 统计序列数据valid_len
def build_array_nmt(lines, vocab, num_steps):
    """将机器翻译的文本序列转换成小批量"""
    lines = [vocab[l] for l in lines]
    lines = [l + [vocab['<eos>']] for l in lines]
    array = torch.tensor([truncate_pad(
        l, num_steps, vocab['<pad>']) for l in lines])
    valid_len = (array != vocab['<pad>']).type(torch.int32).sum(1)
    return array, valid_len


def load_data_nmt(batch_size, num_steps, num_examples=600):
    """返回翻译数据集的迭代器和词表"""
    text = preprocess_nmt(read_data_nmt()) # 读取序列
    source, target = tokenize_nmt(text, num_examples)  # 词元化序列
    src_vocab = d2l.Vocab(source, min_freq=2, # 建立词表
                          reserved_tokens=['<pad>', '<bos>', '<eos>'])
    tgt_vocab = d2l.Vocab(target, min_freq=2,
                          reserved_tokens=['<pad>', '<bos>', '<eos>'])
    src_array, src_valid_len = build_array_nmt(source, src_vocab, num_steps)  # 填充或阶段
    tgt_array, tgt_valid_len = build_array_nmt(target, tgt_vocab, num_steps)
    data_arrays = (src_array, src_valid_len, tgt_array, tgt_valid_len)
    data_iter = d2l.load_array(data_arrays, batch_size)   # 创建小批量迭代器
    return data_iter, src_vocab, tgt_vocab

文章作者: 小白菜
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 小白菜 !
评论
  目录