好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

SpringBoot中并发定时任务的实现、动态定时任务的实现(看这一篇就够了)推荐

一、在java开发领域,目前可以通过以下几种方式进行 定时任务

1、单机部署模式

timer:jdk中自带的一个定时调度类,可以简单的实现按某一频度进行任务执行。提供的功能比较单一,无法实现复杂的调度任务。
scheduledexecutorservice:也是jdk自带的一个基于线程池设计的定时任务类。其每个调度任务都会分配到线程池中的一个线程执行,所以其任务是并发执行的,互不影响。
spring task:spring提供的一个任务调度工具,支持注解和配置文件形式,支持cron表达式,使用简单但功能强大。
quartz:一款功能强大的任务调度器,可以实现较为复杂的调度功能,如每月一号执行、每天凌晨执行、每周五执行等等,还支持分布式调度,就是配置稍显复杂。

2、分布式集群模式(不多介绍,简单提一下)

问题:

i、如何解决定时任务的多次执行? ii、如何解决任务的单点问题,实现任务的故障转移?

问题i的简单思考:

1、固定执行定时任务的机器(可以有效避免多次执行的情况 ,缺点就是单点故障问题)。 2、借助redis的过期机制和分布式锁。 3、借助mysql的锁机制等。

成熟的解决方案:

1、Quartz:可以去看看这篇文章[Quartz分布式](  http://HdhCmsTesttuohang.net/article/88680.html )。
2、elastic-job:( https://github测试数据/elasticjob/elastic-job-lite )当当开发的弹性分布式任务调度系统,采用zookeeper实现分布式协调,实现任务高可用以及分片。
3、xxl-job:( https://github测试数据/xuxueli/xxl-job )是大众点评员发布的分布式任务调度平台,是一个轻量级分布式任务调度框架。
4、saturn:( https://github测试数据/vipshop/Saturn ) 是唯品会提供一个分布式、容错和高可用的作业调度服务框架。

二、springtask实现定时任务(这里是基于springboot)

1、简单的定时任务实现

使用方式:

使用@enablescheduling注解开启对定时任务的支持。 使用@scheduled 注解即可,基于corn、fixedrate、fixeddelay等一些定时策略来实现定时任务。

使用缺点:

1、多个定时任务使用的是同一个调度线程,所以任务是阻塞执行的,执行效率不高。 2、其次如果出现任务阻塞,导致一些场景的定时计算没有实际意义,比如每天12点的一个计算任务被阻塞到1点去执行,会导致结果并非我们想要的。

使用优点:

1、配置简单 2、适用于单个后台线程执行周期任务,并且保证顺序一致执行的场景   

 源码分析:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

//默认使用的调度器

if ( this .taskscheduler == null ) {

  this .localexecutor = executors.newsinglethreadscheduledexecutor();

  this .taskscheduler = new concurrenttaskscheduler( this .localexecutor);

}

//可以看到singlethreadscheduledexecutor指定的核心线程为1,说白了就是单线程执行

public static scheduledexecutorservice newsinglethreadscheduledexecutor() {

  return new delegatedscheduledexecutorservice

  ( new scheduledthreadpoolexecutor( 1 ));

}

//利用了delayedworkqueue延时队列作为任务的存放队列,这样便可以实现任务延迟执行或者定时执行

public scheduledthreadpoolexecutor( int corepoolsize) {

  super (corepoolsize, integer.max_value, 0 , nanoseconds,

   new delayedworkqueue());

}

2、实现并发的定时任务

使用方式:

方式一:由1中我们知道之所以定时任务是阻塞执行,是配置的线程池决定的,那就好办了,换一个不就行了!直接上代码:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

@configuration

  public class scheduledconfig implements schedulingconfigurer {

 

  @autowired

  private taskscheduler mythreadpooltaskscheduler;

 

  @override

  public void configuretasks(scheduledtaskregistrar scheduledtaskregistrar) {

   //简单粗暴的方式直接指定

   //scheduledtaskregistrar.setscheduler(executors.newscheduledthreadpool(5));

   //也可以自定义的线程池,方便线程的使用与维护,这里不多说了

   scheduledtaskregistrar.settaskscheduler(mythreadpooltaskscheduler);

  }

  }

 

  @bean (name = "mythreadpooltaskscheduler" )

  public taskscheduler getmythreadpooltaskscheduler() {

  threadpooltaskscheduler taskscheduler = new threadpooltaskscheduler();

  taskscheduler.setpoolsize( 10 );

  taskscheduler.setthreadnameprefix( "haina-scheduled-" );

  taskscheduler.setrejectedexecutionhandler( new threadpoolexecutor.callerrunspolicy());

  //调度器shutdown被调用时等待当前被调度的任务完成

  taskscheduler.setwaitfortaskstocompleteonshutdown( true );

  //等待时长

  taskscheduler.setawaitterminationseconds( 60 );

  return taskscheduler;

  }

方式二:方式一的本质改变了任务调度器默认使用的线程池,接下来这种是不改变调度器的默认线程池,而是把当前任务交给一个异步线程池去执行

首先使用@enableasync 启用异步任务
然后在定时任务的方法加上@async即可,默认使用的线程池为simpleasynctaskexecutor(该线程池默认来一个任务创建一个线程,就会不断创建大量线程,极有可能压爆服务器内存。当然它有自己的限流机制,这里就不多说了,有兴趣的自己翻翻源码~)
项目中为了更好的控制线程的使用,我们可以自定义我们自己的线程池,使用方式@async("mythreadpool")

废话太多,直接上代码:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

@scheduled (fixedrate = 1000 * 10 ,initialdelay = 1000 * 20 )

@async ( "mythreadpooltaskexecutor" )

//@async

public void scheduledtest02(){

  system.out.println(thread.currentthread().getname()+ "--->xxxxx--->" +thread.currentthread().getid());

}

 

//自定义线程池

@bean (name = "mythreadpooltaskexecutor" )

public taskexecutor getmythreadpooltaskexecutor() {

  threadpooltaskexecutor taskexecutor = new threadpooltaskexecutor();

  taskexecutor.setcorepoolsize( 20 );

  taskexecutor.setmaxpoolsize( 200 );

  taskexecutor.setqueuecapacity( 25 );

  taskexecutor.setkeepaliveseconds( 200 );

  taskexecutor.setthreadnameprefix( "haina-threadpool-" );

  // 线程池对拒绝任务(无线程可用)的处理策略,目前只支持abortpolicy、callerrunspolicy;默认为后者

  taskexecutor.setrejectedexecutionhandler( new threadpoolexecutor.callerrunspolicy());

  //调度器shutdown被调用时等待当前被调度的任务完成

  taskexecutor.setwaitfortaskstocompleteonshutdown( true );

  //等待时长

  taskexecutor.setawaitterminationseconds( 60 );

  taskexecutor.initialize();

  return taskexecutor;

}

线程池的使用心得(后续有专门文章来探讨)

java中提供了threadpoolexecutor和scheduledthreadpoolexecutor,对应与spring中的threadpooltaskexecutor和threadpooltaskscheduler,但是在原有的基础上增加了新的特性,在spring环境下更容易使用和控制。
使用自定义的线程池能够避免一些默认线程池造成的内存溢出、阻塞等等问题,更贴合自己的服务特性
使用自定义的线程池便于对项目中线程的管理、维护以及监控。
即便在非spring环境下也不要使用java默认提供的那几种线程池,坑很多,阿里代码规约不说了吗,得相信大厂!!!

三、动态定时任务的实现

问题:
使用@scheduled注解来完成设置定时任务,但是有时候我们往往需要对周期性的时间的设置会做一些改变,或者要动态的启停一个定时任务,那么这个时候使用此注解就不太方便了,原因在于这个注解中配置的cron表达式必须是常量,那么当我们修改定时参数的时候,就需要停止服务,重新部署。

解决办法:

方式一 :实现schedulingconfigurer接口,重写configuretasks方法,重新制定trigger,核心方法就是addtriggertask(runnable task, trigger trigger) ,不过需要注意的是,此种方式修改了配置值后,需要在下一次调度结束后,才会更新调度器,并不会在修改配置值时实时更新,实时更新需要在修改配置值时额外增加相关逻辑处理。

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

@configuration

  public class scheduledconfig implements schedulingconfigurer {

 

  @autowired

  private taskscheduler mythreadpooltaskscheduler;

 

  @override

  public void configuretasks(scheduledtaskregistrar scheduledtaskregistrar) {

   //scheduledtaskregistrar.setscheduler(executors.newscheduledthreadpool(5));

   scheduledtaskregistrar.settaskscheduler(mythreadpooltaskscheduler);

   //可以实现动态调整定时任务的执行频率

   scheduledtaskregistrar.addtriggertask(

     //1.添加任务内容(runnable)

     () -> system.out.println( "cccccccccccccccc--->" + thread.currentthread().getid()),

     //2.设置执行周期(trigger)

     triggercontext -> {

      //2.1 从数据库动态获取执行周期

      string cron = "0/2 * * * * ? " ;

      //2.2 合法性校验.

  //     if (stringutils.isempty(cron)) {

  //      // omitted code ..

  //     }

       //2.3 返回执行周期(date)

       return new crontrigger(cron).nextexecutiontime(triggercontext);

      }

    );

  }

  }

方式二:使用threadpooltaskscheduler类可实现动态添加删除功能,当然也可实现执行频率的调整

首先,我们要认识下这个调度类,它其实是对java中scheduledthreadpoolexecutor的一个封装改进后的产物,主要改进有以下几点:

1、提供默认配置,因为是scheduledthreadpoolexecutor,所以只有poolsize这一个默认参数。 2、支持自定义任务,通过传入trigger参数。 3、对任务出错处理进行优化,如果是重复性的任务,不抛出异常,通过日志记录下来,不影响下次运行,如果是只执行一次的任务,将异常往上抛。

顺便说下threadpooltaskexecutor相对于threadpoolexecutor的改进点:

1、提供默认配置,原生的threadpoolexecutor的除了threadfactory和rejectedexecutionhandler其他没有默认配置 2、实现asynclistenabletaskexecutor接口,支持对futuretask添加success和fail的回调,任务成功或失败的时候回执行对应回调方法。 3、因为是spring的工具类,所以抛出的rejectedexecutionexception也会被转换为spring框架的taskrejectedexception异常(这个无所谓) 4、提供默认threadfactory实现,直接通过参数重载配置

扯了这么多,还是直接上代码:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

@component

public class dynamictimedtask {

 

  private static final logger logger = loggerfactory.getlogger(dynamictimedtask. class );

 

  //利用创建好的调度类统一管理

  //@autowired

  //@qualifier("mythreadpooltaskscheduler")

  //private threadpooltaskscheduler mythreadpooltaskscheduler;

 

 

  //接受任务的返回结果

  private scheduledfuture<?> future;

 

  @autowired

  private threadpooltaskscheduler threadpooltaskscheduler;

 

  //实例化一个线程池任务调度类,可以使用自定义的threadpooltaskscheduler

  @bean

  public threadpooltaskscheduler threadpooltaskscheduler() {

   threadpooltaskscheduler executor = new threadpooltaskscheduler();

   return new threadpooltaskscheduler();

  }

 

 

  /**

  * 启动定时任务

  * @return

  */

  public boolean startcron() {

   boolean flag = false ;

   //从数据库动态获取执行周期

   string cron = "0/2 * * * * ? " ;

   future = threadpooltaskscheduler.schedule( new checkmodelfile(),cron);

   if (future!= null ){

    flag = true ;

    logger.info( "定时check训练模型文件,任务启动成功!!!" );

   } else {

    logger.info( "定时check训练模型文件,任务启动失败!!!" );

   }

   return flag;

  }

 

  /**

  * 停止定时任务

  * @return

  */

  public boolean stopcron() {

   boolean flag = false ;

   if (future != null ) {

    boolean cancel = future.cancel( true );

    if (cancel){

     flag = true ;

     logger.info( "定时check训练模型文件,任务停止成功!!!" );

    } else {

     logger.info( "定时check训练模型文件,任务停止失败!!!" );

    }

   } else {

    flag = true ;

    logger.info( "定时check训练模型文件,任务已经停止!!!" );

   }

   return flag;

  }

 

 

  class checkmodelfile implements runnable{

 

   @override

   public void run() {

    //编写你自己的业务逻辑

    system.out.print( "模型文件检查完毕!!!" )

   }

  }

 

}

四、总结

到此基于springtask下的定时任务的简单使用算是差不多了,其中不免有些错误的地方,或者理解有偏颇的地方欢迎大家提出来!
基于分布式集群下的定时任务使用,后续有时间再继续!!!

以上所述是小编给大家介绍的springboot并发定时任务动态定时任务实现详解整合,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对网站的支持!

原文链接:https://HdhCmsTestcnblogs测试数据/baixianlong/archive/2019/04/05/10659045.html

查看更多关于SpringBoot中并发定时任务的实现、动态定时任务的实现(看这一篇就够了)推荐的详细内容...

  阅读:10次