3、基于Zookeeper实现分布式锁

这篇具有很好参考价值的文章主要介绍了3、基于Zookeeper实现分布式锁。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

3.1、Zookeeper安装和相关概念

3.1.1 安装启动

# 解压到/mysoft文件夹下
tar -zxvf zookeeper-3.7.0-bin.tar.gz
# 重命名
mv apache-zookeeper-3.7.0-bin/ zookeeper
# 打开zookeeper根目录
cd /mysoft/zookeeper
# 创建一个数据目录,备用
mkdir data
# 打开zk的配置目录
cd /mysoft/zookeeper/conf
# copy配置文件,zk启动时会加载zoo.cfg文件
cp zoo_sample.cfg zoo.cfg
# 编辑配置文件
vim zoo.cfg
# 修改dataDir参数为之前创建的数据目录:/mysoft/zookeeper/data
# 切换到bin目录
cd /mysoft/zookeeper/bin
# 启动 
./zkServer.sh start
./zkServer.sh status # 查看启动状态
./zkServer.sh stop # 停止
./zkServer.sh restart # 重启
./zkCli.sh # 查看zk客户端

如下,说明启动成功:
3、基于Zookeeper实现分布式锁,分布式锁,zookeeper,分布式锁

3.1.2 相关概念

Zookeeper提供一个多层级的节点命名空间(节点称为znode),每个节点都用一个以斜杠(/)分隔的路径表示,而且每个节点都有父节点(根节点除外),非常类似于文件系统。并且每个节点都是唯一的。

1、znode节点有四种类型:

  • PERSISTENT:永久节点。客户端与zookeeper断开连接后,该节点依旧存在
  • EPHEMERAL:临时节点。客户端与zookeeper断开连接后,该节点被删除
  • PERSISTENT_SEQUENTIAL:永久节点、序列化。客户端与zookeeper断开连接后,该节点依旧存在,只是Zookeeper给该节点名称进行顺序编号
  • EPHEMERAL_SEQUENTIAL:临时节点、序列化。客户端与zookeeper断开连接后,该节点被删除,只是Zookeeper给该节点名称进行顺序编号
create /aa test  # 创建持久化节点
create -e /cc test  # 创建临时节点
create -s /bb test  # 创建持久序列化节点
create -e -s /dd test  # 创建临时序列化节点

3、基于Zookeeper实现分布式锁,分布式锁,zookeeper,分布式锁

2、事件监听
在读取数据时,我们可以同时对节点设置事件监听,当节点数据或结构变化时,zookeeper会通知客户端
当前zookeeper针对节点的监听有如下四种事件:

  • 当某个节点创建或者删除的时候:stat -w /xx

    当/xx节点创建时:NodeCreated ,当/xx节点删除时:NodeDeleted

  • 节点的值变化监听:get -w /xx

    当/xx节点数据发生变化时:NodeDataChanged

  • 节点的子节点变化监听:ls -w /xx

    当/xx节点的子节点创建或者删除时:NodeChildChanged

【注意】zookeeper监听都是一次性的


3.1.3 Java客户端

ZooKeeper的java客户端有:

  • 原生客户端
  • ZkClient
  • Curator框架(类似于redisson,有很多功能性封装)

Java命令演示:

public class ZkTest {
    public static void main(String[] args) throws InterruptedException {
        ZooKeeper zooKeeper = null;
        CountDownLatch countDownLatch = new CountDownLatch(1);
        try {
            zooKeeper = new ZooKeeper("192.168.239.11:2181", 30000, new Watcher() {

                @Override
                public void process(WatchedEvent event) {
                    Event.KeeperState state = event.getState();
                    if (Event.KeeperState.SyncConnected.equals(state)) {
                        System.out.println("获取zookeeper连接了.....");
                        countDownLatch.countDown();
                    } else if (Event.KeeperState.Closed.equals(state)) {
                        System.out.println("关闭连接......");
                    }
                }
            });
            countDownLatch.await();
            System.out.println("zookeeper一顿操作...");
            // 一、创建一个节点
            // 创建一个节点,1-节点路径 2-节点内容 3-节点的访问权限 4-节点类型
            // OPEN_ACL_UNSAFE:任何人可以操作该节点
            // CREATOR_ALL_ACL:创建者拥有所有访问权限
            // READ_ACL_UNSAFE: 任何人都可以读取该节点
            // 创建持久化节点
            // zooKeeper.create("/atguigu", "haha~~".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            // 创建临时节点
            // zooKeeper.create("/test", "haha~~".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
            // 创建永久序列化
            // zooKeeper.create("/atguigu/cc", "haha~~".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
            // 创建临时序列化节点
            // zooKeeper.create("/atguigu/dd", "haha~~".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            // zooKeeper.create("/atguigu/dd", "haha~~".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            // zooKeeper.create("/atguigu/dd", "haha~~".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);


            // 二、判断节点是否存在
            Stat stat = zooKeeper.exists("/test", true);
            if (stat != null){
                System.out.println("当前节点存在!" + stat.getVersion());
            } else {
                System.out.println("当前节点不存在!");
            }

            // 三、获取一个节点的数据
            byte[] data = zooKeeper.getData("/aa/bb0000000000", false, null);
            System.out.println("节点/aa/bb0000000000下的数据为:" + new String(data));

            // 查询一个节点的所有子节点
            List<String> children = zooKeeper.getChildren("/aa", new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    System.out.println("的子节点发生变化");
                }
            });
            System.out.println("/aa节点下的子节点:" + children);

            // 更新
            // zooKeeper.setData("/atguigu", "wawa...".getBytes(), stat.getVersion());

            // 删除一个节点
            //zooKeeper.delete("/atguigu", -1);

            System.in.read();

        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (KeeperException e) {
            throw new RuntimeException(e);
        } finally {
            if (zooKeeper != null) {
                zooKeeper.close();
            }
        }
    }
}

3.2 Zookeeper实现分布式锁的思路分析

获取锁:create一个节点(因为一个节点只能创建一次,当一个线程创建完一个节点后,其他线程都不能创建该节点了)
删除锁:delete一个节点

参照redis分布式锁的特点:

  1. 互斥 排他
  2. 防死锁:
    1. 可自动释放锁(临时节点) :获得锁之后客户端所在机器宕机了,客户端没有主动删除子节点;如果创建的是永久的节点,那么这个锁永远不会释放,导致死锁;由于创建的是临时节点,客户端宕机后,过了一定时间zookeeper没有收到客户端的心跳包判断会话失效,将临时节点删除从而释放锁。
    2. 可重入锁:借助于ThreadLocal
  3. 防误删:宕机自动释放临时节点,不需要设置过期时间,也就不存在误删问题。
  4. 加锁/解锁要具备原子性
  5. 单点问题:使用Zookeeper可以有效的解决单点问题,ZK一般是集群部署的。
  6. 集群问题:zookeeper集群是强一致性的,只要集群中有半数以上的机器存活,就可以对外提供服务。


3.3 ZooKeeper分布式锁的基本实现

实现思路:

  1. 多个请求同时添加一个相同的临时节点,只有一个可以添加成功。添加成功的获取到锁
  2. 执行业务逻辑
  3. 完成业务流程后,删除节点释放锁。

基本实现

由于zookeeper获取链接是一个耗时过程,这里可以在项目启动时,初始化链接,并且只初始化一次。借助于spring特性,代码实现如下:

public class ZkClient {

    private ZooKeeper zooKeeper;

    @PostConstruct
    public void init() {
        // 项目启动时,创建zk连接
        CountDownLatch countDownLatch = new CountDownLatch(1);
        try {
            zooKeeper = new ZooKeeper("192.168.239.11:2181", 30000, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    Event.KeeperState state = event.getState();
                    if (Event.KeeperState.SyncConnected.equals(state)) {
                        System.out.println("获取zookeeper连接了.....");
                        countDownLatch.countDown();
                    } else if (Event.KeeperState.Closed.equals(state)) {
                        System.out.println("关闭连接......");
                    }
                }
            });
        } catch (IOException e) {
           e.printStackTrace();
        }
    }

    @PreDestroy
    public void destory() {
        // 项目销毁前,释放zk连接
        if(zooKeeper!=null) {
            try {
                zooKeeper.close();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }


    // 获取分布式锁
    public ZkDistributedLock getLock(String name) {
        return new ZkDistributedLock(name,zooKeeper);
    }
}

zk分布式锁具体实现:

public class ZkDistributedLock implements Lock {

    private String lockName;

    private ZooKeeper zooKeeper;

    private final String ROOT_PATH = "/locks";

    public ZkDistributedLock(String lockName, ZooKeeper zooKeeper) {
        this.lockName = lockName;
        this.zooKeeper = zooKeeper;
        // 判断锁的根节点是否存在,如果不存在,则创建根节点
        try {
            if (zooKeeper.exists(ROOT_PATH, false) == null) {
                zooKeeper.create(ROOT_PATH, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (KeeperException e) {
            throw new RuntimeException(e);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void lock() {
        // 创建znode节点
        this.tryLock();

    }

    @Override
    public void lockInterruptibly() throws InterruptedException {

    }

    @Override
    public boolean tryLock() {
        // 【注意】创建zookeeper创建临时节点 !! ===》 为什么 ? 防止死锁
        try {
            this.zooKeeper.create(ROOT_PATH + "/" + lockName, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            // 获取锁失败了,则睡眠一段时间后,进行重试
            try {
                Thread.sleep(80);
                this.tryLock();
            } catch (InterruptedException ex) {
                e.printStackTrace();
            }
        }
        return false;
    }

    @Override
    public boolean tryLock(long l, TimeUnit timeUnit) throws InterruptedException {
        return false;
    }

    @Override
    public void unlock() {
        try {
            this.zooKeeper.delete(ROOT_PATH + "/" + lockName, -1);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public Condition newCondition() {
        return null;
    }
}

改造StockService的checkAndLock方法:

@Autowired
private ZkClient client;

  public void deduct() {
       ZkDistributedLock lock = zkClient.getLock("lock");
       lock.lock();
       try {
           String stockStr = redisTemplate.opsForValue().get("stock:" + "1001");
           // 2. 判断库存是否充足
           if (stockStr != null && stockStr.length() != 0) {
               Long stock = Long.parseLong(stockStr);
               if (stock > 0) {
                   redisTemplate.opsForValue().set("stock:" + "1001", String.valueOf(stock - 1));
               }
           }
       } finally {
           lock.unlock();
       }
   }

测试
jmeter 循环并发发送5000请求,查看库存

存在的问题

  • 性能一般(比mysql分布式锁略好)
  • 不可重入


3.4. 优化:性能优化

基本实现中由于无限自旋影响性能:
3、基于Zookeeper实现分布式锁,分布式锁,zookeeper,分布式锁
试想:每个请求要想正常的执行完成,最终都是要创建节点,如果能够避免争抢必然可以提高性能。


3.4.1 优化1 临时序列化节点

这里借助于zk的临时序列化节点,实现分布式锁:每个创建最小节点的线程获取到分布式锁
3、基于Zookeeper实现分布式锁,分布式锁,zookeeper,分布式锁
虽然不用反复争抢创建节点了,但是会自旋判断自己是最小的节点,这个判断逻辑反而更复杂更耗时。

3.4.1 优化2 使用监听

对于这个算法有个极大的优化点:假如当前有1000个节点在等待锁,如果获得锁的客户端释放锁时,这1000个客户端都会被唤醒,这种情况称为“羊群效应”;在这种羊群效应中,zookeeper需要通知1000个客户端,这会阻塞其他的操作,最好的情况应该只唤醒新的最小节点对应的客户端。应该怎么做呢?在设置事件监听时,每个客户端应该对刚好在它之前的子节点设置事件监听,例如子节点列表为/locks/lock-0000000000、/locks/lock-0000000001、/locks/lock-0000000002,序号为1的客户端监听序号为0的子节点删除消息,序号为2的监听序号为1的子节点删除消息

所以调整后的分布式锁算法流程如下:

  • 客户端连接zookeeper,并在/lock下创建临时的且有序的子节点,第一个客户端对应的子节点为/locks/lock-0000000000,第二个为/locks/lock-0000000001,以此类推;
  • 客户端获取/lock下的子节点列表,判断自己创建的子节点是否为当前子节点列表中序号最小的子节点,如果是则认为获得锁,否则监听刚好在自己之前一位的子节点删除消息,获得子节点变更通知后重复此步骤直至获得锁;
  • 执行业务代码;
  • 完成业务流程后,删除对应的子节点释放锁。

3.4.3. 实现阻塞锁

public class ZkDistributedLock implements Lock {

    private String lockName;

    private String currentNodePath;

    private ZooKeeper zooKeeper;

    private final String ROOT_PATH = "/locks";

    public ZkDistributedLock(String lockName, ZooKeeper zooKeeper) {
        this.lockName = lockName;
        this.zooKeeper = zooKeeper;
        // 判断锁的根节点是否存在,如果不存在,则创建根节点
        try {
            if (zooKeeper.exists(ROOT_PATH, false) == null) {
                zooKeeper.create(ROOT_PATH, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (KeeperException e) {
            throw new RuntimeException(e);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void lock() {
        // 创建znode节点
        this.tryLock();

    }

    @Override
    public void lockInterruptibly() throws InterruptedException {

    }


    @Override
    public boolean tryLock() {
        // 【注意】创建zookeeper创建临时节点 !! ===》 为什么 ? 防止死锁
        try {
            currentNodePath = this.zooKeeper.create(ROOT_PATH + "/" + lockName + "-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            // 获取前置节点,如果前置节点为空,则获取锁成功,否则监听前置节点
            String preNode = this.getpreNode();
            if (preNode != null) {
                // 利用闭锁思想,实现阻塞功能
                CountDownLatch countDownLatch = new CountDownLatch(1);
                // 当前节点有前置节点
                // 因为获取前置节点这个操作,不具备原子性。需要再次判断zk中的前置节点是否存在
                if(this.zooKeeper.exists(lockName + "/" + preNode, new Watcher() {
                    @Override
                    public void process(WatchedEvent event) {
                        countDownLatch.countDown();
                    }
                }) == null) {
                    return  true;
                }
                countDownLatch.await();
            }
            // 当前节点没有前置节点,获取锁成功
            return true;
        } catch (Exception e) {
           // e.printStackTrace();
        }
        return false;
    }

    private String getpreNode() throws InterruptedException, KeeperException {
        // 获取根节点下的所有节点
        List<String> children = this.zooKeeper.getChildren(ROOT_PATH, false);
        if (CollectionUtils.isEmpty(children)) {
            throw new IllegalMonitorStateException("非法操作!");
        }

        // 获取和当前节点同一资源的锁
        List<String> nodes = children.stream().filter(item -> StringUtils.startsWith(item, lockName + "-"))
                .collect(Collectors.toList());
        if (CollectionUtils.isEmpty(nodes)) {
            throw new IllegalMonitorStateException("非法操作!");
        }
        // 排好队
        Collections.sort(nodes);
        // 获取当前节点下标
        int index = Collections.binarySearch(nodes, StringUtils.substringAfterLast(currentNodePath, "/"));
        if (index < 0) {
            throw new IllegalMonitorStateException("非法操作!");
        } else if (index > 0) {
            return nodes.get(index - 1);
        }
        // 如果当前节点就是第一个节点,返回null
        return null;
    }

     /*   @Override
    public boolean tryLock() {
        // 【注意】创建zookeeper创建临时节点 !! ===》 为什么 ? 防止死锁
        try {
            this.zooKeeper.create(ROOT_PATH + "/" + lockName, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            // 获取锁失败了,则睡眠一段时间后,进行重试
            try {
                Thread.sleep(80);
                this.tryLock();
            } catch (InterruptedException ex) {
                e.printStackTrace();
            }
        }
        return false;
    }*/


    @Override
    public boolean tryLock(long l, TimeUnit timeUnit) throws InterruptedException {
        return false;
    }

    @Override
    public void unlock() {
        try {
            this.zooKeeper.delete(ROOT_PATH + "/" + lockName, -1);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public Condition newCondition() {
        return null;
    }
}

3.5 优化-可重入锁

有两种实现方式:

  • 1、在节点中记录服务器,线程以及可重入信息
  • 2、使用ThreadLocal 变量
public class ZkDistributedLock implements Lock {

    private String lockName;

    private String currentNodePath;

    private ZooKeeper zooKeeper;

    private final String ROOT_PATH = "/locks";
    private static final ThreadLocal<Integer> THREAD_LOCAL = new ThreadLocal<>();

    public ZkDistributedLock(String lockName, ZooKeeper zooKeeper) {
        this.lockName = lockName;
        this.zooKeeper = zooKeeper;
        // 判断锁的根节点是否存在,如果不存在,则创建根节点
        try {
            if (zooKeeper.exists(ROOT_PATH, false) == null) {
                zooKeeper.create(ROOT_PATH, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (KeeperException e) {
            throw new RuntimeException(e);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void lock() {
        // 创建znode节点
        this.tryLock();

    }

    @Override
    public void lockInterruptibly() throws InterruptedException {

    }


    @Override
    public boolean tryLock() {
        // 【注意】创建zookeeper创建临时节点 !! ===》 为什么 ? 防止死锁
        try {
            // 判断当前线程是不是持有锁,持有锁的话,则将重入次数 + 1
            Integer flag = THREAD_LOCAL.get();
            if (flag != null && flag > 0) {
                THREAD_LOCAL.set(flag + 1);
                return true;
            }

            currentNodePath = this.zooKeeper.create(ROOT_PATH + "/" + lockName + "-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            // 获取前置节点,如果前置节点为空,则获取锁成功,否则监听前置节点
            String preNode = this.getpreNode();
            if (preNode != null) {
                // 利用闭锁思想,实现阻塞功能
                CountDownLatch countDownLatch = new CountDownLatch(1);
                // 当前节点有前置节点
                // 因为获取前置节点这个操作,不具备原子性。需要再次判断zk中的前置节点是否存在
                if(this.zooKeeper.exists(lockName + "/" + preNode, new Watcher() {
                    @Override
                    public void process(WatchedEvent event) {
                        countDownLatch.countDown();
                    }
                }) == null) {
                    // 获取锁成功 重入次数 + 1
                    THREAD_LOCAL.set(1);
                    return  true;
                }
                countDownLatch.await();
            }
            // 当前节点没有前置节点,获取锁成功
            // 获取锁成功 重入次数 + 1
            THREAD_LOCAL.set(1);
            return true;
        } catch (Exception e) {
           // e.printStackTrace();
        }
        return false;
    }

    private String getpreNode() throws InterruptedException, KeeperException {
        // 获取根节点下的所有节点
        List<String> children = this.zooKeeper.getChildren(ROOT_PATH, false);
        if (CollectionUtils.isEmpty(children)) {
            throw new IllegalMonitorStateException("非法操作!");
        }

        // 获取和当前节点同一资源的锁
        List<String> nodes = children.stream().filter(item -> StringUtils.startsWith(item, lockName + "-"))
                .collect(Collectors.toList());
        if (CollectionUtils.isEmpty(nodes)) {
            throw new IllegalMonitorStateException("非法操作!");
        }
        // 排好队
        Collections.sort(nodes);
        // 获取当前节点下标
        int index = Collections.binarySearch(nodes, StringUtils.substringAfterLast(currentNodePath, "/"));
        if (index < 0) {
            throw new IllegalMonitorStateException("非法操作!");
        } else if (index > 0) {
            return nodes.get(index - 1);
        }
        // 如果当前节点就是第一个节点,返回null
        return null;
    }

     /*   @Override
    public boolean tryLock() {
        // 【注意】创建zookeeper创建临时节点 !! ===》 为什么 ? 防止死锁
        try {
            this.zooKeeper.create(ROOT_PATH + "/" + lockName, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            // 获取锁失败了,则睡眠一段时间后,进行重试
            try {
                Thread.sleep(80);
                this.tryLock();
            } catch (InterruptedException ex) {
                e.printStackTrace();
            }
        }
        return false;
    }*/


    @Override
    public boolean tryLock(long l, TimeUnit timeUnit) throws InterruptedException {
        return false;
    }

    @Override
    public void unlock() {
        try {
            THREAD_LOCAL.set(THREAD_LOCAL.get() - 1);
            if (THREAD_LOCAL.get() == 0) {
                this.zooKeeper.delete(ROOT_PATH + "/" + lockName, -1);
                THREAD_LOCAL.remove();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public Condition newCondition() {
        return null;
    }
}


3.6 zk分布式锁小结

参照redis分布式锁的特点:文章来源地址https://www.toymoban.com/news/detail-605676.html

  1. 互斥 排他:zk节点的不可重复性,以及序列化节点的有序性
  2. 防死锁:
    1. 可自动释放锁:临时节点
    2. 可重入锁:借助于ThreadLocal
  3. 防误删:临时节点
  4. 加锁/解锁要具备原子性
  5. 单点问题:使用Zookeeper可以有效的解决单点问题,ZK一般是集群部署的。
  6. 集群问题:zookeeper集群是强一致性的,只要集群中有半数以上的机器存活,就可以对外提供服务。
  7. 公平锁:有序性节点

3.7 Curator中的分布式锁

到了这里,关于3、基于Zookeeper实现分布式锁的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • Zookeeper实现分布式锁

    Zookeeper实现分布式锁

    ZooKeeper是一个分布式协调服务,其中提供的序列化、持久化、有层次的目录结构使得它非常适合用于实现分布式锁。在ZooKeeper中,分布式锁通常通过临时有序节点实现。以下是ZooKeeper分布式锁的详细介绍:  实现方式: 临时有序节点: 当一个客户端需要获取锁时,它在ZooK

    2024年02月02日
    浏览(14)
  • 在Spring中,可以使用不同的方式来实现分布式锁,例如基于数据库、Redis、ZooKeeper等

    在Spring中,可以使用不同的方式来实现分布式锁,例如基于数据库、Redis、ZooKeeper等

    在Spring中,可以使用不同的方式来实现分布式锁,例如基于数据库、Redis、ZooKeeper等。下面是两种常见的实现方式: 使用Redis实现分布式锁: 使用自定义注解实现本地锁: 以上是两种常见的在Spring中实现分布式锁的方式。第一种方式使用Redis作为分布式锁的存储介质,通过

    2024年03月17日
    浏览(14)
  • 使用ZooKeeper实现分布式锁

    目录 引言 1. ZooKeeper简介 2. 分布式锁实现原理 3. 分布式锁实现步骤 步骤一:创建ZooKeeper客户端 步骤二:创建分布式锁类 步骤三:使用分布式锁 4. 总结 在分布式系统中,实现分布式锁是一项常见的任务,可以用于保证同一时间只有一个客户端可以访问共享资源,从而避免竞

    2024年02月21日
    浏览(15)
  • 分布式锁解决方案_Zookeeper实现分布式锁

    提示:这里可以添加系列文章的所有文章的目录,目录需要自己手动添加 分布式锁解决方案_Zookeeper实现分布式锁 提示:写完文章后,目录可以自动生成,如何生成可参考右边的帮助文档 提示:这里可以添加本文要记录的大概内容: Zookeeper 是一个开源的分布式协调服务,它

    2024年02月03日
    浏览(10)
  • springboot 使用zookeeper实现分布式队列

    一.添加ZooKeeper依赖:在pom.xml文件中添加ZooKeeper客户端的依赖项。例如,可以使用Apache Curator作为ZooKeeper客户端库: 二.创建ZooKeeper连接:在应用程序的配置文件中,配置ZooKeeper服务器的连接信息。例如,在application.properties文件中添加以下配置:  三.创建分布式队列:使用Z

    2024年02月13日
    浏览(13)
  • springboot 使用zookeeper实现分布式锁

    一.添加ZooKeeper依赖:在pom.xml文件中添加ZooKeeper客户端的依赖项。例如,可以使用Apache Curator作为ZooKeeper客户端库: 二.创建ZooKeeper连接:在应用程序的配置文件中,配置ZooKeeper服务器的连接信息。例如,在application.properties文件中添加以下配置: 三.创建分布式锁:使用ZooKee

    2024年02月13日
    浏览(15)
  • Zookeeper实战——分布式锁实现以及原理

    Zookeeper实战——分布式锁实现以及原理

    分布式锁是控制分布式系统之间同步访问共享资源的一种方式。分布式锁的实现方式有很多种,比如 Redis 、数据库 、 zookeeper 等。这篇文章主要介绍用 Zookeeper 实现分布式锁。 先说结论: Zookeeper 是基于临时顺序节点以及 Watcher 监听器机制实现分布式锁的 。 (1)ZooKeeper 的每

    2023年04月08日
    浏览(8)
  • 【分布式锁】06-Zookeeper实现分布式锁:可重入锁源码分析

    前言 前面已经讲解了Redis的客户端Redission是怎么实现分布式锁的,大多都深入到源码级别。 在分布式系统中,常见的分布式锁实现方案还有Zookeeper,接下来会深入研究Zookeeper是如何来实现分布式锁的。 Zookeeper初识 文件系统 Zookeeper维护一个类似文件系统的数据结构 image.png 每

    2024年02月22日
    浏览(13)
  • 实现分布式锁:Zookeeper vs Redis

    目录 引言 1. Zookeeper分布式锁 1.1特点和优势: 强一致性 顺序节点 Watch机制 1.2 Zookeeper分布式锁代码示例 2. Redis分布式锁 2.1特点和优势: 简单高效 可续租性 灵活性 2.2Redis分布式锁代码示例 3.对比和选择 3.1  一致性要求 3.2  适用场景 3.3 性能和复杂度 结论 在分布式系统中,

    2024年01月22日
    浏览(15)
  • springboot 使用zookeeper实现分布式ID

    添加ZooKeeper依赖:在pom.xml文件中添加ZooKeeper客户端的依赖项。例如,可以使用Apache Curator作为ZooKeeper客户端库: 创建ZooKeeper连接:在应用程序的配置文件中,配置ZooKeeper服务器的连接信息。例如,在application.properties文件中添加以下配置: 创建分布式ID生成器:使用ZooKeeper客

    2024年02月13日
    浏览(13)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包