微服务系列文章之 Redisson实现分布式锁(2)

这篇具有很好参考价值的文章主要介绍了微服务系列文章之 Redisson实现分布式锁(2)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、RLock接口

1、概念
public interface RLock extends Lock, RExpirable, RLockAsync

很明显RLock是继承Lock锁,所以他有Lock锁的所有特性,比如lock、unlock、trylock等特性,同时它还有很多新特性:强制锁释放,带有效期的锁,。

2、RLock锁API

这里针对上面做个整理,这里列举几个常用的接口说明

public interface RLock {
    //----------------------Lock接口方法-----------------------

    /**
     * 加锁 锁的有效期默认30秒
     */
    void lock();
    /**
     * tryLock()方法是有返回值的,它表示用来尝试获取锁,如果获取成功,则返回true,如果获取失败(即锁已被其他线程获取),则返回false .
     */
    boolean tryLock();
    /**
     * tryLock(long time, TimeUnit unit)方法和tryLock()方法是类似的,只不过区别在于这个方法在拿不到锁时会等待一定的时间,
     * 在时间期限之内如果还拿不到锁,就返回false。如果如果一开始拿到锁或者在等待期间内拿到了锁,则返回true。
     *
     * @param time 等待时间
     * @param unit 时间单位 小时、分、秒、毫秒等
     */
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    /**
     * 解锁
     */
    void unlock();
    /**
     * 中断锁 表示该锁可以被中断 假如A和B同时调这个方法,A获取锁,B为获取锁,那么B线程可以通过
     * Thread.currentThread().interrupt(); 方法真正中断该线程
     */
    void lockInterruptibly();

    //----------------------RLock接口方法-----------------------
    /**
     * 加锁 上面是默认30秒这里可以手动设置锁的有效时间
     *
     * @param leaseTime 锁有效时间
     * @param unit      时间单位 小时、分、秒、毫秒等
     */
    void lock(long leaseTime, TimeUnit unit);
    /**
     * 这里比上面多一个参数,多添加一个锁的有效时间
     *
     * @param waitTime  等待时间
     * @param leaseTime 锁有效时间
     * @param unit      时间单位 小时、分、秒、毫秒等
     */
    boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
    /**
     * 检验该锁是否被线程使用,如果被使用返回True
     */
    boolean isLocked();
    /**
     * 检查当前线程是否获得此锁(这个和上面的区别就是该方法可以判断是否当前线程获得此锁,而不是此锁是否被线程占有)
     * 这个比上面那个实用
     */
    boolean isHeldByCurrentThread();
    /**
     * 中断锁 和上面中断锁差不多,只是这里如果获得锁成功,添加锁的有效时间
     * @param leaseTime  锁有效时间
     * @param unit       时间单位 小时、分、秒、毫秒等
     */
    void lockInterruptibly(long leaseTime, TimeUnit unit);  
}

RLock相关接口,主要是新添加了 leaseTime 属性字段,主要是用来设置锁的过期时间,避免死锁。

二、RedissonLock实现类

public class RedissonLock extends RedissonExpirable implements RLock

RedissonLock实现了RLock接口,所以实现了接口的具体方法。这里我列举几个方法说明下

1、void lock()方法
 @Override
    public void lock() {
        try {
            lockInterruptibly();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

发现lock锁里面进去其实用的是lockInterruptibly(中断锁,表示可以被中断),而且捕获异常后用 Thread.currentThread().interrupt()来真正中断当前线程,其实它们是搭配一起使用的。

具体有关lockInterruptibly()方法讲解推荐一个博客。博客:Lock的lockInterruptibly()

接下来执行流程,这里理下关键几步

/**
     * 1、带上默认值调另一个中断锁方法
     */
    @Override
    public void lockInterruptibly() throws InterruptedException {
        lockInterruptibly(-1, null);
    }
    /**
     * 2、另一个中断锁的方法
     */
    void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException 
    /**
     * 3、这里已经设置了锁的有效时间默认为30秒  (commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout()=30)
     */
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    /**
     * 4、最后通过lua脚本访问Redis,保证操作的原子性
     */
    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                        "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "return redis.call('pttl', KEYS[1]);",
                Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }

那么void lock(long leaseTime, TimeUnit unit)方法其实和上面很相似了,就是从上面第二步开始的。

2、tryLock(long waitTime, long leaseTime, TimeUnit unit)

接口的参数和含义上面已经说过了,现在我们开看下源码,这里只显示一些重要逻辑。

 @Override
    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        long threadId = Thread.currentThread().getId();
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        //1、 获取锁同时获取成功的情况下,和lock(...)方法是一样的 直接返回True,获取锁False再往下走
        if (ttl == null) {
            return true;
        }
        //2、如果超过了尝试获取锁的等待时间,当然返回false 了。
        time -= System.currentTimeMillis() - current;
        if (time <= 0) {
            acquireFailed(threadId);
            return false;
        }

        // 3、订阅监听redis消息,并且创建RedissonLockEntry,其中RedissonLockEntry中比较关键的是一个 Semaphore属性对象,用来控制本地的锁请求的信号量同步,返回的是netty框架的Future实现。
        final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
        //  阻塞等待subscribe的future的结果对象,如果subscribe方法调用超过了time,说明已经超过了客户端设置的最大wait time,则直接返回false,取消订阅,不再继续申请锁了。
        //  只有await返回true,才进入循环尝试获取锁
        if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
            if (!subscribeFuture.cancel(false)) {
                subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
                    @Override
                    public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
                        if (subscribeFuture.isSuccess()) {
                            unsubscribe(subscribeFuture, threadId);
                        }
                    }
                });
            }
            acquireFailed(threadId);
            return false;
        }

       //4、如果没有超过尝试获取锁的等待时间,那么通过While一直获取锁。最终只会有两种结果
        //1)、在等待时间内获取锁成功 返回true。2)等待时间结束了还没有获取到锁那么返回false。
        while (true) {
            long currentTime = System.currentTimeMillis();
            ttl = tryAcquire(leaseTime, unit, threadId);
            // 获取锁成功
            if (ttl == null) {
                return true;
            }
           //   获取锁失败
            time -= System.currentTimeMillis() - currentTime;
            if (time <= 0) {
                acquireFailed(threadId);
                return false;
            }
        }
    }

重点 tryLock一般用于特定满足需求的场合,但不建议作为一般需求的分布式锁,一般分布式锁建议用void lock(long leaseTime, TimeUnit unit)。因为从性能上考虑,在高并发情况下后者效率是前者的好几倍

3、unlock()

解锁的逻辑很简单。

@Override
    public void unlock() {
        // 1.通过 Lua 脚本执行 Redis 命令释放锁
        Boolean opStatus = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE,
                RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; " +
                        "end;" +
                        "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                        "return nil;" +
                        "end; " +
                        "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                        "if (counter > 0) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                        "return 0; " +
                        "else " +
                        "redis.call('del', KEYS[1]); " +
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; "+
                        "end; " +
                        "return nil;",
                Arrays.<Object>asList(getName(), getChannelName()),
                LockPubSub.unlockMessage, internalLockLeaseTime,
                getLockName(Thread.currentThread().getId()));
        // 2.非锁的持有者释放锁时抛出异常
        if (opStatus == null) {
            throw new IllegalMonitorStateException(
                    "attempt to unlock lock, not locked by current thread by node id: "
                            + id + " thread-id: " + Thread.currentThread().getId());
        }
        // 3.释放锁后取消刷新锁失效时间的调度任务
        if (opStatus) {
            cancelExpirationRenewal();
        }
    }

使用 EVAL 命令执行 Lua 脚本来释放锁:

  1. key 不存在,说明锁已释放,直接执行 publish 命令发布释放锁消息并返回 1
  2. key 存在,但是 field 在 Hash 中不存在,说明自己不是锁持有者,无权释放锁,返回 nil
  3. 因为锁可重入,所以释放锁时不能把所有已获取的锁全都释放掉,一次只能释放一把锁,因此执行 hincrby 对锁的值减一
  4. 释放一把锁后,如果还有剩余的锁,则刷新锁的失效时间并返回 0;如果刚才释放的已经是最后一把锁,则执行 del 命令删除锁的 key,并发布锁释放消息,返回 1

注意这里有个实际开发过程中,容易出现很容易出现上面第二步异常,非锁的持有者释放锁时抛出异常。比如下面这种情况文章来源地址https://www.toymoban.com/news/detail-560066.html

   //设置锁1秒过去
        redissonLock.lock("redisson", 1);
        /**
         * 业务逻辑需要咨询2秒
         */
        redissonLock.release("redisson");
      /**
       * 线程1 进来获得锁后,线程一切正常并没有宕机,但它的业务逻辑需要执行2秒,这就会有个问题,在 线程1 执行1秒后,这个锁就自动过期了,
       * 那么这个时候 线程2 进来了。在线程1去解锁就会抛上面这个异常(因为解锁和当前锁已经不是同一线程了)
       */

到了这里,关于微服务系列文章之 Redisson实现分布式锁(2)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • Redisson实现分布式锁示例

    Redisson实现分布式锁示例

    可以下载redis desktop manager软件来查看redis里面存放的东西 红色框内的TTL值就是过期时间,默认-1,表示永不过期,指定过期时间后就变成你指定的值了。 上面的方法,我们让线程睡眠60S,代表我们的业务执行时间,在调用这个方法时,我们可以在 redis desktop manager软件上实时查

    2024年02月12日
    浏览(11)
  • redisson+aop实现分布式锁

    基于注解实现,一个注解搞定缓存 Aop:面向切面编程,在不改变核心代码的基础上实现扩展,有以下应用场景 ①事务 ②日志 ③controlleradvice+expetcationhandle实现全局异常 ④redissson+aop实现分布式锁 ⑤认证授权 Aop的实现存在与bean的后置处理器beanpostprocessAfterinitlazing 注解的定义仿照

    2024年01月19日
    浏览(20)
  • SpringBoot结合Redisson实现分布式锁

    SpringBoot结合Redisson实现分布式锁

    🧑‍💻作者名称:DaenCode 🎤作者简介:啥技术都喜欢捣鼓捣鼓,喜欢分享技术、经验、生活。 😎人生感悟:尝尽人生百味,方知世间冷暖。 📖所属专栏:SpringBoot实战 以下是专栏部分内容,更多内容请前往专栏查看! 标题 一文带你学会使用SpringBoot+Avue实现短信通知功能

    2024年02月08日
    浏览(13)
  • 图解Redisson如何实现分布式锁、锁续约?

    图解Redisson如何实现分布式锁、锁续约?

    使用当前(2022年12月初)最新的版本:3.18.1; 案例 案例采用redis-cluster集群的方式; redission支持4种连接redis方式,分别为单机、主从、Sentinel、Cluster 集群;在分布式锁的实现上区别在于hash槽的获取方式。 具体配置方式见Redisson的GitHub(https://github.com/redisson/redisson/wiki/2.-%E9

    2023年04月16日
    浏览(18)
  • 源码篇--Redisson 分布式锁lock的实现

    我们知道Redis 缓存可以使用setNx来作为分布式锁,但是我们直接使用setNx 需要考虑锁过期的问题;此时我们可以使用Redisson 的lock 来实现分布式锁,那么lock 内部帮我们做了哪些工作呢。 RedisConfig.java lock.lock() 阻塞获取 redis 锁,获取到锁之后继续向下执行业务逻辑; lockInterr

    2024年01月25日
    浏览(23)
  • Redis分布式锁及Redisson的实现原理

    Redis分布式锁及Redisson的实现原理

    Redis分布式锁 在讨论分布式锁之前我们回顾一下一些单机锁,比如synchronized、Lock 等 锁的基本特性: 1.互斥性:同一时刻只能有一个节点访问共享资源,比如一个代码块,或者同一个订单同一时刻只能有一个线程去支付等。 2.可重入性: 允许一个已经获得锁的线程,在没有释

    2024年02月06日
    浏览(6)
  • Spring Boot 集成 Redisson 实现分布式锁

    Spring Boot 集成 Redisson 实现分布式锁

            Redisson 是一种基于 Redis 的 Java 驻留集群的分布式对象和服务库,可以为我们提供丰富的分布式锁和线程安全集合的实现。在 Spring Boot 应用程序中使用 Redisson 可以方便地实现分布式应用程序的某些方面,例如分布式锁、分布式集合、分布式事件发布和订阅等。本篇

    2024年02月08日
    浏览(16)
  • 在Java项目中使用redisson实现分布式锁

    在Java项目中使用Redission自定义注解实现分布式锁: 添加Redission依赖项:在项目的pom.xml中添加Redission依赖项: 创建自定义注解:创建一个自定义注解来标记需要使用分布式锁的方法。例如,创建一个名为 @DistributedLock 的注解: 创建注解切面:创建一个切面类,通过AOP将注解

    2024年02月16日
    浏览(12)
  • spring boot 实现Redisson分布式锁及其读写锁

    分布式锁,就是控制分布式系统中不同进程共同访问同一共享资源的一种锁的实现。 1、引入依赖 2、配置文件 3、配置类 4、测试代码 5、理解 一、时间设置 默认 lock() 小结 lock.lock (); (1)默认指定锁时间为30s(看门狗时间) (2)锁的自动续期:若是业务超长,运行期间自

    2024年02月12日
    浏览(13)
  • redis实战-redis实现分布式锁&redisson快速入门

    redis实战-redis实现分布式锁&redisson快速入门

    前言 集群环境下的并发问题  分布式锁 定义 需要满足的条件 常见的分布式锁 redis实现分布式锁 核心思路 代码实现 误删情况 逻辑说明 解决方案 代码实现 更为极端的误删情况 Lua脚本解决原子性问题 分布式锁-redission redisson的概念 快速入门 总结 在前面我们已经实现了单机

    2024年02月09日
    浏览(18)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包