Netty基础—2.网络编程基础二
技术分享
11个月前 (03-12)
0
999+
大纲
1.网络编程简介
2.BIO网络编程
3.AIO网络编程
4.NIO网络编程之Buffer
5.NIO网络编程之实战
6.NIO网络编程之Reactor模式
1.网络编程简介
既然是通信,那么肯定会有两个对端。在网络编程里提供服务的一方叫服务端,连接服务端使用服务的另一方叫客户端。
如果类的名字有Server或ServerSocket,则表示这个类是给服务端用的。如果类的名字只有Socket的,则表示这个类是负责具体的网络读写的。
对于服务端来说ServerSocket只是个场所,具体和客户端沟通的是Socket。所以在网络编程里,ServerSocket并不负责具体的网络读写,ServerSocket只负责接收客户端连接后分配一个Socket处理具体网络读写。这一点对所有模式(BIO、AIO、NIO)的网络编程都适用。
在网络编程里,开发者关注的其实就三个事情:
一.建立网络连接
客户端连接服务端,服务端等待和接收连接。
二.读网络数据
三.写网络数据
所有模式的通信编程(BIO、AIO、NIO)都是围绕上面这三件事情进行的。
2.BIO网络编程
(1)BIO即阻塞IO模型
(2)BIO编程介绍
(3)BIO模型的问题与改进
(4)BIO编程的标准模式
(5)BIO编程的例子
(1)BIO即阻塞IO模型
(2)BIO编程介绍
服务端提供IP和端口,客户端通过连接操作向服务端发起连接请求。如果通过三次握手成功建立TCP连接,双方就可以通过套接字进行通信。
在传统的同步阻塞模型(BIO)开发中:服务端的ServerSocket负责绑定IP地址 + 启动监听端口,客户端的Socket负责发起连接操作。连接成功后,双方通过输入和输出流进行同步阻塞式通信。
BIO模型的服务端,通常由一个Acceptor线程负责监听客户端的连接。Acceptor线程接收到每个客户端连接请求后都会创建一个新的线程进行处理,请求处理完毕再通过输出流返回响应给客户端,然后再销毁线程。这就是典型的:一个请求一个响应模型。
(3)BIO模型的问题与改进
BIO模型的最大问题是缺乏弹性伸缩能力。当客户端并发访问量增加后,服务端线程个数和客户端并发访问数呈1:1关系。线程是比较宝贵的系统资源,线程数量快速膨胀后,系统的性能将急剧下降。随着访问量的继续增大,系统最终可能会挂掉。
为了改进这种一个连接一个线程的模型,可以使用线程池来管理这些线程,从而实现1个或多个线程处理N个客户端的模型。但是这种改进后的模型的底层还是使用同步阻塞IO,通常这种通过线程池进行改进的模型被称为伪异步IO模型。
如果线程池使用的是CachedThreadPool线程池,那么除了能自动管理线程外,客户端 : 线程数看起来也像是1 : 1。如果线程池使用的是FixedThreadPool线程池,那么就能有效控制线程数量,实现客户端 : 线程数 = N : M的伪异步IO模型。
但是正因为限制了线程数量,如果发生读取数据慢(比如数据量大、网络传输慢等)且存在大量并发的情况,那么对其他连接请求的处理就只能一直等待,这就是最大的弊端。
(4)BIO编程的标准模式
说明一:客户端指定服务端的IP地址和端口,对服务端发起连接请求。
说明二:服务端需要有一个ServerSocket,来处理客户端发起的连接请求。服务端收到连接请求后,ServerSocket的accept()方法会创建一个Socket。然后将这个Socket打包成一个Runable型的任务并放到一个线程池里执行。
说明三:在这个Runable任务里便会根据这个Socket进行具体的读写和业务处理。
以上就是BIO编程的标准模式。BIO受限于并发下的线程数量,一般用于客户端较多。尽管早期的Tomcat也使用了线程池的BIO,但毕竟性能不高。
(5)BIO编程的例子
//类说明: 客户端 //允许客户端在控制台输入数据然后送往服务器, 然后读取服务端输出的数据 //这里的示例是一个长连接, 如果要实现短连接可以在每次客户端发起完请求后将socket关闭即可 public class BioClient { //服务器端口号 public static int DEFAULT_PORT = 12345; public static String DEFAULT_SERVER_IP = "127.0.0.1"; public static void main(String[] args) throws InterruptedException, IOException { //通过构造函数创建Socket, 并且连接指定地址和端口的服务端 Socket socket = new Socket(DEFAULT_SERVER_IP, DEFAULT_PORT); System.out.println("请输入请求消息: "); //启动读取服务端输出数据的线程 new ReadMsg(socket).start(); //允许客户端在控制台输入数据, 然后送往服务器 PrintWriter pw = null; while(true) { pw = new PrintWriter(socket.getOutputStream()); pw.println(new Scanner(System.in).next()); pw.flush();//把数据通过socket刷到网络上去 } } //读取服务端输出数据的线程: 进行连接的发起和读写 private static class ReadMsg extends Thread { Socket socket; public ReadMsg(Socket socket) { this.socket = socket; } @Override public void run() { //负责处理socket读写的输入流, 使用try-resources语法 try (BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()))) { String line = null; //通过输入流读取服务端传输的数据 //如果已经读到输入流尾部, 返回null, 退出循环; 如果得到非空值, 就将结果进行业务处理 while ((line = br.readLine()) != null) { System.out.printf("客户端业务处理: %sn", line); } } catch (SocketException e) { System.out.printf("%sn", "服务器断开了你的连接"); } catch (Exception e) { e.printStackTrace(); } finally { //使用try-resources语法, 可以不用下面这段判断对资源进行关闭 clear(); } } //必要的资源清理工作 private void clear() { if (socket != null) { try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } } } //类说明: BIO的服务端主程序 //接收客户的连接, 通过ServerSocket来进行处理socket //把客户发送的连接打包成任务交给线程池去执行处理 public class BioServer { //服务器端口号 public static int DEFAULT_PORT = 12345; public static String DEFAULT_SERVER_IP = "127.0.0.1"; //服务器端必须: 用来接待用户请求, 其中的accept()方法就会产生一个socket private static ServerSocket server; //线程池: 处理每个客户端的请求 private static ExecutorService executorService = Executors.newFixedThreadPool(5); public static void main(String[] args) throws IOException { start(); } //启动服务器端 private static void start() throws IOException { try { //通过构造函数创建ServerSocket //如果端口合法且空闲, 服务端就监听成功 server = new ServerSocket(DEFAULT_PORT); System.out.println("服务器已启动, 端口号: " + DEFAULT_PORT); while(true) { //通过accept()方法产生一个socket和新的客户端进行沟通, 这是一个长连接 Socket socket= server.accept(); System.out.println("有新的客户端连接----" ); //当有新的客户端接入时, 将socket打包成一个任务, 投入到线程池 executorService.execute(new BioServerHandler(socket)); } } finally { if (server != null) { server.close(); } } } } //类说明: 线程池中用来处理客户端具体socket的输入输出流的线程 public class BioServerHandler implements Runnable { private Socket socket; public BioServerHandler(Socket socket) { this.socket = socket; } public void run() { //负责socket读写的输出、输入流, 使用try-resources语法 try (BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); PrintWriter out = new PrintWriter(socket.getOutputStream(), true)) { String message; String result; //通过输入流读取客户端传输的数据 //如果已经读到输入流尾部, 返回null, 退出循环; 如果得到非空值, 就将结果进行业务处理 while((message = in.readLine()) != null) { System.out.println("服务端接收到信息: " + message); result = "返回客户端信息: 你好, " + message + ", 现在时间是: " + new Date(System.currentTimeMillis()).toString() ; //将业务结果通过输出流返回给客户端 out.println(result); } } catch (Exception e) { e.printStackTrace(); } finally { //使用try-resources语法, 可以不用下面这段判断对资源进行关闭 if (socket != null) { try { socket.close(); } catch (IOException e) { e.printStackTrace(); } socket = null; } } } }
3.AIO网络编程
(1)AIO即异步IO模型
(2)AIO编程介绍
(3)客户端AsynchronousSocketChannel
(4)服务端AsynchronousServerSocketChannel
(5)AIO、BIO和NIO对比
(1)AIO即异步IO模型
(2)AIO编程介绍
AIO核心类有两个:
AsynchronousSocketChannel(客户端)
AsynchronousServerSocketChannel(服务端)
AIO为TCP通信提供了异步Channel。AsynchronousServerSocketChannel创建成功后,类似于ServerSocket,也是调用accept()方法来接受来自客户端的连接。
由于异步IO实际的IO操作是交给操作系统来做的,用户进程只负责通知操作系统进行IO和接受操作系统IO完成的通知。所以调用异步的ServerSocketChannel的accept()方法,当前线程不会阻塞。但程序不知道accept()方法何时能接收到客户端请求且操作系统完成网络IO。为解决这个问题,AIO中ServerSocketChannel的accept()方法是这样的。
public abstract <A> void accept( A attachment, CompletionHandler<AsynchronousSocketChannel, ? super A> handler ); public interface CompletionHandler<V,A> { //当IO完成时触发该方法 //该方法的第一个参数代表IO操作返回的对象,第二个参数代表发起IO操作时传入的附加参数 void completed(V result, A attachment); //当IO失败时触发该方法,第一个参数代表IO操作失败引发的异常或错误 void failed(Throwable exc, A attachment); }
AIO中ServerSocketChannel的accept()方法会接受来自客户端请求,连接成功或失败都会触发CompletionHandler对象的相应方法。
(3)客户端AsynchronousSocketChannel
AsynchronousSocketChannel的的用法与Socket类似,有三个方法:
一.connect()连接指定端口 + IP地址的服务器
二.read()处理读
三.write()处理写
下面是AIO的客户端的代码示例:
//类说明: AIO的客户端主程序 public class AioClient { //IO通信处理器 private static AioClientHandler clientHandle; public static void start() { if (clientHandle != null) { return; } clientHandle = new AioClientHandler(DEFAULT_SERVER_IP, DEFAULT_PORT); //负责网络通讯的线程 new Thread(clientHandle,"Client").start(); } //向服务器发送消息 public static boolean sendMsg(String msg) throws Exception { if (msg.equals("q")) { return false; } clientHandle.sendMessage(msg); return true; } public static void main(String[] args) throws Exception { AioClient.start(); System.out.println("请输入发送给服务端的请求消息: "); Scanner scanner = new Scanner(System.in); while(AioClient.sendMsg(scanner.nextLine())); } } //1.clientChannel.connect向服务端发起连接,是异步的 //连接成功/失败系统会调用completed/failed方法进行通知 //类说明: IO通信处理器, 1.负责连接服务器, 2.提供对外暴露的可以向服务端发送数据的API //AioClientHandler也会被打包成一个任务放到线程池里面去执行, 所以要实现Runnable //由于客户端发起的连接是基于AIO的, 需要提示出连接成功或者失败的返回说明, 因此也要实现CompletionHandler public class AioClientHandler implements CompletionHandler<Void, AioClientHandler>, Runnable { private AsynchronousSocketChannel clientChannel;//客户端进行异步操作的socket private String host; private int port; //防止线程退出不好演示效果, 因为任务是一个异步IO, 方法执行完后会迅速返回, //为了不让客户端程序过早退出, 所以这里用CountDownLatch阻塞一下 private CountDownLatch latch; public AioClientHandler(String host, int port) { this.host = host; this.port = port; try { //创建一个实际异步的客户端通道, 下面会基于这个通道发起连接 //连接成功会执行completed方法, 连接失败会执行failed方法 clientChannel = AsynchronousSocketChannel.open(); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { //创建CountDownLatch, 因为是异步调用, 下面的connect不会阻塞, 那么整个run方法会迅速结束, 那么负责网络通讯的线程也会迅速结束 //所以使用这个CountDownLatch就是为了进行阻塞 latch = new CountDownLatch(1); //发起异步连接操作, 回调参数就是这个实例本身, 如果连接成功会回调这个实例的completed方法 clientChannel.connect(new InetSocketAddress(host, port), null,this); try { latch.await(); clientChannel.close(); } catch (InterruptedException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } //连接成功, 这个方法会被系统调用 @Override public void completed(Void result, AioClientHandler attachment) { System.out.println("已经连接到服务端"); } //连接失败, 这个方法会被系统调用 @Override public void failed(Throwable exc, AioClientHandler attachment) { System.err.println("连接到服务端失败"); exc.printStackTrace(); latch.countDown(); try { clientChannel.close(); } catch (IOException e) { e.printStackTrace(); } } //提供对外暴露的可以向服务端发送数据的API public void sendMessage(String msg) { //为了把msg变成可以在网络传输的格式: 把字符串变成字节数组, 再把字节数组放入缓冲区, 然后对缓冲区进行读 byte[] bytes = msg.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); //进行异步写, 向服务端发送消息, 同样的这个方法会迅速返回 //而且也需要提供一个接口让系统在一次网络写操作完成后通知应用程序 //所以传入一个实现了CompletionHandler的AioClientWriteHandler //第1个writeBuffer, 表示客户端要发送给服务器的数据 //第2个writeBuffer, 考虑到网络写有可能无法一次性将数据写完,需要进行多次网络写 //所以将writeBuffer作为附件传递给AioClientWriteHandler clientChannel.write(writeBuffer, writeBuffer, new AioClientWriteHandler(clientChannel, latch)); } } //2.clientChannel.write向服务端写数据,也是异步的 //类说明: 网络写的处理器, 负责异步写完之后的处理 //在CompletionHandler<Integer, ByteBuffer>中 //Integer: 表示本次网络写操作完成实际写入的字节数 //ByteBuffer: 表示写操作的附件, 存储了写操作需要写入的数据 public class AioClientWriteHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel clientChannel; private CountDownLatch latch; public AioClientWriteHandler(AsynchronousSocketChannel clientChannel, CountDownLatch latch) { this.clientChannel = clientChannel; this.latch = latch; } @Override public void completed(Integer result, ByteBuffer buffer) { //有可能无法一次性将数据写完, 需要检查缓冲区中是否还有数据需要继续进行网络写 if (buffer.hasRemaining()) { clientChannel.write(buffer, buffer, this); } else { //写操作已经完成, 为读取服务端传回的数据建立缓冲区 ByteBuffer readBuffer = ByteBuffer.allocate(1024); //进行异步读, 这个方法会迅速返回, 而且也需要提供一个接口让系统在读操作完成后通知我们的应用程序 //所以要传入一个实现了CompletionHandler的AioClientReadHandler clientChannel.read(readBuffer, readBuffer, new AioClientReadHandler(clientChannel, latch)); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { System.err.println("数据发送失败..."); try { clientChannel.close(); latch.countDown(); } catch (IOException e) { } } } //3.clientChannel.read读取服务端的数据,也是异步的 //类说明: 网络读的处理器, 负责异步读完之后的处理 //在CompletionHandler<Integer, ByteBuffer>中 //Integer: 表示本次网络写操作完成实际写入的字节数 //ByteBuffer: 表示写操作的附件, 存储了写操作需要写入的数据 public class AioClientReadHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel clientChannel; private CountDownLatch latch; public AioClientReadHandler(AsynchronousSocketChannel clientChannel, CountDownLatch latch) { this.clientChannel = clientChannel; this.latch = latch; } @Override public void completed(Integer result, ByteBuffer buffer) { buffer.flip(); byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); String msg; try { msg = new String(bytes,"UTF-8"); System.out.println("读取到服务端的数据: " + msg); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { System.err.println("数据读取失败..."); try { clientChannel.close(); latch.countDown(); } catch (IOException e) { } } }
(4)服务端AsynchronousServerSocketChannel
下面是服务端程序的代码示例:
//类说明: 服务端主程序 public class AioServer { private static AioServerHandler serverHandle; //统计客户端个数 public volatile static long clientCount = 0; public static void start() { if (serverHandle != null) { return; } serverHandle = new AioServerHandler(DEFAULT_PORT); new Thread(serverHandle, "Server").start(); } public static void main(String[] args) { AioServer.start(); } } //1.serverChannel.accept接收客户端的连接,是异步的 //所以需要AioAcceptHandler实现CompletionHandler来让操作系统进行调用和通知 //类说明: 响应网络操作的处理器, 接收客户端的连接 public class AioServerHandler implements Runnable { //使用latch进行阻塞, 防止任务跑完run方法后就退出了 public CountDownLatch latch; //建立ServerSocket, 进行异步通信的通道 public AsynchronousServerSocketChannel serverChannel; public AioServerHandler(int port) { try { //创建服务端通道 serverChannel = AsynchronousServerSocketChannel.open(); //绑定端口 serverChannel.bind(new InetSocketAddress(port)); System.out.println("服务端已经启动, 端口是: " + port); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { latch = new CountDownLatch(1); //用于接收客户端的连接, 异步操作, 需要实现了CompletionHandler接口的处理器处理和客户端的连接操作 serverChannel.accept(this, new AioAcceptHandler()); try { //使用latch进行阻塞, 防止任务跑完run方法后就退出了 latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } } //2.AioAcceptHandler的channel就是真正和客户端进行通信用的socket,区别于serverChannel仅作为中介来分配 //不过要注意的是:每完成一次serverChannel.accept连接之后,需要重新再一次注册监听,让别的客户端也可以连接 //类说明: 服务端用来处理接收客户端连接的处理器 public class AioAcceptHandler implements CompletionHandler<AsynchronousSocketChannel, AioServerHandler> { //这里的channel就是真正和客户端进行通信用的socket, 区别于serverChannel仅作为中介来分配 @Override public void completed(AsynchronousSocketChannel channel, AioServerHandler serverHandler) { AioServer.clientCount++; System.out.println("连接的客户端数: " + AioServer.clientCount); //每完成一次serverChannel.accept连接之后, 需要重新再一次注册监听, 让别的客户端也可以连接 serverHandler.serverChannel.accept(serverHandler,this); ByteBuffer readBuffer = ByteBuffer.allocate(1024); //异步读, read方法的参数如下: //ByteBuffer dst: 接收缓冲区, 用于从异步Channel中读取数据包; //A attachment: 异步Channel携带的附件, 通知回调的时候作为入参使用; //CompletionHandler<Integer,? super A>: 系统回调的业务handler, 进行读操作; channel.read(readBuffer, readBuffer, new AioReadHandler(channel)); } @Override public void failed(Throwable exc, AioServerHandler serverHandler) { exc.printStackTrace(); serverHandler.latch.countDown(); } } //3.channel.read读取客户端具体的数据,也是异步的 //下面代码使用一个内部类实现CompletionHandler来向客户端写数据 //类说明: 读数据的处理器 public class AioReadHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel channel; public AioReadHandler(AsynchronousSocketChannel channel) { this.channel = channel; } //读取到消息后的处理 @Override public void completed(Integer result, ByteBuffer attachment) { //如果条件成立, 说明客户端主动终止了TCP套接字, 这时服务端终止就可以了 if (result == -1) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } return; } //flip操作 attachment.flip(); byte[] message = new byte[attachment.remaining()]; attachment.get(message); try { System.out.println(result); String msg = new String(message,"UTF-8"); System.out.println("服务端读取到客户端的消息是: " + msg); String responseStr = response(msg); //向客户端发送消息 doWrite(responseStr); } catch (Exception e) { e.printStackTrace(); } } //向客户端发送消息 private void doWrite(String result) { byte[] bytes = result.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); //异步写数据 channel.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer attachment) { if (attachment.hasRemaining()) { channel.write(attachment, attachment,this); } else { //读取客户端传回的数据 ByteBuffer readBuffer = ByteBuffer.allocate(1024); //异步读数据 channel.read(readBuffer, readBuffer, new AioReadHandler(channel)); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } } }); } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { this.channel.close(); } catch (IOException e) { e.printStackTrace(); } } }
(5)AIO、BIO和NIO对比
说明一:客户端可以使用BIO,BIO编程最简单。但是BIO性能不行,无法应对高并发的情况,所以BIO适合客户端。
说明二:服务端通常不使用BIO,通常使用AIO或者NIO。但是这个AIO是个假的异步,因为Linux内核还不支持真正的异步IO,只有Windows下的IOCP才实现了异步IO。Linux下可以做到的异步IO效果也只是通过使用IO复用下的epoll模型,而IO复用实际上使用的还是同步IO,所以在Linux下进行网络编程常用的是NIO。
说明三:AIO从理论上讲性能最强,但编码最复杂,且Linux本身的实现并不优秀。所以一般在实际的Linux生产上,也很少使用AIO,主要是使用NIO。Netty就是基于NIO开发的。
5.NIO网络编程之Buffer
(1)Buffer的作用
Buffer的作用是方便读写通道(Channel)中的数据。首先数据是从通道(Channel)读入缓冲区,从缓冲区写入通道(Channel)的。应用程序发送数据时,会先将数据写入缓冲区,然后再通过通道发送缓冲区的数据。应用数据读取数据时,会先将数从通道中读到缓冲区,然后再读取缓冲区的数据。
缓冲区本质上是一块可以写入数据、可以读取数据的内存。这块内存被包装成NIO的Buffer对象,并提供了一组方法用来方便访问该块内存。所以Buffer的本质是一块可以写入数据、可以读取数据的内存。
(2)Buffer的重要属性
一.capacity
Buffer作为一个内存块有一个固定的大小值,叫capacity。我们只能往Buffer中写capacity个byte、long,char等类型。一旦Buffer满了,需要将其清空(读取数据或清除数据)才能继续写数据。
二.position
当往Buffer中写数据时,position表示当前的位置,position的初始值为0。当一个数据写到Buffer后,position会移动到下一个可插入数据的位置。所以position的最大值为capacity – 1。
当从Buffer中读取数据时,需要从某个特定的position位置读数据。如果将Buffer从写模式切换到读模式,那么position会被重置为0。当从Buffer的position处读到一个数据时,position会移动到下一个可读位置。
三.limit
在写模式下,Buffer的limit表示最多能往Buffer里写多少数据。在写模式下,Buffer的limit等于Buffer的capacity。
当Buffer从写模式切换到读模式时, limit表示最多能读到多少数据。因此,当Buffer切换到读模式时,limit会被设置成写模式下的position值。
(3)Buffer的分配
要想获得一个Buffer对象首先要进行分配,每一个Buffer类都有allocate()方法。可以在堆上分配,也可以在直接内存上分配。
//分配一个capacity为48字节的ByteBuffer的例子 ByteBuffer buf = ByteBuffer.allocate(48); //分配一个capacity为1024个字符的CharBuffer的例子 CharBuffer buf = CharBuffer.allocate(1024);
一般建议使用在堆上分配。如果应用偏计算,就用堆上分配。如果应用偏网络通讯频繁,就用直接内存。
wrap()方法可以把一个byte数组或byte数组的一部分包装成ByteBuffer对象。
ByteBuffer wrap(byte [] array); ByteBuffer wrap(byte [] array, int offset, int length);
Buffer分配的例子:
//类说明: Buffer的分配 public class AllocateBuffer { //输出结果如下: //----------Test allocate-------- //before allocate, 虚拟机可用的内存大小: 253386384 //buffer = java.nio.HeapByteBuffer[pos=0 lim=102400000 cap=102400000] //after allocate, 虚拟机可用的内存大小: 150986368 //directBuffer = java.nio.DirectByteBuffer[pos=0 lim=102400000 cap=102400000] //after direct allocate, 虚拟机可用的内存大小: 150986368 //----------Test wrap-------- //java.nio.HeapByteBuffer[pos=0 lim=32 cap=32] //java.nio.HeapByteBuffer[pos=10 lim=20 cap=32] public static void main(String[] args) { System.out.println("----------Test allocate--------"); System.out.println("before allocate, 虚拟机可用的内存大小: " + Runtime.getRuntime().freeMemory()); //堆上分配 ByteBuffer buffer = ByteBuffer.allocate(102400000); System.out.println("buffer = " + buffer); System.out.println("after allocate, 虚拟机可用的内存大小: " + Runtime.getRuntime().freeMemory()); //这部分用的直接内存 ByteBuffer directBuffer = ByteBuffer.allocateDirect(102400000); System.out.println("directBuffer = " + directBuffer); System.out.println("after direct allocate, 虚拟机可用的内存大小: " + Runtime.getRuntime().freeMemory()); System.out.println("----------Test wrap--------"); byte[] bytes = new byte[32]; buffer = ByteBuffer.wrap(bytes); System.out.println(buffer); buffer = ByteBuffer.wrap(bytes, 10, 10); System.out.println(buffer); } }
(4)Buffer的读写
一.向Buffer中写数据
将数据写到Buffer有两种方式:
方式一:从Channel读出数据写到Buffer
方式二:通过Buffer的put()方法将数据写到Buffer
//从Channel写到Buffer的例子 int bytesRead = inChannel.read(buf);//从channel读出数据写到buffer //通过put()方法将数据写到Buffer的例子 buf.put(127);
二.从Buffer中读取数据
从Buffer中读取数据有两种方式:
方式一:从Buffer中读取数据写入到Channel
方式二:使用get()方法从Buffer中读取数据
//从Buffer读取数据到Channel的例子 int bytesWritten = inChannel.write(buf); //使用get()方法从Buffer中读取数据的例子 byte aByte = buf.get();
三.使用Buffer读写数据常见步骤
步骤一:写入数据到Buffer
步骤二:调用flip()方法
步骤三:从Buffer中读取数据
步骤四:调用clear()方法或compact()方法
flip()方法会将Buffer从写模式切换到读模式,调用flip()方法会将position设回0,并将limit设置成之前的position值。
当向buffer写入数据时,buffer会记录下写了多少数据。一旦要读取数据,需要通过flip()方法将Buffer从写模式切换到读模式。在读模式下,可以读取之前写入到buffer的所有数据。一旦读完了所有的数据,就需要清空缓冲区,让它可以再次被写入。
有两种方式能清空缓冲区:调用clear()方法或compact()方法。clear()方法会清空整个缓冲区,compact()方法只会清除已经读过的数据。任何未读的数据都被移到缓冲区的起始处,新写入的数据将放到缓冲区未读数据的后面。
四.其他常用操作
操作一:rewind()方法
Buffer.rewind()将position设回0,所以可以重读Buffer中的所有数据。limit保持不变,仍然表示能从Buffer中读取多少个元素(byte、char等)。
操作二:clear()与compact()方法
一旦读取完Buffer中的数据,需要让Buffer准备好再次被写入,这时候可以通过clear()方法或compact()方法来完成。
如果调用的是clear()方法,position将被设为0,limit被设为capacity的值。此时Buffer被认为是清空了,但是Buffer中的数据并未清除,只是这些标记能告诉我们可以从哪里开始往Buffer里写数据。
如果Buffer中有一些未读的数据,调用clear()方法,数据将被遗忘。意味着不再有任何标记会告诉你哪些数据被读过,哪些还没有。
如果Buffer中仍有未读的数据,且后续还需要这些数据,但是此时想要先写些数据,那么可以使用compact()方法。compact()方法会将所有未读的数据拷贝到Buffer起始处,然后将position设到最后一个未读元素正后面,limit属性依然像clear()方法一样设置成capacity。现在Buffer准备好写数据了,但是不会覆盖未读的数据。
操作三:mark()与reset()方法
通过调用Buffer.mark()方法,可以标记Buffer中的一个特定position,之后可以通过调用Buffer.reset()方法恢复到这个position。
//类说明: Buffer方法演示 public class BufferMethod { public static void main(String[] args) { System.out.println("------Test get-------------"); ByteBuffer buffer = ByteBuffer.allocate(32); buffer .put((byte) 'a')//0 .put((byte) 'b')//1 .put((byte) 'c')//2 .put((byte) 'd')//3 .put((byte) 'e')//4 .put((byte) 'f');//5 //before flip()java.nio.HeapByteBuffer[pos=6 lim=32 cap=32] System.out.println("before flip()" + buffer); //转换为读取模式: pos置为0, lim置为转换前pos的值 buffer.flip(); //before get():java.nio.HeapByteBuffer[pos=0 lim=6 cap=32] System.out.println("before get():" + buffer); //get()会影响position的位置, 这是相对取; System.out.println((char) buffer.get()); //after get():java.nio.HeapByteBuffer[pos=1 lim=6 cap=32] System.out.println("after get():" + buffer); //get(index)不影响position的值, 这是绝对取; System.out.println((char) buffer.get(2)); //after get(index):java.nio.HeapByteBuffer[pos=1 lim=6 cap=32] System.out.println("after get(index):" + buffer); byte[] dst = new byte[10]; //position移动两位 buffer.get(dst, 0, 2); //after get(dst, 0, 2):java.nio.HeapByteBuffer[pos=3 lim=6 cap=32] System.out.println("after get(dst, 0, 2):" + buffer); System.out.println("dst:" + new String(dst));//dst:bc System.out.println("--------Test put-------"); ByteBuffer bb = ByteBuffer.allocate(32); //before put(byte):java.nio.HeapByteBuffer[pos=0 lim=32 cap=32] System.out.println("before put(byte):" + bb); //put()不带索引会改变pos, after put(byte):java.nio.HeapByteBuffer[pos=1 lim=32 cap=32] System.out.println("after put(byte):" + bb.put((byte) 'z')); //put(2,(byte) 'c')不改变position的位置 bb.put(2, (byte) 'c'); //after put(2,(byte) 'c'):java.nio.HeapByteBuffer[pos=1 lim=32 cap=32] System.out.println("after put(2,(byte) 'c'):" + bb); System.out.println(new String(bb.array())); //这里的buffer是abcdef[pos=3 lim=6 cap=32] bb.put(buffer); //after put(buffer):java.nio.HeapByteBuffer[pos=4 lim=32 cap=32] System.out.println("after put(buffer):" + bb); System.out.println(new String(bb.array())); System.out.println("--------Test reset----------"); buffer = ByteBuffer.allocate(20); System.out.println("buffer = " + buffer); buffer.clear(); buffer.position(5);//移动position到5 buffer.mark();//记录当前position的位置 buffer.position(10);//移动position到10 System.out.println("before reset:" + buffer); buffer.reset();//复位position到记录的地址 System.out.println("after reset:" + buffer); System.out.println("--------Test rewind--------"); buffer.clear(); buffer.position(10);//移动position到10 buffer.limit(15);//限定最大可写入的位置为15 System.out.println("before rewind:" + buffer); buffer.rewind();//将position设回0 System.out.println("before rewind:" + buffer); System.out.println("--------Test compact--------"); buffer.clear(); //放入4个字节,position移动到下个可写入的位置,也就是4 buffer.put("abcd".getBytes()); System.out.println("before compact:" + buffer); System.out.println(new String(buffer.array())); buffer.flip();//将position设回0,并将limit设置成之前position的值 System.out.println("after flip:" + buffer); //从Buffer中读取数据的例子,每读一次,position移动一次 System.out.println((char) buffer.get()); System.out.println((char) buffer.get()); System.out.println((char) buffer.get()); System.out.println("after three gets:" + buffer); System.out.println(new String(buffer.array())); //compact()方法将所有未读的数据拷贝到Buffer起始处 //然后将position设到最后一个未读元素正后面 buffer.compact(); System.out.println("after compact:" + buffer); System.out.println(new String(buffer.array())); } }
(5)Buffer常用方法总结
应用程序可以向Selector对象注册它关注的Channel,以及具体的某一个Channel会对哪些IO事件感兴趣,Selector中会维护一个已经注册的Channel的容器。
Channel可以和操作系统进行内容传递,应用程序可以通过Channel读数据,也可以通过Channel向操作系统写数据,当然写数据和读数据都要通过Buffer来实现。
所有被Selector注册的Channel都是继承SelectableChannel的子类,通道中的数据总是要先读到一个Buffer,或者总是要从一个Buffer中写入。ScoketChannel和ServerSocketChannel都是SelectableChannel类的子类。
NIO中的SelectionKey共定义了四种事件类型:OP_ACCEPT、OP_READ、OP_WRITE、OP_CONNECT,分别对应接受连接、读、写、请求连接的网络Socket操作。
ServerSocketChannel和SocketChannel可以注册自己感兴趣的操作类型,当对应操作类型的就绪条件满足时操作系统就会通知这些Channel。