先写一个脚本sql,插入2000个用户
INSERT INTO sys_users (mobile, password)
SELECT
numbers.n AS mobile,
'$2a$10$zKQfSn/GCcR6MX4nHk3MsOMhJnI0qxN4MFdiufDMH2wzuTaR9G1sq' AS password
FROM
(
SELECT ones.n + tens.n*10 + hundreds.n*100 + thousands.n*1000 + 1 AS n
FROM
(SELECT 0 AS n UNION ALL SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3 UNION ALL SELECT 4 UNION ALL SELECT 5 UNION ALL SELECT 6 UNION ALL SELECT 7 UNION ALL SELECT 8 UNION ALL SELECT 9) ones
CROSS JOIN (SELECT 0 AS n UNION ALL SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3 UNION ALL SELECT 4 UNION ALL SELECT 5 UNION ALL SELECT 6 UNION ALL SELECT 7 UNION ALL SELECT 8 UNION ALL SELECT 9) tens
CROSS JOIN (SELECT 0 AS n UNION ALL SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3 UNION ALL SELECT 4 UNION ALL SELECT 5 UNION ALL SELECT 6 UNION ALL SELECT 7 UNION ALL SELECT 8 UNION ALL SELECT 9) hundreds
CROSS JOIN (SELECT 0 AS n UNION ALL SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3 UNION ALL SELECT 4 UNION ALL SELECT 5 UNION ALL SELECT 6 UNION ALL SELECT 7 UNION ALL SELECT 8 UNION ALL SELECT 9) thousands
ORDER BY n
) numbers
LIMIT 2000;
登录是通过2个字段,一个是mobile,一个是password,生成了mobile从1到2000,密码默认是123456
然后写一个单元测试,实现新注册的2000个用户登录,然后获取token
package system
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"os"
"reflect"
"runtime"
"strings"
"sync"
"testing"
"time"
)
var Global_client *http.Client
func GetGlobalClient() {
client := &http.Client{
Transport: &http.Transport{
MaxIdleConns: 20, // 设置连接池大小为 200
},
}
Global_client = client
}
func TestBaseApi_TokenNext(t *testing.T) {
var wg sync.WaitGroup
loginNum := 2000
GetGlobalClient()
s := make(chan string, loginNum)
limit := make(chan int, 20000)
//go prilimit(limit)
go Show()
for i := 1; i <= 2000000; i++ {
mobile := fmt.Sprintf("%d", i)
wg.Add(1)
password := "123456"
//向通道中发送值,如果满了500个,则会阻塞
limit <- 1111
go obtainToken(mobile, password, &wg, limit, s)
}
wg.Wait()
//当数据都到了通道里面之后,我们可以关闭通道
close(s)
fmt.Println("通道的长度为:",len(s))
file, err := os.OpenFile("E:\\Go\\goproject\\LearnExam\\sever\\token.txt", os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0644)
defer file.Close()
for token := range s {
if token == "" {
continue
}
_, err = file.WriteString(token + "\n")
if err != nil {
return
}
}
}
func Show() {
for {
num := runtime.NumGoroutine()
fmt.Printf("当前程序中的协程数量:%d\n", num)
time.Sleep(1 * time.Second)
}
}
func AppendStringToFile(filePath string, content string) error {
file, err := os.OpenFile(filePath, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0644)
if err != nil {
return err
}
defer file.Close()
_, err = file.WriteString(content + "\n")
if err != nil {
return err
}
return nil
}
func obtainToken(mobile, password string, wg *sync.WaitGroup, limit chan int, s chan string) {
defer wg.Done()
type Body struct {
Mobile string `json:"mobile"`
Password string `json:"password"`
}
b := Body{
mobile, password,
}
bodymarshal, err := json.Marshal(&b)
if err != nil {
return
}
//再处理一下
reqBody := strings.NewReader(string(bodymarshal))
req, err := http.NewRequest("POST", "." +
"", reqBody)
if err != nil {
fmt.Printf("Error creating request for user %s: %v\n", mobile, err)
return
}
req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
resp, err := Global_client.Do(req)
if err != nil {
//fmt.Printf("Error sending request for user %s: %v\n", mobile, err)
fmt.Printf("Error sending request for user %s: %+v\n", mobile, err)
fmt.Println("反射:", reflect.TypeOf(err))
fmt.Println("err是EOF,那resp是:",resp)
return
}
//defer func(Body io.ReadCloser) {
// err = Body.Close()
// if err != nil {
//
// }
//}(resp.Body)
body, err := ioutil.ReadAll(resp.Body) //把请求到的body转化成byte[]
if err != nil {
return
}
type Result struct {
Code int `json:"code"`
Data struct {
Token string `json:"token"`
} `json:"data"`
}
r := Result{}
err = json.Unmarshal(body, &r)
if err != nil {
return
}
if r.Code == 0 {
s <- r.Data.Token
temp := <-limit
fmt.Println("通道取值:", temp)
fmt.Printf("Token obtained for user %s\n", mobile)
} else {
fmt.Printf("Failed to obtain token for user %s\n", mobile)
}
}
我们使用有缓冲的通道和sync.WaitGroup信号量,来控制协程的数量,经过测试,发现limit,loginNum,影响到最后成功的结果,这其中的的原理我还暂时没有想清楚。limit为50,loginNum为2000,会存在服务端正常返回,但是客户端报EOF,limit为50,loginNum为500的时候,不会出现EOF问题,那说明limit为50是没问题的,按照道理说,及时loginNum增大到100000也不会有问题,但是却出现了问题,后面再解决吧。
现在已经拿到了2000个用户的token了,我们使用jemter工具来进行压测
MySQL层面解决
这个是要测试的函数
func (e *ExamService) PayExam(c *gin.Context, p request.PayExam) (err error) {
//基本参数校验
if p.ExamId == 0 {
return errors.New("参数有误")
}
//获取用户登录信息
userID := utils.GetUserID(c)
fmt.Println("userid:", userID)
//判断用户是否已经买过
var orders []examination.Order
cond := fmt.Sprintf("order_type_id = 1 and commodity_id = %v",p.ExamId)
err = global.GVA_DB.Model(&system.SysUser{GVA_MODEL: global.GVA_MODEL{ID: userID}}).Association("Orders").Find(&orders,cond)
if err != nil {
return err
}
if len(orders) > 0 {
return errors.New("您已经购买过了,无需再次购买")
}
//exam, err := (&ExamService{}).GetExam(request.GetExam{ExamId: p.ExamId})
exam := examination.Exam{
GVA_MODEL: global.GVA_MODEL{ID: p.ExamId},
}
err = global.GVA_DB.Select("Stock", "ExamName").First(&exam).Error
fmt.Println("库存:",exam.Stock)
if err == gorm.ErrRecordNotFound {
return errors.New("该场考试不存在")
}
if exam.Stock <= 0 {
return errors.New("该场考试已经售卖完了")
}
//扣考试的库存
//err = global.GVA_DB.Model(&exam).Update("stock", exam.Stock-1).Error
sql := fmt.Sprintf("update exams set stock = stock - 1 WHERE id = %d",p.ExamId)
err = global.GVA_DB.Debug().Raw(sql).Scan(nil).Error
if err != nil {
return errors.New("扣除库存失败")
}
//下单
err = global.GVA_DB.Create(&examination.Order{Name: "购买考试:" + exam.ExamName, OrderTypeID: 1, OrderTypeDetail: "考试",SysUserID: userID,CommodityID: p.ExamId}).Error
return err
}
其中,由于exams表中的stock是uint类型,在用gorm建表的时候,自动设置了BIGINT UNSIGNED,已经在MySQL层面就解决了这个秒杀问题,我们接下来把stock改成int类型,然后使用redis的分布式锁来解决这个问题
Redis层面解决
我们还是使用MySQL用来记录下单记录,但是库存信息缓存到了redis里面
最关键的是
result, err := global.GVA_REDIS.HIncrBy(context.Background(), key, "stock", -1).Result()
不是原子性的,我们使用lua脚本让其原子性,然后就可以解决这个问题了( MySQL的一条update就是一个事务,是具有原子性的)文章来源:https://www.toymoban.com/news/detail-632095.html
func (e *ExamService) PayExamByRedis(c *gin.Context, p request.PayExam) (err error) {
//基本参数校验
if p.ExamId == 0 {
return errors.New("参数有误")
}
//获取用户登录信息
userID := utils.GetUserID(c)
fmt.Println("userid:", userID)
key := utils.ExamPrefix + strconv.Itoa(int(p.ExamId))
//判断用户是否已经买过
var orders []examination.Order
cond := fmt.Sprintf("order_type_id = 1 and commodity_id = %v", p.ExamId)
err = global.GVA_DB.Model(&system.SysUser{GVA_MODEL: global.GVA_MODEL{ID: userID}}).Association("Orders").Find(&orders, cond)
if err != nil {
return err
}
if len(orders) > 0 {
return errors.New("您已经购买过了,无需再次购买")
}
stock, err := global.GVA_REDIS.HGet(context.Background(), key, "stock").Result()
name, err := global.GVA_REDIS.HGet(context.Background(), key, "exam_name").Result()
fmt.Println("库存:", stock)
if err == redis.Nil {
return errors.New("该场考试不存在")
}
stockInt, err := strconv.Atoi(stock)
if stockInt <= 0 {
return errors.New("该场考试已经售卖完了")
}
//这个操作不是原子性的
//result, err := global.GVA_REDIS.HIncrBy(context.Background(), key, "stock", -1).Result()
//使用Lua脚本让其原子化
script := fmt.Sprintf(`if (redis.call('hexists', KEYS[1], KEYS[2]) == 1) then local stock = tonumber(redis.call('hget', KEYS[1], KEYS[2]));
if (stock > 0) then
redis.call('hincrby', KEYS[1], KEYS[2], -1);
return stock;
end;
return 0;
end;`)
result, err := global.GVA_REDIS.Eval(context.Background(), script, []string{key, "stock"}, -1).Int()
if err != nil {
fmt.Println(err)
return errors.New(err.Error())
}
if result == 0 {
return errors.New("该场考试已经售卖完了")
}
//扣考试的库存
//下单
err = global.GVA_DB.Create(&examination.Order{Name: "购买考试:" + name, OrderTypeID: 1, OrderTypeDetail: "考试", SysUserID: userID, CommodityID: p.ExamId}).Error
if err != nil {
global.GVA_LOG.Error("mysql插入购买考试记录失败", zap.Error(err))
return err
}
return err
}
Channel层面解决
func (e *ExamApi) PayExam(c *gin.Context) {
global.LimitPayExamChannel <- 1
var p request.PayExam
err := c.ShouldBindJSON(&p)
if err != nil {
response.FailWithMessage(err.Error(), c)
return
}
//err = examService.PayExamByRedis(c, p)
err = examService.PayExamByChannel(c, p)
if err != nil {
response.FailWithMessage(err.Error(), c)
<-global.LimitPayExamChannel
return
}
userID := utils.GetUserID(c)
response.OkWithMessage(strconv.Itoa(int(userID))+"购买成功", c)
<-global.LimitPayExamChannel
}
我们在全局初始化一个缓冲区容量为1的通道,虽然秒杀期间并发了很多的请求,但是我一次只处理一个,这样能保证结果是正确的,不会出现超卖问题,但是失去了并发的意义。文章来源地址https://www.toymoban.com/news/detail-632095.html
到了这里,关于Golang实现Redis分布式锁解决秒杀问题的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!