背景
公司项目有个需求, 前端上传excel文件, 后端读取数据、处理数据、返回错误数据, 最简单的方式同步处理, 客户端上传文件后一直阻塞等待响应, 但用户体验无疑很差, 处理数据可能十分耗时, 没人愿意傻等, 由于项目暂未使用activemq等 消息队列 中间件, 而redis的lpush和rpop很适合作为一种轻量级的消息队列实现, 所以用它完成此次功能开发
一、本文涉及知识点
excel文件读写--阿里easyexcel sdk 文件上传、下载--腾讯云对象存储 远程服务调用--resttemplate 生产者、消费者--redistemplate leftpush和rightpop操作 异步处理数据--executors线程池 读取网络文件流--httpclient 自定义注解实现用户身份认证--jwt token认证, 拦截器拦截标注有@loginrequired注解的请求入口当然, java实现咯
涉及的知识点比较多, 每一个知识点都可以作为专题进行学习分析, 本文将完整实现呈现出来, 后期拆分与小伙伴分享学习
二、项目目录结构
说明: 数据库dao层放到另一个模块了, 不是本文重点
三、主要maven依赖
1、easyexcel
1 2 3 4 5 6 7 |
<easyexcel-latestversion> 1.1 . 2 -beta4</easyexcel-latestversion>
<dependency> <groupid>com.alibaba</groupid> <artifactid>easyexcel</artifactid> <version>${easyexcel-latestversion}</version> </dependency> |
jwt
1 2 3 4 5 |
<dependency> <groupid>io.jsonwebtoken</groupid> <artifactid>jjwt</artifactid> <version> 0.7 . 0 </version> </dependency> |
redis
1 2 3 4 5 |
<dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-redis</artifactid> <version> 1.3 . 5 .release</version> </dependency> |
腾讯cos
1 2 3 4 5 |
<dependency> <groupid>com.qcloud</groupid> <artifactid>cos_api</artifactid> <version> 5.4 . 5 </version> </dependency> |
四、流程
用户上传文件 将文件存储到腾讯cos 将上传后的文件id及上传记录保存到数据库 redis生产一条导入消息, 即保存文件id到redis 请求结束, 返回"处理中"状态 redis消费消息 读取cos文件, 异步处理数据 将错误数据以excel形式上传至cos, 以供用户下载, 并更新处理状态为"处理完成" 客户端轮询查询处理状态, 并可以下载错误文件 结束五、实现效果
上传文件
数据库导入记录
导入的数据
下载错误文件
错误数据提示
查询导入记录
六、代码实现
1、导入excel控制层
1 2 3 4 5 6 |
@loginrequired @requestmapping (value = "doimport" , method = requestmethod.post) public jsonresponse doimport( @requestparam ( "file" ) multipartfile file, httpservletrequest request) { pluser user = getuser(request); return orderimportservice.doimport(file, user.getid()); } |
2、service层
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
@override public jsonresponse doimport(multipartfile file, integer userid) { if ( null == file || file.isempty()) { throw new serviceexception( "文件不能为空" ); }
string filename = file.getoriginalfilename(); if (!checkfilesuffix(filename)) { throw new serviceexception( "当前仅支持xlsx格式的excel" ); }
// 存储文件 string fileid = savetooss(file); if (stringutils.isblank(fileid)) { throw new serviceexception( "文件上传失败, 请稍后重试" ); }
// 保存记录到数据库 saverecordtodb(userid, fileid, filename);
// 生产一条订单导入消息 redisproducer.produce(rediskey.orderimportkey, fileid);
return jsonresponse.ok( "导入成功, 处理中..." ); }
/** * 校验文件格式 * @param filename * @return */ private static boolean checkfilesuffix(string filename) { if (stringutils.isblank(filename) || filename.lastindexof( "." ) <= 0 ) { return false ; }
int pointindex = filename.lastindexof( "." ); string suffix = filename.substring(pointindex, filename.length()).tolowercase(); if ( ".xlsx" .equals(suffix)) { return true ; }
return false ; }
/** * 将文件存储到腾讯oss * @param file * @return */ private string savetooss(multipartfile file) { inputstream ins = null ; try { ins = file.getinputstream(); } catch (ioexception e) { e.printstacktrace(); }
string fileid; try { string originalfilename = file.getoriginalfilename(); file f = new file(originalfilename); inputstreamtofile(ins, f); filesystemresource resource = new filesystemresource(f);
multivaluemap<string, object> param = new linkedmultivaluemap<>(); param.add( "file" , resource);
responseresult responseresult = resttemplate.postforobject(txossuploadurl, param, responseresult. class ); fileid = (string) responseresult.getdata(); } catch (exception e) { fileid = null ; }
return fileid; } |
3、redis生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
@service public class redisproducerimpl implements redisproducer {
@autowired private redistemplate redistemplate;
@override public jsonresponse produce(string key, string msg) { map<string, string> map = maps.newhashmap(); map.put( "fileid" , msg); redistemplate.opsforlist().leftpush(key, map); return jsonresponse.ok(); }
} |
4、redis消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
@service public class redisconsumer {
@autowired public redistemplate redistemplate;
@value ( "${txossfileurl}" ) private string txossfileurl;
@value ( "${txossuploadurl}" ) private string txossuploadurl;
@postconstruct public void init() { processorderimport(); }
/** * 处理订单导入 */ private void processorderimport() { executorservice executorservice = executors.newcachedthreadpool(); executorservice.execute(() -> { while ( true ) { object object = redistemplate.opsforlist().rightpop(rediskey.orderimportkey, 1 , timeunit.seconds); if ( null == object) { continue ; } string msg = json.tojsonstring(object); executorservice.execute( new orderimporttask(msg, txossfileurl, txossuploadurl)); } }); }
} |
5、处理任务线程类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
public class orderimporttask implements runnable { public orderimporttask(string msg, string txossfileurl, string txossuploadurl) { this .msg = msg; this .txossfileurl = txossfileurl; this .txossuploadurl = txossuploadurl; } }
/** * 注入bean */ private void autowirebean() { this .resttemplate = beancontext.getapplicationcontext().getbean(resttemplate. class ); this .transactiontemplate = beancontext.getapplicationcontext().getbean(transactiontemplate. class ); this .orderimportservice = beancontext.getapplicationcontext().getbean(orderimportservice. class ); }
@override public void run() { // 注入bean autowirebean();
jsonobject jsonobject = json.parseobject(msg); string fileid = jsonobject.getstring( "fileid" );
multivaluemap<string, object> param = new linkedmultivaluemap<>(); param.add( "id" , fileid);
responseresult responseresult = resttemplate.postforobject(txossfileurl, param, responseresult. class ); string fileurl = (string) responseresult.getdata(); if (stringutils.isblank(fileurl)) { return ; }
inputstream inputstream = httpclientutil.readfilefromurl(fileurl); list<object> list = excelutil.read(inputstream); process(list, fileid); }
/** * 将文件上传至oss * @param file * @return */ private string savetooss(file file) { string fileid; try { filesystemresource resource = new filesystemresource(file); multivaluemap<string, object> param = new linkedmultivaluemap<>(); param.add( "file" , resource);
responseresult responseresult = resttemplate.postforobject(txossuploadurl, param, responseresult. class ); fileid = (string) responseresult.getdata(); } catch (exception e) { fileid = null ; } return fileid; } |
说明: 处理数据的业务逻辑代码就不用贴了
6、上传文件到cos
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
@requestmapping ( "/txossupload" ) @responsebody public responseresult txossupload( @requestparam ( "file" ) multipartfile file) throws unsupportedencodingexception { if ( null == file || file.isempty()) { return responseresult.fail( "文件不能为空" ); }
string originalfilename = file.getoriginalfilename(); originalfilename = mimeutility.decodetext(originalfilename); // 解决中文乱码问题 string contenttype = getcontenttype(originalfilename); string key;
inputstream ins = null ; file f = null ;
try { ins = file.getinputstream(); f = new file(originalfilename); inputstreamtofile(ins, f); key = ifilestorageclient.txossupload( new fileinputstream(f), originalfilename, contenttype); } catch (exception e) { return responseresult.fail(e.getmessage()); } finally { if ( null != ins) { try { ins.close(); } catch (ioexception e) { e.printstacktrace(); } } if (f.exists()) { // 删除临时文件 f.delete(); } }
return responseresult.ok(key); }
public static void inputstreamtofile(inputstream ins,file file) { try { outputstream os = new fileoutputstream(file); int bytesread = 0 ; byte [] buffer = new byte [ 8192 ]; while ((bytesread = ins.read(buffer, 0 , 8192 )) != - 1 ) { os.write(buffer, 0 , bytesread); } os.close(); ins.close(); } catch (exception e) { e.printstacktrace(); } }
public string txossupload(fileinputstream inputstream, string key, string contenttype) { key = uuid.getuuid() + "-" + key; ossutil.txossupload(inputstream, key, contenttype); try { if ( null != inputstream) { inputstream.close(); } } catch (ioexception e) { e.printstacktrace(); } return key; }
public static void txossupload(fileinputstream inputstream, string key, string contenttype) { objectmetadata objectmetadata = new objectmetadata(); try { int length = inputstream.available(); objectmetadata.setcontentlength(length); } catch (exception e){ logger.info(e.getmessage()); } objectmetadata.setcontenttype(contenttype); cosclient.putobject(txbucketname, key, inputstream, objectmetadata); } |
7、下载文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
/** * 腾讯云文件下载 * @param response * @param id * @return */ @requestmapping ( "/txossdownload" ) public object txossdownload(httpservletresponse response, string id) { cosobjectinputstream cosobjectinputstream = ifilestorageclient.txossdownload(id, response); string contenttype = getcontenttype(id); fileutil.txossdownload(response, contenttype, cosobjectinputstream, id); return null ; }
public static void txossdownload(httpservletresponse response, string contenttype, inputstream filestream, string filename) { fileoutputstream fos = null ; response.reset(); outputstream os = null ; try { response.setcontenttype(contenttype + "; charset=utf-8" ); if (!contenttype.equals(plconstans.filecontenttype.image)){ try { response.setheader( "content-disposition" , "attachment; filename=" + new string(filename.getbytes( "utf-8" ), "iso8859-1" )); } catch (unsupportedencodingexception e) { response.setheader( "content-disposition" , "attachment; filename=" + filename); logger.error( "encoding file name failed" , e); } }
os = response.getoutputstream();
byte [] b = new byte [ 1024 * 1024 ]; int len; while ((len = filestream.read(b)) > 0 ) { os.write(b, 0 , len); os.flush(); try { if (fos != null ) { fos.write(b, 0 , len); fos.flush(); } } catch (exception e) { logger.error(e.getmessage()); } } } catch (ioexception e) { ioutils.closequietly(fos); fos = null ; } finally { ioutils.closequietly(os); ioutils.closequietly(filestream); if (fos != null ) { ioutils.closequietly(fos); } } } |
8、读取网络文件流
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
/** * 读取网络文件流 * @param url * @return */ public static inputstream readfilefromurl(string url) { if (stringutils.isblank(url)) { return null ; }
httpclient httpclient = new defaulthttpclient(); httpget methodget = new httpget(url); try { httpresponse response = httpclient.execute(methodget); if (response.getstatusline().getstatuscode() == 200 ) { httpentity entity = response.getentity(); return entity.getcontent(); } } catch (exception e) { e.printstacktrace(); } return null ; } |
9、excelutil
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
/** * 读excel * @param inputstream 文件输入流 * @return list集合 */ public static list<object> read(inputstream inputstream) { return easyexcelfactory.read(inputstream, new sheet( 1 , 1 )); }
/** * 写excel * @param data list数据 * @param clazz * @param savefilepath 文件保存路径 * @throws ioexception */ public static void write(list<? extends baserowmodel> data, class <? extends baserowmodel> clazz, string savefilepath) throws ioexception { file tempfile = new file(savefilepath); outputstream out = new fileoutputstream(tempfile); excelwriter writer = easyexcelfactory.getwriter(out); sheet sheet = new sheet( 1 , 3 , clazz, "sheet1" , null ); writer.write(data, sheet); writer.finish(); out.close(); } |
说明: 至此, 整个流程算是完整了, 下面将其他知识点代码也贴出来参考
七、其他
1、@loginrequired注解
1 2 3 4 5 6 7 |
/** * 在需要登录验证的controller的方法上使用此注解 */ @target ({elementtype.method}) @retention (retentionpolicy.runtime) public @interface loginrequired { } |
2、mycontrolleradvice
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
@controlleradvice public class mycontrolleradvice {
@responsebody @exceptionhandler (tokenvalidationexception. class ) public jsonresponse tokenvalidationexceptionhandler() { return jsonresponse.logininvalid(); }
@responsebody @exceptionhandler (serviceexception. class ) public jsonresponse serviceexceptionhandler(serviceexception se) { return jsonresponse.fail(se.getmsg()); }
@responsebody @exceptionhandler (exception. class ) public jsonresponse exceptionhandler(exception e) { e.printstacktrace(); return jsonresponse.fail(e.getmessage()); }
} |
3、authenticationinterceptor
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
public class authenticationinterceptor implements handlerinterceptor {
private static final string current_user = "user" ;
@autowired private userservice userservice;
@override public boolean prehandle(httpservletrequest request, httpservletresponse response, object handler) { // 如果不是映射到方法直接通过 if (!(handler instanceof handlermethod)) { return true ; } handlermethod handlermethod = (handlermethod) handler; method method = handlermethod.getmethod();
// 判断接口是否有@loginrequired注解, 有则需要登录 loginrequired methodannotation = method.getannotation(loginrequired. class ); if (methodannotation != null ) { // 验证token integer userid = jwtutil.verifytoken(request); pluser pluser = userservice.selectbyprimarykey(userid); if ( null == pluser) { throw new runtimeexception( "用户不存在,请重新登录" ); } request.setattribute(current_user, pluser); return true ; } return true ; }
@override public void posthandle(httpservletrequest httpservletrequest, httpservletresponse httpservletresponse, object o, modelandview modelandview) throws exception { }
@override public void aftercompletion(httpservletrequest httpservletrequest, httpservletresponse httpservletresponse, object o, exception e) throws exception { } } |
4、jwtutil
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
public static final long expiration_time = 2592_000_000l; // 有效期30天 public static final string secret = "pl_token_secret" ; public static final string header = "token" ; public static final string user_id = "userid" ;
/** * 根据userid生成token * @param userid * @return */ public static string generatetoken(string userid) { hashmap<string, object> map = new hashmap<>(); map.put(user_id, userid); string jwt = jwts.builder() .setclaims(map) .setexpiration( new date(system.currenttimemillis() + expiration_time)) .signwith(signaturealgorithm.hs512, secret) 测试数据pact(); return jwt; }
/** * 验证token * @param request * @return 验证通过返回userid */ public static integer verifytoken(httpservletrequest request) { string token = request.getheader(header); if (token != null ) { try { map<string, object> body = jwts.parser() .setsigningkey(secret) .parseclaimsjws(token) .getbody();
for (map.entry entry : body.entryset()) { object key = entry.getkey(); object value = entry.getvalue(); if (key.tostring().equals(user_id)) { return integer.valueof(value.tostring()); // userid } } return null ; } catch (exception e) { logger.error(e.getmessage()); throw new tokenvalidationexception( "unauthorized" ); } } else { throw new tokenvalidationexception( "missing token" ); } } |
结语: ok, 搞定,睡了, 好困
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对的支持。
原文链接:https://HdhCmsTestcnblogs测试数据/wangzaiplus/p/10660520.html
查看更多关于Spring boot项目redisTemplate实现轻量级消息队列的方法的详细内容...