0%

Netty简单使用

文章字数:659,阅读全文大约需要2分钟

一个基于socket的Netty客户端和服务端相互通讯

服务端

服务类,用于创建Netty服务绑定端口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package io.greatcolin.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/** Netty服务器类,用于接收请求
* @author colin.cheng
* @version V1.0
* @date Created In 13:50 2019/8/14
*/
public class NettyServer {

/**
* 启动服务
* @param port 启动时绑定的端口
*/
public void bind(int port){
//Reactor线程组,一个用来处理连接,一个用来处理网络读写
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
try{
//启动NIO服务端的辅助启动类
ServerBootstrap serverBootstrap = new ServerBootstrap();
//绑定线程组
serverBootstrap.group(bossGroup,workGroup)
//指定通道类型(服务端是NioServerSocketChannel)
.channel(NioServerSocketChannel.class)
//设置通道的处理器
.option(ChannelOption.SO_BACKLOG,1024)
//子通道
.childHandler(new ChildChannelHandler());
//绑定并监听端口
ChannelFuture future = serverBootstrap.bind(port).sync();
System.out.println(Thread.currentThread().getName()+",启动成功,等待请求中");
//future.channel()获取程序的channel,等待结束(closeFuthre),阻塞(sync)
future.channel().closeFuture().sync();
}catch (Exception e){
e.printStackTrace();
}finally {
//退出,释放资源
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}


private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel arg0) throws Exception {
//ChannelPipeline是一个链式的处理请求的流程
arg0.pipeline().addLast(new NettyServerHandler());
}
}
}

处理请求的具体实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package io.greatcolin.server;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
* @author colin.cheng
* @version V1.0
* @date Created In 14:31 2019/8/14
*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

/**
* 接收客户端消息,自动触发
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

//转换msg成Netty的ByteBuf对象,类似ByteBuffer(缓冲区)
ByteBuf buf = (ByteBuf) msg;
//创建缓冲区内信息大小的byte类型数组
byte[] reg = new byte[buf.readableBytes()];
//读取缓冲区的信息并转换成字符串
buf.readBytes(reg);
String mess = new String(reg,"UTF-8");
System.out.println("mess = " + mess);
//回复消息
String respMess = "收到";
ByteBuf respByteBuf = Unpooled.copiedBuffer(respMess.getBytes());
ctx.write(respByteBuf);
}

/**
*当Channel上的一个读操作完成时被调用( channelRead是进行拆包和粘包之后的请求,channelReadComplete则是具体的每一次发送请求)
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//读完后刷新缓冲区,发送
ctx.flush();
}

/**
* 异常发生
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//发送异常关闭context,释放相关资源
ctx.close();
}
}

客户端

建立连接请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package io.greatcolin.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

/**客户端,发起连接请求
* @author colin.cheng
* @version V1.0
* @date Created In 15:19 2019/8/14
*/
public class NettyClient {

/**
* 连接目标服务器
* @param host
* @param port
*/
public void connect(String host,int port){
//NIO线程组
EventLoopGroup group = new NioEventLoopGroup();
try{
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY,true)
.handler(new ChileHandler());
ChannelFuture channelFuture = bootstrap.connect(host,port).sync();
System.out.println(Thread.currentThread().getName()+",发起连接请求");
//等待客户端链路关闭
channelFuture.channel().closeFuture().sync();
}catch (Exception e){
e.printStackTrace();
}finally {
//释放资源
group.shutdownGracefully();
}
}

private class ChileHandler extends ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new NettyClientHanlder());
}
}
}

具体处理事件的类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package io.greatcolin.client;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**处理请求
* @author colin.cheng
* @version V1.0
* @date Created In 16:32 2019/8/14
*/
public class NettyClientHanlder extends ChannelInboundHandlerAdapter {

/**
* 连接成功之后执行
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
String sendMess = "我是客户端:"+Thread.currentThread().getName();
byte[] sendMesByte = sendMess.getBytes("UTF-8");
ByteBuf sendByteBuf = Unpooled.buffer(sendMesByte.length);
sendByteBuf.writeBytes(sendMesByte);
ctx.writeAndFlush(sendByteBuf);
}

/**
* 接收返回的消息
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf)msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String mess = new String(req,"UTF-8");
System.out.println(Thread.currentThread().getName()+"接收到返回的消息:"+mess);
ctx.close();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}

使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package io.greatcolin;

import io.greatcolin.client.NettyClient;
import io.greatcolin.server.NettyServer;

import java.util.concurrent.TimeUnit;

/**
* Hello world!
*
*/
public class App
{
public static void main( String[] args )
{
//开启服务,服务会阻塞,所以使用一个线程开启
new Thread(new Runnable() {
@Override
public void run() {
NettyServer server = new NettyServer();
server.bind(7777);
}
}).start();

NettyClient client = new NettyClient();
for (int i=0;i<3;i++){
App.sleep();
client.connect("127.0.0.1",7777);
System.out.println("------");
}
}

//休眠3秒
public static void sleep(){
try {
TimeUnit.SECONDS.sleep(3);
}catch (Exception e){
e.printStackTrace();
}
}
}

运行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
Thread-0,启动成功,等待请求中
main,发起连接请求
mess = 我是客户端:nioEventLoopGroup-4-1
nioEventLoopGroup-4-1接收到返回的消息:收到
------
main,发起连接请求
mess = 我是客户端:nioEventLoopGroup-5-1
nioEventLoopGroup-5-1接收到返回的消息:收到
------
main,发起连接请求
mess = 我是客户端:nioEventLoopGroup-6-1
nioEventLoopGroup-6-1接收到返回的消息:收到
------