浅谈errgroup的使用以及源码分析

这篇具有很好参考价值的文章主要介绍了浅谈errgroup的使用以及源码分析。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

本文讲解的是golang.org/x/sync这个包中的errgroup

1、errgroup 的基础介绍

学习过 Go 的朋友都知道 Go 实现并发编程是比较容易的事情,只需要使用go关键字就可以开启一个 goroutine。那对于并发场景中,如何实现goroutine的协调控制呢?常见的一种方式是使用sync.WaitGroup 来进行协调控制。

使用过sync.WaitGroup 的朋友知道,sync.WaitGroup 虽然可以实现协调控制,但是不能传递错误,那该如何解决呢?聪明的你可能马上想到使用 chan 或者是 context来传递错误,确实是可以的。那接下来,我们一起看看官方是怎么实现上面的需求的呢?

1.1 errgroup的安装

安装命令:

go get golang.org/x/sync

//下面的案例是基于v0.1.0 演示的
go get golang.org/x/sync@v0.1.0

1.2 errgroup的基础例子

这里我们需要请求3个url来获取数据,假设请求url2时报错,url3耗时比较久,需要等一秒。

package main

import (
	"errors"
	"fmt"
	"golang.org/x/sync/errgroup"
	"strings"
	"time"
)

func main()  {
	queryUrls := map[string]string{
		"url1": "http://localhost/url1",
		"url2": "http://localhost/url2",
		"url3": "http://localhost/url3",
	}

	var eg errgroup.Group
	var results []string

	for _, url := range queryUrls {
		url := url
		eg.Go(func() error {
			result, err := query(url)
			if err != nil {
				return err
			}
			results = append(results, fmt.Sprintf("url:%s -- ret: %v", url, result))
			return nil
		})
	}
	
  // group 的wait方法,等待上面的 eg.Go 的协程执行完成,并且可以接受错误
	err := eg.Wait()
	if err != nil {
		fmt.Println("eg.Wait error:", err)
		return
	}

	for k, v := range results {
		fmt.Printf("%v ---> %v\n", k, v)
	}
}

func query(url string) (ret string, err error) {
	// 假设这里是发送请求,获取数据
	if strings.Contains(url, "url2") {
		// 假设请求 url2 时出现错误
		fmt.Printf("请求 %s 中....\n", url)
		return "", errors.New("请求超时")
	} else if strings.Contains(url, "url3") {
		// 假设 请求 url3 需要1秒
		time.Sleep(time.Second*1)
	}
	fmt.Printf("请求 %s 中....\n", url)
	return "success", nil
}

执行结果:

请求 http://localhost/url2 中....
请求 http://localhost/url1 中....
请求 http://localhost/url3 中....
eg.Wait error: 请求超时

果然,当其中一个goroutine出现错误时,会把goroutine中的错误传递出来。

我们自己运行一下上面的代码就会发现这样一个问题,请求 url2 出错了,但是依旧在请求 url3 。因为我们需要聚合 url1、url2、url3 的结果,所以当其中一个出现问题时,我们是可以做一个优化的,就是当其中一个出现错误时,取消还在执行的任务,直接返回结果,不用等待任务执行结果。

那应该如何做呢?

这里假设 url1 执行1秒,url2 执行报错,url3执行3秒。所以当url2报错后,就不用等url3执行结束就可以返回了。

package main

import (
	"context"
	"errors"
	"fmt"
	"golang.org/x/sync/errgroup"
	"strings"
	"time"
)

func main()  {
	queryUrls := map[string]string{
		"url1": "http://localhost/url1",
		"url2": "http://localhost/url2",
		"url3": "http://localhost/url3",
	}

	var results []string
	ctx, cancel := context.WithCancel(context.Background())
	eg, errCtx := errgroup.WithContext(ctx)

	for _, url := range queryUrls {
		url := url
		eg.Go(func() error {
			result, err := query(errCtx, url)
			if err != nil {
        //其实这里不用手动取消,看完源码就知道为啥了
				cancel()
				return err
			}
			results = append(results, fmt.Sprintf("url:%s -- ret: %v", url, result))
			return nil
		})
	}

	err := eg.Wait()
	if err != nil {
		fmt.Println("eg.Wait error:", err)
		return
	}

	for k, v := range results {
		fmt.Printf("%v ---> %v\n", k, v)
	}
}


func query(errCtx context.Context, url string) (ret string, err error) {
	fmt.Printf("请求 %s 开始....\n", url)
	// 假设这里是发送请求,获取数据
	if strings.Contains(url, "url2") {
		// 假设请求 url2 时出现错误
		time.Sleep(time.Second*2)
		return "", errors.New("请求出错")


	} else if strings.Contains(url, "url3") {
		// 假设 请求 url3 需要1秒
		select {
		case <- errCtx.Done():
			ret, err = "", errors.New("请求3被取消")
			return
		case <- time.After(time.Second*3):
			fmt.Printf("请求 %s 结束....\n", url)
			return "success3", nil
		}
	} else {
		select {
		case <- errCtx.Done():
			ret, err = "", errors.New("请求1被取消")
			return
		case <- time.After(time.Second):
			fmt.Printf("请求 %s 结束....\n", url)
			return "success1", nil
		}
	}

}

执行结果:

请求 http://localhost/url2 开始....
请求 http://localhost/url3 开始....
请求 http://localhost/url1 开始....
请求 http://localhost/url1 结束....
eg.Wait error: 请求出错

2、errgroup源码分析

看了上面的例子,我们对errgroup有了一定了解,接下来,我们一起看看errgroup做了那些封装。

2.1 errgroup.Group

errgroup.Group源码如下:

// A Group is a collection of goroutines working on subtasks that are part of
// the same overall task.
//
// A zero Group is valid, has no limit on the number of active goroutines,
// and does not cancel on error.
type Group struct {
  // context 的 cancel 方法
	cancel func()

	wg sync.WaitGroup
	
  //传递信号的通道,这里主要是用于控制并发创建 goroutine 的数量
  //通过 SetLimit 设置过后,同时创建的goroutine 最大数量为n
	sem chan token
	
  // 保证只接受一次错误
	errOnce sync.Once
  // 最先返回的错误
	err     error
}

看结构体中的内容,发现比原生的sync.WaitGroup多了下面的内容:

  • cancel func()
  • sem chan token
  • errOnce sync.Once
  • err error

2.2 WithContext 方法

// WithContext returns a new Group and an associated Context derived from ctx.
//
// The derived Context is canceled the first time a function passed to Go
// returns a non-nil error or the first time Wait returns, whichever occurs
// first.
func WithContext(ctx context.Context) (*Group, context.Context) {
	ctx, cancel := context.WithCancel(ctx)
	return &Group{cancel: cancel}, ctx
}

方法逻辑还是比较简单的,主要做了两件事:

  • 使用contextWithCancel()方法创建一个可取消的Context
  • context.WithCancel(ctx)创建的 cancel赋值给 Group中的cancel

2.3 Go

1.2 最后一个例子说,不用手动去执行 cancel 的原因就在这里。

g.cancel() //这里就是为啥不用手动执行 cancel的原因

// Go calls the given function in a new goroutine.
// It blocks until the new goroutine can be added without the number of
// active goroutines in the group exceeding the configured limit.
//
// The first call to return a non-nil error cancels the group's context, if the
// group was created by calling WithContext. The error will be returned by Wait.
func (g *Group) Go(f func() error) {
	if g.sem != nil {
    //往 sem 通道中发送空结构体,控制并发创建 goroutine 的数量
		g.sem <- token{}
	}

	g.wg.Add(1)
	go func() {
    // done()函数的逻辑就是当 f 执行完后,从 sem 取一条数据,并且 g.wg.Done()
		defer g.done()

		if err := f(); err != nil {
			g.errOnce.Do(func() { // 这里就是确保 g.err 只被赋值一次
				g.err = err
				if g.cancel != nil {
					g.cancel() //这里就是为啥不用手动执行 cancel的原因
				}
			})
		}
	}()
}

2.4 TryGo

看注释,知道此函数的逻辑是:当正在执行的goroutine数量小于通过SetLimit()设置的数量时,可以启动成功,返回 true,否则启动失败,返回false。

// TryGo calls the given function in a new goroutine only if the number of
// active goroutines in the group is currently below the configured limit.
//
// The return value reports whether the goroutine was started.
func (g *Group) TryGo(f func() error) bool {
	if g.sem != nil {
		select {
		case g.sem <- token{}: // 当g.sem的缓冲区满了过后,就会执行default,也代表着未启动成功
			// Note: this allows barging iff channels in general allow barging.
		default:
			return false
		}
	}
  
  //----主要看上面的逻辑,下面的逻辑和Go中的一样-------

	g.wg.Add(1)
	go func() {
		defer g.done()

		if err := f(); err != nil {
			g.errOnce.Do(func() {
				g.err = err
				if g.cancel != nil {
					g.cancel()
				}
			})
		}
	}()
	return true
}

2.5 Wait

代码逻辑很简单,这里主要注意这里:

//我看这里的时候,有点疑惑,为啥这里会去调用 cancel()方法呢?
//这里是为了代码的健壮性,用 context.WithCancel() 创建得到的 cancel,在代码执行完毕之前取消是一个好习惯
g.cancel()

// Wait blocks until all function calls from the Go method have returned, then
// returns the first non-nil error (if any) from them.
func (g *Group) Wait() error {
  g.wg.Wait() //通过 g.wg.Wait() 阻塞等待所有的 goroutine 执行完
	if g.cancel != nil {
    //我看这里的时候,有点疑惑,为啥这里会去调用 cancel()方法呢?
    //这里是为了代码的健壮性,用 context.WithCancel() 创建得到的 cancel,在代码执行完毕之前取消是一个好习惯
 		g.cancel()
	}
	return g.err
}

2.6 SetLimit

看代码的注释,我们知道:SetLimit的逻辑主要是限制同时执行的 goroutines 的数量为n,当n小于0时,没有限制。如果有运行的 goroutine,调用此方法会报错。

// SetLimit limits the number of active goroutines in this group to at most n.
// A negative value indicates no limit.
//
// Any subsequent call to the Go method will block until it can add an active
// goroutine without exceeding the configured limit.
//
// The limit must not be modified while any goroutines in the group are active.
func (g *Group) SetLimit(n int) {
	if n < 0 {
		g.sem = nil
		return
	}
	if len(g.sem) != 0 {
		panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem)))
	}
	g.sem = make(chan token, n)
}

3、errgroup 容易忽视的坑

这个坑是看别人的记录看到的,对errgroup不太熟悉时,是不小心确实容易掉进去,所以摘抄了过来,如果侵权,请联系删除,谢谢!

原文链接:并发编程包之 errgroup

需求:

开启多个Goroutine去缓存中设置数据,同时开启一个Goroutine去异步写日志,很快我的代码就写出来了:

package main

import (
	"context"
	"errors"
	"fmt"
	"golang.org/x/sync/errgroup"
	"time"
)

func main()  {
	g, ctx := errgroup.WithContext(context.Background())

	// 单独开一个协程去做其他的事情,不参与waitGroup
	go WriteChangeLog(ctx)

	for i:=0 ; i< 3; i++{
		g.Go(func() error {
			return errors.New("访问redis失败\n")
		})
	}
	if err := g.Wait();err != nil{
		fmt.Printf("appear error and err is %s",err.Error())
	}
	time.Sleep(1 * time.Second)
}

func WriteChangeLog(ctx context.Context) error {
	select {
	case <- ctx.Done():
		return nil
	case <- time.After(time.Millisecond * 50):
		fmt.Println("write changelog")
	}
	return nil
}

结果:

appear error and err is 访问redis失败

代码看着没有问题,但是日志一直没有写入。这是为什么呢?

其实原因就是因为这个ctxerrgroup.WithContext方法返回的一个带取消的ctx,我们把这个ctx当作父context传入WriteChangeLog方法中了,如果errGroup取消了,也会导致上下文的context都取消了,所以WriteChangelog方法就一直执行不到。

这个点是我们在日常开发中想不到的,所以需要注意一下~。

解决方法:

解决方法就是在 go WriteChangeLog(context.Background()) 传入新的ctx

参考资料:

八. Go并发编程--errGroup

并发编程包之 errgroup文章来源地址https://www.toymoban.com/news/detail-426370.html

上面这个案例中讲了一个容易忽视的坑,大家可以看看

到了这里,关于浅谈errgroup的使用以及源码分析的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • 浅谈 iframe的优缺点以及使用场景

    提示:以下是本篇文章正文内容 iframe 是嵌入式框架,是 HTML框架 ,还是一个 内联元素 ,iframe元素会创建包含另一个文档的内联框架(行内框架),说白了就是,iframe用来在页面嵌入其他的页面。 通常我们使用iframe直接在页面嵌套iframe标签指定src就可以了。 iframe能够把嵌入

    2024年02月05日
    浏览(12)
  • 第六章、用户体验五要素之框架层解析(本文作用是通俗讲解,让你更容易理解)

            结构层定义产品运行形式,框架层则用于确定用什么样的功能或者形式来实现。在框架层,功能型和信息型产品都需要信息设计,不同的是功能型还需要界面设计,而信息型产品则是导航设计。         1、界面设计:如果涉及提供给用户做某些事的能力,那就是界

    2024年02月09日
    浏览(12)
  • SolidUI AI生成可视化,0.1.0版本模块划分以及源码讲解

    SolidUI AI生成可视化,0.1.0版本模块划分以及源码讲解

    随着文本生成图像的语言模型兴起,SolidUI想帮人们快速构建可视化工具,可视化内容包括2D,3D,3D场景,从而快速构三维数据演示场景。SolidUI 是一个创新的项目,旨在将自然语言处理(NLP)与计算机图形学相结合,实现文生图功能。通过构建自研的文生图语言模型,SolidUI 利用

    2024年02月16日
    浏览(8)
  • python微博舆情分析系统 可视化 情感分析 爬虫 机器学习(源码+讲解)✅

    python微博舆情分析系统 可视化 情感分析 爬虫 机器学习(源码+讲解)✅

    🍅 大家好,今天给大家分享一个Python项目,感兴趣的可以先收藏起来,点赞、关注不迷路! 🍅 大家在毕设选题,项目以及论文编写等相关问题都可以给我留言咨询,希望帮助同学们顺利毕业 。 设计1000套(建议收藏) 毕业设计:2023-2024年最新最全计算机专业毕业设计选题

    2024年03月25日
    浏览(12)
  • 【Mybatis源码分析】Mybatis中的反射(MetaObject)详细讲解

    【Mybatis源码分析】Mybatis中的反射(MetaObject)详细讲解

    在使用Mybatis,编写DQL语句时,查询结果可能会是多个,多变量指定肯定是不现实的。而Mybatis可以进行映射,将JDBC返回的结果映射到实例类或者Map对象中,方便开发者直接使用返回对象,就可以得到从数据库取出来的结果。 映射原理大伙都知道是利用了反射(因为咱就只是通

    2023年04月08日
    浏览(12)
  • 浅谈 ext2 文件系统的特点、优缺点以及使用场景

    ext2(Extended File System 2)是 Linux 中最早的一种文件系统,它是 Linux 文件系统的基础,也被广泛用于其他类 Unix 系统中。下面是 ext2 文件系统的特点、优缺点以及使用场景: 特点: ext2 文件系统可以支持大容量的存储设备,最大支持 32 TB 的文件系统大小。 ext2 文件系统使用块

    2024年02月03日
    浏览(10)
  • Xshell连接不上排错以及解决方案(本文原因:重启网卡失败)

    Xshell连接不上排错以及解决方案(本文原因:重启网卡失败)

    目录 ​说一下我自己的排错思路: (1)检查自己想要链接的虚拟机有无开启 (2)检查windows服务里面关于虚拟机和xshell的服务是否已经开启,网络是否出错 (3)进入ens33文件查看ip ,dns1等是否出现配置错误 (4)检查防火墙有没有关闭 (5)查看ssh服务是否开启  (6)是否

    2024年02月04日
    浏览(11)
  • 【数据分析与可视化】pyecharts可视化图表讲解及实战(超详细 附源码)

    【数据分析与可视化】pyecharts可视化图表讲解及实战(超详细 附源码)

    需要源码请点赞关注收藏后评论区留言私信~~~ pyecharts是基于Echart图表的一个类库,而Echart是百度开源的一个可视化JavaScript库 pyecharts主要基于web浏览器进行显示,绘制的图形比较多,包括折线图、柱状图、饼图、漏斗图、地图、极坐标图等,代码量很少,而且很灵活,绘制出

    2024年02月01日
    浏览(12)
  • Flutter网络请求框架Dio源码分析以及封装(一)--请求流程分析

    利用flutter开发app也已经有些时间了,这个过程中最多接触到的就是网络请求相关的代码。自己目前项目中使用的是现在市面上最流行的网络请求库-dio,相对于flutter自带的HttpClient来说,dio使用起来更简单,功能更强大,支持全局配置、Restful API、FormData、拦截器、 请求取消、

    2024年02月09日
    浏览(10)
  • 【k8s源码分析-Apiserver-2】kube-apiserver 结构概览以及主体部分源码分析

    【k8s源码分析-Apiserver-2】kube-apiserver 结构概览以及主体部分源码分析

    Kubernetes 源码剖析(书籍) kube-apiserver的设计与实现 - 自记小屋 APIGroupInfo 记录 GVK 与 Storage 的对应关系 将 GVK 转换成,Restful HTTP Path 将 Storage 封装成 HTTP Handler 将上面两个形成映射,实现相关的路由处理 发起请求并处理的流程 发送请求:通过 GVK 对应的 Restful HTTP Path 发送请求

    2024年02月03日
    浏览(26)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包