好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

从操作系统层面分析Java IO演进之路

 前言

 

本文从 操作系统 实际调用角度(以CentOS Linux release 7.5操作系统为示例),力求追根溯源看 IO 的每一步操作到底发生了什么。

关于如何查看系统调用,Linux可以使用 strace 来查看任何软件的系统调动(这是个很好的分析学习方法):strace -ff -o ./out java Test Java

一 BIO

 

/**   * Alipay.com Inc. Copyright (c) 2004-2020  All  Rights Reserved.   */  package io;     import java.io.*;  import java.net.ServerSocket;  import java.net.Socket;    /**   * @author xiangyong.ding   * @version $Id: TestSocket.java, v 0.1 2020年08月02日 20:56 xiangyong.ding Exp $   */  public  class BIOSocket {       public   static  void main(String[] args) throws IOException {          ServerSocket serverSocket = new ServerSocket(8090);          System. out .println( "step1: new ServerSocket " );          while ( true ) {              Socket client = serverSocket.accept();              System. out .println( "step2: client\t"  + client.getPort());              new Thread(() -> {                  try {                      InputStream  in  = client.getInputStream();                      BufferedReader reader = new BufferedReader(new InputStreamReader( in ));                      while ( true ) {                          System. out .println(reader.readLine());                      }                  } catch (IOException e) {                      e.printStackTrace();                  }              }).start();          }      }  } 

1 发生的系统调用

启动时

socket(AF_INET, SOCK_STREAM, IPPROTO_IP) = 5  bind(5, {sa_family=AF_INET, sin_port=htons(8090), sin_addr=inet_addr( "0.0.0.0" )}, 16) = 0  listen(5, 50)                           = 0  poll([{fd=5, events=POLLIN|POLLERR}], 1, -1) = 1 ([{fd=5, revents=POLLIN}]) 

poll函数会阻塞直到其中任何一个fd发生事件。

有客户端连接后

accept(5, {sa_family=AF_INET, sin_port=htons(10253), sin_addr=inet_addr( "42.120.74.252" )}, [16]) = 6  clone(child_stack=0x7f013e5c4fb0, flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID, parent_tidptr=0x7f013e5c59d0,         tls=0x7f013e5c5700, child_tidptr=0x7f013e5c59d0) = 13168  poll([{fd=5, events=POLLIN|POLLERR}], 1, -1 

抛出线程(即我们代码里的 new Thread() )后,继续poll阻塞等待连接。

clone出来的线程

recvfrom(6,  "hello,bio\n" , 8192, 0,  NULL ,  NULL ) = 

关于对recvfrom函数的说明,其中第四个参数0 表示这是一个阻塞调用。

客户端发送数据后

recvfrom(6,  "hello,bio\n" , 8192, 0,  NULL ,  NULL ) = 10 

2 优缺点

优点

代码简单,逻辑清晰。

缺点

由于stream的read操作是阻塞读,面对多个连接时 每个连接需要每线程。无法处理大量连接(C10K问题)。 误区:可见JDK1.8中对于最初的BIO,在Linux OS下仍然使用的poll,poll本身也是相对比较高效的多路复用函数(支持非阻塞、多个socket同时检查event),只是限于JDK最初的stream API限制,无法支持非阻塞读取。

二 NIO(non block)

 

改进:使用NIO API,将阻塞变为非阻塞, 不需要大量线程。

/**   * Alipay.com Inc. Copyright (c) 2004-2020  All  Rights Reserved.   */  package io;    import java.io.IOException;  import java.net.InetSocketAddress;  import java.nio.ByteBuffer;  import java.nio.channels.ServerSocketChannel;  import java.nio.channels.SocketChannel;  import java.util.LinkedList;    /**   * @author xiangyong.ding   * @version $Id: NioSocket.java, v 0.1 2020年08月09日 11:25 xiangyong.ding Exp $   */  public  class NIOSocket {      private  static  LinkedList<SocketChannel> clients = new LinkedList<>();        private  static  void startClientChannelHandleThread(){          new Thread(() -> {              while ( true ){                  ByteBuffer buffer = ByteBuffer.allocateDirect(4096);                    //处理客户端连接                   for  (SocketChannel c : clients) {                      // 非阻塞, >0 表示读取到的字节数量, 0或-1表示未读取到或读取异常                       int  num = 0;                      try {                          num = c. read (buffer);                      } catch (IOException e) {                          e.printStackTrace();                      }                        if (num > 0) {                          buffer.flip();                          byte[] clientBytes = new byte[buffer.limit()];                          //从缓冲区 读取到内存中                          buffer.get(clientBytes);                            System. out .println(c.socket().getPort() +  ":"  + new String(clientBytes));                            //清空缓冲区                          buffer.clear();                      }                  }              }          }).start();      }         public   static  void main(String[] args) throws IOException {          //new socket,开启监听          ServerSocketChannel socketChannel = ServerSocketChannel. open ();          socketChannel.bind(new InetSocketAddress(9090));          //设置阻塞接受客户端连接          socketChannel.configureBlocking( true );            //开始client处理线程          startClientChannelHandleThread();            while ( true ) {              //接受客户端连接; 非阻塞,无客户端返回 null (操作系统返回-1)              SocketChannel client = socketChannel.accept();                if (client ==  null ) {                  //System. out .println( "no client" );              }  else  {                  //设置读非阻塞                  client.configureBlocking( false );                     int  port = client.socket().getPort();                  System. out .println( "client port :"  + port);                    clients. add (client);              }          }      }  } 

1 发生的系统调用

主线程

socket(AF_INET, SOCK_STREAM, IPPROTO_IP) = 4  bind(4, {sa_family=AF_INET, sin_port=htons(9090), sin_addr=inet_addr( "0.0.0.0" )}, 16) = 0  listen(4, 50)                           = 0  fcntl(4, F_SETFL, O_RDWR|O_NONBLOCK)    = 0  accept(4, 0x7fe26414e680, 0x7fe26c376710) = -1 EAGAIN (Resource temporarily unavailable) 

有连接后,子线程

read (6, 0x7f3f415b1c50, 4096)           = -1 EAGAIN (Resource temporarily unavailable)  read (6, 0x7f3f415b1c50, 4096)           = -1 EAGAIN (Resource temporarily unavailable)  ... 

资源使用情况:

2 优缺点

优点

线程数大大减少。

缺点

需要程序自己扫描 每个连接read,需要 O(n)时间复杂度系统调用 (此时可能只有一个连接发送了数据),高频系统调用(导致CPU 用户态内核态切换)高。导致CPU消耗很高。

三 多路复用器(select、poll、epoll)

 

改进:不需要用户扫描所有连接,由kernel 给出哪些连接有数据,然后应用从有数据的连接读取数据。

1 epoll

import java.net.InetSocketAddress;  import java.nio.ByteBuffer;  import java.nio.channels.SelectionKey;  import java.nio.channels.Selector;  import java.nio.channels.ServerSocketChannel;  import java.nio.channels.SocketChannel;  import java.util.Iterator;  import java.util.LinkedList;  import java.util. Set ;    /**   * 多路复用socket   *   * @author xiangyong.ding   * @version $Id: MultiplexingSocket.java, v 0.1 2020年08月09日 12:19 xiangyong.ding Exp $   */  public  class MultiplexingSocket {         static  ByteBuffer buffer = ByteBuffer.allocateDirect(4096);         public   static  void main(String[] args) throws Exception {            LinkedList<SocketChannel> clients = new LinkedList<>();            //1.启动server          //new socket,开启监听          ServerSocketChannel socketChannel = ServerSocketChannel. open ();          socketChannel.bind(new InetSocketAddress(9090));          //设置非阻塞,接受客户端          socketChannel.configureBlocking( false );            //多路复用器(JDK包装的代理, select  /poll/epoll/kqueue)          Selector selector = Selector. open (); //java自动代理,默认为epoll          //Selector selector = PollSelectorProvider.provider().openSelector();//指定为poll            //将服务端socket 注册到 多路复用器          socketChannel.register(selector, SelectionKey.OP_ACCEPT);            //2. 轮训多路复用器          // 先询问有没有连接,如果有则返回数量以及对应的对象(fd)          while (selector. select () > 0) {              System. out .println();               Set <SelectionKey> selectionKeys = selector.selectedKeys();              Iterator<SelectionKey> iter = selectionKeys.iterator();                while (iter.hasNext()) {                  SelectionKey  key  = iter. next ();                  iter.remove();                    //2.1 处理新的连接                  if ( key .isAcceptable()) {                      //接受客户端连接; 非阻塞,无客户端返回 null (操作系统返回-1)                      SocketChannel client = socketChannel.accept();                      //设置读非阻塞                      client.configureBlocking( false );                        //同样,把client也注册到selector                      client.register(selector, SelectionKey.OP_READ);                      System. out .println( "new client : "  + client.getRemoteAddress());                  }                  //2.2 处理读取数据                   else  if ( key .isReadable()) {                      readDataFromSocket( key );                  }              }          }      }        protected  static  void readDataFromSocket(SelectionKey  key ) throws Exception {          SocketChannel socketChannel = (SocketChannel)  key .channel();          // 非阻塞, >0 表示读取到的字节数量, 0或-1表示未读取到或读取异常          // 请注意:这个例子降低复杂度,不考虑报文大于buffer  size 的情况           int  num = socketChannel. read (buffer);            if (num > 0) {              buffer.flip();              byte[] clientBytes = new byte[buffer.limit()];              //从缓冲区 读取到内存中              buffer.get(clientBytes);                System. out .println(socketChannel.socket().getPort() +  ":"  + new String(clientBytes));                //清空缓冲区              buffer.clear();          }      }  } 

2 发生的系统调用

启动

socket(AF_INET, SOCK_STREAM, IPPROTO_IP) = 4  bind(4, {sa_family=AF_INET, sin_port=htons(9090), sin_addr=inet_addr( "0.0.0.0" )}, 16) = 0  listen(4, 50)  fcntl(4, F_SETFL, O_RDWR|O_NONBLOCK)    = 0  epoll_create(256)                       = 7  epoll_ctl(7, EPOLL_CTL_ADD, 5, {EPOLLIN, {u32=5, u64=4324783852322029573}}) = 0  epoll_ctl(7, EPOLL_CTL_ADD, 4, {EPOLLIN, {u32=4, u64=158913789956}}) = 0  epoll_wait(7 

关于对epoll_create(对应着Java的 Selector selector = Selector.open()) 的说明,本质上是在内存的操作系统保留区,创建一个epoll数据结构。用于后面当有client连接时,向该epoll区中添加监听。

有连接

epoll_wait(7,[{EPOLLIN, {u32=4, u64=158913789956}}], 8192, -1) = 1  accept(4, {sa_family=AF_INET, sin_port=htons(29597), sin_addr=inet_addr( "42.120.74.252" )}, [16]) = 8  fcntl(8, F_SETFL, O_RDWR|O_NONBLOCK)    = 0  epoll_ctl(7, EPOLL_CTL_ADD, 8, {EPOLLIN, {u32=8, u64=3212844375897800712}}) = 0 

关于epoll_ctl (对应着Java的 client.register(selector, SelectionKey.OP_READ) )。其中 EPOLLIN 恰好对应着Java的 SelectionKey.OP_READ 即监听数据到达读取事件。

客户端发送数据

epoll_wait(7,[{EPOLLIN, {u32=8, u64=3212844375897800712}}], 8192, -1) = 1  read (8,  "hello,multiplex\n" , 4096)      = 16  epoll_wait(7, 

note:epoll_wait第四个参数-1表示block。

poll 和 epoll 对比

根据[1.BIO]中的poll函数调用和epoll函数对比如下:

poll和epoll本质上都是同步IO, 区别于BIO的是 多路复用充分降低了 system call,而epoll更进一步,再次降低了system call的时间复杂度。

3 优缺点

优点

线程数同样很少,甚至可以把acceptor线程和worker线程使用同一个。 时间复杂度低,Java实现的Selector(在Linux OS下使用的epoll函数)支持多个clientChannel事件的一次性获取,且时间复杂度维持在O(1)。 CPU使用低:得益于Selector,我们不用向 [2.NIO]中需要自己一个个ClientChannel手动去检查事件,因此使得CPU使用率大大降低。

缺点

数据处理麻烦:目前socketChannel.read 读取数据完全是基于字节的,当我们需要需要作为HTTP服务网关时,对于HTTP协议的处理完全需要自己解析,这是个庞大、烦杂、容易出错的工作。 性能 现有socket数据的读取(socketChannel.read(buffer))全部通过一个buffer 缓冲区来接受,一旦连接多起来,这无疑是一个单线程读取,性能无疑是个问题。 那么此时buffer我们每次读取都重新new出来呢?如果每次都new出来,这样的内存碎片对于GC无疑是一场灾难。如何平衡地协调好buffer的共享,既保证性能,又保证线程安全,这是个难题。

四 Netty

 

1 研究的目标源码(netty提供的入门example)

TelnetServer

package telnet;    import io.netty.bootstrap.ServerBootstrap;  import io.netty.channel.EventLoopGroup;  import io.netty.channel.nio.NioEventLoopGroup;  import io.netty.channel.socket.nio.NioServerSocketChannel;  import io.netty.handler.logging.LogLevel;  import io.netty.handler.logging.LoggingHandler;  import io.netty.handler.ssl.SslContext;  import io.netty.handler.ssl.SslContextBuilder;  import io.netty.handler.ssl.util.SelfSignedCertificate;    /**   * Simplistic telnet server.   */  public  final class TelnetServer {         static  final boolean SSL = System.getProperty( "ssl" ) !=  null ;       static  final  int  PORT =  Integer .parseInt(System.getProperty( "port" , SSL?  "8992"  :  "8023" ));         public   static  void main(String[] args) throws Exception {          // Configure SSL.          final SslContext sslCtx;          if (SSL) {              SelfSignedCertificate ssc = new SelfSignedCertificate();              sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();          }  else  {              sslCtx =  null ;          }            EventLoopGroup bossGroup = new NioEventLoopGroup(1);          EventLoopGroup workerGroup = new NioEventLoopGroup();          try {              ServerBootstrap b = new ServerBootstrap();              b. group (bossGroup, workerGroup)               .channel(NioServerSocketChannel.class)               .handler(new LoggingHandler(LogLevel.INFO))               .childHandler(new TelnetServerInitializer(sslCtx));                b.bind(PORT).sync().channel().closeFuture().sync();          } finally {              bossGroup.shutdownGracefully();              workerGroup.shutdownGracefully();          }      }  } 

TelnetServerHandler

package telnet;    import io.netty.channel.ChannelFuture;  import io.netty.channel.ChannelFutureListener;  import io.netty.channel.ChannelHandler.Sharable;  import io.netty.channel.ChannelHandlerContext;  import io.netty.channel.SimpleChannelInboundHandler;    import java.net.InetAddress;  import java.util. Date ;    /**   * Handles a server-side channel.   */  @Sharable  public  class TelnetServerHandler extends SimpleChannelInboundHandler<String> {        @Override       public  void channelActive(ChannelHandlerContext ctx) throws Exception {          // Send greeting  for  a new  connection .          ctx.write( "Welcome to "  + InetAddress.getLocalHost().getHostName() +  "!\r\n" );          ctx.write( "It is "  + new  Date () +  " now.\r\n" );          ctx.flush();      }        @Override       public  void channelRead0(ChannelHandlerContext ctx, String request) throws Exception {          // Generate  and  write a response.          String response;          boolean  close  =  false ;          if (request.isEmpty()) {              response =  "Please type something.\r\n" ;          }  else  if ( "bye" .equals(request.toLowerCase())) {              response =  "Have a good day!\r\n" ;               close  =  true ;          }  else  {              response =  "Did you say '"  + request +  "'?\r\n" ;          }            // We do  not  need  to  write a ChannelBuffer here.          // We know the encoder inserted  at  TelnetPipelineFactory will do the conversion.          ChannelFuture future = ctx.write(response);            //  Close  the  connection   after  sending  'Have a good day!'           // if the client has sent  'bye' .          if ( close ) {              future.addListener(ChannelFutureListener. CLOSE );          }      }        @Override       public  void channelReadComplete(ChannelHandlerContext ctx) {          ctx.flush();      }        @Override       public  void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {          cause.printStackTrace();          ctx. close ();      }  } 

TelnetServerInitializer

package telnet;    import io.netty.channel.ChannelInitializer;  import io.netty.channel.ChannelPipeline;  import io.netty.channel.socket.SocketChannel;  import io.netty.handler.codec.DelimiterBasedFrameDecoder;  import io.netty.handler.codec.Delimiters;  import io.netty.handler.codec.string.StringDecoder;  import io.netty.handler.codec.string.StringEncoder;  import io.netty.handler.ssl.SslContext;    /**   * Creates a newly configured {@link ChannelPipeline}  for  a new channel.   */  public  class TelnetServerInitializer extends ChannelInitializer<SocketChannel> {        private  static  final StringDecoder DECODER = new StringDecoder();      private  static  final StringEncoder ENCODER = new StringEncoder();        private  static  final TelnetServerHandler SERVER_HANDLER = new TelnetServerHandler();        private final SslContext sslCtx;         public  TelnetServerInitializer(SslContext sslCtx) {          this.sslCtx = sslCtx;      }        @Override       public  void initChannel(SocketChannel ch) throws Exception {          ChannelPipeline pipeline = ch.pipeline();            if (sslCtx !=  null ) {              pipeline.addLast(sslCtx.newHandler(ch.alloc()));          }            //  Add  the text line codec combination  first ,          pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));          // the encoder  and  decoder are  static   as  these are sharable          pipeline.addLast(DECODER);          pipeline.addLast(ENCODER);            //  and   then  business logic.          pipeline.addLast(SERVER_HANDLER);      }  } 

2 启动后的系统调用

主线程(23109)

## 256无实际作用,这里只为了兼容旧版kernel api  epoll_create(256)                       = 7epoll_ctl(7, EPOLL_CTL_ADD, 5, {EPOLLIN, {u32=5, u64=5477705356928876549}}) = 0    epoll_create(256)                       = 10epoll_ctl(10, EPOLL_CTL_ADD, 8, {EPOLLIN, {u32=8, u64=17041805914081853448}}) = 0    epoll_create(256)                       = 13  epoll_ctl(13, EPOLL_CTL_ADD, 11, {EPOLLIN, {u32=11, u64=17042151607409573899}}) = 0    epoll_create(256)                       = 16  epoll_ctl(16, EPOLL_CTL_ADD, 14, {EPOLLIN, {u32=14, u64=17042497300737294350}}) = 0    epoll_create(256)                       = 19  epoll_ctl(19, EPOLL_CTL_ADD, 17, {EPOLLIN, {u32=17, u64=17042561450368827409}}) = 0    epoll_create(256)                       = 10  socket(AF_INET, SOCK_STREAM, IPPROTO_IP) = 20  clone(child_stack=0x7fc3c509afb0, flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID, parent_tidptr=0x7fc3c509b9d0, tls=0x7fc3c509b700, child_tidptr=0x7fc3c509b9d0) = 23130 

概括为:

向OS新建socket,并开启clone boss线程23130。 为BOSS创建了一个epoll(论证参见下面[boss]),每个worker创建一个epoll数据结构(本质上是在kernel内存区创建了一个数据结构,用于后续监听)。 创建boss线程监听的socket(本质上在kernel中创建一个数据结构)。

boss(23130)

bind(20, {sa_family=AF_INET, sin_port=htons(8023), sin_addr=inet_addr( "0.0.0.0" )}, 16) = 0  listen(20, 128)                         = 0  getsockname(20, {sa_family=AF_INET, sin_port=htons(8023), sin_addr=inet_addr( "0.0.0.0" )}, [16]) = 0  getsockname(20, {sa_family=AF_INET, sin_port=htons(8023), sin_addr=inet_addr( "0.0.0.0" )}, [16]) = 0     ##将fd为7号epoll和fd为20号的socket绑定,事件:epoll_ctl_add和epoll_ctl_mod  epoll_ctl(7, EPOLL_CTL_ADD, 20, {EPOLLIN, {u32=20, u64=14198059139132817428}}) = 0  epoll_ctl(7, EPOLL_CTL_MOD, 20, {EPOLLIN, {u32=20, u64=20}}) = 0  epoll_wait(7, [{EPOLLIN, {u32=5, u64=17295150779149058053}}], 8192, 1000) = 1  epoll_wait(7, [], 8192, 1000)           = 0(不断轮训,1S超时一次) 

概括为:

将上一步中main线程创建的fd:20绑定端口8023,并开启监听(网卡负责监听和接受连接和数据,kernel则负责路由到具体进程,具体参见:关于socket和bind和listen,TODO )。 将7号socket对应的fd绑定到20号对应的epoll数据结构上去(都是操作kernel中的内存)。 开始1S中一次阻塞等待epoll有任何连接或数据到达。

3 客户端连接

boss (23130)

accept(20, {sa_family=AF_INET, sin_port=htons(11144), sin_addr=inet_addr( "42.120.74.122" )}, [16]) = 24  getsockname(24, {sa_family=AF_INET, sin_port=htons(8023), sin_addr=inet_addr( "192.168.0.120" )}, [16]) = 0  getsockname(24, {sa_family=AF_INET, sin_port=htons(8023), sin_addr=inet_addr( "192.168.0.120" )}, [16]) = 0  setsockopt(24, SOL_TCP, TCP_NODELAY, [1], 4) = 0  getsockopt(24, SOL_SOCKET, SO_SNDBUF, [87040], [4]) = 0  getsockopt(24, SOL_SOCKET, SO_SNDBUF, [87040], [4]) = 0  ##抛出  work 线程  clone(child_stack=0x7fc3c4c98fb0, flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID, parent_tidptr=0x7fc3c4c999d0, tls=0x7fc3c4c99700, child_tidptr=0x7fc3c4c999d0) = 2301 

worker (2301)

writev(24, [{ "Welcome to iZbp14e1g9ztpshfrla9m" ..., 37}, { "It is Sun Aug 23 15:44:14 CST 20" ..., 41}], 2) = 78  epoll_ctl(13, EPOLL_CTL_ADD, 24, {EPOLLIN, {u32=24, u64=24}}) = 0  epoll_ctl(13, EPOLL_CTL_MOD, 24, {EPOLLIN, {u32=24, u64=14180008216221450264}}) = 0  epoll_wait(13, [{EPOLLIN, {u32=11, u64=17042151607409573899}}], 8192, 1000) = 1   read (11,  "\1" , 128)                     = 1  ##开始无限loop  epoll_wait(13, [], 8192, 1000)          = 0  epoll_wait(13, [{EPOLLIN, {u32=24, u64=24}}], 8192, 1000) = 1 

概括:

当BOSS轮训epoll_wait等到了连接后,首先accept得到该socket对应的fd。 连接建立后 BOSS立马抛出一个线程(clone函数)。 worker(即新建的线程)写入了一段数据(这里是业务逻辑)。 worker将该client对应的fd绑定到了13号epoll上。 worker继续轮训监听13号epoll。

4 客户端主动发送数据

worker(2301)

read (24,  "i am daojian\r\n" , 1024)      = 14  write(24,  "Did you say 'i am daojian'?\r\n" , 29) = 29  ##继续无限loop  epoll_wait(13, [], 8192, 1000)          = 0 

概括为:

wait到数据后,立即read到用户控件内存中(读取1024个字节到 用户控件某个buff中)。 写入数据(业务逻辑,不必太关注)。 继续轮训等待13号epoll。

5 客户端发送bye报文,服务器断开TCP连接

worker(2301)

read (24,  "bye\r\n" , 1024)               = 5  write(24,  "Have a good day!\r\n" , 18)   = 18  getsockopt(24, SOL_SOCKET, SO_LINGER, {onoff=0, linger=0}, [8]) = 0  dup2(25, 24)                            = 24  ##从epoll数据结构中(OS)中删除fd为24的socket  epoll_ctl(13, EPOLL_CTL_DEL, 24, 0x7f702dd531e0) = -1 ENOENT  ##关闭24 socket  close (24)                               = 0  ##继续等待13 epoll数据  epoll_wait(13, [], 8192, 1000)          = 0 

断开客户端连接概括为:

从epoll中删除该客户端对应的fd(这里触发源头没找到,可能是boss)。 close关闭客户端24号fd。 继续轮训epoll。

6 五个客户端同时连接

boss线程(23130)

accept(20, {sa_family=AF_INET, sin_port=htons(1846), sin_addr=inet_addr( "42.120.74.122" )}, [16]) = 24  clone(child_stack=0x7f702cc51fb0, flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID, parent_tidptr=0x7f702cc529d0, tls=0x7f702cc52700, child_tidptr=0x7f702cc529d0) = 10035    accept(20, {sa_family=AF_INET, sin_port=htons(42067), sin_addr=inet_addr( "42.120.74.122" )}, [16]) = 26  clone(child_stack=0x7f702cb50fb0, flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID, parent_tidptr=0x7f702cb519d0, tls=0x7f702cb51700, child_tidptr=0x7f702cb519d0) = 10067    ... 

woker线程(10035,第一个连接)

epoll_ctl(13, EPOLL_CTL_ADD, 24, {EPOLLIN, {u32=24, u64=24}}) = 0  epoll_ctl(13, EPOLL_CTL_MOD, 24, {EPOLLIN, {u32=24, u64=3226004877247250456}}) = 0  epoll_wait(13, [{EPOLLIN, {u32=11, u64=17042151607409573899}}], 8192, 1000) = 1                  = 1  epoll_wait(13, [], 8192, 1000)          = 0 

worker线程(10067,第二个连接)

epoll_ctl(16, EPOLL_CTL_ADD, 26, {EPOLLIN, {u32=26, u64=26}}) = 0  epoll_ctl(16, EPOLL_CTL_MOD, 26, {EPOLLIN, {u32=26, u64=3221483685433835546}}) = 0  epoll_wait(16, [{EPOLLIN, {u32=14, u64=17042497300737294350}}], 8192, 1000) = 1  epoll_wait(16, [], 8192, 1000)          = 0  epoll_wait(16, [], 8192, 1000)          = 0 

worker线程(10067,第二个连接)

epoll_ctl(19, EPOLL_CTL_ADD, 27, {EPOLLIN, {u32=27, u64=27}}) = 0  epoll_ctl(19, EPOLL_CTL_MOD, 27, {EPOLLIN, {u32=27, u64=3216966479350071323}}) = 0 

worker线程(8055,第四个连接)

epoll_ctl(10, EPOLL_CTL_ADD, 28, {EPOLLIN, {u32=28, u64=28}}) = 0  epoll_ctl(10, EPOLL_CTL_MOD, 28, {EPOLLIN, {u32=28, u64=3302604828697427996}}) = 0 

worker线程(10035,第五个连接,不在clone线程,而是复用了第一个epoll对应的worker)

epoll_ctl(13, EPOLL_CTL_ADD, 29, {EPOLLIN, {u32=29, u64=29}}) = 0  epoll_ctl(13, EPOLL_CTL_MOD, 29, {EPOLLIN, {u32=29, u64=29}}) = 0 

概括为:

epoll和boss、worker之间的关系:一共有4个worker对应着4个epoll对象,boss和每个worker都有对应自己的epoll。 boss根据epoll数量,平衡分配连接到每个worker对应的epoll中。

7 总结

下图通过对系统调用的调查得出 netty 和 kernel 交互图:

初始化直接创建5个epoll,其中7号为boss使用,专门用于处理和客户端连接;其余4个用来给worker使用,用户处理和客户端的数据交互。

work的线程数量,取决于初始化时创建了几个epoll,worker的复用本质上是epoll的复用。

work之间为什么要独立使用epoll?为什么不共享?

为了避免各个worker之间发生争抢连接处理,netty直接做了物理隔离,避免竞争。各个worker只负责处理自己管理的连接,并且后续该worker中的每个client的读写操作完全由 该线程单独处理,天然避免了资源竞争,避免了锁。 worker单线程,性能考虑:worker不仅仅要epoll_wait,还是处理read、write逻辑,加入worker处理了过多的连接,势必造成这部分消耗时间片过多,来不及处理更多连接,性能下降。

8 优缺点

优点

数据处理:netty提供了大量成熟的数据处理组件(ENCODER、DECODER),HTTP、POP3拿来即用。 编码复杂度、可维护性:netty充分使得业务逻辑与网络处理解耦,只需要少量的BootStrap配置即可,更多的集中在业务逻辑处理上。 性能:netty提供了的ByteBuf(底层Java原生的ByteBuffer),提供了池化的ByteBuf,兼顾读取性能和ByteBuf内存分配(在后续文档中会再做详解)。

缺点

入门有一定难度。

五 AIO

 

1 启动

main线程

epoll_create(256)                       = 5  epoll_ctl(5, EPOLL_CTL_ADD, 6, {EPOLLIN, {u32=6, u64=11590018039084482566}}) = 0    ##创建BOSS 线程(Proactor)  clone(child_stack=0x7f340ac06fb0, flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID, parent_tidptr=0x7f340ac079d0, tls=0x7f340ac07700, child_tidptr=0x7f340ac079d0) = 22704    socket(AF_INET6, SOCK_STREAM, IPPROTO_IP) = 8  setsockopt(8, SOL_IPV6, IPV6_V6ONLY, [0], 4) = 0  setsockopt(8, SOL_SOCKET, SO_REUSEADDR, [1], 4) = 0  bind(8, {sa_family=AF_INET6, sin6_port=htons(9090), inet_pton(AF_INET6,  "::" , &sin6_addr), sin6_flowinfo=0, sin6_scope_id=0}, 28) = 0  listen(8, 50)    accept(8, 0x7f67d01b3120, 0x7f67d9246690) = -1  epoll_ctl(5, EPOLL_CTL_MOD, 8, {EPOLLIN|EPOLLONESHOT, {u32=8, u64=15380749440025362440}}) = -1 ENOENT ( No  such file  or  directory)  epoll_ctl(5, EPOLL_CTL_ADD, 8, {EPOLLIN|EPOLLONESHOT, {u32=8, u64=15380749440025362440}}) = 0  read (0, 

22704(BOSS 线程(Proactor))

epoll_wait(5,  <unfinished ...> 

2 请求连接

22704(BOSS 线程(Proactor))处理连接

epoll_wait(5,[{EPOLLIN, {u32=9, u64=4294967305}}], 512, -1) = 1  accept(8, {sa_family=AF_INET6, sin6_port=htons(55320), inet_pton(AF_INET6,  "::ffff:36.24.32.140" , &sin6_addr), sin6_flowinfo=0, sin6_scope_id=0}, [28]) = 9  clone(child_stack=0x7ff35c99ffb0, flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID, parent_tidptr=0x7ff35c9a09d0, tls=0x7ff35c9a0700, child_tidptr=0x7ff35c9a09d0) = 26241  epoll_wait(5,  <unfinished ...> 

26241

#将client 连接的FD加入到BOSS的epoll中,以便BOSS线程监听网络事件  epoll_ctl(5, EPOLL_CTL_MOD, 9, {EPOLLIN|EPOLLONESHOT, {u32=9, u64=4398046511113}}) = -1 ENOENT ( No  such file  or  directory)  epoll_ctl(5, EPOLL_CTL_ADD, 9, {EPOLLIN|EPOLLONESHOT, {u32=9, u64=4398046511113}}) = 0  accept(8, 0x7ff3440008c0, 0x7ff35c99f4d0) = -1 EAGAIN (Resource temporarily unavailable)  epoll_ctl(5, EPOLL_CTL_MOD, 8, {EPOLLIN|EPOLLONESHOT, {u32=8, u64=8}}) = 0 

3 客户端发送数据

22704(BOSS 线程(Proactor))处理连接

epoll_wait(5,[{EPOLLIN, {u32=9, u64=4294967305}}], 512, -1) = 1  ##数据读出  read (9,  "daojian111\r\n" , 1024)         = 12  ##数据处理交给其他线程,这里由于线程池为空,需要先clone线程  clone(child_stack=0x7ff35c99ffb0, flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID, parent_tidptr=0x7ff35c9a09d0, tls=0x7ff35c9a0700, child_tidptr=0x7ff35c9a09d0) = 26532 

复制线程处理,线程号26532

write(1,  "pool-1-thread-2-10received : dao" ..., 41) = 41  write(1,  "\n" , 1)  accept(8, 0x7f11c400b5f0, 0x7f11f42fd4d0) = -1 EAGAIN (Resource temporarily unavailable)  epoll_ctl(5, EPOLL_CTL_MOD, 8, {EPOLLIN|EPOLLONESHOT, {u32=8, u64=8}}) = 0 

4 总结

从系统调用角度,Java的AIO事实上是以多路复用(Linux上为epoll)等同步IO为基础,自行实现了异步事件分发。 BOSS Thread负责处理连接,并分发事件。 WORKER Thread只负责从BOSS接收的事件执行,不负责任何网络事件监听。

5 优缺点

优点

相比于前面的BIO、NIO,AIO已经封装好了任务调度,使用时只需关心任务处理。

缺点

事件处理完全由Thread Pool完成,对于同一个channel的多个事件可能会出现并发问题。 相比netty,buffer API不友好容易出错;编解码工作复杂。

原文链接:https://zhuanlan.51cto.com/art/202106/669120.htm

查看更多关于从操作系统层面分析Java IO演进之路的详细内容...

  阅读:14次