11.定时任务&定时线程池详解

这篇具有很好参考价值的文章主要介绍了11.定时任务&定时线程池详解。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

3.1 新增定时任务池

11.定时任务&定时线程池详解

​ 当我们不用任务框架时,我们想自己写一个定时任务时,我们能想起那个工具类呢?Timer ?还有吗?不知道了,下面我们要讲下ScheduledThreadPoolExecutor,定时任务线程池,可以执行一次任务,还可以执行周期性任务。

1.0 ScheduledThreadPoolExecutor的用法

定时线程池的类的结构图如下:
11.定时任务&定时线程池详解

从结构图上可以看出定时线程池ScheduledThreadPoolExecutor继承了线程池ThreadPoolExecutor,也就是说它们之间肯定有相同的行为和属性。

ScheduledThreadPoolExecutor常用发的方法如下

1)schedule():一次行任务,延迟执行,任务只执行一次。

2)scheduleAtFixedRate():周期性任务,不不等待任务结束,每隔周期时间执行一次,新任务放进队列中.

3)scheduleWithFixedDelay():周期性任务,等待任务结束,每隔周期时间执行一次.

代码样例入下:

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class TestScheduledThreadPoolExecutor {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
        //无返回值 延迟5秒返回
        scheduledThreadPoolExecutor.schedule(()->{
            System.out.println("我要延迟5秒执行,只执行一次 ");
        },5000, TimeUnit.MICROSECONDS);
    }
}

可以用在启动项目时需要等待对象的加载,延迟执行一个任务。

带返回值的延迟执行任务如下

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class TestScheduledThreadPoolExecutor {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
        //有返回值任务  可以用作异步处理任务不用等待结果
       ScheduledFuture<Integer> future =  scheduledThreadPoolExecutor.schedule(()->{
            System.out.println("我要延迟5秒执行,只执行一次 ");
            return 1;
        },5000, TimeUnit.MICROSECONDS);
        System.out.println(future.get());
    }
}

待返回值的任务,可以用于异步处理一个任务,等主线任务执行完,主要任务要知道异步任务的执行状态。

周期性任务:参数一样,方法名字不一样 例子如下

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class TestScheduledThreadPoolExecutor {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
        //周期性的任务  发心跳 service1-service2 每次5s,发送一个心跳 下面的例子是不管任务是否执行完,一直想队列中放。 一个任务占一个线程。
        //scheduledThreadPoolExecutor.scheduleAtFixedRate(()->{
        //等待任务执行结束,在间隔2秒执行。
        scheduledThreadPoolExecutor.scheduleWithFixedDelay(()->{
            System.out.println("send heart beat");
            long startTime = System.currentTimeMillis(),nowTime = startTime;
            while((nowTime-startTime)<5000){
                nowTime = System.currentTimeMillis();
                try{
                    Thread.sleep(100);
                }catch (InterruptedException e ){
                    e.printStackTrace();
                }
            }
            System.out.println("task over .....");

            //任务启动多久之后   ,周期 每2s执行一次,时间单位
        },1000,2000,TimeUnit.MILLISECONDS);
    }
}
2.0 定时线程池使用场景
2.1 分布式锁-redis

​ 当使用setnx获取分布式锁(锁是有失效时间的),但是害怕任务没有执行完成锁失效了,怎么办呢?可以在任务的开始用一个定时线程池每隔一段时间看下锁是否失效如果没失效延长失效时间,如果失效不做处理。这样可以保证任务执行完成。

2.2 服务注册中心

服务注册客户端每隔多久向服务中心发送下自己的ip,端口,服务名字及服务状态。

2.3 和Timer的不同
import java.util.Timer;
import java.util.TimerTask;

public class TestTimer {
    public static void main(String[] args) throws InterruptedException {
        Timer timer = new Timer();
        timer.scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run() {
                System.out.println("send star  -----");
                throw new RuntimeException("2134    243");
            }
        }, 1000, 2000);
    }
}

上面是的使用方法,从使用方法上和定时线程池的使用方法类似,都是周期性的执行任务;

不同的地方是:

Timer:单线程,线程挂了,不会再创建线程执行任务;

ScheduledThreadPoolExecutor:线程挂了,再提交任务,线程池会创建新的线程执行任务。

3.0 定时任务线程池实现原理

线程池执行过程:调用sechedule相关方法时,会先把任务添加到队列中,再又线程从队列中取出执行。

11.定时任务&定时线程池详解

它接收SchduledFutureTask类型的任务,是线程调度的最小单位,有三种提交方法:

1)schedule():一次行任务,延迟执行,任务只执行一次。

2)scheduleAtFixedRate():周期性任务,不不等待任务结束,每隔周期时间执行一次,新任务放进队列中.

3)scheduleWithFixedDelay():周期性任务,等待任务结束,每隔周期时间执行一次.

它采用DelayedWorkQueue存储等待的任务:

1)DelayedWorkQueue内部封装了一个PriorityQueue,根据它会根据time的先后时间排序,若time相同则根据sequenceNumber排序;

2)DelayedWorkQueue是一个无界队列;

3.1 SchduledFutureTask

SchduledFutureTask 接收的参数(成员变量):

1)private long time :任务开始的时间;

2)private final long sequenceNumber:任务的序号;

3)private final long period:任务执行的间隔;

工作线程的执行 过程:

  • 工作线程会 从DelayedQueue取已经到期的任务去执行;
  • 执行结束后重新设置任务的到期时间,再次放回DelayedQueue

ScheduledThreadPoolExecutor会把执行的任务放到工作队列DelayedQueue中,DelayedQueue封装了一个PriorityQueue,PriorityQueue会对队列中的SchduledFutureTask 进行排序,具体的排序算法如下:

 public int compareTo(Delayed other) {
            if (other == this) // compare zero if same object
                return 0;
            if (other instanceof ScheduledFutureTask) {
                ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
                long diff = time - x.time;
                if (diff < 0)
                    return -1;
                else if (diff > 0)
                    return 1;
                else if (sequenceNumber < x.sequenceNumber)
                    return -1;
                else
                    return 1;
            }
            long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
            return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
        }

1)首先按照time排序,time小的排在前面,time大的排在后面;

2)如果time相同,按照sequenceNumber排序,sequenceNumber小的排在前面,sequenceNumber大的排在后面。如果两个task的执行时间相同,优先执行先提交的task.

ScheduledFutureTaskn的run方法实现:

run方法是调度task的核心,task 的执行实际是run方法的执行。

 public void run() {
            boolean periodic = isPeriodic();
     //如果当前线程池已经不支持执行任务,则取消
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
     //如果不需要周期性执行,则直接执行run方法
            else if (!periodic)
                ScheduledFutureTask.super.run();
     //如果需要周期性执行,先执行,后设置下次执行时间
            else if (ScheduledFutureTask.super.runAndReset()) {
                //计算下次执行时间
                setNextRunTime();
                //再次将执行任务添加到队列中,重复执行。
                reExecutePeriodic(outerTask);
            }
        }
    }

reExecutePeriodic 源码如下:

    void reExecutePeriodic(RunnableScheduledFuture<?> task) {
        if (canRunInCurrentRunState(true)) {
            super.getQueue().add(task);
            if (!canRunInCurrentRunState(true) && remove(task))
                task.cancel(false);
            else
                ensurePrestart();
        }
    }

该方法和delayExecute方法类似,不同的是:

1)由于调用reExecutePeriodic 方法时已经执行过一次周期性任务了,所以不会reject当前任务;

2)传入的任务一定是周期性任务

3.2 线程池任务提交

首先是schedule方法,该方法指任务在指定延迟时间到达后触发,只会执行一次。

    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay,
                                           TimeUnit unit) {
        //参数校验
        if (callable == null || unit == null)
            throw new NullPointerException();
        //这是一个嵌套结构,首先把用户提交的任务包装成ScheduledFutureTask
        //然后在调用decorateTask进行包装,该方法是留给用户去扩展的,默认是个空方法。
        RunnableScheduledFuture<V> t = decorateTask(callable,
            new ScheduledFutureTask<V>(callable,
                                       triggerTime(delay, unit)));
        delayedExecute(t);
        return t;
    }

scheduleWithFixedDelay周期性执行任务:

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (delay <= 0)
            throw new IllegalArgumentException();
        // 将任务包装成 ScheduledFutureTask 类型
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(-delay));
        // 再次装饰任务,可以复写 decorateTask 方法,定制化任务 
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        // 放入延时队列中,ScheduledFutureTask 是接口 RunnableScheduledFuture 的一个实现类 
        // 所以放入队列还是 ScheduledFutureTask 类型的
        delayedExecute(t);
        return t;
    }

任务提交方法delayedExecute源码如下:

    private void delayedExecute(RunnableScheduledFuture<?> task) {
        //如果线程池已经关闭,则 使用决绝策略把提交任务拒绝掉
        if (isShutdown())
            reject(task);
        else {
            //与ThreadPoolExecutor不同的,这里直接把任务加入延迟队列
            super.getQueue().add(task);
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                //如果当前状态无法执行,则取消
                remove(task))
                task.cancel(false);
            else
                //这里增加了一个worker线程,避免提交的任务没有worker去执行
                //原因就是该类没有像ThreadPoolExecutor 一样,核心worker满了,才放入队列
                ensurePrestart();
        }
    }
3.3 DelayedWorkerQueue

ScheduledThreadPoolExecutor之所以要在自己实现阻塞的工作队列,是因为ScheduledThreadPoolExecutor要求的工作队列有些特殊。

DelayedWorkerQueue是一个基于堆的数据结构,类似于DelayQueue和PriorityQueue。在执行定时任务的时候,每个任务执行时间都不同,所以DelayedWorkerQueue的工作就是按照执行时间的升序来排列,执行时间距离当前时间越近的任务在队列的qianmian(注意:这里的顺序并不是绝对的,堆中的排序只保证了自己的下次执行时间要比父节点的下次执行时间要大,而叶子节点之间并不是顺序的。)

堆结构图如下

11.定时任务&定时线程池详解

可知,DelayedWorkerQueue是一个基于最小堆结构的队列。堆结构可以使用数组表示,可以转换成如下的数组

11.定时任务&定时线程池详解

在这种结构中,可以发下如下特点:

假设索引值从0开始,子节点的索引值为K,父节点的索引值为P,则:

  1. 一个节点的左节点的索引为:k=p*2+1;
  2. 一个节点的右节点的索引为:k=(p+1)*2;
  3. 一个节点的父节点的索引为:p=(k-1)/2;

为什么要使用DelayedWorkerQueue呢?

定时任务执行时需要取出最近要执行的任务,所以任务在队列中每次出队时,一定要是当前队列中执行时间最靠前的,所以自然要使用优先级队列。

DelayedWorkerQueue是一个优先级队列,它可以保证每次出队列的任务都是当前队列中执行时间最靠前的,由于它是基于堆结构的队列,堆结构在执行插入和删除操作时的最坏时间复杂度是O(logN).

DelayedWorkerQueue的属性

//队列初始化容量
private static final int INITIAL_CAPACITY = 16;
//根据初始化容量创建RunnableScheduledFuture 类型的数组;
private RunnableScheduledFuture<?>[] queue =
    new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
private final ReentrantLock lock = new ReentrantLock();
private int size = 0;
// leader 线程
private Thread leader = null;
// 当较新的任务在队列的头部可用时,或者新线程可能需要成为leader,则通过该条件发出信号
private final Condition available = lock.newCondition();

注意:这里的leader,它是Leader-Follower模式的变体,用于减少不必要的定时等待。什么意思呢?对于多线程的网络模型来说 所有线程会有三种身份中的一种:leader和follower,以及一个干活中的状态:proccesser。它的基木原则就是,永远最多只有一个leader,而所有follower都在等待成为leader。线程池启动时会自动产生一个Leader负责等待网络IO事件,当有一个事件产生时,Leader线程首先通知一个Follower线程将其提拔为新的Leader,然后自己就去干活了,去处理这个网络事件,处理完毕后加入Follower线程等待队列,等待下次成为Leader。这种方法可以增强CPU高速缓存相似性,及消除动态内存分配和线程间的数据交换。文章来源地址https://www.toymoban.com/news/detail-403149.html

到了这里,关于11.定时任务&定时线程池详解的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • @Scheduled Cron定时任务——表达式详解

    Cron表达式是一种用于定时任务调度的字符串表达式,它由6个或7个字段组成,分别表示秒、分、时、日、月、周和年。每个字段用空格分隔,字段之间用逗号分隔。 秒(0-59) 分(0-59) 时(0-23) 日(1-31) 月(1-12) 周(0-7,其中0和7都表示周日) 年(可选字段,1970-2099) Cron表达式的语法规则如

    2024年02月09日
    浏览(12)
  • python实现定时任务的8种方式详解

    python实现定时任务的8种方式详解

            在日常工作中,常常会用到需要周期性执行的任务,一种方式是采用 Linux 系统自带的 crond 结合命令行实现。另外一种方式是直接使用Python。                 当每隔一段时间就要执行一段程序,或者往复循环执行某一个任务,这就需要使用定时任务来执行

    2023年04月09日
    浏览(12)
  • 【多线程】定时器,详解定时器原理,让大家更深刻的理解多线程

    【多线程】定时器,详解定时器原理,让大家更深刻的理解多线程

    前言: 大家好,我是 良辰丫 ,今天我们一起了解一下定时器,通过定时器来熟悉一下线程安全等相关知识点.💞💞💞 🧑个人主页:良辰针不戳 📖所属专栏:javaEE初阶 🍎励志语句:生活也许会让我们遍体鳞伤,但最终这些伤口会成为我们一辈子的财富。 💦期待大家三连,关注

    2024年02月01日
    浏览(11)
  • 微服务系列文章 之SpringBoot之定时任务详解

    微服务系列文章 之SpringBoot之定时任务详解

    使用SpringBoot创建定时任务非常简单,目前主要有以下三种创建方式: 一、基于注解(@Scheduled) 二、基于接口(SchedulingConfigurer) 前者相信大家都很熟悉,但是实际使用中我们往往想从数据库中读取指定时间来动态执行定时任务,这时候基于接口的定时任务就派上用场了。 三、

    2024年02月16日
    浏览(12)
  • kettle工具下载、安装、数据迁移、定时任务详解

    kettle工具下载、安装、数据迁移、定时任务详解

    篇幅有点长,将这五个内容放在了一篇文章里,其中最主要的是数据迁移和定时任务 目录 目录 一、简单介绍 二、下载 三、安装 四、数据迁移:包括单表整体数据迁移,单表存在字段不同情况的数据迁移,简单批量数据迁移 五、定时任务 kettle 是一个ETL工具,ETL(Extract-T

    2024年02月06日
    浏览(10)
  • Python - 定时任务框架【APScheduler】基本使用详解(一)

    一个网页会有很多数据是不需要经常变动的,比如说首页,变动频率低而访问量大,我们可以把它静态化,这样就不需要每次有请求都要查询数据库再返回,可以减少服务器压力 我们可以使用Django的模板渲染功能完成页面渲染 APScheduler的全称是Advanced Python Scheduler。它是一个

    2024年02月09日
    浏览(12)
  • 用Spring Boot轻松实现定时任务--原理详解

      在现代化的web开发中,定时任务是一个非常常见的功能。Spring Boot为我们提供了一个简便的方式来处理这些任务,我们只需加入一些注解和配置即可完成。本文将介绍 Spring Boot 定时任务的基本概念和原理,以及如何在具体业务场景中使用和优化配置。   定时任务是指在

    2024年02月06日
    浏览(16)
  • Linux的定时任务--CronTab 命令详解及使用教程

    Linux的定时任务--CronTab 命令详解及使用教程

    Linux crontab 是用来定期执行程序的命令。(Nginx 日志切割,数据库备份等) 当安装完成操作系统之后,默认便会启动此任务调度命令。 crond 命令每分钟会定期检查是否有要执行的工作,如果有要执行的工作便会自动执行该工作。 注意:新创建的 cron 任务,不会马上执行,至

    2024年02月16日
    浏览(10)
  • 【独家解密】Java中定时任务的解决方案详解

    【独家解密】Java中定时任务的解决方案详解

    目录 1、前言 2、定时任务的概述 2.1 什么是定时任务 2.2 定时任务的应用场景 3、使用Timer类和TimerTask类 3.1 Timer类的使用方法 3.2 TimerTask类的使用方法 4、使用ScheduledThreadPoolExecutor类 4.1 ScheduledThreadPoolExecutor类的使用方法 5、使用Spring框架的定时任务 5.1 配置XML方式的定时任务

    2024年02月02日
    浏览(8)
  • 【Linux】Linux中Crontab(定时任务)命令详解及使用教程

    【Linux】Linux中Crontab(定时任务)命令详解及使用教程

    Crontab介绍: Linux crontab是用来crontab命令常见于Unix和类Unix的操作系统之中,用于设置周期性被执行的指令。该命令从标准输入设备读取指令,并将其存放于“crontab”文件中,以供之后读取和执行。该词来源于希腊语 chronos(χρ?νο?),原意是时间。通常,crontab储存的指令被守护

    2024年02月08日
    浏览(10)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包