Golang实现Redis分布式锁解决秒杀问题

这篇具有很好参考价值的文章主要介绍了Golang实现Redis分布式锁解决秒杀问题。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

先写一个脚本sql,插入2000个用户

INSERT INTO sys_users (mobile, password)
SELECT 
    numbers.n AS mobile,
    '$2a$10$zKQfSn/GCcR6MX4nHk3MsOMhJnI0qxN4MFdiufDMH2wzuTaR9G1sq' AS password
FROM 
    (
        SELECT ones.n + tens.n*10 + hundreds.n*100 + thousands.n*1000 + 1 AS n
        FROM 
            (SELECT 0 AS n UNION ALL SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3 UNION ALL SELECT 4 UNION ALL SELECT 5 UNION ALL SELECT 6 UNION ALL SELECT 7 UNION ALL SELECT 8 UNION ALL SELECT 9) ones
            CROSS JOIN (SELECT 0 AS n UNION ALL SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3 UNION ALL SELECT 4 UNION ALL SELECT 5 UNION ALL SELECT 6 UNION ALL SELECT 7 UNION ALL SELECT 8 UNION ALL SELECT 9) tens
            CROSS JOIN (SELECT 0 AS n UNION ALL SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3 UNION ALL SELECT 4 UNION ALL SELECT 5 UNION ALL SELECT 6 UNION ALL SELECT 7 UNION ALL SELECT 8 UNION ALL SELECT 9) hundreds
            CROSS JOIN (SELECT 0 AS n UNION ALL SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3 UNION ALL SELECT 4 UNION ALL SELECT 5 UNION ALL SELECT 6 UNION ALL SELECT 7 UNION ALL SELECT 8 UNION ALL SELECT 9) thousands
        ORDER BY n
    ) numbers
LIMIT 2000;

登录是通过2个字段,一个是mobile,一个是password,生成了mobile从1到2000,密码默认是123456

然后写一个单元测试,实现新注册的2000个用户登录,然后获取token

package system

import (
	"encoding/json"
	"fmt"
	"io/ioutil"
	"net/http"
	"os"
	"reflect"
	"runtime"
	"strings"
	"sync"
	"testing"
	"time"
)

var Global_client *http.Client

func GetGlobalClient() {
	client := &http.Client{
		Transport: &http.Transport{
			MaxIdleConns: 20, // 设置连接池大小为 200
		},
	}
	Global_client = client
}
func TestBaseApi_TokenNext(t *testing.T) {
	var wg sync.WaitGroup
	loginNum := 2000
	GetGlobalClient()
	s := make(chan string, loginNum)
	limit := make(chan int, 20000)
	//go prilimit(limit)
	go Show()
	for i := 1; i <= 2000000; i++ {
		mobile := fmt.Sprintf("%d", i)
		wg.Add(1)
		password := "123456"
		//向通道中发送值,如果满了500个,则会阻塞
		limit <- 1111
		go obtainToken(mobile, password, &wg, limit, s)
	}
	wg.Wait()

	//当数据都到了通道里面之后,我们可以关闭通道
	close(s)
	fmt.Println("通道的长度为:",len(s))
	file, err := os.OpenFile("E:\\Go\\goproject\\LearnExam\\sever\\token.txt", os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0644)
	defer file.Close()
	for token := range s {
		if token == "" {
			continue
		}
		_, err = file.WriteString(token + "\n")
		if err != nil {
			return
		}
	}

}

func Show() {
	for {
		num := runtime.NumGoroutine()
		fmt.Printf("当前程序中的协程数量:%d\n", num)
		time.Sleep(1 * time.Second)
	}
}

func AppendStringToFile(filePath string, content string) error {
	file, err := os.OpenFile(filePath, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0644)
	if err != nil {
		return err
	}
	defer file.Close()
	_, err = file.WriteString(content + "\n")
	if err != nil {
		return err
	}

	return nil
}
func obtainToken(mobile, password string, wg *sync.WaitGroup, limit chan int, s chan string) {
	defer wg.Done()

	type Body struct {
		Mobile   string `json:"mobile"`
		Password string `json:"password"`
	}
	b := Body{
		mobile, password,
	}

	bodymarshal, err := json.Marshal(&b)
	if err != nil {
		return
	}
	//再处理一下
	reqBody := strings.NewReader(string(bodymarshal))
	req, err := http.NewRequest("POST", "." +
		"", reqBody)
	if err != nil {
		fmt.Printf("Error creating request for user %s: %v\n", mobile, err)
		return
	}
	req.Header.Add("Content-Type", "application/x-www-form-urlencoded")

	resp, err := Global_client.Do(req)
	if err != nil {
		//fmt.Printf("Error sending request for user %s: %v\n", mobile, err)
		fmt.Printf("Error sending request for user %s: %+v\n", mobile, err)
		fmt.Println("反射:", reflect.TypeOf(err))
		fmt.Println("err是EOF,那resp是:",resp)
		return
	}
	//defer func(Body io.ReadCloser) {
	//	err = Body.Close()
	//	if err != nil {
	//
	//	}
	//}(resp.Body)
	body, err := ioutil.ReadAll(resp.Body) //把请求到的body转化成byte[]
	if err != nil {
		return
	}
	type Result struct {
		Code int `json:"code"`
		Data struct {
			Token string `json:"token"`
		} `json:"data"`
	}
	r := Result{}
	err = json.Unmarshal(body, &r)
	if err != nil {
		return
	}
	if r.Code == 0 {
		s <- r.Data.Token
		temp := <-limit
		fmt.Println("通道取值:", temp)
		fmt.Printf("Token obtained for user %s\n", mobile)
	} else {
		fmt.Printf("Failed to obtain token for user %s\n", mobile)
	}

}

我们使用有缓冲的通道和sync.WaitGroup信号量,来控制协程的数量,经过测试,发现limit,loginNum,影响到最后成功的结果,这其中的的原理我还暂时没有想清楚。limit为50,loginNum为2000,会存在服务端正常返回,但是客户端报EOF,limit为50,loginNum为500的时候,不会出现EOF问题,那说明limit为50是没问题的,按照道理说,及时loginNum增大到100000也不会有问题,但是却出现了问题,后面再解决吧。

现在已经拿到了2000个用户的token了,我们使用jemter工具来进行压测

MySQL层面解决

这个是要测试的函数

func (e *ExamService) PayExam(c *gin.Context, p request.PayExam) (err error) {
	//基本参数校验
	if p.ExamId == 0 {
		return errors.New("参数有误")
	}
	//获取用户登录信息
	userID := utils.GetUserID(c)
	fmt.Println("userid:", userID)
	//判断用户是否已经买过
	var orders []examination.Order
	cond := fmt.Sprintf("order_type_id = 1 and commodity_id = %v",p.ExamId)
	err = global.GVA_DB.Model(&system.SysUser{GVA_MODEL: global.GVA_MODEL{ID: userID}}).Association("Orders").Find(&orders,cond)
	if err != nil {
		return err
	}
	if len(orders) > 0 {
		return errors.New("您已经购买过了,无需再次购买")
	}
	//exam, err := (&ExamService{}).GetExam(request.GetExam{ExamId: p.ExamId})
	exam := examination.Exam{
		GVA_MODEL: global.GVA_MODEL{ID: p.ExamId},
	}
	err = global.GVA_DB.Select("Stock", "ExamName").First(&exam).Error
	fmt.Println("库存:",exam.Stock)
	if err == gorm.ErrRecordNotFound {
		return errors.New("该场考试不存在")
	}

	if exam.Stock <= 0 {
		return errors.New("该场考试已经售卖完了")
	}
	//扣考试的库存
	//err = global.GVA_DB.Model(&exam).Update("stock", exam.Stock-1).Error
	sql := fmt.Sprintf("update exams set stock = stock - 1 WHERE id = %d",p.ExamId)
	err = global.GVA_DB.Debug().Raw(sql).Scan(nil).Error

	if err != nil {
		return errors.New("扣除库存失败")
	}
	//下单
	err = global.GVA_DB.Create(&examination.Order{Name: "购买考试:" + exam.ExamName, OrderTypeID: 1, OrderTypeDetail: "考试",SysUserID: userID,CommodityID: p.ExamId}).Error
	return err
}

其中,由于exams表中的stock是uint类型,在用gorm建表的时候,自动设置了BIGINT UNSIGNED,已经在MySQL层面就解决了这个秒杀问题,我们接下来把stock改成int类型,然后使用redis的分布式锁来解决这个问题

Redis层面解决

我们还是使用MySQL用来记录下单记录,但是库存信息缓存到了redis里面

最关键的是

result, err := global.GVA_REDIS.HIncrBy(context.Background(), key, "stock", -1).Result()

 不是原子性的,我们使用lua脚本让其原子性,然后就可以解决这个问题了( MySQL的一条update就是一个事务,是具有原子性的)

func (e *ExamService) PayExamByRedis(c *gin.Context, p request.PayExam) (err error) {
	//基本参数校验
	if p.ExamId == 0 {
		return errors.New("参数有误")
	}
	//获取用户登录信息
	userID := utils.GetUserID(c)
	fmt.Println("userid:", userID)
	key := utils.ExamPrefix + strconv.Itoa(int(p.ExamId))
	//判断用户是否已经买过
	var orders []examination.Order
	cond := fmt.Sprintf("order_type_id = 1 and commodity_id = %v", p.ExamId)
	err = global.GVA_DB.Model(&system.SysUser{GVA_MODEL: global.GVA_MODEL{ID: userID}}).Association("Orders").Find(&orders, cond)
	if err != nil {
		return err
	}
	if len(orders) > 0 {
		return errors.New("您已经购买过了,无需再次购买")
	}
	stock, err := global.GVA_REDIS.HGet(context.Background(), key, "stock").Result()
	name, err := global.GVA_REDIS.HGet(context.Background(), key, "exam_name").Result()
	fmt.Println("库存:", stock)
	if err == redis.Nil {
		return errors.New("该场考试不存在")
	}
	stockInt, err := strconv.Atoi(stock)
	if stockInt <= 0 {
		return errors.New("该场考试已经售卖完了")
	}
	//这个操作不是原子性的
	//result, err := global.GVA_REDIS.HIncrBy(context.Background(), key, "stock", -1).Result()
	//使用Lua脚本让其原子化
	script := fmt.Sprintf(`if (redis.call('hexists', KEYS[1], KEYS[2]) == 1) then	local stock = tonumber(redis.call('hget', KEYS[1], KEYS[2]));
	if (stock > 0) then
	   redis.call('hincrby', KEYS[1], KEYS[2], -1);
	   return stock;
	end;
    return 0;
end;`)
	result, err := global.GVA_REDIS.Eval(context.Background(), script, []string{key, "stock"}, -1).Int()
	if err != nil {
		fmt.Println(err)
		return errors.New(err.Error())
	}
	if result == 0 {
		return errors.New("该场考试已经售卖完了")
	}
	//扣考试的库存
	//下单
	err = global.GVA_DB.Create(&examination.Order{Name: "购买考试:" + name, OrderTypeID: 1, OrderTypeDetail: "考试", SysUserID: userID, CommodityID: p.ExamId}).Error
	if err != nil {
		global.GVA_LOG.Error("mysql插入购买考试记录失败", zap.Error(err))
		return err
	}
	return err
}

Channel层面解决

func (e *ExamApi) PayExam(c *gin.Context) {
	global.LimitPayExamChannel <- 1
	var p request.PayExam
	err := c.ShouldBindJSON(&p)
	if err != nil {
		response.FailWithMessage(err.Error(), c)
		return
	}
	//err = examService.PayExamByRedis(c, p)
	err = examService.PayExamByChannel(c, p)
	if err != nil {
		response.FailWithMessage(err.Error(), c)
		<-global.LimitPayExamChannel
		return
	}
	userID := utils.GetUserID(c)
	response.OkWithMessage(strconv.Itoa(int(userID))+"购买成功", c)
	<-global.LimitPayExamChannel
}

我们在全局初始化一个缓冲区容量为1的通道,虽然秒杀期间并发了很多的请求,但是我一次只处理一个,这样能保证结果是正确的,不会出现超卖问题,但是失去了并发的意义。文章来源地址https://www.toymoban.com/news/detail-632095.html

到了这里,关于Golang实现Redis分布式锁解决秒杀问题的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • Redis实现方式开启新篇章,解决分布式环境下的资源竞争问题,提升系统稳定性

    Redis实现方式开启新篇章,解决分布式环境下的资源竞争问题,提升系统稳定性

    分布式锁一般有三种实现方式: 数据库乐观锁; 基于Redis的分布式锁; 基于ZooKeeper的分布式锁 本篇博客将介绍第二种方式,基于Redis实现分布式锁。 虽然网上已经有各种介绍Redis分布式锁实现的博客,然而他们的实现却有着各种各样的问题,为了避免误人子弟,本篇博客将

    2024年02月07日
    浏览(16)
  • Redis 分布式锁存在什么问题 ?如何解决 ?

    Redis 分布式锁存在什么问题 ?如何解决 ?

    目录 1. 如何实现分布式锁 2. Redis 分布式锁存在什么问题 2.1 解决死锁问题 2.2 解决锁误删问题 Redis 天生就可以作为一个分布式系统来使用,所以它实现的锁都是分布式锁。 Redis 可以通过 setnx(set if not exists)命令实现分布式锁~ setnx mylock true  -  加锁 del mylock  -  释放锁 通过

    2024年02月11日
    浏览(9)
  • 【Redis】4、全局唯一 ID生成、单机(非分布式)情况下的秒杀和一人一单

    【Redis】4、全局唯一 ID生成、单机(非分布式)情况下的秒杀和一人一单

    🍀 id 字段不是 自增 AUTO_INCREMENT 的 每个店铺都可以发布优惠券: 用户抢购的时候会生成订单并保存到 tb_voucher_order 这张表中 如订单 id 使用数据库自增 ID 会出现以下问题: 🍀 id 规律性太明显(可能会被用户猜测到优惠券的 id) 🍀 受单表数据量的限制(优惠券订单可能很多

    2024年02月16日
    浏览(15)
  • 深入理解PHP+Redis实现分布式锁的相关问题

    PHP使用分布式锁,受语言本身的限制,有一些局限性。 通俗理解单机锁问题:自家的锁锁自家的门,只能保证自家的事,管不了别人家不锁门引发的问题,于是有了分布式锁。 分布式锁概念:是针对多个节点的锁。避免出现数据不一致或者并发冲突的问题,让每个节点确保

    2024年03月23日
    浏览(17)
  • 我是如何用 redis 分布式锁来解决线上历史业务问题的

    我是如何用 redis 分布式锁来解决线上历史业务问题的

    近期发现,开发功能的时候发现了一个 mq 消费顺序错乱(历史遗留问题),导致业务异常的问题,看看我是如何解决的 首先,简单介绍一下情况: 线上 k8s 有多个 pod 会去消费 mq 中的消息,可是生产者发送的消息是期望一定要有序去消费,此时要表达的是,例如 生产者如果

    2024年02月09日
    浏览(12)
  • Redis分布式问题

    Redis分布式问题

      Redis为单进程单线程模式,采用队列模式将并发访问变成串行访问,且多客户端对Redis的连接并不存在竞争关系Redis中可以使用SETNX命令实现分布式锁。当且仅当 key 不存在,将 key 的值设为 value。 若给定的 key 已经存在,则SETNX 不做任何动作SETNX 是『SET if Not eXists』(如果不

    2024年02月09日
    浏览(12)
  • Redis分布式锁问题

    1、业务单机情况下         问题:并发没有加锁导致线程安全问题。         解决方法:加锁处理,如lock、 synchronized         仍有问题:业务分布式情况下,代码级别加锁已经无效。需要借助第三方组件,如redis、zookeeper。 2、业务分布式情况下,使用redis的setNX,实现加

    2024年02月13日
    浏览(14)
  • Redis进阶:分布式锁问题

    Redis进阶:分布式锁问题

    单机单体中的锁机制在分布式集群系统中失效; 单纯的Java API并不能提供分布式锁的能力,为了解决这个问题就需要一种跨JVM的互斥机制来控制共享资源的访问; 1.2.1 分布式锁主流的实现方案 基于数据库实现分布式锁; 基于 缓存(Redis等),性能最高 ; 基于Zookeeper,可靠

    2024年02月08日
    浏览(10)
  • 基于Mongodb分布式锁简单实现,解决定时任务并发执行问题

    我们日常开发过程,会有一些定时任务的代码来统计一些系统运行数据,但是我们应用有需要部署多个实例,传统的通过配置文件来控制定时任务是否启动又太过繁琐,而且还经常出错,导致一些异常数据的产生 网上有很多分布式锁的实现方案,基于redis、zk、等有很多,但

    2023年04月18日
    浏览(12)
  • 分布式秒杀方案--java

    分布式秒杀方案--java

    前提:先把商品详情和秒杀商品缓存redis中,减少对数据库的访问(可使用定时任务) 秒杀商品无非就是那几步(前面还可能会有一些判断,如用户是否登录,一人一单,秒杀时间验证等) 1一人一单 2.判断库存 3.减库存 4.创建订单 1.1这样秒杀肯定会出现超卖的情况,所以必

    2024年02月09日
    浏览(13)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包