序言
- 承接上文 VAE变分自编码器学习笔记 一文介绍了VAE算法的设计思路、原理、公式推导以及作者本人对VAE模型的理解
- 这里继续就VAE算法的实现进行学习,为模型能够应用到工作中做准备
1. 环境配置
-
(1)我们使用pytorch来训练模型,首先导入必要的库(模块module、包package)/相关类
import torch from torch import optim import torch.nn.functional as F from torch.utils.data import DataLoader from torchvision import transforms, datasets from torchvision.utils import save_image from vae import VAE import matplotlib.pyplot as plt import argparse import os import shutil import numpy as np
-
(2)设置模型运行的设备
cuda = torch.cuda.is_available() device = torch.device("cuda" if cuda else "cpu")
-
(3)设置默认参数,包括一些超参和其他配置参数,这里用到argparser这个库
parser = argparse.ArgumentParser(description="Variational Auto-Encoder MNIST Example") parser.add_argument('--result_dir', type=str, default='./VAEResult', metavar='DIR', help='output directory') parser.add_argument('--save_dir', type=str, default='./checkPoint', metavar='N', help='model saving directory') parser.add_argument('--batch_size', type=int, default=128, metavar='N', help='batch size for training(default: 128)') parser.add_argument('--epochs', type=int, default=200, metavar='N', help='number of epochs to train(default: 200)') parser.add_argument('--seed', type=int, default=1, metavar='S', help='random seed(default: 1)') parser.add_argument('--resume', type=str, default='', metavar='PATH', help='path to latest checkpoint(default: None)') parser.add_argument('--test_every', type=int, default=10, metavar='N', help='test after every epochs') parser.add_argument('--num_worker', type=int, default=1, metavar='N', help='the number of workers') parser.add_argument('--lr', type=float, default=1e-3, help='learning rate(default: 0.001)') parser.add_argument('--z_dim', type=int, default=20, metavar='N', help='the dim of latent variable z(default: 20)') parser.add_argument('--input_dim', type=int, default=28 * 28, metavar='N', help='input dim(default: 28*28 for MNIST)') parser.add_argument('--input_channel', type=int, default=1, metavar='N', help='input channel(default: 1 for MNIST)') args = parser.parse_args() # num_workers设置载入输入所用的子进程的个数 kwargs = {'num_workers': 2, 'pin_memory': True} if cuda else {}
2. 数据集准备
-
加载MNIST手写数据集,包括训练数据集和测试数据集
def dataloader(batch_size=128, num_workers=2): transform = transforms.Compose([ transforms.ToTensor(), ]) # 下载mnist数据集 mnist_train = datasets.MNIST('mnist', train=True, transform=transform, download=True) mnist_test = datasets.MNIST('mnist', train=False, transform=transform, download=True) # 载入mnist数据集 # 该函数会先打乱数据再按batch_size取数据 mnist_train = DataLoader(mnist_train, batch_size=batch_size, shuffle=True) mnist_test = DataLoader(mnist_test, batch_size=batch_size, shuffle=True) classes = ('0', '1', '2', '3', '4', '5', '6', '7', '8', '9') return mnist_test, mnist_train, classes
3. 构建网络
-
(1)构造函数
class VAE(nn.Module): def __init__(self, input_dim=784, h_dim=400, z_dim=20): # 调用父类方法初始化模块的state super(VAE, self).__init__() self.input_dim = input_dim self.h_dim = h_dim self.z_dim = z_dim # 编码器 : [b, input_dim] => [b, z_dim] self.fc1 = nn.Linear(input_dim, h_dim) # 第一个全连接层 self.fc2 = nn.Linear(h_dim, z_dim) # mu self.fc3 = nn.Linear(h_dim, z_dim) # log_var # 解码器 : [b, z_dim] => [b, input_dim] self.fc4 = nn.Linear(z_dim, h_dim) self.fc5 = nn.Linear(h_dim, input_dim)
输入层维度:input_dim = 784
隐藏层维度:h_dim = 400
隐变量维度:z_dim = 20 -
(2)forward函数:前向传播部分,在模型调用时自动调用该函数
def forward(self, x): """ 向前传播部分, 在model_name(inputs)时自动调用 :param x: the input of our training model [b, batch_size, 1, 28, 28] :return: the result of our training model """ batch_size = x.shape[0] # flatten [b, batch_size, 1, 28, 28] => [b, batch_size, 784] # tensor.view()方法可以调整tensor的形状,但必须保证调整前后元素总数一致 # view不会修改自身的数据,返回的新tensor与原tensor共享内存,即更改一个,另一个也随之改变。 x = x.view(batch_size, self.input_dim) # 一行代表一个样本 # encoder mu, log_var = self.encode(x) # reparameterization trick sampled_z = self.reparameterization(mu, log_var) # decoder x_hat = self.decode(sampled_z) # reshape x_hat = x_hat.view(batch_size, 1, 28, 28) return x_hat, mu, log_var
-
(3)编解码函数
- encoder和decoder可以直接使用Sequential的形式写;
- encoder结尾不能继续再接一个ReLU,因为在优化的过程中,隐变量是要逐渐趋于N(0, I)的,如果非要加个ReLU的话,本身假设的隐变量维度就很小,小于0的隐变量直接就没了,decoder在解码时直接会因为信息不足而崩掉;
def encode(self, x): """ encoding part :param x: input image :return: mu and log_var """ h = F.relu(self.fc1(x)) mu = self.fc2(h) log_var = self.fc3(h) return mu, log_var
def decode(self, z): """ Given a sampled z, decode it back to image """ h = F.relu(self.fc4(z)) x_hat = torch.sigmoid(self.fc5(h)) # 图片数值取值为[0,1],不宜用ReLU return x_hat
-
(4)重参数化函数
- 重参数化通过标准化步骤得到采样结果,使得模型可训练
def reparameterization(self, mu, log_var): """ Given a standard gaussian distribution epsilon ~ N(0,1), we can sample the random variable z as per z = mu + sigma * epsilon """ sigma = torch.exp(log_var * 0.5) eps = torch.randn_like(sigma) return mu + sigma * eps # *是点乘的意思
4. 定义损失函数
-
VAE的损失由重构损失和KL散度构成
- KL散度: KLD描述两个分布的相似性
K L ( N ( μ , σ 2 ) N ( 0 , 1 ) ) = 1 2 ( − l o g σ 2 + μ 2 + σ 2 − 1 ) KL(N(\mu , \sigma ^{2} ) N(0,1))=\frac{1}{2} (-log\sigma ^{2}+\mu ^{2}+ \sigma ^{2} -1) KL(N(μ,σ2)N(0,1))=21(−logσ2+μ2+σ2−1)
-
重构损失
因为手写数据集MNIST是黑白二值图像,我们把MNIST看作是二分类问题,重构损失直接用BCE二分类交叉熵;如果是三通道图像如RGB图像或者是单通道图像如灰度图像,还是需要使用MSE做重构损失
def loss_function(x_hat, x, mu, log_var): """ Calculate the loss. Note that the loss includes two parts. :return: total loss, BCE and KLD of our model """ # 1. the reconstruction loss. # We regard the MNIST as binary classification BCE = F.binary_cross_entropy(x_hat, x, reduction='sum') # 2. KL-divergence KLD = 0.5 * torch.sum(torch.exp(log_var) + torch.pow(mu, 2) - 1. - log_var) # 3. total loss loss = BCE + KLD return loss, BCE, KLD
5. 模型训练/测试
5.1 从头开始训练
-
参数中已定义好默认的训练epoch和batch_size,然后优化器我们选择Adam优化器;
-
开始训练迭代,把每个epoch的平均损失保存起来:
loss_epoch = [] for epoch in range(start_epoch, args.epochs): loss_batch = [] for batch_index, (x, _) in enumerate(mnist_train): # x : [b, 1, 28, 28] x = x.to(device) # 前向传播 x_hat, mu, log_var = model(x) # 模型调用会自动调用model中的forward函数 loss, BCE, KLD = loss_function(x_hat, x, mu, log_var) # 计算损失值 loss_batch.append(loss.item()) # loss是Tensor类型 # 后向传播 optimizer.zero_grad() # 梯度清零,否则上一步的梯度仍会存在 loss.backward() # 后向传播计算梯度,这些梯度会保存在model.parameters里面 optimizer.step() # 更新梯度,这一步与上一步主要是根据model.parameters联系起来 # print statistics every 100 batch if (batch_index + 1) % 100 == 0: print('Epoch [{}/{}], Batch [{}/{}] : Total-loss = {:.4f}, BCE-Loss = {:.4f}, KLD-loss = {:.4f}' .format(epoch + 1, args.epochs, batch_index + 1, len(mnist_train.dataset) // args.batch_size, loss.item() / args.batch_size, BCE.item() / args.batch_size, KLD.item() / args.batch_size)) if batch_index == 0: x_concat = torch.cat([x.view(-1, 1, 28, 28), x_hat.view(-1, 1, 28, 28)], dim=3) save_image(x_concat, './%s/reconstructed-%d.png' % (args.result_dir, epoch + 1)) # 把这一个epoch的样本的平均损失存起来 loss_epoch.append(np.sum(loss_batch) / len(mnist_train.dataset)) # len(mnist_train.dataset)为样本个数 # 测试模型 if (epoch + 1) % args.test_every == 0: best_test_loss = test(model, optimizer, mnist_test, epoch, best_test_loss) return loss_epoch
-
测试模型:对本次迭代的模型进行测试,记录最优loss并保存对应的check point
def test(model, optimizer, mnist_test, epoch, best_test_loss): test_avg_loss = 0.0 with torch.no_grad(): # 这一部分不计算梯度,也就是不放入计算图中去 '''测试测试集中的数据''' # 计算所有batch的损失函数的和 for test_batch_index, (test_x, _) in enumerate(mnist_test): test_x = test_x.to(device) # 前向传播 test_x_hat, test_mu, test_log_var = model(test_x) # 损害函数值 test_loss, test_BCE, test_KLD = loss_function(test_x_hat, test_x, test_mu, test_log_var) test_avg_loss += test_loss # 对和求平均,得到每一张图片的平均损失 test_avg_loss /= len(mnist_test.dataset) '''测试随机生成的隐变量''' # 随机从隐变量的分布中取隐变量 z = torch.randn(args.batch_size, args.z_dim).to(device) # 每一行是一个隐变量,总共有batch_size行 # 对隐变量重构 random_res = model.decode(z).view(-1, 1, 28, 28) # 保存重构结果 save_image(random_res, './%s/random_sampled-%d.png' % (args.result_dir, epoch + 1)) '''保存目前训练好的模型''' # 保存模型 is_best = test_avg_loss < best_test_loss best_test_loss = min(test_avg_loss, best_test_loss) save_checkpoint({ 'epoch': epoch, # 迭代次数 'best_test_loss': best_test_loss, # 目前最佳的损失函数值 'state_dict': model.state_dict(), # 当前训练过的模型的参数 'optimizer': optimizer.state_dict(), }, is_best, args.save_dir) return best_test_loss
5.2 导入ckpt继续训练
-
通常在训练模型的过程中,可能遭遇断电、断网的风险,为防止先前训练的模型中断无法恢复,每隔一段时间就将训练信息保存一次很有必要,以便后面用于推理或恢复训练;
-
checkpoint不仅可以保存模型参数、优化器参数、还有loss、epoch、额外的torch.nn.Embedding层等;
-
要保存多个组件,则将它们放到一个字典中,再使用toch.save()序列化这个字典,一般使用.tar文件格式来保存这些checkpoint;
-
要加载各个组件,首先初始化模型和优化器,然后使用torch.load()加载保存的字典,然后可以直接查询字典中的值来获取保存的组件;
def save_checkpoint(state, is_best, outdir): """ 每训练一定的epochs后, 判断损失函数是否是目前最优的,并保存模型的参数 """ if not os.path.exists(outdir): os.makedirs(outdir) checkpoint_file = os.path.join(outdir, 'checkpoint.pth') best_file = os.path.join(outdir, 'model_best.pth') # 把state保存在checkpoint_file文件夹中 torch.save(state, checkpoint_file) if is_best: shutil.copyfile(checkpoint_file, best_file)
-
承接上一节训练过程中保存checkpoint,save_checkpoint函数如上;当想恢复训练时,
model = VAE(z_dim=args.z_dim).to(device) # 初始化VAE模型,并转移到GPU上去 optimizer = optim.Adam(model.parameters(), lr=args.lr) # 初始化优化器,需要优化的是model的参数,learning rate设置为0.001 # 从checkpoint恢复训练 start_epoch = 0 best_test_loss = np.finfo('f').max # 初始化为最大浮点数 if args.resume: if os.path.isfile(args.resume): # 载入已经训练过的模型参数与结果 checkpoint = torch.load(args.resume) start_epoch = checkpoint['epoch'] + 1 best_test_loss = checkpoint['best_test_loss'] model.load_state_dict(checkpoint['state_dict']) optimizer.load_state_dict(checkpoint['optimizer']) else: print('=> no checkpoint found at %s' % args.resume)
5.3 模型测试
-
如5.1节中所示,我们可以在每次迭代结束后对模型进行测试,保留最优的checkpoint,也可以在模型训练完成后再对模型进行测试
-
单独进行模型测试的代码结构如下
for epoch in range(1, args.epochs + 1): train(epoch) test(epoch) def train(epoch): model.train() train_loss = 0 for batch_idx, (data, _) in enumerate(train_loader): ... def test(epoch): model.eval() test_loss = 0 for data, _ in test_loader: ...
6. 可视化显示
-
主要是绘制loss随epoch迭代的变化关系图
if __name__ == '__main__': loss_epoch = main() plt.plot(loss_epoch) plt.xlabel('epoch') plt.ylabel('loss') plt.show()
7. 模型保存与加载
-
模型训练好后,可以将模型保存下来
- 保存模型参数
# 保存 torch.save(model.state_dict(), 'your_path/vae_mnist.pth') # 加载 loaded_model = ModelClass(*args, **kwargs) # 初始化模型实例 loaded_model.load_state_dict(torch.load(your_path/vae_mnist.pth)) # 更新模型参数
- 保存整个模型
# 保存 torch.save(your_model, your_path/your_model_name) # 加载 the_model = torch.load(your_path/your_model_name)
8. 完整模型代码
-
vae模型代码
from torch import nn import torch import torch.nn.functional as F class VAE(nn.Module): def __init__(self, input_dim=784, h_dim=400, z_dim=20): # 调用父类方法初始化模块的state super(VAE, self).__init__() self.input_dim = input_dim self.h_dim = h_dim self.z_dim = z_dim # 编码器 : [b, input_dim] => [b, z_dim] self.fc1 = nn.Linear(input_dim, h_dim) # 第一个全连接层 self.fc2 = nn.Linear(h_dim, z_dim) # mu self.fc3 = nn.Linear(h_dim, z_dim) # log_var # 解码器 : [b, z_dim] => [b, input_dim] self.fc4 = nn.Linear(z_dim, h_dim) self.fc5 = nn.Linear(h_dim, input_dim) def forward(self, x): """ 向前传播部分, 在model_name(inputs)时自动调用 :param x: the input of our training model [b, batch_size, 1, 28, 28] :return: the result of our training model """ batch_size = x.shape[0] # flatten [b, batch_size, 1, 28, 28] => [b, batch_size, 784] # tensor.view()方法可以调整tensor的形状,但必须保证调整前后元素总数一致 # view不会修改自身的数据,返回的新tensor与原tensor共享内存,即更改一个,另一个也随之改变。 x = x.view(batch_size, self.input_dim) # 一行代表一个样本 # encoder mu, log_var = self.encode(x) # reparameterization trick sampled_z = self.reparameterization(mu, log_var) # decoder x_hat = self.decode(sampled_z) # reshape x_hat = x_hat.view(batch_size, 1, 28, 28) return x_hat, mu, log_var def encode(self, x): """ encoding part :param x: input image :return: mu and log_var """ h = F.relu(self.fc1(x)) mu = self.fc2(h) log_var = self.fc3(h) return mu, log_var def decode(self, z): """ Given a sampled z, decode it back to image """ h = F.relu(self.fc4(z)) x_hat = torch.sigmoid(self.fc5(h)) # 图片数值取值为[0,1],不宜用ReLU return x_hat def reparameterization(self, mu, log_var): """ Given a standard gaussian distribution epsilon ~ N(0,1), we can sample the random variable z as per z = mu + sigma * epsilon """ sigma = torch.exp(log_var * 0.5) eps = torch.randn_like(sigma) return mu + sigma * eps # *是点乘的意思
-
main主函数代码
# 我们使用pytorch来训练模型,首先导入必要的库(模块module、包package)/相关类 import torch from torch import optim import torch.nn.functional as F from torch.utils.data import DataLoader from torchvision import transforms, datasets from torchvision.utils import save_image from vae import VAE import matplotlib.pyplot as plt import argparse import os import shutil import numpy as np # 设置模型运行的设备 cuda = torch.cuda.is_available() device = torch.device("cuda" if cuda else "cpu") # 设置默认参数,包括一些超参和其他配置参数,用到argparser这个库 parser = argparse.ArgumentParser(description="Variational Auto-Encoder MNIST Example") parser.add_argument('--result_dir', type=str, default='./VAEResult', metavar='DIR', help='output directory') parser.add_argument('--save_dir', type=str, default='./checkPoint', metavar='N', help='model saving directory') parser.add_argument('--batch_size', type=int, default=128, metavar='N', help='batch size for training(default: 128)') parser.add_argument('--epochs', type=int, default=200, metavar='N', help='number of epochs to train(default: 200)') parser.add_argument('--seed', type=int, default=1, metavar='S', help='random seed(default: 1)') parser.add_argument('--resume', type=str, default='', metavar='PATH', help='path to latest checkpoint(default: None)') parser.add_argument('--test_every', type=int, default=10, metavar='N', help='test after every epochs') parser.add_argument('--num_worker', type=int, default=1, metavar='N', help='the number of workers') parser.add_argument('--lr', type=float, default=1e-3, help='learning rate(default: 0.001)') parser.add_argument('--z_dim', type=int, default=20, metavar='N', help='the dim of latent variable z(default: 20)') parser.add_argument('--input_dim', type=int, default=28 * 28, metavar='N', help='input dim(default: 28*28 for MNIST)') parser.add_argument('--input_channel', type=int, default=1, metavar='N', help='input channel(default: 1 for MNIST)') args = parser.parse_args() # num_workers设置载入输入所用的子进程的个数 kwargs = {'num_workers': 2, 'pin_memory': True} if cuda else {} # 加载MNIST手写数据集,包括训练数据集和测试数据集 def dataloader(batch_size=128, num_workers=2): transform = transforms.Compose([ transforms.ToTensor(), ]) # 下载mnist数据集 mnist_train = datasets.MNIST('mnist', train=True, transform=transform, download=True) mnist_test = datasets.MNIST('mnist', train=False, transform=transform, download=True) # 载入mnist数据集 # 该函数会先打乱数据再按batch_size取数据 mnist_train = DataLoader(mnist_train, batch_size=batch_size, shuffle=True) mnist_test = DataLoader(mnist_test, batch_size=batch_size, shuffle=True) classes = ('0', '1', '2', '3', '4', '5', '6', '7', '8', '9') return mnist_test, mnist_train, classes # VAE的损失由重构损失和KL散度构成 def loss_function(x_hat, x, mu, log_var): """ Calculate the loss. Note that the loss includes two parts. :return: total loss, BCE and KLD of our model """ # 1. the reconstruction loss. # We regard the MNIST as binary classification BCE = F.binary_cross_entropy(x_hat, x, reduction='sum') # 2. KL-divergence KLD = 0.5 * torch.sum(torch.exp(log_var) + torch.pow(mu, 2) - 1. - log_var) # 3. total loss loss = BCE + KLD return loss, BCE, KLD # 测试模型:对本次迭代的模型进行测试,记录最优loss并保存对应的check point def test(model, optimizer, mnist_test, epoch, best_test_loss): test_avg_loss = 0.0 with torch.no_grad(): # 这一部分不计算梯度,也就是不放入计算图中去 '''测试测试集中的数据''' # 计算所有batch的损失函数的和 for test_batch_index, (test_x, _) in enumerate(mnist_test): test_x = test_x.to(device) # 前向传播 test_x_hat, test_mu, test_log_var = model(test_x) # 损害函数值 test_loss, test_BCE, test_KLD = loss_function(test_x_hat, test_x, test_mu, test_log_var) test_avg_loss += test_loss # 对和求平均,得到每一张图片的平均损失 test_avg_loss /= len(mnist_test.dataset) '''测试随机生成的隐变量''' # 随机从隐变量的分布中取隐变量 z = torch.randn(args.batch_size, args.z_dim).to(device) # 每一行是一个隐变量,总共有batch_size行 # 对隐变量重构 random_res = model.decode(z).view(-1, 1, 28, 28) # 保存重构结果 save_image(random_res, './%s/random_sampled-%d.png' % (args.result_dir, epoch + 1)) '''保存目前训练好的模型''' # 保存模型 is_best = test_avg_loss < best_test_loss best_test_loss = min(test_avg_loss, best_test_loss) save_checkpoint({ 'epoch': epoch, # 迭代次数 'best_test_loss': best_test_loss, # 目前最佳的损失函数值 'state_dict': model.state_dict(), # 当前训练过的模型的参数 'optimizer': optimizer.state_dict(), }, is_best, args.save_dir) return best_test_loss # checkpoint不仅可以保存模型参数、优化器参数、还有loss、epoch、额外的torch.nn.Embedding层等 def save_checkpoint(state, is_best, outdir): """ 每训练一定的epochs后, 判断损失函数是否是目前最优的,并保存模型的参数 """ if not os.path.exists(outdir): os.makedirs(outdir) checkpoint_file = os.path.join(outdir, 'checkpoint.pth') best_file = os.path.join(outdir, 'model_best.pth') # 把state保存在checkpoint_file文件夹中 torch.save(state, checkpoint_file) if is_best: shutil.copyfile(checkpoint_file, best_file) # 模型训练和测试主函数 def main(): # Step 1: 载入数据 mnist_test, mnist_train, classes = dataloader(args.batch_size, args.num_worker) # 查看每一个batch图片的规模 x, label = iter(mnist_train).__next__() # 取出第一批(batch)训练所用的数据集 print(' img : ', x.shape) # img : torch.Size([batch_size, 1, 28, 28]), 每次迭代获取batch_size张图片,每张图大小为(1,28,28) # Step 2: 准备工作 : 搭建计算流程 model = VAE(z_dim=args.z_dim).to(device) # 创建VAE模型实例,并转移到GPU上去 optimizer = optim.Adam(model.parameters(), lr=args.lr) # 初始化优化器,需要优化的是model的参数,learning rate设置为0.001 # Step 3: 从checkpoint恢复训练 start_epoch = 0 best_test_loss = np.finfo('f').max # 初始化为最大浮点数 if args.resume: if os.path.isfile(args.resume): # 载入已经训练过的模型参数与结果 checkpoint = torch.load(args.resume) start_epoch = checkpoint['epoch'] + 1 best_test_loss = checkpoint['best_test_loss'] model.load_state_dict(checkpoint['state_dict']) optimizer.load_state_dict(checkpoint['optimizer']) if not os.path.exists(args.result_dir): os.makedirs(args.result_dir) # Step 4: 开始迭代 loss_epoch = [] for epoch in range(start_epoch, args.epochs): # 训练模型 # 每次迭代都要遍历所有的批次 loss_batch = [] for batch_index, (x, _) in enumerate(mnist_train): # x : [b, 1, 28, 28], remember to deploy the input on GPU x = x.to(device) # 前向传播 x_hat, mu, log_var = model(x) # 模型调用会自动调用model中的forward函数 loss, BCE, KLD = loss_function(x_hat, x, mu, log_var) # 计算损失值,即目标函数 loss_batch.append(loss.item()) # loss是Tensor类型 # 后向传播 optimizer.zero_grad() # 梯度清零,否则上一步的梯度仍会存在 loss.backward() # 后向传播计算梯度,这些梯度会保存在model.parameters里面 optimizer.step() # 更新梯度,这一步与上一步主要是根据model.parameters联系起来了 # print statistics every 100 batch if (batch_index + 1) % 100 == 0: print('Epoch [{}/{}], Batch [{}/{}] : Total-loss = {:.4f}, BCE-Loss = {:.4f}, KLD-loss = {:.4f}' .format(epoch + 1, args.epochs, batch_index + 1, len(mnist_train.dataset) // args.batch_size, loss.item() / args.batch_size, BCE.item() / args.batch_size, KLD.item() / args.batch_size)) if batch_index == 0: # visualize reconstructed result at the beginning of each epoch x_concat = torch.cat([x.view(-1, 1, 28, 28), x_hat.view(-1, 1, 28, 28)], dim=3) save_image(x_concat, './%s/reconstructed-%d.png' % (args.result_dir, epoch + 1)) # 把这一个epoch的每一个样本的平均损失存起来 loss_epoch.append(np.sum(loss_batch) / len(mnist_train.dataset)) # len(mnist_train.dataset)为样本个数 # 测试模型 if (epoch + 1) % args.test_every == 0: best_test_loss = test(model, optimizer, mnist_test, epoch, best_test_loss) return loss_epoch if __name__ == '__main__': loss_epoch = main() plt.plot(loss_epoch) plt.xlabel('epoch') plt.ylabel('loss') plt.show()
9. 用到的函数补充介绍
9.1 transforms.Compose()
- 将通过Compose函数将各种转换操作组合起来,然后直接通过组合后的变量.xx来调用
9.2 nums_worker
-
torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4)的nums_worker参数
-
nums_worker用来加载batch数据的worker进程数
- nums_worker = 0表示只由主进程加载,可能比较慢
- nums_worker = 1表示只有一个worker进程加载,主进程不参与数据加载,也可能比较慢
- nums_worker > 0表示只有指定数量的进程去加载数据,主进程不参与
-
nums_worker越多资源消耗也越大,所以要根据batch_size和机器性能来决定;
一般将nums_worker设置为CPU支持线程数
9.3 model.to(device) 或 var.to(device)
-
model.to(device):表示将模型加载到指定设备上,
- device=torch.device(“cpu”)表示使用CPU
- device=torch.device(“cuda”)表示使用GPU
-
var.to(device):表示将所有最开始读取数据时的tensor变量copy一份到device所指定的GPU上去,之后的运算都在GPU上进行
-
.cuda()和.to(device)的区别:两个方法都可以达到同样的效果,在pytorch中,即使是有GPU的机器,它也不会自动使用GPU,而是需要在程序中显示指定。调用model.cuda(),可以将模型加载到GPU上去,但这种方法不被提倡,而建议使用model.to(device)的方式,这样可以显示指定需要使用的计算资源,特别是有多个GPU的情况下
9.4 model.train() 和 model.eval()
如果模型中有BN层(Batch Normalization)和Dropout,
-
model.train()的作用是 启用 Batch Normalization 和 Dropout
- 保证BN层能够用到每一批数据的均值mean和方差var;
- 对于Dropout,model.train()是随机取一部分网络连接来训练更新参数
-
model.eval()的作用是 不启用 Batch Normalization 和 Dropout
- 保证BN层能够用全部训练数据的均值mean和方差var;即测试过程中要保证BN层的均值和方差不变
- 对于Dropout,model.eval()是利用到了所有网络连接,即不进行随机舍弃神经元
-
总结来讲:
-
在train模式下,dropout网络层会按照设定的参数设置保留激活单元的概率(保留概率=p); BN层会继续计算数据的mean和var等参数并更新
-
在eval模式下,dropout层会让所有的激活单元都通过,而BN层会停止计算和更新mean和var,直接使用在训练阶段已经学出的mean和var值
-
如果含有BN层和Dropout,如果不加model.eval(),有输入数据,即使不训练,也会改变权值
-
9.5 with torch.no_grad()
-
with torch.no_grad()主要是用于停止autograd模块的工作,以起到加速和节省显存的作用,因为不用计算和存储梯度
-
它的作用是将该with语句包裹起来的部分停止梯度的更新,从而节省了GPU算力和显存,但是并不会影响dropout和BN层的行为
9.6 为什么使用方差对数而不是方差
- encode后返回均值和方差,但是方差要求非负,而神经网络返回是不受控制的,故这里拟合的是对数方差log_var而不是方差var
- 对数方差和标准差关系
s t d 2 = e l o g _ v a r std^{2} = e^{log\_var} std2=elog_var
s t d = e l o g _ v a r 2 std = e^{\frac{log\_var}{2} } std=e2log_var
9.7 encode末尾不能再接ReLU
- 激活层是为了添加矩阵运算的非线性
- 在优化的过程中,隐变量z是要逐渐趋于N(0, I)的,如果加ReLU的话,本身假设的隐变量维度就很小,小于0的隐变量直接没了,decode时会因为信息不足而崩掉
9.8 为什么重构损失用BCE而不是MSE
- 因为MNIST是黑白二值图像,重构损失直接就用BCE = 二分类交叉熵损失函数
用MSE也可以,如果是三通道图像如RGB图像,则必须使用MSE而不是BCE
10. 模型部署
见文章: To Do
创作不易,如有帮助,请点赞收藏支持
【参考文章】
pytorch中DataLoader的num_workers参数详解
pytorch模型保存与加载
checkpoint保存和加载1
checkpoint保存和加载2
pytorch中to(device)的用法
with torch.no_grad()
GAN和VAE的本质区别
使用VAE生成神奇宝贝
VAE模型详解1
VAE模型的理解与实现
VAE模型的pytorch实现1
VAE模型的pytorch实现2
VAE模型的pytorch实现3
VAE模型的pytorch实现4(推荐)
MNIST手写数字识别-pytorch实现(推荐)文章来源:https://www.toymoban.com/news/detail-636458.html
created by shuaixio, 2023.08.06文章来源地址https://www.toymoban.com/news/detail-636458.html
到了这里,关于【机器学习】VAE算法的pytorch实现-MNIST手写数据识别的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!