PyTorch多进程模型推理

这篇具有很好参考价值的文章主要介绍了PyTorch多进程模型推理。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

进程和线程

进程:一个在内存中运行的应用程序,每个进程有自己独立的一块内存空间。资源分配的最小单位

线程:进程中的一个执行单元,程序执行的最小单位。一个进程可以有多个线程。

Python的多线程特点:在Python中,由于GIL的存在,在多线程的时候,同一时间只能有一个线程在CPU上运行,而且是单个CPU,不管CPU核数为多少。所以,Python不能利用多线程发挥多核的优势,但是,可以通过多进程实现多核任务。多个Python进程有各自独立的GIL锁,互不影响。

什么时候使用多线程/多进程:在python中,如果一个进程包含多个线程,做CPU密集型任务时,多线程并不能有多少效率提升,相反可能还会因为线程的频繁切换导致效率下降,此时推荐使用多进程;如果做IO密集型任务,多线程的进程可以利用IO阻塞等待时的空闲时间执行其他线程,提升效率。

Python中单线程、多线程和多进程的效率对比实验 | 菜鸟教程 (runoob.com)

Python多进程实现方法

多进程的实现

Python的多进程是通过multiprocessing模块实现,和多线程的threading.Thread差不多,它可以利用multiprocessing.Process对象来创建一个进程对象。这个进程对象的方法和线程对象的方法差不多,也有start(), run(), join()等方法

from multiprocessing import  Process

def fun1(name):
    print('测试%s多进程' %name)

if __name__ == '__main__':
    process_list = []
    for i in range(5):  # 开启 5 个子进程执行fun1函数
        p = Process(target=fun1,args=('Python',))  # 实例化进程对象
        p.start()
        process_list.append(p)

    for i in process_list:
        p.join()

    print('结束测试')

多进程之间的通信

由于每个进程有自己独立的一块内存空间,系统独立分配资源(CPU、内存),因此进程之间是独立的。每启动一个新的进程相当于把数据进行了一次克隆,子进程里的数据修改无法影响到主进程中的数据,不同子进程之间的数据也不能共享,这是多进程使用时与多线程的区别。所以,不同的多进程之间需要通信

常用的多进程之间的通信方式有:队列Queue、管道Pipe、Managers。

Queue和Pipe实现了数据交互,并没实现数据共享,即一个进程去更改另一个进程的数据,需要用到Managers来共享内存。

以Queue为例,Python中多进程的通信如下:

def func1(i):
    time.sleep(1)
    print(f'args {i}')

def run__queue():
    from multiprocessing import Process, Queue

    queue = Queue(maxsize=4)  # the following attribute can call in anywhere
    queue.put(True)
    queue.put([0, None, object])  # you can put deepcopy thing
    queue.qsize()  # the length of queue
    print(queue.get())  # First In First Out
    print(queue.get())  # First In First Out
    queue.qsize()  # the length of queue

    process = [Process(target=func1, args=(queue,)),
               Process(target=func1, args=(queue,)), ]
    [p.start() for p in process]
    [p.join() for p in process]

if __name__ =='__main__':
    run__queue()

进程池

进程池维护一个进程序列,使用时去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。

进程池中有两个方法:apply:同步,一般不使用;apply_async:异步,常用。

但是pool.apply_async不能和pytorch推理一起用,受到spwan的影响。因为GPU模型Pytorch规定多进程的启动方法必须是“spawn”,使用map_async或者apply_async这类方法都不行。

Pytorch 多进程在单卡上测试_咆哮的阿杰的博客-CSDN博客_单卡多进程计算

并且,进程池pool的两个父子进程之间通信不能用Queue,需要Manager。

  • Pool和Process的区别

  • Process需要自己管理进程,起一个Process就是起一个新进程;

  • Pool是进程池,它可以开启固定数量的进程,然后将任务放到一个池子里,系统来调度多进程执行池子里的任务;

参考

  • Python如何使用多进程Process、Pool、Queue、Manager等

一篇文章搞定Python多进程(全) - 知乎 (zhihu.com)

在Python中优雅地用多进程 - 知乎 (zhihu.com)

  • Python使用 Pool.apply_async 和 Manager.Queue实现进程池通信

Python高级——消息队列(Queue)与进程池(Pool)_HMMHMH的博客-CSDN博客

Python中的Queue与多进程(multiprocessing)_SQZHAO的博客-CSDN博客_python queue 多进程

Pytorch结合多进程的使用场景

场景1:读取图片数据,判断是否损坏

只需要给函数open_image使用多进程,不需要考虑进程通信,采用进程池Pool的imap方法

def open_image(img_name):
    try:
        Image.open(os.path.join(image_path, img_name))
    except:
        return img_name

def detect_broken_delete(img_path, delete=False):
    listiter = os.listdir(img_path)
    process_num = max(cpu_count() - 2, 1)
    with Pool(process_num) as pool:
        output = set(tqdm(pool.imap(open_image, listiter), total=len(listiter)))
    print("broken images:", output)
    if delete:
        # delete broken images
        for img in output:
            if img:
                os.remove(os.path.join(img_path, img))
        print("broken images have been deleted.")

场景2:Pytorch减少模型推理时间

在有些时候,pytorch模型的性能瓶颈可能不在模型推断,而是在图像的预处理和后处理。这时候将torch模型转为onnx或者tensorRT收益不大,但可以使用多进程缩短前后处理时间和推理时间。

下面的例子为人脸解析face parsing模型推理案例:推理1w张图片大概从20min缩短为10min。

简单描述:

  1. 首先定义三个函数,分别为预处理preprocess_img,模型推断inference,后处理afterprocess_img(将人脸解析的分割mask作用到原图上,提取想要的人脸部分)

  1. 然后需要定义进程函数,用于读取Queue的数据并用前三个函数处理,实现进程之间的通信。进程函数的输入为queue相关的或者全局变量,然后通过queue的get和put方法实现数据传递。

  1. 总的流程为,第一个进程函数getimgpath_process获取图像,存到第一个队列img_path_queue;第二个进程函数preprocessimg_process,读取第一个队列,并预处理,结果存到第二个队列img_queue;第三个进程函数inference_process读取第二个队列,做出推断,结果存在第三个队列result_queue;第四个进程函数afterprocessimg_process读取第三个队列,做后处理并保存。

  1. 一共有三个Queue存数据、四个Process处理数据。在evaluate_multiprocess中,第一个函数读取全部图像路径,只采用单个进程;后面三个进程函数都采用了8个进程数量。最后关闭进程。

  1. 注意,需要通过 torch.multiprocessing.set_start_method("spawn")来设置pytorch多进程和cuda的使用。这样可以在单卡上同时处理多个图片,8个进程就可以同时单卡处理8张图片;当然,需要注意显存的使用,可以和onnx等结合加速推理和减少显存占用。

from model import BiSeNet
import torch
import os
import os.path as osp
import numpy as np
from PIL import Image, ImageFile
from tqdm import tqdm
import time
import torchvision.transforms as transforms
from multiprocessing import Pool, cpu_count, Process, Queue
import torch.multiprocessing as mp
ImageFile.LOAD_TRUNCATED_IMAGES = True
Image.MAX_IMAGE_PIXELS = None


to_tensor = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)),
])

def load_net(model_path="res/cp/79999_iter.pth"):
    n_classes = 19
    net = BiSeNet(n_classes=n_classes)
    net.cuda()
    net.load_state_dict(torch.load(model_path))
    net.eval()
    return net

def preprocess_img(img_path):
    img = Image.open(img_path)
    image = img.resize((512, 512), Image.BILINEAR).convert('RGB')
    img_arr = np.array(image)
    img = to_tensor(image)
    img = torch.unsqueeze(img, 0)
    return img, img_arr

def inference(net, img):
    with torch.no_grad():
        img = img.cuda()
        out = net(img)[0]
        parsing = out.squeeze(0).cpu().numpy().argmax(0)
        return parsing

def afterprocess_img(parsing, img_arr, img_path, res_path):
    mask = np.zeros_like(img_arr)
    indices = np.isin(parsing, [1, 2, 3, 10, 12, 13])  # face_dataset config: 1:skin, 2:l_brow, 3:r_brow, 4:nose, 12:u_lip, 13:l_lip
    mask[indices] = img_arr[indices]
    img_mask = Image.fromarray(mask)
    img_mask.save(osp.join(res_path, osp.basename(img_path)))
    print(f"save {osp.basename(img_path)}")


# accelerate infer by multiprocess, useing Queue in communication between processes (4 Processes & 3 Queues)
# ==================================================================================
# P(getimgpath)-----P(preprocess)-----P(inference)-----P(afterprocess)-->save img
#                |                 |                |
#        Queue(img_path)        Queue(img)     Queue(result)
# ==================================================================================

# get img_path process
def getimgpath_process(root_path, img_path_queue):
    for img_name in os.listdir(root_path):
        img_path = osp.join(root_path, img_name)
        img_path_queue.put(img_path)

# preprocess img process
def preprocessimg_process(img_path_queue, img_queue):
    while True:
        img_path = img_path_queue.get()
        img, img_arr = preprocess_img(img_path)
        img_queue.put((img, img_arr, img_path))

# inference process
def inference_process(net, img_queue, result_queue):
    while True:
        img, img_arr, img_path = img_queue.get()
        parsing = inference(net, img)
        result_queue.put((parsing, img_arr, img_path))

# afterprocess img process
def afterprocessimg_process(result_queue, res_path):
    while True:
        parsing, img_arr, img_path = result_queue.get()
        afterprocess_img(parsing, img_arr, img_path, res_path)


def evaluate_multiprocess(net, root_path, res_path):
    if not os.path.exists(res_path):
        os.makedirs(res_path)
    mp.set_start_method("spawn")
    img_path_queue, img_queue, result_queue = Queue(), Queue(), Queue()
    # pool.apply_async can not use in spawn.
    Imagepath_Process = Process(target=getimgpath_process, args=(root_path, img_path_queue))
    Imagepath_Process.start()
    for i in range(8):
        Preprocess_Process = Process(target=preprocessimg_process, args=(img_path_queue, img_queue))
        Preprocess_Process.start()
    for i in range(8):
        Inference_Process = Process(target=inference_process, args=(net, img_queue, result_queue))
        Inference_Process.start()
    for i in range(8):
        Afterprocess_Process = Process(target=afterprocessimg_process, args=(result_queue, res_path))
        Afterprocess_Process.start()
    # Imagepath_Process.start()
    # Preprocess_Process.start()
    # Inference_Process.start()
    # Afterprocess_Process.start()
    time.sleep(1)  # wait process starting, replace of `join`
    stime = time.time()
    while True:
        if(result_queue.empty() and (img_queue.empty()) and (img_path_queue.empty())):
            time.sleep(1) # wait final process ending
            Imagepath_Process.terminate()
            Preprocess_Process.terminate()
            Inference_Process.terminate()
            Afterprocess_Process.terminate()
            img_path_queue.close()
            img_queue.close()
            result_queue.close()
            break
        else:
            pass
    etime = time.time()
    print(f"all images cost {etime - stime} seconds")


if __name__ == "__main__":
    net = load_net()
    root_path="test"
    res_path='testtest'
    evaluate_multiprocess(net, root_path, res_path)

参考

多进程缩短推理时间 - 知乎 (zhihu.com)

pytorch多进程最佳实践_小篆的博客-CSDN博客_pytorch 多进程文章来源地址https://www.toymoban.com/news/detail-790462.html

到了这里,关于PyTorch多进程模型推理的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • deepspeed训练报错torch.distributed.elastic.multiprocessing.errors.ChildFailedError

    deepspeed训练报错torch.distributed.elastic.multiprocessing.errors.ChildFailedError

    测试场景:使用deepspeed框架训练gpt模型 问题: 报错torch.distributed.elastic.multiprocessing.errors.ChildFailedError 具体见截图: 解决办法: 含义:表明在运行 train.py 脚本时,传递了一个未被识别的参数 --local-rank=1 这里我在train.py脚本文件中果然没有找到–local-rank参数,在很多的parser

    2024年02月08日
    浏览(9)
  • 使用 multiprocessing 多进程处理批量数据

    使用 multiprocessing 多进程处理批量数据

    示例代码 multiprocessing.Pool 创建进程池, 传入的参数是要要使用的 CPU 内核数量, 直接用 cpu_count() 可以拿到当前硬件配置所有的 CPU 内核数. pool.map 可以直接将处理后的结果拼接成一个 list 对象 应用在实际数据处理代码的效果对比: 普通处理方式, 用时 221 秒 多进程处理方式, 用时

    2024年02月09日
    浏览(15)
  • Python进程池multiprocessing.Pool

    Python进程池multiprocessing.Pool

    鲲鹏920:192核心 内存:756G python:3.9 在做单纯的cpu计算的场景,使用单进程核多进程的耗时做如下测试: 单进程情况下cpu的占用了如下,占用一半的核心数: 每一步和总耗时如下: cpu占用如下,每个进程基本占用48个左右核心数; 多进程的耗时如下: 每一个进程的耗时为

    2024年01月17日
    浏览(14)
  • PyTorch多GPU训练模型——使用单GPU或CPU进行推理的方法

    PyTorch多GPU训练模型——使用单GPU或CPU进行推理的方法

    PyTorch提供了非常便捷的多GPU网络训练方法: DataParallel 和 DistributedDataParallel 。在涉及到一些复杂模型时,基本都是采用多个GPU并行训练并保存模型。但在推理阶段往往只采用单个GPU或者CPU运行。这时怎么将多GPU环境下保存的模型权重加载到单GPU/CPU运行环境下的模型上成了一

    2024年02月09日
    浏览(12)
  • Python中使用multiprocessing模块创建进程

    Python中使用multiprocessing模块创建进程

            在计算机编程领域,多进程编程是一种常见的并发编程技术,特别适用于利用多核处理器来提高程序性能和并行处理任务。Python作为一种功能强大的编程语言,提供了多种方法来实现多进程编程。其中,multiprocessing模块为我们提供了一种简单而强大的方式来创建和

    2024年02月22日
    浏览(12)
  • Python自学:使用多进程处理 multiprocessing

    以下代码没有使用多进程。 输出为: Sleeping 1 second… Done Sleep… Sleeping 1 second… Done Sleep… Finished in 2.03 second(s) 以下代码使用了多进程。 输出为: Sleeping 1 second… Sleeping 1 second… Done Sleep… Done Sleep… Finished in 1.07 second(s) 输出为: Sleeping 1.5 second(s)… Sleeping 1.5 second(s)… Slee

    2024年02月09日
    浏览(16)
  • Python分享之多进程探索 (multiprocessing包)

    在初步了解Python多进程之后,我们可以继续探索multiprocessing包中更加高级的工具。这些工具可以让我们更加便利地实现多进程。   进程池 进程池 (Process Pool)可以创建多个进程。这些进程就像是随时待命的士兵,准备执行任务(程序)。一个进程池中可以容纳多个待命的士兵。

    2024年02月08日
    浏览(12)
  • Queue的多线程爬虫和multiprocessing多进程

    Queue的模块里面提供了同步的、线程安全的队列类,包括FIFO(先入后出)队列Queue、FIFO(后入先出)LifoQueue和优先队列PriorityQueue。 (在上个文件创建了爬取文件) 我们使用这个方法来获取,代码如下: 对象传入myThread中; thread = myThread(tName,workQueue)  使用一个for循环来

    2024年04月14日
    浏览(12)
  • 【python】tkinter使用多进程打包成exe后multiprocessing无法关闭对应进程

    这是由于multiprocessing模块在Windows操作系统下使用fork方法创建子进程时会导致打包成exe后无法正常运行的问题。 可以尝试使用freeze_support函数来解决这个问题。freeze_support函数是在Windows操作系统下用于支持multiprocessing模块的函数。 下面是一个示例代码: 在代码的开头,我们导

    2024年02月11日
    浏览(15)
  • Python进程池multiprocessing.Pool八个函数对比

    Python的multiprocessing.Pool类提供了多种方法来分发任务给进程池中的工作进程。这些方法在功能和用途上有所不同,适用于不同的场景。以下是multiprocessing.Pool中八个主要函数的对比 apply() 功能:阻塞地执行一个函数,直到这个函数的执行完成。 用法:apply(func, args=(), kwds={}) 特

    2024年02月04日
    浏览(14)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包