文章字数:659,阅读全文大约需要2分钟
一个基于socket的Netty客户端和服务端相互通讯
服务端
服务类,用于创建Netty服务绑定端口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| package io.greatcolin.server;
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;
public class NettyServer {
public void bind(int port){ EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workGroup = new NioEventLoopGroup(); try{ ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup,workGroup) .channel(NioServerSocketChannel.class) //设置通道的处理器 .option(ChannelOption.SO_BACKLOG,1024) //子通道 .childHandler(new ChildChannelHandler()); ChannelFuture future = serverBootstrap.bind(port).sync(); System.out.println(Thread.currentThread().getName()+",启动成功,等待请求中"); future.channel().closeFuture().sync(); }catch (Exception e){ e.printStackTrace(); }finally { bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } }
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel arg0) throws Exception { arg0.pipeline().addLast(new NettyServerHandler()); } } }
|
处理请求的具体实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| package io.greatcolin.server;
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter;
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg; byte[] reg = new byte[buf.readableBytes()]; buf.readBytes(reg); String mess = new String(reg,"UTF-8"); System.out.println("mess = " + mess); String respMess = "收到"; ByteBuf respByteBuf = Unpooled.copiedBuffer(respMess.getBytes()); ctx.write(respByteBuf); }
@Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
|
客户端
建立连接请求
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| package io.greatcolin.client;
import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
public class NettyClient {
public void connect(String host,int port){ EventLoopGroup group = new NioEventLoopGroup(); try{ Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY,true) .handler(new ChileHandler()); ChannelFuture channelFuture = bootstrap.connect(host,port).sync(); System.out.println(Thread.currentThread().getName()+",发起连接请求"); channelFuture.channel().closeFuture().sync(); }catch (Exception e){ e.printStackTrace(); }finally { group.shutdownGracefully(); } }
private class ChileHandler extends ChannelInitializer<SocketChannel>{ @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new NettyClientHanlder()); } } }
|
具体处理事件的类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| package io.greatcolin.client;
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter;
public class NettyClientHanlder extends ChannelInboundHandlerAdapter {
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { String sendMess = "我是客户端:"+Thread.currentThread().getName(); byte[] sendMesByte = sendMess.getBytes("UTF-8"); ByteBuf sendByteBuf = Unpooled.buffer(sendMesByte.length); sendByteBuf.writeBytes(sendMesByte); ctx.writeAndFlush(sendByteBuf); }
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf)msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String mess = new String(req,"UTF-8"); System.out.println(Thread.currentThread().getName()+"接收到返回的消息:"+mess); ctx.close(); }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
|
使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| package io.greatcolin;
import io.greatcolin.client.NettyClient; import io.greatcolin.server.NettyServer;
import java.util.concurrent.TimeUnit;
public class App { public static void main( String[] args ) { new Thread(new Runnable() { @Override public void run() { NettyServer server = new NettyServer(); server.bind(7777); } }).start();
NettyClient client = new NettyClient(); for (int i=0;i<3;i++){ App.sleep(); client.connect("127.0.0.1",7777); System.out.println("------"); } }
public static void sleep(){ try { TimeUnit.SECONDS.sleep(3); }catch (Exception e){ e.printStackTrace(); } } }
|
运行结果
1 2 3 4 5 6 7 8 9 10 11 12 13
| Thread-0,启动成功,等待请求中 main,发起连接请求 mess = 我是客户端:nioEventLoopGroup-4-1 nioEventLoopGroup-4-1接收到返回的消息:收到 ------ main,发起连接请求 mess = 我是客户端:nioEventLoopGroup-5-1 nioEventLoopGroup-5-1接收到返回的消息:收到 ------ main,发起连接请求 mess = 我是客户端:nioEventLoopGroup-6-1 nioEventLoopGroup-6-1接收到返回的消息:收到 ------
|