好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

BlockingQueue队列处理高并发下的日志

前言

当系统流量负载比较高时,业务日志的写入操作也要纳入系统性能考量之内,如若处理不当,将影响系统的正常业务操作,之前写过一篇《spring boot通过MQ消费log4j2的日志》的博文,采用了RabbitMQ消息中间件来存储抗高并发下的日志,因为引入了中间件,操作使用起来可能没那么简便,今天分享使用多线程消费阻塞队列的方式来处理我们的海量日志

what阻塞队列?

阻塞队列(BlockingQueue)是区别于普通队列多了两个附加操作的线程安全的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

1.声明存储固定消息的队列

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

/**

  * Created by kl on 2017/3/20.

  * Content :销售操作日志队列

  */

public class SalesLogQueue{

     //队列大小

     public static final int QUEUE_MAX_SIZE    = 1000 ;

     private static SalesLogQueue alarmMessageQueue = new SalesLogQueue();

     //阻塞队列

     private BlockingQueueblockingQueue = new LinkedBlockingQueue<>(QUEUE_MAX_SIZE);

     private SalesLogQueue(){}

     public static SalesLogQueue getInstance() {

         return alarmMessageQueue;

     }

     /**

      * 消息入队

      * @param salesLog

      * @return

      */

     public boolean push(SalesLog salesLog) {

         return this .blockingQueue.add(salesLog); //队列满了就抛出异常,不阻塞

     }

     /**

      * 消息出队

      * @return

      */

     public SalesLog poll() {

         SalesLog result = null ;

         try {

             result = this .blockingQueue.take();

         } catch (InterruptedException e) {

             e.printStackTrace();

         }

         return result;

     }

     /**

      * 获取队列大小

      * @return

      */

     public int size() {

         return this .blockingQueue.size();

     }

}

ps:因为业务原因,采用add的方式入队,队列满了就抛异常,不阻塞

2.消息入队

消息入队可以在任何需要保存日志的地方操作,如aop统一拦截日志处理,filter过滤请求日志处理,或者耦合的业务日志,记住,不阻塞入队操作,不然将影响正常的业务操作,如下为filter统一处理请求日志:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

/**

  * Created by kl on 2017/3/20.

  * Content :访问请求拦截,保存操作日志

  */

public class SalesLogFilter implements Filter {

     private RoleResourceService resourceService;

     @Override

     public void init(FilterConfig filterConfig) throws ServletException {

         ServletContext context = filterConfig.getServletContext();

         ApplicationContext ctx = WebApplicationContextUtils.getWebApplicationContext(context);

         resourceService = ctx.getBean(RoleResourceService. class );

     }

     @Override

     public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {

         try {

             HttpServletRequest request = (HttpServletRequest) servletRequest;

             String requestUrl = request.getRequestURI();

             String requestType=request.getMethod();

             String ipAddress = HttpClientUtil.getIpAddr(request);

             Map resource=resourceService.getResource();

             String context=resource.get(requestUrl);

             //动态url正则匹配

             if (StringUtil.isNull(context)){

                 for (Map.Entry entry:resource.entrySet()){

                     String resourceUrl= entry.getKey();

                     if (requestUrl.matches(resourceUrl)){

                         context=entry.getValue();

                         break ;

                     }

                 }

             }

             SalesLog log= new SalesLog();

             log.setCreateDate( new Timestamp(System.currentTimeMillis()));

             log.setContext(context);

             log.setOperateUser(UserTokenUtil.currentUser.get().get( "realname" ));

             log.setRequestIp(ipAddress);

             log.setRequestUrl(requestUrl);

             log.setRequestType(requestType);

             SalesLogQueue.getInstance().push(log);

         } catch (Exception e){

             e.printStackTrace();

         }

         filterChain.doFilter(servletRequest, servletResponse);

     }

     @Override

     public void destroy() {

     }

}

3.消息出队被消费

BlockingQueue是线程安全的,所以可以放心的在多个线程中去处理队列中的消息,如下代码声明了一个两个大小的固定线程池,并添加了两个线程去处理队列中的消息

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

/**

  * Created by kl on 2017/3/20.

  * Content :启动消费操作日志队列的线程

  */

@Component

public class ConsumeSalesLogQueue {

     @Autowired

     SalesLogService salesLogService;

     @PostConstruct

     public void startrtThread() {

         ExecutorService e = Executors.newFixedThreadPool( 2 ); //两个大小的固定线程池

         e.submit( new PollSalesLog(salesLogService));

         e.submit( new PollSalesLog(salesLogService));

     }

     class PollSalesLog implements Runnable {

         SalesLogService salesLogService;

         public PollSalesLog(SalesLogService salesLogService) {

             this .salesLogService = salesLogService;

         }

         @Override

         public void run() {

             while ( true ) {

                 try {

                     SalesLog salesLog = SalesLogQueue.getInstance().poll();

                     if (salesLog!= null ){

                         salesLogService.saveSalesLog(salesLog);

                     }

                 } catch (Exception e) {

                     e.printStackTrace();

                 }

             }

         }

     }

}

参考博文如下,对BlockingQueue队列更多了解,可读一读如下的博文:

详细分析Java并发集合ArrayBlockingQueue的用法

详解Java阻塞队列(BlockingQueue)的实现原理

Java并发之BlockingQueue的使用

以上就是BlockingQueue队列处理高并发下的日志的详细内容,更多关于BlockingQueue队列处理高并发日志的资料请关注其它相关文章!

原文链接:http://HdhCmsTestkailing.pub/article/index/arcid/153.html

查看更多关于BlockingQueue队列处理高并发下的日志的详细内容...

  阅读:19次