好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

java 注解实现一个可配置线程池的方法示例

前言

项目需要多线程执行一些task,为了方便各个服务的使用。特意封装了一个公共工具类,下面直接撸代码:

poolconfig(线程池核心配置参数):

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

/**

  * <h1>线程池核心配置(<b style="color:#cd0000">基本线程池数量、最大线程池数量、队列初始容量、线程连接保持活动秒数(默认60s)</b>)</h1>

  *

  * <blockquote><code>

  * <table border="1px" style="border-color:gray;" width="100%"><tbody>

  * <tr><th style="color:green;text-align:left;">

  * 属性名称

  * </th><th style="color:green;text-align:left;">

  * 属性含义

  * </th></tr>

  * <tr><td>

  * queuecapacity

  * </td><td>

  * 基本线程池数量

  * </td></tr>

  * <tr><td>

  * count

  * </td><td>

  * 最大线程池数量

  * </td></tr>

  * <tr><td>

  * maxcount

  * </td><td>

  * 队列初始容量

  * </td></tr>

  * <tr><td>

  * alivesec

  * </td><td>

  * 线程连接保持活动秒数(默认60s)

  * </td></tr>

  * </tbody></table>

  * </code></blockquote>

 

  */

public class poolconfig {

 

  private int queuecapacity = 200 ;

 

  private int count = 0 ;

 

  private int maxcount = 0 ;

 

  private int alivesec;

 

  public int getqueuecapacity() {

  return queuecapacity;

  }

 

  public void setqueuecapacity( int queuecapacity) {

  this .queuecapacity = queuecapacity;

  }

 

  public void setcount( int count) {

  this .count = count;

  }

 

  public void setmaxcount( int maxcount) {

  this .maxcount = maxcount;

  }

 

  public void setalivesec( int alivesec) {

  this .alivesec = alivesec;

  }

 

  public int getcount() {

  return count;

  }

 

  public int getmaxcount() {

  return maxcount;

  }

 

  public int getalivesec() {

  return alivesec;

  }

}

threadpoolconfig(线程池配置 yml配置项以thread开头):

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

import java.util.arraylist;

import java.util.hashmap;

import java.util.list;

import java.util.map;

 

import org.springframework.boot.context.properties.configurationproperties;

import org.springframework.stereotype.component;

 

/**

  * <h1>线程池配置(<b style="color:#cd0000">线程池核心配置、各个业务处理的任务数量</b>)</h1>

  *

  * <blockquote><code>

  * <table border="1px" style="border-color:gray;" width="100%"><tbody>

  * <tr><th style="color:green;text-align:left;">

  * 属性名称

  * </th><th style="color:green;text-align:left;">

  * 属性含义

  * </th></tr>

  * <tr><td>

  * pool

  * </td><td>

  * 线程池核心配置

  * 【{@link poolconfig}】

  * </td></tr>

  * <tr><td>

  * count

  * </td><td>

  * 线程池各个业务任务初始的任务数

  * </td></tr>

  * </tbody></table>

  * </code></blockquote>

 

  */

@component

@configurationproperties (prefix= "thread" )

public class threadpoolconfig {

 

  private poolconfig pool = new poolconfig();

 

  map<string, integer> count = new hashmap<>();

 

  public poolconfig getpool() {

  return pool;

  }

 

  public void setpool(poolconfig pool) {

  this .pool = pool;

  }

 

  public map<string, integer> getcount() {

  return count;

  }

 

}

定义task注解,方便使用:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

@target (elementtype.type)

@retention (retentionpolicy.runtime)

@documented

@component

public @interface excutortask {

 

  /**

  * the value may indicate a suggestion for a logical excutortask name,

  * to be turned into a spring bean in case of an autodetected excutortask .

  * @return the suggested excutortask name, if any

  */

  string value() default "" ;

 

}

通过反射获取使用task注解的任务集合:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

public class beans {

 

  private static final char prefix = '.' ;

 

  public static concurrentmap<string, string> scanbeanclassnames(){

  concurrentmap<string, string> beanclassnames = new concurrenthashmap<>();

  classpathscanningcandidatecomponentprovider provider = new classpathscanningcandidatecomponentprovider( false );

    provider.addincludefilter( new annotationtypefilter(excutortask. class ));

    for ( package pkg : package .getpackages()){

    string basepackage = pkg.getname();

      set<beandefinition> components = provider.findcandidatecomponents(basepackage);

      for (beandefinition component : components) {

      string beanclassname = component.getbeanclassname();

      try {

     class <?> clazz = class .forname(component.getbeanclassname());

     boolean isannotationpresent = clazz.isannotationpresent(zimatask. class );

     if (isannotationpresent){

      zimatask task = clazz.getannotation(excutortask. class );

      string aliasname = task.value();

      if (aliasname != null && ! "" .equals(aliasname)){

      beanclassnames.put(aliasname, component.getbeanclassname());

      }

     }

     } catch (classnotfoundexception e) {

     e.printstacktrace();

     }

      beanclassnames.put(beanclassname.substring(beanclassname.lastindexof(prefix) + 1 ), component.getbeanclassname());

      }

    }

    return beanclassnames;

   }

}

 线程执行类taskpool:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

@component

public class taskpool {

 

  public threadpooltaskexecutor pooltaskexecutor;

 

  @autowired

  private threadpoolconfig threadpoolconfig;

 

  @autowired

  private applicationcontext context;

 

  private final integer max_pool_size = 2000 ;

 

  private poolconfig poolcfg;

 

  private map<string, integer> taskscount;

 

  private concurrentmap<string, string> beanclassnames;

 

  @postconstruct

   public void init() {

 

  beanclassnames = beans.scanbeanclassnames();

   

    pooltaskexecutor = new threadpooltaskexecutor();

   

    poolcfg = threadpoolconfig.getpool();

 

  taskscount = threadpoolconfig.getcount();

 

  int corepoolsize = poolcfg.getcount(),

   maxpoolsize = poolcfg.getmaxcount(),

   queuecapacity = poolcfg.getqueuecapacity(),

   minpoolsize = 0 , maxcount = (corepoolsize << 1 );

 

  for (string taskname : taskscount.keyset()){

   minpoolsize += taskscount.get(taskname);

  }

 

  if (corepoolsize > 0 ){

   if (corepoolsize <= minpoolsize){

   corepoolsize = minpoolsize;

   }

  } else {

   corepoolsize = minpoolsize;

  }

 

  if (queuecapacity > 0 ){

   pooltaskexecutor.setqueuecapacity(queuecapacity);

  }

 

  if (corepoolsize > 0 ){

   if (max_pool_size < corepoolsize){

   corepoolsize = max_pool_size;

   }

   pooltaskexecutor.setcorepoolsize(corepoolsize);

  }

 

  if (maxpoolsize > 0 ){

   if (maxpoolsize <= maxcount){

   maxpoolsize = maxcount;

   }

   if (max_pool_size < maxpoolsize){

   maxpoolsize = max_pool_size;

   }

   pooltaskexecutor.setmaxpoolsize(maxpoolsize);

  }

 

  if (poolcfg.getalivesec() > 0 ){

   pooltaskexecutor.setkeepaliveseconds(poolcfg.getalivesec());

  }

 

  pooltaskexecutor.initialize();

   }

  

  public void execute( class <?>... clazz){

  int i = 0 , len = taskscount.size();

  for (; i < len; i++){

   integer taskcount = taskscount.get(i);

   for ( int t = 0 ; t < taskcount; t++){

   try {

    object taskobj = context.getbean(clazz[i]);

    if (taskobj != null ){

    pooltaskexecutor.execute((runnable) taskobj);

    }

   } catch (exception ex){

    ex.printstacktrace();

   }

   }

  }

   }

  

  public void execute(string... args){

    int i = 0 , len = taskscount.size();

  for (; i < len; i++){

   integer taskcount = taskscount.get(i);

   for ( int t = 0 ; t < taskcount; t++){

   try {

    object taskobj = null ;

    if (context.containsbean(args[i])){

    taskobj = context.getbean(args[i]);

    } else {

    if (beanclassnames.containskey(args[i].tolowercase())){

     class <?> clazz = class .forname(beanclassnames.get(args[i].tolowercase()));

     taskobj = context.getbean(clazz);

    }

    }

    if (taskobj != null ){

    pooltaskexecutor.execute((runnable) taskobj);

    }

   } catch (exception ex){

    ex.printstacktrace();

   }

   }

  }

   }

 

  public void execute(){

  for (string taskname : taskscount.keyset()){

   integer taskcount = taskscount.get(taskname);

   for ( int t = 0 ; t < taskcount; t++){

   try {

    object taskobj = null ;

    if (context.containsbean(taskname)){

    taskobj = context.getbean(taskname);

    } else {

    if (beanclassnames.containskey(taskname)){

     class <?> clazz = class .forname(beanclassnames.get(taskname));

     taskobj = context.getbean(clazz);

    }

    }

    if (taskobj != null ){

    pooltaskexecutor.execute((runnable) taskobj);

    }

   } catch (exception ex){

    ex.printstacktrace();

   }

   }

  }

   }

  

}

如何使用?(做事就要做全套 ^_^)

1.因为使用的springboot项目,需要在application.properties 或者 application.yml 添加

?

1

2

3

4

5

#配置执行的task线程数

thread.count.needexcutortask= 4

#最大存活时间

thread.pool.alivesec= 300000

#其他配置同理

2.将我们写的线程配置进行装载到我们的项目中

?

1

2

3

4

5

6

7

8

9

10

11

@configuration

public class taskmanager {

 

  @resource

  private taskpool taskpool;

 

  @postconstruct

  public void executor(){

  taskpool.execute();

  }

}

3.具体使用

?

1

2

3

4

5

6

7

8

@excutortask

public class needexcutortask implements runnable{

   @override

  public void run() {

     thread.sleep(1000l);

     log.info( "====== 任务执行 =====" )

   }

}

以上就是创建一个可扩展的线程池相关的配置(望指教~~~)。希望对大家的学习有所帮助,也希望大家多多支持。

原文链接:https://blog.csdn.net/u011663149/article/details/86497456

查看更多关于java 注解实现一个可配置线程池的方法示例的详细内容...

  阅读:9次