好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

一次搞懂Java三种IO模型

大家好,我是老三,上一节我们讨论了Linux的五种IO模型,接下来,我们从Java语言层面,来看看对IO的实现。

在Java中,一共有三种IO模型,分别是阻塞IO(BIO)、非阻塞IO(NIO)和异步IO(AIO)。

Linux五种IO模型和Java三种IO模型

Java BIO

Java BIO就是Java的传统IO模型,对应了操作系统IO模型里的阻塞IO。

Java BIO相关的实现都位于java.io包下,其通信原理是客户端、服务端之间通过Socket套接字建立管道连接,然后从管道中获取对应的输入/输出流,最后利用输入/输出流对象实现发送/接收信息。

我们来看个Demo:

BioServer:

/**    * @Author 三分恶    * @Date 2023/4/30    * @Description BIO服务端    */   public   class  BioServer {         public   static   void  main(String[] args)  throws  IOException {           //定义一个ServerSocket服务端对象,并为其绑定端口号           ServerSocket server =  new  ServerSocket(8888);          System.out.println( "===========BIO服务端启动================" );           //对BIO来讲,每个Socket都需要一个Thread            while  ( true ) {               //监听客户端Socket连接               Socket socket = server.accept();               new  BioServerThread(socket).start();          }        }         /**        * BIO Server线程        */        static   class  BioServerThread  extends  Thread{           //socket连接            private  Socket socket;           public  BioServerThread(Socket socket){               this .socket=socket;          }            @Override           public   void  run() {               try  {                   //从socket中获取输入流                   InputStream inputStream=socket.getInputStream();                   //转换为                   BufferedReader bufferedReader= new  BufferedReader( new  InputStreamReader(inputStream));                  String msg;                   //从Buffer中读取信息,如果读取到信息则输出                    while ((msg=bufferedReader.readLine())!= null ){                      System.out.println( "收到客户端消息:" +msg);                  }                     //从socket中获取输出流                   OutputStream outputStream=socket.getOutputStream();                  PrintStream printStream= new  PrintStream(outputStream);                   //通过输出流对象向客户端传递信息                   printStream.println( "你好,吊毛!" );                   //清空输出流                   printStream.flush();                   //关闭socket                   socket.shutdownOutput();              }  catch  (IOException e) {                  e.printStackTrace();              }          }      }  } 

BioClient

/**    * @Author 三分恶    * @Date 2023/4/30    * @Description BIO客户端    */   public   class  BioClient {         public   static   void  main(String[] args)  throws  IOException {          List<String> names= Arrays.asList( "帅哥" , "靓仔" , "坤坤" );           //通过循环创建多个多个client            for  (String name:names){               //创建socket并根据IP地址与端口连接服务端               Socket socket= new  Socket( "127.0.0.1" ,8888);              System.out.println( "===========BIO客户端启动================" );               //从socket中获取字节输出流               OutputStream outputStream=socket.getOutputStream();               //通过输出流向服务端传递信息               String hello= "你好," +name+ "!" ;              outputStream.write(hello.getBytes());               //清空流,关闭socket输出               outputStream.flush();              socket.shutdownOutput();                 //从socket中获取字节输入流               InputStream inputStream=socket.getInputStream();              BufferedReader bufferedReader= new  BufferedReader( new  InputStreamReader(inputStream));               //读取服务端消息               String msg;               while ((msg=bufferedReader.readLine())!= null ){                  System.out.println( "收到服务端消息:" +msg);              }              inputStream.close();              outputStream.close();              socket.close();          }      }  } 

先启动BioServer,再启动BioClient,运行结果

===========BIO服务端启动================  收到客户端消息:你好,帅哥!  收到客户端消息:你好,靓仔!  收到客户端消息:你好,坤坤! 

===========BIO客户端启动================  收到服务端消息:你好,吊毛!  ===========BIO客户端启动================  收到服务端消息:你好,吊毛!  ===========BIO客户端启动================  收到服务端消息:你好,吊毛! 

在上述Java-BIO的通信过程中,如果客户端一直没有发送消息过来,服务端则会一直等待下去,从而服务端陷入阻塞状态。同理,由于客户端也一直在等待服务端的消息,如果服务端一直未响应消息回来,客户端也会陷入阻塞状态。

在BioServer定义了一个类BioServerThread,继承了Thread类,run方法里主要是通过socket和流来读取客户端的消息,以及发送消息给客户端,每处理一个客户端的Socket连接,就得新建一个线程。

同时,IO读写操作也是阻塞的,如果客户端一直没有发送消息过来,线程就会进入阻塞状态,一直等待下去。

在BioClient里,循环创建Socket,向服务端收发消息,客户端的读写也是阻塞的。

在这个Demo里就体现了BIO的两个特点:

一个客户端连接对应一个处理线程 读写操作都是阻塞的

Java BIO

毫无疑问,不管是创建太多线程,还是阻塞读写,都会浪费服务器的资源。

Java NIO

那么我们就进入Java的下一种IO模型——Java NIO,它对应操作系统IO模型中的多路复用IO,底层采用了epoll实现。

Java-NIO则是JDK1.4中新引入的API,它在BIO功能的基础上实现了非阻塞式的特性,其所有实现都位于java.nio包下。NIO是一种基于通道、面向缓冲区的IO操作,相较BIO而言,它能够更为高效的对数据进行读写操作,同时与原先的BIO使用方式也大有不同。

我们还是先来看个Demo:

NioServer

/**    * @Author 三分恶    * @Date 2023/4/30    * @Description NIO服务端    */   public   class  NioServer {         public   static   void  main(String[] args)  throws  IOException {           //创建一个选择器selector           Selector selector= Selector.open();           //创建serverSocketChannel           ServerSocketChannel serverSocketChannel=ServerSocketChannel.open();           //绑定端口           serverSocketChannel.socket().bind( new  InetSocketAddress(8888));           //必须得设置成非阻塞模式           serverSocketChannel.configureBlocking( false );           //将channel注册到selector并设置监听事件为ACCEPT           serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);          System.out.println( "===========NIO服务端启动============" );           while ( true ){               //超时等待                if (selector.select(1000)==0){                  System.out.println( "===========NIO服务端超时等待============" );                   continue ;              }               // 有客户端请求被轮询监听到,获取返回的SelectionKey集合               Iterator<SelectionKey> iterator=selector.selectedKeys().iterator();               //迭代器遍历SelectionKey集合                while  (iterator.hasNext()){                  SelectionKey key=iterator.next();                   // 判断是否为ACCEPT事件                    if  (key.isAcceptable()){                       // 处理接收请求事件                       SocketChannel socketChannel=((ServerSocketChannel) key.channel()).accept();                       //非阻塞模式                       socketChannel.configureBlocking( false );                       // 注册到Selector并设置监听事件为READ                       socketChannel.register(selector,SelectionKey.OP_READ, ByteBuffer.allocate(1024));                      System.out.println( "成功连接客户端" );                  }                   //判断是否为READ事件                    if  (key.isReadable()){                      SocketChannel socketChannel = (SocketChannel) key.channel();                         try  {                           // 获取以前设置的附件对象,如果没有则新建一个                           ByteBuffer buffer = (ByteBuffer) key.attachment();                           if  (buffer ==  null ) {                              buffer = ByteBuffer.allocate(1024);                              key.attach(buffer);                          }                           // 清空缓冲区                           buffer.clear();                           // 将通道中的数据读到缓冲区                            int  len = socketChannel.read(buffer);                           if  (len > 0) {                              buffer.flip();                              String message =  new  String(buffer.array(), 0, len);                              System.out.println( "收到客户端消息:"  + message);                          }  else   if  (len < 0) {                               // 接收到-1,表示连接已关闭                               key.cancel();                              socketChannel.close();                               continue ;                          }                           // 注册写事件,下次向客户端发送消息                           socketChannel.register(selector, SelectionKey.OP_WRITE, buffer);                      }  catch  (IOException e) {                           // 取消SelectionKey并关闭对应的SocketChannel                           key.cancel();                          socketChannel.close();                      }                  }                   //判断是否为WRITE事件                    if  (key.isWritable()){                      SocketChannel socketChannel = (SocketChannel) key.channel();                       //获取buffer                       ByteBuffer buffer = (ByteBuffer) key.attachment();                      String hello =  "你好,坤坤!" ;                       //清空buffer                       buffer.clear();                       //buffer中写入消息                       buffer.put(hello.getBytes());                      buffer.flip();                       //向channel中写入消息                       socketChannel.write(buffer);                      buffer.clear();                      System.out.println( "向客户端发送消息:"  + hello);                       // 设置下次读写操作,向 Selector 进行注册                       socketChannel.register(selector, SelectionKey.OP_READ, buffer);                  }                   // 移除本次处理的SelectionKey,防止重复处理                   iterator.remove();              }          }        }  } 

NioClient

public   class  NioClient {         public   static   void  main(String[] args)  throws  IOException {           // 创建SocketChannel并指定ip地址和端口号           SocketChannel socketChannel = SocketChannel.open( new  InetSocketAddress( "127.0.0.1" , 8888));          System.out.println( "==============NIO客户端启动================" );           // 非阻塞模式           socketChannel.configureBlocking( false );          String hello= "你好,靓仔!" ;          ByteBuffer buffer = ByteBuffer.wrap(hello.getBytes());           // 向通道中写入数据           socketChannel.write(buffer);          System.out.println( "发送消息:"  + hello);          buffer.clear();           // 将channel注册到Selector并监听READ事件           socketChannel.register(Selector.open(), SelectionKey.OP_READ, buffer);           while  ( true ) {               // 读取服务端数据                if  (socketChannel.read(buffer) > 0) {                  buffer.flip();                  String msg =  new  String(buffer.array(), 0, buffer.limit());                  System.out.println( "收到服务端消息:"  + msg);                   break ;              }          }           // 关闭输入流           socketChannel.shutdownInput();           // 关闭SocketChannel连接           socketChannel.close();      }  } 

先运行NioServer,再运行NioClient,运行结果:

===========NIO服务端启动============  ===========NIO服务端超时等待============  ===========NIO服务端超时等待============  成功连接客户端  收到客户端消息:你好,靓仔!  向客户端发送消息:你好,坤坤! 

==============NIO客户端启动================  发送消息:你好,靓仔!  收到服务端消息:你好,坤坤! 

我们在这个案例里实现了一个比较简单的Java NIO 客户端服务端通信,里面有两个小的点需要注意,注册到选择器上的通道都必须要为非阻塞模型,同时通过缓冲区传输数据时,必须要调用flip()方法切换为读取模式。

代码流程示意图

Java-NIO中有三个核心概念:**Buffer(缓冲区)、Channel(通道)、Selector(选择器)**。

Java NIO

每个客户端连连接本质上对应着一个Channel通道,每个通道都有自己的Buffer缓冲区来进行读写,这些Channel被Selector选择器管理调度 Selector负责轮询所有已注册的Channel,监听到有事件发生,才提交给服务端线程处理,服务端线程不需要做任何阻塞等待,直接在Buffer里处理Channel事件的数据即可,处理完马上结束,或返回线程池供其他客户端事件继续使用。 通过Selector,服务端的一个Thread就可以处理多个客户端的请求 Buffer(缓冲区)就是饭店用来存放食材的储藏室,当服务员点餐时,需要从储藏室中取出食材进行制作。 Channel(通道)是用于传输数据的车道,就像饭店里的上菜窗口,可以快速把点好的菜品送到客人的桌上。 Selector(选择器)就是大堂经理,负责协调服务员、厨师和客人的配合和沟通,以保证整个就餐过程的效率和顺畅。

Java AIO

Java-AIO也被成为NIO2,它是在NIO的基础上,引入了新的异步通道的概念,并提供了异步文件通道和异步套接字的实现。

异步通道的实现体系

它们的主要区别就在于这个异步通道,见名知意:使用异步通道去进行IO操作时,所有操作都为异步非阻塞的,当调用read()/write()/accept()/connect()方法时,本质上都会交由操作系统去完成,比如要接收一个客户端的数据时,操作系统会先将通道中可读的数据先传入read()回调方法指定的缓冲区中,然后再主动通知Java程序去处理。

我们还是先来看个Demo:

AioServer

/**    * @Author 三分恶    * @Date 2023/5/1    * @Description AIO服务端    */   public   class  AioServer {         public   static   void  main(String[] args)  throws  Exception {           // 创建异步通道组,处理IO事件           AsynchronousChannelGroup group = AsynchronousChannelGroup.withFixedThreadPool(10, Executors.defaultThreadFactory());           //创建异步服务器Socket通道,并绑定端口           AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open(group).bind( new  InetSocketAddress(8888));          System.out.println( "=============AIO服务端启动=========" );             // 异步等待接收客户端连接           server.accept( null ,  new  CompletionHandler<AsynchronousSocketChannel, Object>() {               // 创建ByteBuffer                final  ByteBuffer buffer = ByteBuffer.allocate(1024);                @Override               public   void  completed(AsynchronousSocketChannel channel, Object attachment) {                  System.out.println( "客户端连接成功" );                   try  {                      buffer.clear();                       // 异步读取客户端发送的消息                       channel.read(buffer,  null ,  new  CompletionHandler<Integer, Object>() {                          @Override                           public   void  completed(Integer len, Object attachment) {                              buffer.flip();                              String message =  new  String(buffer.array(), 0, len);                              System.out.println( "收到客户端消息:"  + message);                                 // 异步发送消息给客户端                               channel.write(ByteBuffer.wrap(( "你好,阿坤!" ).getBytes()),  null ,  new  CompletionHandler<Integer, Object>() {                                  @Override                                   public   void  completed(Integer result, Object attachment) {                                       // 关闭输出流                                        try  {                                          channel.shutdownOutput();                                      }  catch  (IOException e) {                                          e.printStackTrace();                                      }                                  }                                    @Override                                   public   void  failed(Throwable exc, Object attachment) {                                      exc.printStackTrace();                                       try  {                                          channel.close();                                      }  catch  (IOException e) {                                          e.printStackTrace();                                      }                                  }                              });                          }                            @Override                           public   void  failed(Throwable exc, Object attachment) {                              exc.printStackTrace();                               try  {                                  channel.close();                              }  catch  (IOException e) {                                  e.printStackTrace();                              }                          }                      });                  }  catch  (Exception e) {                      e.printStackTrace();                  }                   // 继续异步等待接收客户端连接                   server.accept( null ,  this );              }                @Override               public   void  failed(Throwable exc, Object attachment) {                  exc.printStackTrace();                   // 继续异步等待接收客户端连接                   server.accept( null ,  this );              }          });           // 等待所有连接都处理完毕           group.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);      }    } 

AioClient

/**    * @Author 三分恶    * @Date 2023/5/1    * @Description AIO客户端    */   public   class  AioClient {         public   static   void  main(String[] args)  throws  Exception {           // 创建异步Socket通道           AsynchronousSocketChannel client = AsynchronousSocketChannel.open();           // 异步连接服务器           client.connect( new  InetSocketAddress( "127.0.0.1" , 8888),  null ,  new  CompletionHandler<Void, Object>() {               // 创建ByteBuffer                final  ByteBuffer buffer = ByteBuffer.wrap(( "你好,靓仔!" ).getBytes());                @Override               public   void  completed(Void result, Object attachment) {                   // 异步发送消息给服务器                   client.write(buffer,  null ,  new  CompletionHandler<Integer, Object>() {                       // 创建ByteBuffer                        final  ByteBuffer readBuffer = ByteBuffer.allocate(1024);                        @Override                       public   void  completed(Integer result, Object attachment) {                          readBuffer.clear();                           // 异步读取服务器发送的消息                           client.read(readBuffer,  null ,  new  CompletionHandler<Integer, Object>() {                              @Override                               public   void  completed(Integer result, Object attachment) {                                  readBuffer.flip();                                  String msg =  new  String(readBuffer.array(), 0, result);                                  System.out.println( "收到服务端消息:"  + msg);                              }                                @Override                               public   void  failed(Throwable exc, Object attachment) {                                  exc.printStackTrace();                                   try  {                                      client.close();                                  }  catch  (IOException e) {                                      e.printStackTrace();                                  }                              }                          });                      }                        @Override                       public   void  failed(Throwable exc, Object attachment) {                          exc.printStackTrace();                           try  {                              client.close();                          }  catch  (IOException e) {                              e.printStackTrace();                          }                      }                  });              }                @Override               public   void  failed(Throwable exc, Object attachment) {                  exc.printStackTrace();                   try  {                      client.close();                  }  catch  (IOException e) {                      e.printStackTrace();                  }              }          });           // 等待连接处理完毕           Thread.sleep(1000);           // 关闭输入流和Socket通道           client.shutdownInput();          client.close();      }  } 

看下运行结果

=============AIO服务端启动=========  客户端连接成功  收到客户端消息:你好,靓仔! 

收到服务端消息:你好,阿坤!

可以看到,所有的操作都是异步进行,通过completed接收异步回调,通过failed接收错误回调。

而且我们发现,相较于之前的NIO而言,AIO其中少了Selector选择器这个核心组件,选择器在NIO中充当了协调者的角色。

但在Java-AIO中,类似的角色直接由操作系统担当,而且不是采用轮询的方式监听IO事件,而是采用一种类似于[订阅-通知]的模式。

Java AIO简图

在AIO中,所有创建的通道都会直接在OS上注册监听,当出现IO请求时,会先由操作系统接收、准备、拷贝好数据,然后再通知监听对应通道的程序处理数据。

Java-AIO这种异步非阻塞式IO也是由操作系统进行支持的,在Windows系统中提供了一种异步IO技术:IOCP(I/O Completion Port,所以Windows下的Java-AIO则是依赖于这种机制实现。不过在Linux系统中由于没有这种异步IO技术,所以Java-AIO在Linux环境中使用的还是epoll这种多路复用技术进行模拟实现的。

因为Linux的异步IO技术实际上不太成熟,所以Java-AIO的实际应用并不是太多,比如大名鼎鼎的网络通信框架Netty就没有采用Java-AIO,而是使用Java-NIO,在代码层面,自行实现异步。

小结

那么这期我们就快速过了一下Java的三种IO机制,它们的特点,我们直接看下图:

Java三种IO模型

我们也发现,虽然Java-NIO、Java-AIO,在性能上比Java-BIO要强很多,但是可以看到,写法上一个比一个难搞,不过好在基本也没人直接用Java-NIO、Java-AIO,如果要进行网络通信,一般都会采用Netty,它对原生的Java-NIO进行了封装优化,接下来,我们会继续走近Netty,敬请期待。

参考:

[1].《Netty权威指南》

[2].https://juejin.cn/post/7130952602350534693#heading-14

[3].https://HdhCmsTestjianshu测试数据/p/670033e5b916

原文地址:https://mp.weixin.qq测试数据/s/EMVS4dnHAgOfKg_OCQWEIw

查看更多关于一次搞懂Java三种IO模型的详细内容...

  阅读:11次