1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
| import lombok.extern.slf4j.Slf4j;
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.*; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.function.Function;
@Slf4j public class TcpServer {
private Selector selector; private BindPortThread bindThread; private SocketChannelProcessorThread messageProcessorThread; private ServerSocketChannel serverSocketChannel;
public TcpServer(int port, Function<String, String> dealPackage, int bufferSize, int workerSize) throws IOException { this.selector = Selector.open(); this.serverSocketChannel = ServerSocketChannel.open(); this.serverSocketChannel.socket().bind(new InetSocketAddress(port)); this.serverSocketChannel.configureBlocking(false); this.messageProcessorThread = new SocketChannelProcessorThread(dealPackage, bufferSize, workerSize, selector); this.messageProcessorThread.start(); this.bindThread = new BindPortThread(selector, serverSocketChannel); this.bindThread.start(); }
public void close() throws IOException { this.bindThread.close(); this.messageProcessorThread.close(); this.serverSocketChannel.close(); this.selector.close(); }
private class BindPortThread extends Thread {
private Selector selector; private ServerSocketChannel serverSocketChannel; private volatile boolean flag = true;
public BindPortThread(Selector selector, ServerSocketChannel serverSocketChannel) { super("TcpServer-BindPort-Thread"); this.selector = selector; this.serverSocketChannel = serverSocketChannel; }
@Override public void run() { while (flag) { try { if(serverSocketChannel.isOpen()) { final SocketChannel accept = serverSocketChannel.accept(); if(accept != null) { accept.configureBlocking(false); accept.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE).attach(new ConcurrentLinkedDeque<>()); } } } catch (ClosedChannelException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } }
public void close() throws IOException { this.serverSocketChannel.close(); this.flag = false; } } }
|