Java组合式异步编程CompletableFuture

这篇具有很好参考价值的文章主要介绍了Java组合式异步编程CompletableFuture。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。


前言

CompletableFuture是Java 8中引入的一个功能强大的Future实现类,它的字面翻译是“可完成的Future”。 CompletableFuture对并发编程进行了增强,可以方便地将多个有一定依赖关系的异步任务以流水线的方式组合在一起,大大简化多异步任务的开发。

CompletableFuture实现了两个接口,一个是Future,另一个是CompletionStage,Future表示异步任务的结果,而CompletionStage字面意思是完成阶段,多个CompletionStage可以以流水线的方式组合起来,对于其中一个CompletionStage,它有一个计算任务,但可能需要等待其他一个或多个阶段完成才能开始,它完成后,可能会触发其他阶段开始运行。

CompletableFuture的设计主要是为了解决Future的阻塞问题,并提供了丰富的API来支持函数式编程和流式编程,可以更方便地组合多个异步任务,并处理它们的依赖关系和异常。这使得它在处理并发编程和异步编程时非常有用。

在使用CompletableFuture时,可以创建它的实例,并通过其提供的各种方法(如thenApply、thenCompose、thenAccept等)来定义任务之间的依赖关系和执行顺序。同时,CompletableFuture还提供了异常处理机制,可以更方便地处理任务执行过程中可能出现的异常。


一、CompletableFuture基本用法

  • 静态方法supplyAsync
CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor)

方法接受两个参数supplier和executor,使用executor执行supplier表示的任务,返回一个CompletableFuture,调用后,任务被异步执行,这个方法立即返回。
supplyAsync还有一个不带executor参数的方法:

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)

没有executor,任务被谁执行呢?与系统环境和配置有关,一般来说,如果可用的CPU核数大于2,会使用Java 7引入的Fork/Join任务执行服务,即ForkJoinPool.commonPool(),该任务执行服务背后的工作线程数一般为CPU核数减1,即Runtime.getRuntime().availableProcessors()-1,否则,会使用ThreadPerTaskExecutor,它会为每个任务创建一个线程。

对于CPU密集型的运算任务,使用Fork/Join任务执行服务是合适的,但对于一般的调用外部服务的异步任务,Fork/Join可能是不合适的,因为它的并行度比较低,可能会让本可以并发的多任务串行运行,这时,应该提供Executor参数。


import java.util.Random;
import java.util.concurrent.*;
import java.util.function.Supplier;


public class CompletableFutureDemo {
    private static ExecutorService executor = Executors.newFixedThreadPool(10);
    private static Random rnd = new Random();
    static int delayRandom(int min, int max) {
        int milli = max > min ? rnd.nextInt(max - min) : 0;
        try {
            Thread.sleep(min + milli);
        } catch (InterruptedException e) {
        }
        return milli;
    }
    static Callable<Integer> externalTask = () -> {
        int time = delayRandom(20, 2000);
        return time;
    };
    public static void master() {
        Future<Integer> asyncRet = callExternalService();
        try {
            Integer ret = asyncRet.get();
            System.out.println(ret);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
    public static CompletableFuture<Integer> callExternalService(){
        Supplier<Integer> supplierTask = () -> {
            int time = delayRandom(20, 2000);
            return time;
        };
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(supplierTask);
        return future;
    }
    public static void main(String[] args) throws Exception{
//        master();
        CompletableFuture<Integer> integerFuture = callExternalService();
        boolean done = integerFuture.isDone();
        Integer now = integerFuture.getNow(0);
        System.out.println(now);
        Integer result = integerFuture.get();
        System.out.println(result);
        now = integerFuture.getNow(0);
        System.out.println(now);
//        executor.shutdown();
    }
}

二、使用CompletableFuture来调度执行由JSON串定义的DAG

在这个例子中,我们创建了四个任务:A、B、C 和 D。任务B依赖于任务A的结果,而任务D依赖于任务B和任务C的结果。我们使用thenApplyAsync来创建依赖链,并使用CompletableFuture.allOf来等待多个任务的完成。最后,我们使用get方法来获取结果。文章来源地址https://www.toymoban.com/news/detail-845619.html


import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import java.util.ArrayList;  
import java.util.HashMap;  
import java.util.List;  
import java.util.Map;  
import java.util.concurrent.CompletableFuture;  
import java.util.concurrent.ExecutionException;  
import java.util.function.Supplier;  
  
public class DagScheduler {  
  
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        String dagJson = "{\"nodes\": [{\"id\": \"A\", \"task\": \"printA\"}, {\"id\": \"B\", \"task\": \"printB\", \"dependencies\": [\"A\"]}, {\"id\": \"C\", \"task\": \"printC\", \"dependencies\": [\"A\"]}, {\"id\": \"D\", \"task\": \"printD\", \"dependencies\": [\"B\", \"C\"]}]}";
        Map<String, CompletableFuture<Void>> futures = new HashMap<>();  
  
        // 解析JSON串  
        JSONObject dagObject = JSONObject.parseObject(dagJson);
        JSONArray nodesArray = dagObject.getJSONArray("nodes");
  
        // 创建一个映射,用于存储每个节点的CompletableFuture  
        Map<String, Node> nodeMap = new HashMap<>();  
        for (int i = 0; i < nodesArray.size(); i++) {
            JSONObject nodeObj = nodesArray.getJSONObject(i);  
            String id = nodeObj.getString("id");  
            List<String> dependencies = new ArrayList<>();  
            if (nodeObj.containsKey("dependencies")) {
                dependencies = nodeObj.getJSONArray("dependencies").toJavaList(String.class);
            }  
            Node node = new Node(id, () -> executeTask(id), dependencies);  
            nodeMap.put(id, node);  
        }  
  
        // 构建依赖关系并启动任务  
        for (Node node : nodeMap.values()) {  
            node.start(futures, nodeMap);  
        }  
  
        // 等待所有任务完成  
        CompletableFuture.allOf(futures.values().toArray(new CompletableFuture[0])).get();  
        System.out.println("All tasks completed.");  
    }  
  
    private static Void executeTask(String taskId) {
        // 执行任务的具体逻辑  
        System.out.println("Executing task: " + taskId);  
        // 模拟任务执行时间  
        try {  
            Thread.sleep((long) (Math.random() * 1000));  
        } catch (InterruptedException e) {  
            Thread.currentThread().interrupt();  
            throw new IllegalStateException(e);  
        }
        return null;
    }  
  
    static class Node {  
        private final String id;  
        private final Supplier<Void> task;
        private final List<String> dependencies;  
        private CompletableFuture<Void> future;
  
        public Node(String id, Supplier<Void> task, List<String> dependencies) {
            this.id = id;  
            this.task = task;  
            this.dependencies = dependencies;  
        }  
  
        public void start(Map<String, CompletableFuture<Void>> futures, Map<String, Node> nodeMap) {
            List<CompletableFuture<Void>> depFutures = new ArrayList<>();
            for (String depId : dependencies) {  
                CompletableFuture<Void> depFuture = futures.get(depId);
                if (depFuture == null) {  
                    throw new IllegalStateException("Unknown dependency: " + depId);  
                }  
                depFutures.add(depFuture);  
            }  
  
            if (depFutures.isEmpty()) {  
                // 没有依赖,直接执行任务  
                future = CompletableFuture.supplyAsync( task);
            } else {  
                // 等待所有依赖完成后执行任务  
                future = CompletableFuture.allOf(depFutures.toArray(new CompletableFuture[0])).thenRunAsync(()->executeTask(id));
            }  
  
            futures.put(id, future);  
        }  
    }  
}

到了这里,关于Java组合式异步编程CompletableFuture的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • Vue3组合式API

    Vue3组合式API

    目录 composition API vs options API 体验 composition API setup 函数 reactive 函数 ref 函数 script setup 语法 计算属性 computed 函数 监听器 watch 函数 生命周期 模板中 ref 的使用 组件通讯 - 父传子 组件通讯 - 子传父 依赖注入 - provide 和 inject 保持响应式 - toRefs 函数 vue2 采用的就是 options API (

    2024年02月07日
    浏览(12)
  • js继承的几种方式(原型链继承、构造函数继承、组合式继承、寄生组合式继承、ES6的Class类继承)

    js继承的几种方式(原型链继承、构造函数继承、组合式继承、寄生组合式继承、ES6的Class类继承)

    实现原理: 子类的原型指向父类实例 。子类在自身实例上找不到属性和方法时去它父类实例(父类实例和实例的原型对象)上查找,从而实现对父类属性和方法的继承 缺点: 子类创建时不能传参(即没有实现super()的功能); 父类实例的修改会影响子类所有实例 实现原理:

    2024年02月07日
    浏览(18)
  • vue3组合式API介绍

    根据官方的说法,vue3.0的变化包括性能上的改进、更小的 bundle 体积、对 TypeScript 更好的支持、用于处理大规模用例的全新 API,全新的api指的就是本文主要要说的组合式api。 在 vue3 版本之前,我们复用组件(或者提取和重用多个组件之间的逻辑),通常有以下几种方式: M

    2023年04月22日
    浏览(15)
  • 组合式升降压PFC的分析方法

    组合式升降压PFC的分析方法

      组合式升降压PFC采用两组储能元件,基本单元为Cuk,Sepic和Zeta。参考论文《New Efficient Bridgeless Cuk Rectifiers for PFC Applications》中的三种拓扑进行分析。   Cuk型PFC的TypeI如下图所示,正半周Dp一直导通,Vc1=Vac+Vo。其中Vac=Vacm sinwt,Vo=mVacm。此根据回路Vac,L1,C1,C2,L2可知,

    2023年04月17日
    浏览(13)
  • 快速入门vue3组合式API

    快速入门vue3组合式API

    (创作不易,感谢有你,你的支持,就是我前行的最大动力,如果看完对你有帮助,请留下您的足迹) 目录 使用create-vue创建项目 熟悉项目目录和关键文件  组合式API  setup选项 setup选项的写法和执行时机 script setup 语法糖 reactive和ref函数 reactive() ref() computed watch 侦听单个数据

    2024年02月12日
    浏览(11)
  • Vue3 组合式函数,实现minxins

    Vue3 组合式函数,实现minxins

    截至目前,组合式函数应该是在VUE 3应用程序中组织业务逻辑最佳的方法。它让我们可以把一些小块的通用逻辑进行抽离、复用,使我们的代码更易于编写、阅读和维护。 根据官方文档说明,在 Vue 应用的概念中, “组合式函数”是一个利用 Vue 组合式 API 来封装和复用有状态

    2024年02月08日
    浏览(10)
  • 基于Vue组合式API的实用工具集

    基于Vue组合式API的实用工具集

    今天,给大家分享一个很实用的工具库 VueUse,它是基于 Vue Composition Api,也就是组合式API。支持在Vue2和Vue3项目中进行使用,据说是目前世界上Star最高的同类型库之一。 图片 官方地址: https://vueuse.org/ 中文地址: https://www.vueusejs.com/ github: https://github.com/vueuse/vueuse 图片 链接

    2024年01月23日
    浏览(8)
  • vue3 组合式 api 单文件组件写法

    Vue3 中的 Composition API 是一种新的编写组件逻辑的方式,它提供了更好的代码组织、类型推导、测试支持和复用性。相比于 Vue2 的 Options API,Composition API 更加灵活和可扩展。 在 Composition API 中,我们使用 setup 函数来定义组件的逻辑部分。setup 函数是一个特殊的函数,在创建组

    2024年02月12日
    浏览(12)
  • vue3组合式api单文件组件写法

    一,模板部分  二,js逻辑部分 

    2024年02月13日
    浏览(17)
  • 带你了解vue3组合式api基本写法

    本文的目的,是为了让已经有 Vue2 开发经验的 人 ,快速掌握 Vue3 的写法。 因此, 本篇假定你已经掌握 Vue 的核心内容 ,只为你介绍编写 Vue3 代码,需要了解的内容。 一、Vue3 里 script 的三种写法 首先,Vue3 新增了一个叫做组合式 api 的东西,英文名叫 Composition API。因此 Vu

    2024年02月01日
    浏览(12)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包