0%

基于nio的javaTcp服务器及客户端

文章字数:120,阅读全文大约需要1分钟

  • SocketChannelProcessorThread.class
  • 使用Select监听socketChannel的通用类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Deque;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

/**
* @author colin.cheng
* @date 2021-10-30
* @since 1.0.0
*/
public class SocketChannelProcessorThread extends Thread {

private int bufferSize;
private Selector selector;
private ExecutorService workerPool;
private Function<String, String> dealPackage;
private Charset defaultCharset = StandardCharsets.UTF_8;
private volatile boolean flag = true;

public SocketChannelProcessorThread(Function<String, String> dealPackage, int bufferSize, int workerSize, Selector selector) {
super("TcpServer-MessageProcessor-Thread");
this.selector = selector;
this.dealPackage = dealPackage;
this.bufferSize = bufferSize;
this.workerPool = Executors.newFixedThreadPool(workerSize, new ThreadFactory() {
final AtomicInteger index = new AtomicInteger();
@Override
public Thread newThread(Runnable runnable) {
return new Thread(runnable, "TcpServer-MessageProcessor-Worker" + index.getAndIncrement());
}
});
}

@Override
public void run() {
while (flag) {
try {
int readyChannels = this.selector.select(100);
if(readyChannels == 0) {
continue;
}
Set<SelectionKey> selectedKeys = this.selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
while(keyIterator.hasNext()) {
final SelectionKey selectionKey = keyIterator.next();
try {
dealSelectedKey(selectionKey);
} catch (IOException e) {
e.printStackTrace();
}
keyIterator.remove();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}

public void close() throws IOException {
this.flag = false;
this.selector.close();
this.workerPool.shutdown();
}

private void dealSelectedKey(SelectionKey key) throws IOException {
if(key.isConnectable()) {

} else if (key.isReadable()) {
final byte[] bytes = new byte[bufferSize];
bytes[0] = 1;
bytes[1] = 1;
final ByteBuffer buffer = ByteBuffer.wrap(bytes);
SocketChannel channel = (SocketChannel)key.channel();
int len = channel.read(buffer);
if(len > 0) {
final String str = new String(buffer.array(), defaultCharset);
workerPool.execute(()->{
final String res = dealPackage.apply(str);
if(res != null) {
((Deque<String>)key.attachment()).addLast(res);
}
});
}
} else if(key.isWritable()) {
SocketChannel channel = (SocketChannel)key.channel();
Deque<String> messList = (Deque<String>)key.attachment();
final Iterator<String> iterator = messList.iterator();
while (iterator.hasNext()) {
String str = iterator.next();
System.out.println("send >> " + str);
channel.write(ByteBuffer.wrap(str.getBytes(defaultCharset)));
iterator.remove();
}
}
}
}
  • TcpClient.class
  • 客户端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Deque;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.function.Function;

/**
* @author colin.cheng
* @date 2021-10-30
* @since 1.0.0
*/
public class TcpClient {

public SocketChannel socketChannel;
private Selector selector;
private SelectionKey selectionKey;
private SocketChannelProcessorThread socketChannelProcessorThread;

public TcpClient(String ip, int port, Function<String, String> dealPackage, int bufferSize, int workerSize) throws IOException {
this.socketChannel = SocketChannel.open();
this.socketChannel.connect(new InetSocketAddress(ip, port));
this.socketChannel.configureBlocking(false);
this.selector = Selector.open();
this.selectionKey = this.socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
this.selectionKey.attach(new ConcurrentLinkedDeque<>());
this.socketChannelProcessorThread = new SocketChannelProcessorThread(dealPackage, bufferSize, workerSize, selector);
this.socketChannelProcessorThread.start();
}

public void write(String mess) throws IOException {
Deque<String> list = (Deque<String>)this.selectionKey.attachment();
list.add(mess);
}

public void close() throws IOException {
this.socketChannelProcessorThread.close();
this.socketChannel.close();
this.selector.close();
}
}
  • TcpServer.java
  • 服务端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.*;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.function.Function;

/**
* @author colin.cheng
* @date 2021-10-30
* @since 1.0.0
*/
@Slf4j
public class TcpServer {

private Selector selector;
private BindPortThread bindThread;
private SocketChannelProcessorThread messageProcessorThread;
private ServerSocketChannel serverSocketChannel;

public TcpServer(int port, Function<String, String> dealPackage, int bufferSize, int workerSize) throws IOException {
this.selector = Selector.open();
this.serverSocketChannel = ServerSocketChannel.open();
this.serverSocketChannel.socket().bind(new InetSocketAddress(port));
this.serverSocketChannel.configureBlocking(false);
this.messageProcessorThread = new SocketChannelProcessorThread(dealPackage, bufferSize, workerSize, selector);
this.messageProcessorThread.start();
this.bindThread = new BindPortThread(selector, serverSocketChannel);
this.bindThread.start();
}

public void close() throws IOException {
this.bindThread.close();
this.messageProcessorThread.close();
this.serverSocketChannel.close();
this.selector.close();
}

private class BindPortThread extends Thread {

private Selector selector;
private ServerSocketChannel serverSocketChannel;
private volatile boolean flag = true;

public BindPortThread(Selector selector, ServerSocketChannel serverSocketChannel) {
super("TcpServer-BindPort-Thread");
this.selector = selector;
this.serverSocketChannel = serverSocketChannel;
}

@Override
public void run() {
while (flag) {
try {
if(serverSocketChannel.isOpen()) {
final SocketChannel accept = serverSocketChannel.accept();
if(accept != null) {
accept.configureBlocking(false);
accept.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE).attach(new ConcurrentLinkedDeque<>());
}
}
} catch (ClosedChannelException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}

public void close() throws IOException {
this.serverSocketChannel.close();
this.flag = false;
}
}
}
  • 使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static void main(String[] args) {
try {
TcpServer server = new TcpServer(8111, mess->{
System.out.println("server收到 [" + mess + "]");
return "来自server的回复=》" + mess;
}, 1024, 2);

TcpClient client = new TcpClient("127.0.0.1", 8111, mess->{
System.out.println("client收到 [" + mess + "]");
return null;
}, 1024, 2);

client.write("test1");
client.write("test2");
} catch (Exception e) {
e.printStackTrace();
}
}