NIO介绍
引用自NIO介绍
简介
NIO时Java 1.4引入的新特性。是对原来的Standard IO的扩展。
Standard IO时对字节流的读写,在进行IO之前,首先创建一个流对象,流对象进行读写操作都是按字节,一个字节一个字节的读或写。而NIO把IO抽象成块,类似磁盘的读写,每次IO操作的单位都是一个块,块被读入内存之后就是一个byte[],NIO一次可以读或写多个字节。
组件
Selector
多路复用选择器,基于“事件驱动”,其核心就是通过Selector来轮询注册在其上的Channel,当发现某个或多个Channel处于就绪状态后,从阻塞状态返回就绪的Channel的SelectionKey集合,进行I/O操作。
1 2
   | Selector selector = Selector.open(); new Thread(new ReactorTask()).start();
   | 
 
1 2 3 4 5 6 7 8 9
   |  ServerSocketChannel ssc = ServerSocketChannel.open();
  ssc.configureBlocking(false);
  ServerSocket ss = ssc.socket(); ss.bind(new InetSocketAddress(InetAddress.getByName("ip"), port));
  ssc.register(selector, SelectionKey,OP_ACCEPT);
 
  | 
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
   | while (true) {          selector.select();     Set<SelectionKey> selectionKeys = selector.selectedKeys();     Iterator<SelectionKey> it = selectionKeys.iterator();     while (it.hasNext()) {         SelectionKey key = it.next();         if (key.isAcceptable()) {                          ServerSocketChannel ssc = (ServerSocketChannel) key.channel();             SocketChannel sc = ssc.accept();             sc.configureBlocking(false);                          sc.register(selector, SelectionKey.OP_READ);         }         if (key.isReadable()) {                          SocketChannel sc = (SocketChannel) key.channel();             ByteBuffer readBuffer = ByteBuffer.allocate(1024);             int readBytes = sc.read(readBuffer);             if (readBytes > 0) {                 readBuffer.flip();                 byte[] bytes = new byte[readBuffer.remaining()];                 readBuffer.get(bytes);                 System.out.println(new String(bytes, "UTF-8"));             }         }     } }
  | 
 
Channel
Channel是NIO对IO抽象的一个新概念,NIO在进行IO时需要创建一个Channel对象,是双向的,不像Standard IO分为输入流和输出流。
Buffer
Buffer和Channel都是一起使用的,每次都是从一个Channel中读出一个Buffer或者把一个Buffer写入到一个Channel中。
1 2 3 4 5 6 7 8 9 10
   |  SocketChannel sc = (SocketChannel) key.channel(); ByteBuffer readBuffer = ByteBuffer.allocate(1024); int readBytes = sc.read(readBuffer); if (readBytes > 0) {     readBuffer.flip();     byte[] bytes = new byte[readBuffer.remaining()];     readBuffer.get(bytes);     System.out.println(new String(bytes, "UTF-8")); }
 
  | 
 
Buffer有3个重要的属性
position 正整数,指向Buffer中下一个要读取或写入的字节位置
 
limit 正整数,指向Buffer中的某个位置,在IO时只读写下标小于limit的字节内容
 
capacity 正整数,Buffer所能容纳的最大字节数
 
0 <= position <= limit <= capacity
初始状态:

从Channel中读入5个字到ByteBuffer:

flip(),准备写入或输出:
1 2 3 4 5 6
   | public final Buffer flip() {     limit = position;     position = 0;     mark = -1;     return this; }
  | 
 

输出内容后,position就移动到跟limit相同的位置上:

ByteBuffer如果要重复利用,需要清理,position和limit回到初始状态时的位置,然后可以接着中这个Buffer来读写数据,不需要再new新的Buffer:
1 2 3 4 5 6
   | public final Buffer clear() {     position = 0;     limit = capacity;     mark = -1;     return this; }
  | 
 

Netty框架
优点
1 2 3 4 5
   | <dependency>     <groupId>org.jboss.netty</groupId>     <artifactId>netty</artifactId>     <version>3.2.5.Final</version> </dependency>
   | 
 
示例
接受客户端请求并将内容打印出来,同时发送一个消息收到回执。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
   | public class NettyServer {
      private static int HEADER_LENGTH = 4;
      public void bind(int port) throws Exception {
          ServerBootstrap b = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),                                                                                   Executors.newCachedThreadPool()));
                   b.setPipelineFactory(new ChannelPipelineFactory() {
              public ChannelPipeline getPipeline() throws Exception {                 ChannelPipeline pipelines = Channels.pipeline();                 pipelines.addLast(MessageHandler.class.getName(), new MessageHandler());                 return pipelines;             }         });                  b.bind(new InetSocketAddress(port));     }
           static class MessageHandler extends SimpleChannelHandler {
          public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {                          ChannelBuffer buffer = (ChannelBuffer) e.getMessage();             String message = new String(buffer.readBytes(buffer.readableBytes()).array(), "UTF-8");             System.out.println("<服务端>收到内容=" + message);
                           byte[] body = "服务端已收到".getBytes();             byte[] header = ByteBuffer.allocate(HEADER_LENGTH).order(ByteOrder.BIG_ENDIAN).putInt(body.length).array();             Channels.write(ctx.getChannel(), ChannelBuffers.wrappedBuffer(header, body));             System.out.println("<服务端>发送回执,time=" + System.currentTimeMillis());
          }     }
      public static void main(String[] args) {         try {             new NettyServer().bind(1088);         } catch (Exception e) {             e.printStackTrace();         }     } }
  | 
 
向服务端发送一个请求,然后打印服务端响应的内容。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
   | public class NettyClient {
      private final ByteBuffer readHeader  = ByteBuffer.allocate(4).order(ByteOrder.BIG_ENDIAN);     private final ByteBuffer writeHeader = ByteBuffer.allocate(4).order(ByteOrder.BIG_ENDIAN);     private SocketChannel    channel;
      public void sendMessage(byte[] body) throws Exception {                  channel = SocketChannel.open();         channel.socket().setSoTimeout(60000);         channel.connect(new InetSocketAddress(AddressUtils.getHostIp(), 1088));
                   writeWithHeader(channel, body);
                   readHeader.clear();         read(channel, readHeader);         int bodyLen = readHeader.getInt(0);         ByteBuffer bodyBuf = ByteBuffer.allocate(bodyLen).order(ByteOrder.BIG_ENDIAN);         read(channel, bodyBuf);         System.out.println("<客户端>收到响应内容:" + new String(bodyBuf.array(), "UTF-8") + ",长度:" + bodyLen);     }
      private void writeWithHeader(SocketChannel channel, byte[] body) throws IOException {         writeHeader.clear();         writeHeader.putInt(body.length);         writeHeader.flip();                  channel.write(ByteBuffer.wrap(body));     }
      private void read(SocketChannel channel, ByteBuffer buffer) throws IOException {         while (buffer.hasRemaining()) {             int r = channel.read(buffer);             if (r == -1) {                 throw new IOException("end of stream when reading header");             }         }     }
      public static void main(String[] args) {         String body = "客户发的测试请求!";         try {             new NettyClient().sendMessage(body.getBytes());         } catch (Exception e) {             e.printStackTrace();         }     } }
  |