好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

详解SpringBoot和SpringBatch 使用

什么是spring batch

spring batch 是一个轻量级的、完善的批处理框架,旨在帮助企业建立健壮、高效的批处理应用。spring batch是spring的一个子项目,使用java语言并基于spring框架为基础开发,使的已经使用 spring 框架的开发者或者企业更容易访问和利用企业服务。

spring batch 提供了大量可重用的组件,包括了日志、追踪、事务、任务作业统计、任务重启、跳过、重复、资源管理。对于大数据量和高性能的批处理任务,spring batch 同样提供了高级功能和特性来支持,比如分区功能、远程功能。总之,通过 spring batch 能够支持简单的、复杂的和大数据量的批处理作业。

spring batch 使用

我们首先配置spring batch 在spring boot 中的使用,数据库用的是mysql,pom文件如下,因为spring boot 中的spring batch 包含 hsqsldb 所以我们将其去除

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

<dependency>

       <groupid>org.springframework.boot</groupid>

       <artifactid>spring-boot-starter-batch</artifactid>

       <exclusions> <!-- 注意这里-->

         <exclusion>

           <groupid>org.hsqldb</groupid>

           <artifactid>hsqldb</artifactid>

         </exclusion>

       </exclusions>

     </dependency>

     <dependency>

       <groupid>org.springframework.boot</groupid>

       <artifactid>spring-boot-starter-jdbc</artifactid>

     </dependency>

     <dependency>

       <groupid>org.springframework.boot</groupid>

       <artifactid>spring-boot-starter-web</artifactid>

     </dependency>

<dependency>

     <groupid>org.hibernate</groupid>

     <artifactid>hibernate-validator</artifactid>

   </dependency>

   <dependency>

     <groupid>mysql</groupid>

     <artifactid>mysql-connector-java</artifactid>

     <version> 5.1 . 21 </version>

   </dependency>

   <dependency>

     <groupid>org.springframework.boot</groupid>

     <artifactid>spring-boot-starter-test</artifactid>

     <scope>test</scope>

   </dependency>

配置好我们需要的实体类。页面就不展示了。

如果有数据校验添加的话那么我们需要配置自定义的检验器。若果没有课略过该步骤

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

public class csvbeanvalidator<t> implements validator<t>,initializingbean {

   private javax.validation.validator  validator;

   @override

   public void validate(t value) throws validationexception {

     set<constraintviolation<t >> constraintviolations=validator.validate(value);

     if (constraintviolations.size()> 0 ){

       stringbuilder message= new stringbuilder();

       for (constraintviolation<t> constraintviolation:constraintviolations){

         message.append(constraintviolation.getmessage() + "\n" );

       }

       throw new validationexception(message.tostring());

     }

   }

   //在这里我们使用的是jsr-303校验数据,在此进行初始化

   @override

   public void afterpropertiesset() throws exception {

     validatorfactory validatorfactory= validation.builddefaultvalidatorfactory();

     validator=validatorfactory.usingcontext().getvalidator();

   }

}

public class csvitemprocessor extends validatingitemprocessor<person> {

   @override

   public person process(person item) throws validationexception {

      super .process(item); // 在这里启动 然后才会调用我们自定义的校验器,否则不能通过 。

      if (item.getnation().equals( "汉族" )){

        item.setname( "01" );

      } else {

        item.setnation( "02" );

      }

      return item;

   }

}

进行job任务监听 自定义类实现jobexecutionlistener 即可

?

1

2

3

4

5

6

7

8

9

10

11

12

13

long starttime;

  long endtime;

  @override

  public void beforejob(jobexecution jobexecution) {

    starttime = system.currenttimemillis();

    system.out.println( "任务处理开始" );

  }

  @override

  public void afterjob(jobexecution jobexecution) {

    endtime = system.currenttimemillis();

    system.out.println( "耗时多长时间:" + (endtime - starttime) + "ms" );

    system.out.println( "任务处理结束" );

  }

进行spring batch 的注入 方法有xml文件注入bean ,在这里选择java注入

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

@configuration

@enablebatchprocessing //开启批处理

public class csvbatchconfig {

   /**1 首先我们通过 flatfileitemreader 读取我们需要的文件 通过setresource来实现

    * 2 设置map 在这里通过先设置解析器 setlinetokenizer 来解析我们csv文件中的数   据

    * 3 setfieldsetmapper 将我们需要的数据转化为我们的实体对象 存储

    * 4 如果想 跳过前面的几行 需要使用setlinestoskip就可以实现

    */

  @bean

  public itemreader<person> reader() throws exception {

    flatfileitemreader<person> reader = new flatfileitemreader<person>(); //1

    reader.setresource( new classpathresource( "people.csv" )); //2

      reader.setlinemapper( new defaultlinemapper<person>() {{ //3

        setlinetokenizer( new delimitedlinetokenizer() {{

          setnames( new string[] { "name" , "age" , "nation" , "address" });

        }});

        setfieldsetmapper( new beanwrapperfieldsetmapper<person>() {{

          settargettype(person. class );

        }});

      }});

        reader.setlinestoskip( 3 );

      return reader;

  }

  @bean

  public itemprocessor<person, person> processor() {

    csvitemprocessor processor = new csvitemprocessor(); //1

    processor.setvalidator(csvbeanvalidator()); //2

    return processor;

  }

    /**

     *写入数据到数据库中

     * 1执行的sql 语句 2 设置数据源

      */

  @bean

  public itemwriter<person> writer(datasource datasource) { //1

    jdbcbatchitemwriter<person> writer = new jdbcbatchitemwriter<person>(); //2

    writer.setitemsqlparametersourceprovider( new beanpropertyitemsqlparametersourceprovider<person>());

    string sql = "insert into person " + "(id,name,age,nation,address) "

        + "values(hibernate_sequence.nextval, :name, :age, :nation,:address)" ;

    writer.setsql(sql); //3

    writer.setdatasource(datasource);

    return writer;

  }

   // 作业的仓库 就是设置数据源

  @bean

  public jobrepository jobrepository(datasource datasource, platformtransactionmanager transactionmanager)

      throws exception {

    jobrepositoryfactorybean jobrepositoryfactorybean = new jobrepositoryfactorybean();

    jobrepositoryfactorybean.setdatasource(datasource);

    jobrepositoryfactorybean.settransactionmanager(transactionmanager);

    jobrepositoryfactorybean.setdatabasetype( "mysql" );

    return jobrepositoryfactorybean.getobject();

  }

    //调度器 使用它来执行 我们的批处理

  @bean

  public simplejoblauncher joblauncher(datasource datasource, platformtransactionmanager transactionmanager)

      throws exception {

    simplejoblauncher joblauncher = new simplejoblauncher();

    joblauncher.setjobrepository(jobrepository(datasource, transactionmanager));

    return joblauncher;

  }

    //将监听器加入到job中

  @bean

  public job importjob(jobbuilderfactory jobs, step s1) {

    return jobs.get( "importjob" )

        .incrementer( new runidincrementer())

        .flow(s1) //1

        .end()

        .listener(csvjoblistener()) //2

        .build();

  }

    //步骤绑定 reader 与writer 一次性处理65000条记录

  @bean

  public step step1(stepbuilderfactory stepbuilderfactory, itemreader<person> reader, itemwriter<person> writer,

      itemprocessor<person,person> processor) {

    return stepbuilderfactory

        .get( "step1" )

        .<person, person>chunk( 65000 ) //1

        .reader(reader) //2

        .processor(processor) //3

        .writer(writer) //4

        .build();

  }

  @bean

  public csvjoblistener csvjoblistener() {

    return new csvjoblistener();

  }

  @bean

  public validator<person> csvbeanvalidator() {

    return new csvbeanvalidator<person>();

  }

}

在配置文件中 启动自动执行批处理

spring.batch.job.names = job1,job2 #启动时要执行的job,默认执行全部job

spring.batch.job.enabled=true #是否自动执行定义的job,默认是

spring.batch.initializer.enabled=true #是否初始化spring batch的数据库,默认为是

spring.batch.schema=

spring.batch.table-prefix= #设置springbatch的数据库表的前缀

项目汇总

从 项目中我们可以看到 总的步骤就是 首先读取我们需要实现的文件进行解析,然后转换成需要的实体类并且绑定到reader中,二 实现我们需要的writer 并且帮到到数据库上,三实现job监听器将其绑定到步骤中 。最后开启批处理 自动执行入库即可 。这个简单步骤主要是配置中用到的 理解流程 自己也可以方便实现 批处理的流程。

总结

以上所述是小编给大家介绍的springboot和springbatch 使用,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对网站的支持!

原文链接:https://yq.aliyun.com/articles/619189

查看更多关于详解SpringBoot和SpringBatch 使用的详细内容...

  阅读:54次