Java 网络编程 —— 实现非阻塞式的服务器

创建阻塞的服务器

ServerSocketChannelSockelChannel 采用默认的阻塞模式时,为了同时处理多个客户的连接,必须使用多线程

public class EchoServer {      	private int port = 8000;     private ServerSocketChannel serverSocketChannel = null;     private ExecutorService executorService; //线程池     private static final int POOL_MULTIPLE = 4; //线程池中工作线程的数目          public EchoServer() throws IOException {         //创建一个线程池         executorService = Executors.newFixedThreadPool(             Runtime.getRuntime().availableProcessors() * POOL_MULTIPLE);         //创建一个ServerSocketChannel对象         serverSocketChannel = ServerSocketChannel.open();         //使得在同一个主机上关闭了服务器程序,紧接着再启动该服务器程序时,可以顺利绑定相同的端口         serverSocketChannel.socket().setReuseAddress(true);         //把服务器进程与一个本地端口绑定         serverSocketChannel.socket().bind(new InetSocketAddress(port));         System.out.println("服务器启动");     }          public void service() {         while (true) {             SocketChannel socketChannel = null;             try {                 socketChannel = serverSocketChannel.accept();                 //处理客户连接                 executorService.execute(new Handler(socketChannel));             } catch(IOException e) {                 e.printStackTrace();             }         }     }          public static void main(String args[])throws IOException {         new EchoServer().service();     }          //处理客户连按     class Handler implements Runnable {          private SocketChannel socketChannel; 		         public Handler(SocketChannel socketChannel) {             this.socketChannel = socketChannel;         }                  public void run() {             handle(socketChannel);         }                  public void handle(SocketChannel socketChannel) {             try {                 //获得与socketChannel关联的Socket对象                 Socket socket = socketChannel.socket();                 System.out.println("接收到客户连接,来自:" + socket.getInetAddress() + ":" + socket.getPort());                                  BufferedReader br = getReader(socket);                 PrintWriter pw = getWriter(socket);                                  String msg = null;                 while ((msg = br.readLine()) != null) {                     System.out.println(msg);                     pw.println(echo(msg));                     if (msg.equals("bye")) {                         break;                     }                 }             } catch (IOException e) {                 e.printStackTrace();             } finally {                 try {                     if(socketChannel != null) {                         socketChannel.close();                     } catch (IOException e) {                         e.printStackTrace();                     }                 }             }         }     }           private PrintWriter getWriter(Socket socket) throws IOException {         OutputStream socketOut = socket.getOutputStream();         return new PrintWriter(socketOut,true);     }          private BufferedReader getReader(Socket socket) throws IOException {         InputStream socketIn = socket.getInputStream();         return new BufferedReader(new InputStreamReader(socketIn));     }          public String echo(String msg) {         return "echo:" + msg;     } } 

创建非阻塞的服务器

在非阻塞模式下,EchoServer 只需要启动一个主线程,就能同时处理三件事:

  • 接收客户的连接
  • 接收客户发送的数据
  • 向客户发回响应数据

EchoServer 委托 Selector 来负责监控接收连接就绪事件、读就绪事件和写就绪事件如果有特定事件发生,就处理该事件

// 创建一个Selector对象 selector = Selector.open(); //创建一个ServerSocketChannel对象 serverSocketChannel = ServerSocketChannel.open(); //使得在同一个主机上关闭了服务器程序,紧接着再启动该服务器程序时 //可以顺利绑定到相同的端口 serverSocketChannel.socket().setReuseAddress(true); //使ServerSocketChannel工作于非阻塞模式 serverSocketChannel.configureBlocking(false): //把服务器进程与一个本地端口绑定 serverSocketChannelsocket().bind(new InetSocketAddress(port)); 

EchoServer 类的 service() 方法负责处理本节开头所说的三件事,体现其主要流程的代码如下:

public void service() throws IOException {     serverSocketChannel.reqister(selector, SelectionKey.OP_ACCEPT);     //第1层while循环     while(selector.select() > 0) {         //获得Selector的selected-keys集合         Set readyKeys = selector.selectedKeys();         Iterator it = readyKeys.iterator();         //第2层while循环         while (it.hasNext()) {             SelectionKey key = null;             //处理SelectionKey             try {                 //取出一个SelectionKey                 key = (SelectionKey) it.next();                 //把 SelectionKey从Selector 的selected-key 集合中删除                 it.remove();                 1f (key.isAcceptable()) { 处理接收连接就绪事件; }                 if (key.isReadable()) { 处理读就绪水件; }                 if (key.isWritable()) { 处理写就绪事件; }             } catch(IOException e) {                 e.printStackTrace();                 try {                     if(key != null) {                         //使这个SelectionKey失效                         key.cancel();                         //关闭与这个SelectionKey关联的SocketChannel                         key.channel().close();                     }                 } catch(Exception ex) {                      e.printStackTrace();                 }             }         }     } } 
  • 首先由 ServerSocketChannelSelector 注册接收连接就绪事件,如果 Selector 监控到该事件发生,就会把相应的 SelectionKey 对象加入 selected-keys 集合
  • 第一层 while 循环,不断询问 Selector 已经发生的事件,select() 方法返回当前相关事件已经发生的 SelectionKey 的个数,如果当前没有任何事件发生,该方法会阻塞下去,直到至少有一个事件发生。SelectorselectedKeys() 方法返回 selected-keys 集合,它存放了相关事件已经发生的 SelectionKey 对象
  • 第二层 while 循环,从 selected-keys 集合中依次取出每个 SelectionKey 对象并从集合中删除,,然后调用 isAcceptable()isReadable()isWritable() 方法判断到底是哪种事件发生了,从而做出相应的处理

1. 处理接收连接就绪事件

if (key.isAcceptable()) {     //获得与SelectionKey关联的ServerSocketChannel     ServerSocketChannel ssc = (ServerSocketChannel) key.channel();     //获得与客户连接的SocketChannel     SocketChannel socketChannel = (SocketChannel) ssc.accept();     //把Socketchannel设置为非阻塞模式     socketChannel.configureBlocking(false);     //创建一个用于存放用户发送来的数据的级冲区     ByteBuffer buffer = ByteBuffer.allocate(1024);     //Socketchannel向Selector注册读就绪事件和写就绪事件     socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer); } 

2. 处理读就绪事件

public void receive(SelectionKey key) throws IOException {     //获得与SelectionKey关联的附件     ByteBuffer buffer = (ByteBuffer) key.attachment();     //获得与SelectionKey关联的Socketchannel     SocketChannel socketChannel = (SocketChannel)key.channel();     //创建一个ByteBuffer用于存放读到的数据     ByteBuffer readBuff = ByteBuffer.allocate(32);     socketChannel.read(readBuff);     readBuff.flip();     //把buffer的极限设为容量     buffer.limit(buffer.capacity());     //把readBuff中的内容拷贝到buffer     buffer.put(readBuff); } 

3. 处理写就绪事件

public void send(SelectionKey key) throws IOException {     //获得与SelectionKey关联的ByteBuffer     ByteBuffer buffer = (ByteBuffer) key.attachment();     //获得与SelectionKey关联的SocketChannel     SocketChannel socketChannel = (SocketChannel) key.channel();     buffer.flip();     //按照GBK编码把buffer中的字节转换为字符串     String data = decode(buffer);     //如果还没有读到一行数据就返回     if(data.indexOf("rn") == -1)         return;     //截取一行数据     String outputData = data.substring(0, data.indexOf("n") + 1);     //把输出的字符串按照GBK编码转换为字节,把它放在outputBuffer中     ByteBuffer outputBuffer = encode("echo:" + outputData);     //输出outputBuffer的所有字节     while(outputBuffer,hasRemaining())         socketChannel.write(outputBuffer);     //把outputData字符审按照GBK编码,转换为字节,把它放在ByteBuffer     ByteBuffer temp = encode(outputData);     //把buffer的位置设为temp的极限     buffer.position(temp.limit()):     //删除buffer已经处理的数据     buffer.compact();     //如果已经输出了字符串“byern”,就使SelectionKey失效,并关闭SocketChannel     if(outputData.equals("byern")) {         key.cancel();         socketChannel.close();     } } 

完整代码如下:

public class EchoServer {      	private int port = 8000;     private ServerSocketChannel serverSocketChannel = null;     private Selector selector;     private Charset charset = Charset.forName("GBK");  	public EchoServer() throws IOException {         // 创建一个Selector对象         selector = Selector.open();         //创建一个ServerSocketChannel对象         serverSocketChannel = ServerSocketChannel.open();         //使得在同一个主机上关闭了服务器程序,紧接着再启动该服务器程序时         //可以顺利绑定到相同的端口         serverSocketChannel.socket().setReuseAddress(true);         //使ServerSocketChannel工作于非阻塞模式         serverSocketChannel.configureBlocking(false):         //把服务器进程与一个本地端口绑定         serverSocketChannelsocket().bind(new InetSocketAddress(port));     }          public void service() throws IOException {         serverSocketChannel.reqister(selector, SelectionKey.OP_ACCEPT);         //第1层while循环         while(selector.select() > 0) {             //获得Selector的selected-keys集合             Set readyKeys = selector.selectedKeys();             Iterator it = readyKeys.iterator();             //第2层while循环             while (it.hasNext()) {                 SelectionKey key = null;                 //处理SelectionKey                 try {                     //取出一个SelectionKey                     key = (SelectionKey) it.next();                     //把 SelectionKey从Selector 的selected-key 集合中删除                     it.remove();                     1f (key.isAcceptable()) {                          //获得与SelectionKey关联的ServerSocketChannel                         ServerSocketChannel ssc = (ServerSocketChannel) key.channel();                         //获得与客户连接的SocketChannel                         SocketChannel socketChannel = (SocketChannel) ssc.accept();                         //把Socketchannel设置为非阻塞模式                         socketChannel.configureBlocking(false);                         //创建一个用于存放用户发送来的数据的级冲区                         ByteBuffer buffer = ByteBuffer.allocate(1024);                         //Socketchannel向Selector注册读就绪事件和写就绪事件                         socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer);                     }                     if (key.isReadable()) { receive(key); }                     if (key.isWritable()) { send(key); }                 } catch(IOException e) {                     e.printStackTrace();                     try {                         if(key != null) {                             //使这个SelectionKey失效                             key.cancel();                             //关闭与这个SelectionKey关联的SocketChannel                             key.channel().close();                         }                     } catch(Exception ex) {                          e.printStackTrace();                     }                 }             }         }     }          public void receive(SelectionKey key) throws IOException {         //获得与SelectionKey关联的附件         ByteBuffer buffer = (ByteBuffer) key.attachment();         //获得与SelectionKey关联的Socketchannel         SocketChannel socketChannel = (SocketChannel)key.channel();         //创建一个ByteBuffer用于存放读到的数据         ByteBuffer readBuff = ByteBuffer.allocate(32);         socketChannel.read(readBuff);         readBuff.flip();         //把buffer的极限设为容量         buffer.limit(buffer.capacity());         //把readBuff中的内容拷贝到buffer         buffer.put(readBuff);     }          public void send(SelectionKey key) throws IOException {         //获得与SelectionKey关联的ByteBuffer         ByteBuffer buffer = (ByteBuffer) key.attachment();         //获得与SelectionKey关联的SocketChannel         SocketChannel socketChannel = (SocketChannel) key.channel();         buffer.flip();         //按照GBK编码把buffer中的字节转换为字符串         String data = decode(buffer);         //如果还没有读到一行数据就返回         if(data.indexOf("rn") == -1)             return;         //截取一行数据         String outputData = data.substring(0, data.indexOf("n") + 1);         //把输出的字符串按照GBK编码转换为字节,把它放在outputBuffer中         ByteBuffer outputBuffer = encode("echo:" + outputData);         //输出outputBuffer的所有字节         while(outputBuffer,hasRemaining())             socketChannel.write(outputBuffer);         //把outputData字符审按照GBK编码,转换为字节,把它放在ByteBuffer         ByteBuffer temp = encode(outputData);         //把buffer的位置设为temp的极限         buffer.position(temp.limit()):         //删除buffer已经处理的数据         buffer.compact();         //如果已经输出了字符串“byern”,就使SelectionKey失效,并关闭SocketChannel         if(outputData.equals("byern")) {             key.cancel();             socketChannel.close();         }     }          //解码     public String decode(ByteBuffer buffer) {         CharBuffer charBuffer = charset.decode(buffer);         return charBuffer.toStrinq();     }          //编码     public ByteBuffer encode(String str) {         return charset.encode(str);     }          public static void main(String args[])throws Exception {         EchoServer server = new EchoServer();         server.service();     } } 

阻塞模式与非阻塞模式混合使用

使用非阻塞模式时,ServerSocketChannel 以及 SocketChannel 都被设置为非阻塞模式,这使得接收连接、接收数据和发送数据的操作都采用非阻塞模式,EchoServer 采用一个线程同时完成这些操作

假如有许多客户请求连接,可以把接收客户连接的操作单独由一个线程完成,把接收数据和发送数据的操作由另一个线程完成,这可以提高服务器的并发性能

负责接收客户连接的线程按照阻塞模式工作,如果收到客户连接,就向 Selector 注册读就绪和写就绪事件,否则进入阻塞状态,直到接收到了客户的连接。负责接收数据和发送数据的线程按照非阻塞模式工作,只有在读就绪或写就绪事件发生时,才执行相应的接收数据和发送数据操作

public class EchoServer {      	private int port = 8000;     private ServerSocketChannel serverSocketChannel = null;     private Selector selector = null;     private Charset charset = Charset.forName("GBK");  	public EchoServer() throws IOException {         selector = Selector.open();         serverSocketChannel = ServerSocketChannel.open();         serverSocketChannel.socket().setReuseAddress(true);         serverSocketChannelsocket().bind(new InetSocketAddress(port));     }          public void accept() {         while(true) {             try {                 SocketChannel socketChannel = serverSocketChannel.accept();                 socketChannel.configureBlocking(false);                                  ByteBuffer buffer = ByteBuffer.allocate(1024);                 synchronized(gate) {                     selector.wakeup();                     socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer);                 }             } catch(IOException e) {                 e.printStackTrace();             }         }     }          private Object gate=new Object();          public void service() throws IOException {         while(true) {             synchronized(gate){}             int n = selector.select();             if(n == 0) continue;             Set readyKeys = selector.selectedKeys();             Iterator it = readyKeys.iterator();             while (it.hasNext()) {                 SelectionKey key = null;                 try {     				it.remove();                     if (key.isReadable()) {                         receive(key);                     }                     if (key.isWritable()) {                         send(key);                     }                 } catch(IOException e) {                     e.printStackTrace();                     try {                         if(key != null) {                             key.cancel();                             key.channel().close();                         }                     } catch(Exception ex) { e.printStackTrace(); }                 }             }         }     }          public void receive(SelectionKey key) throws IOException {         ...     }          public void send(SelectionKey key) throws IOException {         ...     }          public String decode(ByteBuffer buffer) {         ...     }          public ByteBuffer encode(String str) {         ...     }          public static void main(String args[])throws Exception {         final EchoServer server = new EchoServer();         Thread accept = new Thread() {             public void run() {                 server.accept();             }         };         accept.start(); 		server.service();     } } 

注意一点:主线程的 selector select() 方法和 Accept 线程的 register(...) 方法都会造成阻塞,因为他们都会操作 Selector 对象的共享资源 all-keys 集合,这有可能会导致死锁

导致死锁的具体情形是:Selector 中尚没有任何注册的事件,即 all-keys 集合为空,主线程执行 selector.select() 方法时将进入阻塞状态,只有当 Accept 线程向 Selector 注册了事件,并且该事件发生后,主线程才会从 selector.select() 方法返回。然而,由于主线程正在 selector.select() 方法中阻塞,这使得 Acccept 线程也在 register() 方法中阻塞。Accept 线程无法向 Selector 注册事件,而主线程没有任何事件可以监控,所以这两个线程将永远阻塞下去

为了避免对共享资源的竞争,同步机制使得一个线程执行 register() 时,不允许另一个线程同时执行 select() 方法,反之亦然

发表评论

评论已关闭。

相关文章