好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

Spring boot项目redisTemplate实现轻量级消息队列的方法

背景

公司项目有个需求, 前端上传excel文件, 后端读取数据、处理数据、返回错误数据, 最简单的方式同步处理, 客户端上传文件后一直阻塞等待响应, 但用户体验无疑很差, 处理数据可能十分耗时, 没人愿意傻等, 由于项目暂未使用activemq等 消息队列 中间件, 而redis的lpush和rpop很适合作为一种轻量级的消息队列实现, 所以用它完成此次功能开发

一、本文涉及知识点

excel文件读写--阿里easyexcel sdk 文件上传、下载--腾讯云对象存储 远程服务调用--resttemplate 生产者、消费者--redistemplate leftpush和rightpop操作 异步处理数据--executors线程池 读取网络文件流--httpclient 自定义注解实现用户身份认证--jwt token认证, 拦截器拦截标注有@loginrequired注解的请求入口

当然, java实现咯

涉及的知识点比较多, 每一个知识点都可以作为专题进行学习分析, 本文将完整实现呈现出来, 后期拆分与小伙伴分享学习

二、项目目录结构

说明: 数据库dao层放到另一个模块了, 不是本文重点

三、主要maven依赖

1、easyexcel

?

1

2

3

4

5

6

7

<easyexcel-latestversion> 1.1 . 2 -beta4</easyexcel-latestversion>

 

   <dependency>

    <groupid>com.alibaba</groupid>

    <artifactid>easyexcel</artifactid>

    <version>${easyexcel-latestversion}</version>

   </dependency>

jwt

?

1

2

3

4

5

<dependency>

  <groupid>io.jsonwebtoken</groupid>

  <artifactid>jjwt</artifactid>

  <version> 0.7 . 0 </version>

</dependency>

redis

?

1

2

3

4

5

<dependency>

  <groupid>org.springframework.boot</groupid>

  <artifactid>spring-boot-starter-redis</artifactid>

  <version> 1.3 . 5 .release</version>

</dependency>

腾讯cos

?

1

2

3

4

5

<dependency>

  <groupid>com.qcloud</groupid>

  <artifactid>cos_api</artifactid>

  <version> 5.4 . 5 </version>

</dependency>

四、流程

用户上传文件 将文件存储到腾讯cos 将上传后的文件id及上传记录保存到数据库 redis生产一条导入消息, 即保存文件id到redis 请求结束, 返回"处理中"状态 redis消费消息 读取cos文件, 异步处理数据 将错误数据以excel形式上传至cos, 以供用户下载, 并更新处理状态为"处理完成" 客户端轮询查询处理状态, 并可以下载错误文件 结束

五、实现效果

上传文件

数据库导入记录

导入的数据

下载错误文件

错误数据提示

查询导入记录

六、代码实现

1、导入excel控制层

?

1

2

3

4

5

6

@loginrequired

@requestmapping (value = "doimport" , method = requestmethod.post)

public jsonresponse doimport( @requestparam ( "file" ) multipartfile file, httpservletrequest request) {

   pluser user = getuser(request);

   return orderimportservice.doimport(file, user.getid());

}

2、service层

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

@override

public jsonresponse doimport(multipartfile file, integer userid) {

   if ( null == file || file.isempty()) {

     throw new serviceexception( "文件不能为空" );

   }

 

   string filename = file.getoriginalfilename();

   if (!checkfilesuffix(filename)) {

     throw new serviceexception( "当前仅支持xlsx格式的excel" );

   }

 

   // 存储文件

   string fileid = savetooss(file);

   if (stringutils.isblank(fileid)) {

     throw new serviceexception( "文件上传失败, 请稍后重试" );

   }

 

   // 保存记录到数据库

   saverecordtodb(userid, fileid, filename);

 

   // 生产一条订单导入消息

   redisproducer.produce(rediskey.orderimportkey, fileid);

 

   return jsonresponse.ok( "导入成功, 处理中..." );

}

 

/**

  * 校验文件格式

  * @param filename

  * @return

  */

private static boolean checkfilesuffix(string filename) {

   if (stringutils.isblank(filename) || filename.lastindexof( "." ) <= 0 ) {

     return false ;

   }

 

   int pointindex = filename.lastindexof( "." );

   string suffix = filename.substring(pointindex, filename.length()).tolowercase();

   if ( ".xlsx" .equals(suffix)) {

     return true ;

   }

 

   return false ;

}

 

/**

  * 将文件存储到腾讯oss

  * @param file

  * @return

  */

private string savetooss(multipartfile file) {

   inputstream ins = null ;

   try {

     ins = file.getinputstream();

   } catch (ioexception e) {

     e.printstacktrace();

   }

 

   string fileid;

   try {

     string originalfilename = file.getoriginalfilename();

     file f = new file(originalfilename);

     inputstreamtofile(ins, f);

     filesystemresource resource = new filesystemresource(f);

 

     multivaluemap<string, object> param = new linkedmultivaluemap<>();

     param.add( "file" , resource);

 

     responseresult responseresult = resttemplate.postforobject(txossuploadurl, param, responseresult. class );

     fileid = (string) responseresult.getdata();

   } catch (exception e) {

     fileid = null ;

   }

 

   return fileid;

}

3、redis生产者

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

@service

public class redisproducerimpl implements redisproducer {

 

   @autowired

   private redistemplate redistemplate;

 

   @override

   public jsonresponse produce(string key, string msg) {

     map<string, string> map = maps.newhashmap();

     map.put( "fileid" , msg);

     redistemplate.opsforlist().leftpush(key, map);

     return jsonresponse.ok();

   }

 

}

4、redis消费者

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

@service

public class redisconsumer {

 

   @autowired

   public redistemplate redistemplate;

 

   @value ( "${txossfileurl}" )

   private string txossfileurl;

 

   @value ( "${txossuploadurl}" )

   private string txossuploadurl;

 

   @postconstruct

   public void init() {

     processorderimport();

   }

 

   /**

    * 处理订单导入

    */

   private void processorderimport() {

     executorservice executorservice = executors.newcachedthreadpool();

     executorservice.execute(() -> {

       while ( true ) {

         object object = redistemplate.opsforlist().rightpop(rediskey.orderimportkey, 1 , timeunit.seconds);

         if ( null == object) {

           continue ;

         }

         string msg = json.tojsonstring(object);

         executorservice.execute( new orderimporttask(msg, txossfileurl, txossuploadurl));

       }

     });

   }

 

}

5、处理任务线程类

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

public class orderimporttask implements runnable {

   public orderimporttask(string msg, string txossfileurl, string txossuploadurl) {

     this .msg = msg;

     this .txossfileurl = txossfileurl;

     this .txossuploadurl = txossuploadurl;

   }

}

 

   /**

    * 注入bean

    */

   private void autowirebean() {

     this .resttemplate = beancontext.getapplicationcontext().getbean(resttemplate. class );

     this .transactiontemplate = beancontext.getapplicationcontext().getbean(transactiontemplate. class );

     this .orderimportservice = beancontext.getapplicationcontext().getbean(orderimportservice. class );

   }

 

   @override

   public void run() {

     // 注入bean

     autowirebean();

 

     jsonobject jsonobject = json.parseobject(msg);

     string fileid = jsonobject.getstring( "fileid" );

 

     multivaluemap<string, object> param = new linkedmultivaluemap<>();

     param.add( "id" , fileid);

 

     responseresult responseresult = resttemplate.postforobject(txossfileurl, param, responseresult. class );

     string fileurl = (string) responseresult.getdata();

     if (stringutils.isblank(fileurl)) {

       return ;

     }

 

     inputstream inputstream = httpclientutil.readfilefromurl(fileurl);

     list<object> list = excelutil.read(inputstream);

     process(list, fileid);

   }

 

   /**

    * 将文件上传至oss

    * @param file

    * @return

    */

   private string savetooss(file file) {

     string fileid;

     try {

       filesystemresource resource = new filesystemresource(file);

       multivaluemap<string, object> param = new linkedmultivaluemap<>();

       param.add( "file" , resource);

 

       responseresult responseresult = resttemplate.postforobject(txossuploadurl, param, responseresult. class );

       fileid = (string) responseresult.getdata();

     } catch (exception e) {

       fileid = null ;

     }

     return fileid;

   }

说明: 处理数据的业务逻辑代码就不用贴了

6、上传文件到cos

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

@requestmapping ( "/txossupload" )

@responsebody

public responseresult txossupload( @requestparam ( "file" ) multipartfile file) throws unsupportedencodingexception {

   if ( null == file || file.isempty()) {

     return responseresult.fail( "文件不能为空" );

   }

 

   string originalfilename = file.getoriginalfilename();

   originalfilename = mimeutility.decodetext(originalfilename); // 解决中文乱码问题

   string contenttype = getcontenttype(originalfilename);

   string key;

 

   inputstream ins = null ;

   file f = null ;

 

   try {

     ins = file.getinputstream();

     f = new file(originalfilename);

     inputstreamtofile(ins, f);

     key = ifilestorageclient.txossupload( new fileinputstream(f), originalfilename, contenttype);

   } catch (exception e) {

     return responseresult.fail(e.getmessage());

   } finally {

     if ( null != ins) {

       try {

         ins.close();

       } catch (ioexception e) {

         e.printstacktrace();

       }

     }

     if (f.exists()) { // 删除临时文件

       f.delete();

     }

   }

 

   return responseresult.ok(key);

}

 

public static void inputstreamtofile(inputstream ins,file file) {

   try {

     outputstream os = new fileoutputstream(file);

     int bytesread = 0 ;

     byte [] buffer = new byte [ 8192 ];

     while ((bytesread = ins.read(buffer, 0 , 8192 )) != - 1 ) {

       os.write(buffer, 0 , bytesread);

     }

     os.close();

     ins.close();

   } catch (exception e) {

     e.printstacktrace();

   }

}

 

public string txossupload(fileinputstream inputstream, string key, string contenttype) {

   key = uuid.getuuid() + "-" + key;

   ossutil.txossupload(inputstream, key, contenttype);

   try {

     if ( null != inputstream) {

       inputstream.close();

     }

   } catch (ioexception e) {

     e.printstacktrace();

   }

   return key;

}

 

public static void txossupload(fileinputstream inputstream, string key, string contenttype) {

   objectmetadata objectmetadata = new objectmetadata();

   try {

     int length = inputstream.available();

     objectmetadata.setcontentlength(length);

   } catch (exception e){

     logger.info(e.getmessage());

   }

   objectmetadata.setcontenttype(contenttype);

   cosclient.putobject(txbucketname, key, inputstream, objectmetadata);

}

7、下载文件

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

/**

  * 腾讯云文件下载

  * @param response

  * @param id

  * @return

  */

@requestmapping ( "/txossdownload" )

public object txossdownload(httpservletresponse response, string id) {

   cosobjectinputstream cosobjectinputstream = ifilestorageclient.txossdownload(id, response);

   string contenttype = getcontenttype(id);

   fileutil.txossdownload(response, contenttype, cosobjectinputstream, id);

   return null ;

}

 

public static void txossdownload(httpservletresponse response, string contenttype, inputstream filestream, string filename) {

   fileoutputstream fos = null ;

   response.reset();

   outputstream os = null ;

   try {

     response.setcontenttype(contenttype + "; charset=utf-8" );

     if (!contenttype.equals(plconstans.filecontenttype.image)){

       try {

         response.setheader( "content-disposition" , "attachment; filename=" + new string(filename.getbytes( "utf-8" ), "iso8859-1" ));

       } catch (unsupportedencodingexception e) {

         response.setheader( "content-disposition" , "attachment; filename=" + filename);

         logger.error( "encoding file name failed" , e);

       }

     }

 

     os = response.getoutputstream();

 

     byte [] b = new byte [ 1024 * 1024 ];

     int len;

     while ((len = filestream.read(b)) > 0 ) {

       os.write(b, 0 , len);

       os.flush();

       try {

         if (fos != null ) {

           fos.write(b, 0 , len);

           fos.flush();

         }

       } catch (exception e) {

         logger.error(e.getmessage());

       }

     }

   } catch (ioexception e) {

     ioutils.closequietly(fos);

     fos = null ;

   } finally {

     ioutils.closequietly(os);

     ioutils.closequietly(filestream);

     if (fos != null ) {

       ioutils.closequietly(fos);

     }

   }

}

8、读取网络文件流

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

/**

  * 读取网络文件流

  * @param url

  * @return

  */

public static inputstream readfilefromurl(string url) {

   if (stringutils.isblank(url)) {

     return null ;

   }

 

   httpclient httpclient = new defaulthttpclient();

   httpget methodget = new httpget(url);

   try {

     httpresponse response = httpclient.execute(methodget);

     if (response.getstatusline().getstatuscode() == 200 ) {

       httpentity entity = response.getentity();

       return entity.getcontent();

     }

   } catch (exception e) {

     e.printstacktrace();

   }

   return null ;

}

9、excelutil

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

/**

  * 读excel

  * @param inputstream 文件输入流

  * @return list集合

  */

public static list<object> read(inputstream inputstream) {

   return easyexcelfactory.read(inputstream, new sheet( 1 , 1 ));

}

 

/**

  * 写excel

  * @param data list数据

  * @param clazz

  * @param savefilepath 文件保存路径

  * @throws ioexception

  */

public static void write(list<? extends baserowmodel> data, class <? extends baserowmodel> clazz, string savefilepath) throws ioexception {

   file tempfile = new file(savefilepath);

   outputstream out = new fileoutputstream(tempfile);

   excelwriter writer = easyexcelfactory.getwriter(out);

   sheet sheet = new sheet( 1 , 3 , clazz, "sheet1" , null );

   writer.write(data, sheet);

   writer.finish();

   out.close();

}

说明: 至此, 整个流程算是完整了, 下面将其他知识点代码也贴出来参考

七、其他

1、@loginrequired注解

?

1

2

3

4

5

6

7

/**

  * 在需要登录验证的controller的方法上使用此注解

  */

@target ({elementtype.method})

@retention (retentionpolicy.runtime)

public @interface loginrequired {

}

2、mycontrolleradvice

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

@controlleradvice

public class mycontrolleradvice {

 

   @responsebody

   @exceptionhandler (tokenvalidationexception. class )

   public jsonresponse tokenvalidationexceptionhandler() {

     return jsonresponse.logininvalid();

   }

 

   @responsebody

   @exceptionhandler (serviceexception. class )

   public jsonresponse serviceexceptionhandler(serviceexception se) {

     return jsonresponse.fail(se.getmsg());

   }

 

   @responsebody

   @exceptionhandler (exception. class )

   public jsonresponse exceptionhandler(exception e) {

     e.printstacktrace();

     return jsonresponse.fail(e.getmessage());

   }

 

}

3、authenticationinterceptor

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

public class authenticationinterceptor implements handlerinterceptor {

 

   private static final string current_user = "user" ;

 

   @autowired

   private userservice userservice;

 

   @override

   public boolean prehandle(httpservletrequest request, httpservletresponse response, object handler) {

     // 如果不是映射到方法直接通过

     if (!(handler instanceof handlermethod)) {

       return true ;

     }

     handlermethod handlermethod = (handlermethod) handler;

     method method = handlermethod.getmethod();

 

     // 判断接口是否有@loginrequired注解, 有则需要登录

     loginrequired methodannotation = method.getannotation(loginrequired. class );

     if (methodannotation != null ) {

       // 验证token

       integer userid = jwtutil.verifytoken(request);

       pluser pluser = userservice.selectbyprimarykey(userid);

       if ( null == pluser) {

         throw new runtimeexception( "用户不存在,请重新登录" );

       }

       request.setattribute(current_user, pluser);

       return true ;

     }

     return true ;

   }

 

   @override

   public void posthandle(httpservletrequest httpservletrequest, httpservletresponse httpservletresponse, object o, modelandview modelandview) throws exception {

   }

 

   @override

   public void aftercompletion(httpservletrequest httpservletrequest, httpservletresponse httpservletresponse, object o, exception e) throws exception {

   }

}

4、jwtutil

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

public static final long expiration_time = 2592_000_000l; // 有效期30天

public static final string secret = "pl_token_secret" ;

public static final string header = "token" ;

public static final string user_id = "userid" ;

 

/**

  * 根据userid生成token

  * @param userid

  * @return

  */

public static string generatetoken(string userid) {

   hashmap<string, object> map = new hashmap<>();

   map.put(user_id, userid);

   string jwt = jwts.builder()

       .setclaims(map)

       .setexpiration( new date(system.currenttimemillis() + expiration_time))

       .signwith(signaturealgorithm.hs512, secret)

       测试数据pact();

   return jwt;

}

 

/**

  * 验证token

  * @param request

  * @return 验证通过返回userid

  */

public static integer verifytoken(httpservletrequest request) {

   string token = request.getheader(header);

   if (token != null ) {

     try {

       map<string, object> body = jwts.parser()

           .setsigningkey(secret)

           .parseclaimsjws(token)

           .getbody();

 

       for (map.entry entry : body.entryset()) {

         object key = entry.getkey();

         object value = entry.getvalue();

         if (key.tostring().equals(user_id)) {

           return integer.valueof(value.tostring()); // userid

         }

       }

       return null ;

     } catch (exception e) {

       logger.error(e.getmessage());

       throw new tokenvalidationexception( "unauthorized" );

     }

   } else {

     throw new tokenvalidationexception( "missing token" );

   }

}

结语: ok, 搞定,睡了, 好困

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对的支持。

原文链接:https://HdhCmsTestcnblogs测试数据/wangzaiplus/p/10660520.html

查看更多关于Spring boot项目redisTemplate实现轻量级消息队列的方法的详细内容...

  阅读:14次