异步编程 - 06 基于JDK中的Future实现异步编程(中)_CompletableFuture源码解析

这篇具有很好参考价值的文章主要介绍了异步编程 - 06 基于JDK中的Future实现异步编程(中)_CompletableFuture源码解析。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

异步编程 - 06 基于JDK中的Future实现异步编程(中)_CompletableFuture源码解析,【异步编程】,Completable,Future,源码解析


CompletableFuture 类图结构

异步编程 - 06 基于JDK中的Future实现异步编程(中)_CompletableFuture源码解析,【异步编程】,Completable,Future,源码解析

CompletionStage接口

CompletableFuture实现了CompletionStage接口 。

  • 1)一个CompletionStage代表着一个异步计算节点,当另外一个CompletionStage计算节点完成后,当前CompletionStage会执行或者计算一个值;一个节点在计算终止时完成,可能反过来触发其他依赖其结果的节点开始计算。

  • 2)一个节点(CompletionStage)的计算执行可以被表述为一个函数、消费者、可执行的Runable(例如使用apply、accept、run方法),

具体取决于这个节点是否需要参数或者产生结果。例如:

stage.thenApply(x -> square(x))//计算平方和
     .thenAccept(x -> System.out.print(x))//输出计算结果
     .thenRun(() -> System.out.println());//然后执行异步任务
  • 3)CompletionStage节点可以使用3种模式来执行:默认执行、默认异步执行(使用async后缀的方法)和用户自定义的线程执行器执行(通过传递一个Executor方式)。

  • 4)一个节点的执行可以通过一到两个节点的执行完成来触发。一个节点依赖的其他节点通常使用then前缀的方法来进行组织。

属性

result

volatile Object result;       // Either the result or boxed AltResult

result字段用来存放任务执行的结果,如果不为null,则标识任务已经执行完成。而计算任务本身也可能需要返回null值,所以使用AltResult(如下代码)来包装计算任务返回null的情况(ex等于null的时候),AltResult也被用来存放当任务执行出现异常时候的异常信息(ex不为null的时候):

static final class AltResult { // See above
    final Throwable ex;        // null only for NIL
    AltResult(Throwable x) { this.ex = x; }
}

stack

 volatile Completion stack;    // Top of Treiber stack of dependent actions

stack字段是当前任务执行完毕后要触发的一系列行为的入口,由于一个任务执行后可以触发多个行为,所以所有行为被组织成一个链表结构,并且使用Treiber stack实现了无锁基于CAS的链式栈,其中stack存放栈顶行为节点,stack是Completion类型的,定义如下所示。

 abstract static class Completion extends ForkJoinTask<Void>
    implements Runnable, AsynchronousCompletionTask {
    volatile Completion next;      // Treiber stack下一个节点
  ...
}

asyncPool

/**
 * Default executor -- ForkJoinPool.commonPool() unless it cannot
 * support parallelism.
 */
private static final Executor asyncPool = useCommonPool ?
    ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

asyncPool是用来执行异步任务的线程池,如果支持并发则默认为Fork-JoinPool.commonPool(),否则是ThreadPerTaskExecutor


方法

CompletableFuture<Void>runAsync(Runnable runnable)

该方法返回一个新的CompletableFuture对象,其结果值会在给定的runnable行为使用ForkJoinPool.commonPool()异步执行完毕后被设置为null,代码如下所示。

public static CompletableFuture<Void> runAsync(Runnable runnable) {
    return asyncRunStage(asyncPool, runnable);
}

如上代码中,默认情况下asyncPool为ForkJoinPool.commonPool(),其中asyncRunStage代码如下所示。

static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) {
    //1.任务或者行为为null,则抛出NPE异常
    if (f == null) throw new NullPointerException();
    //2.创建一个future对象
    CompletableFuture<Void> d = new CompletableFuture<Void>();
    //3.包装f和d为异步任务后,投递到线程池执行
    e.execute(new AsyncRun(d, f));
    //4.返回创建的future对象
    return d;
}
  • 代码1判断行为是否为null,如果是则抛出异常。

  • 代码2创建一个CompletableFuture对象。

  • 代码3首先创建一个AsyncRun任务,里面保存了创建的future对象和要执行的行为,然后投递到ForkJoinPool.commonPool()线程池执行。

  • 代码4直接返回创建的CompletableFuture对象。

可知runAsync方法会马上返回一个CompletableFuture对象,并且当前线程不会被阻塞;代码3投递AsyncRun任务到线程池后,线程池线程会执行其run方法。

下面我们看看在AsyncRun中是如何执行我们设置的行为,并把结果设置到创建的future对象中的。

static final class AsyncRun extends ForkJoinTask<Void>
       implements Runnable, AsynchronousCompletionTask {
        CompletableFuture<Void> dep; Runnable fn;
        //保存创建的future和要执行的行为
        AsyncRun(CompletableFuture<Void> dep, Runnable fn) {
            this.dep = dep; this.fn = fn;
        }
       ...
        public void run() {
            CompletableFuture<Void> d; Runnable f;
            if ((d = dep) != null && (f = fn) != null) {
                dep = null; fn = null;
                //5.如果future的result等于null,说明任务还没完成
                if (d.result == null) {
                    try {
                        //5.1执行传递的行为
                        f.run();
                        //5.2设置future的结果为null
                        d.completeNull();
                    } catch (Throwable ex) {
                        d.completeThrowable(ex);
                    }
                }
                //6弹出当前future中依赖当前结果的行为并执行
                d.postComplete();
            }
        }
}
  • 这里代码5如果发现future的result不为null,说明当前future还没开始执行,则代码5.1执行我们传递的runnable方法,然后执行代码5.2将future对象的结果设置为null,这时候其他因调用future的get()方法而被阻塞的线程就会从get()处返回null。

  • 当代码6的future任务结束后,看看其stack栈里面是否有依赖其结果的行为,如果有则从栈中弹出来,并执行。

其实上面代码中的runAsync实现可以用我们自己编写的简单代码来模拟。

public static CompletableFuture runAsync(Runnable runnable) {
    CompletableFuture<String> future = new CompletableFuture<String>();

    // 2.开启线程计算任务结果,并设置
    POOL_EXECUTOR.execute(() -> {

        // 2.1模拟任务计算
        try {
            runnable.run();
            future.complete(null);

        } catch (Exception e) {
            future.completeExceptionally(e);
        }
    });

    return future;
}

CompletableFuture<U> supplyAsync(Supplier<U>supplier)

该方法返回一个新的CompletableFuture对象,其结果值为入参supplier行为执行的结果,代码如下所示。

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
    return asyncSupplyStage(asyncPool, supplier);
}
static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
                                             Supplier<U> f) {
    if (f == null) throw new NullPointerException();
    CompletableFuture<U> d = new CompletableFuture<U>();
    e.execute(new AsyncSupply<U>(d, f));
    return d;
}

如上代码与runAsync类似,不同点在于,其提交到线程池的是AsyncSupply类型的任务,下面我们来看其代码。

static final class AsyncSupply<T> extends ForkJoinTask<Void>
        implements Runnable, AsynchronousCompletionTask {
    CompletableFuture<T> dep; Supplier<T> fn;
    AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {
        this.dep = dep; this.fn = fn;
    }

   ...

    public void run() {
        CompletableFuture<T> d; Supplier<T> f;
        if ((d = dep) != null && (f = fn) != null) {
            dep = null; fn = null;
            //1.如果future的result等于null,说明任务还没完成
            if (d.result == null) {
                try {
                    //1.1 f.get()执行行为f的方法,并获取结果
                    //1.2 把f.get()执行结果设置到future对象
                    d.completeValue(f.get());
                } catch (Throwable ex) {
                    d.completeThrowable(ex);
                }
            }
            //2.弹出当前future中依赖当前结果的行为并执行

            d.postComplete();
        }
    }
}

如上代码与runAsync的不同点在于,这里的行为方法是Supplier,其get()方法有返回值,且返回值会被设置到future中,然后调用future的get()方法的线程就会获取到该值。


CompletableFuture<U> supplyAsync(Supplier<U>supplier,Executor executor)

该方法返回一个新的CompletableFuture对象,其结果值为入参supplier行为执行的结果,需要注意的是,supplier行为的执行不再是ForkJoinPool.commonPool(),而是业务自己传递的executor,其代码如下所示。

    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
                                                       Executor executor) {
        return asyncSupplyStage(screenExecutor(executor), supplier);
    }
static Executor screenExecutor(Executor e) {
    //如果使用commonpool并且传递的e本身就是commonpool
    if (!useCommonPool && e == ForkJoinPool.commonPool())
        return asyncPool;
    //如果传递的线程池为null,则抛出NPE异常
    if (e == null) throw new NullPointerException();

    //返回业务传递的线程池e
    return e;
}

如上代码通过使用screenExecutor方法来判断传入的线程池是否是一个可用的线程池,如果不是则抛出异常。

异步编程 - 06 基于JDK中的Future实现异步编程(中)_CompletableFuture源码解析,【异步编程】,Completable,Future,源码解析文章来源地址https://www.toymoban.com/news/detail-698427.html

到了这里,关于异步编程 - 06 基于JDK中的Future实现异步编程(中)_CompletableFuture源码解析的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • CompletableFuture:Java中的异步编程利器

    CompletableFuture:Java中的异步编程利器

    前言: 在秋招的面试中,面试官问了很多关于异步编程相关的知识点,朋友最近也和我聊到了这个话题,因此今天咱们来讨论讨论这个知识点! 随着现代软件系统的日益复杂,对于非阻塞性和响应性的需求也在不断增加。Java为我们提供了多种工具和技术来满足这些需求,其

    2024年02月04日
    浏览(12)
  • CompletableFuture与线程池:Java 8中的高效异步编程搭配

    摘要:在Java 8中,CompletableFuture和线程池的结合使用为程序员提供了一种高效、灵活的异步编程解决方案。本文将深入探讨CompletableFuture和线程池结合使用的优势、原理及实际应用案例,帮助读者更好地理解并掌握这一技术。 随着多核处理器的普及,应用程序的性能和响应能

    2024年02月07日
    浏览(16)
  • Java组合式异步编程CompletableFuture

    CompletableFuture是Java 8中引入的一个功能强大的Future实现类,它的字面翻译是“可完成的Future”。 CompletableFuture对并发编程进行了增强,可以方便地将多个有一定依赖关系的异步任务以流水线的方式组合在一起,大大简化多异步任务的开发。 CompletableFuture实现了两个接口,一个

    2024年04月09日
    浏览(13)
  • CompletableFuture异步编程事务及多数据源配置详解(含gitee源码)

    CompletableFuture异步编程事务及多数据源配置详解(含gitee源码)

    仓库地址: buxingzhe: 一个多数据源和多线程事务练习项目 小伙伴们在日常编码中经常为了提高程序运行效率采用多线程编程,在不涉及事务的情况下,使用dou.lea大神提供的CompletableFuture异步编程利器,它提供了许多优雅的api,我们可以很方便的进行异步多线程编程,速度杠杠

    2024年01月22日
    浏览(13)
  • CompletableFuture异步编程事务及多数据源配置问题(含gitee源码)

    CompletableFuture异步编程事务及多数据源配置问题(含gitee源码)

    仓库地址: buxingzhe: 一个多数据源和多线程事务练习项目 小伙伴们在日常编码中经常为了提高程序运行效率采用多线程编程,在不涉及事务的情况下,使用dou.lea大神提供的CompletableFuture异步编程利器,它提供了许多优雅的api,我们可以很方便的进行异步多线程编程,速度杠杠

    2024年02月05日
    浏览(10)
  • Future和CompletableFuture区别

    Future :获取异步返回的结果需要使用轮询的方式,消耗cup CompletableFuture:采用观察者模式,阻塞获取异步返回的结果,性能得到优化 CompletableFuture的使用明细 官方文档:https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/asyncio 例子 注意事项 Timeout :定义请求

    2024年02月12日
    浏览(8)
  • 模拟实现.net中的Task机制:探索异步编程的奥秘

    .net中使用Task可以方便地编写异步程序,为了更好地理解Task及其调度机制,接下来模拟Task的实现,目的是搞清楚: Task是什么 Task是如何被调度的 从最基本的Task用法开始 这个命令的作用是将action作为一项任务提交给调度器,调度器会安排空闲线程来处理。 我们使用Job来模拟

    2024年02月06日
    浏览(10)
  • CompletableFuture异步回调

    CompletableFuture简介 CompletableFuture被用于异步编程,异步通常意味着非阻塞,可以使得任务单独允许在与主线程分离的其他线程中,并且通过回调可以在主线程中得到异步任务的执行状态,是否完成,和是否异常信息。 CompletableFuture实现了Future,CompletionStage接口,实现了Future接

    2024年02月05日
    浏览(12)
  • CompletableFuture异步任务编排使用

    runAsync(runnable):无返回值 runAsync(runnable, executor):无返回值,可自定义线程池 supplyAsync(runnable):有返回值 supplyAsync(runnable, executor):有回值,可自定义线程池 相关代码演示: 解析:oneFuture.join()获取的执行结果为null,因为runAsync是没有返回结果的。 allOf(future1,future2,future3…):

    2024年02月04日
    浏览(10)
  • CompletableFuture异步优化代码

    CompletableFuture异步优化代码

    我们在项目开发中,有可能遇到一个接口需要调用N个服务的接口。比如用户请求获取订单信息,需要调用用户信息、商品信息、物流信息等接口,最后再汇总数据统一返回。如果使用串行的方法按照顺序挨个调用接口,这样接口的响应的速度就很慢。如果并行调用接口,同时

    2024年02月08日
    浏览(9)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包