Netty入门

  6 分钟   10948 字    |    

Netty入门

简介

Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

  • 高性能,低延迟

“网络编程只要你使用了 Netty 框架,你的程序性能基本就不会差。”

  • 弥补Java NIO缺陷

Netty 在 NIO 基础上进行了更高层次的封装,屏蔽了 NIO 的复杂性; Netty 更加可靠稳定,修复和完善了 JDK NIO 较多已知问题; Netty 具有较好的可扩展性

  • 更低的资源消耗

Netty 通过复用对象,避免频繁创建和销毁带来的开销;提供了更多面向用户态的零拷贝技术,避免了数据在堆内存和堆外内存之间的拷贝

集成

服务端

  1. 导入依赖
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.36.Final</version>
</dependency>
  1. 编写netty处理器
@Slf4j
public class SocketHandler extends ChannelInboundHandlerAdapter {
    public static final ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    /**
     * 读取到客户端发来的消息
     *
     * @param ctx ChannelHandlerContext
     * @param msg msg
     * @throws Exception e
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //获取客户端发送过来的消息
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("收到客户端" + ctx.channel().remoteAddress() + "发送的消息:" + byteBuf.toString(CharsetUtil.UTF_8));
    }
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        //发送消息给客户端
        ctx.writeAndFlush(Unpooled.copiedBuffer("服务端已收到消息", CharsetUtil.UTF_8));
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        log.info("新的客户端链接:" + ctx.channel().id().asShortText());
        clients.add(ctx.channel());
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        clients.remove(ctx.channel());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.channel().close();
        clients.remove(ctx.channel());
    }
}
当byteBuf传递到下一个handler时,最后一个handler记得释放:ReferenceCountUtil.release(msg);
  1. 编写netty初始化器
@Component
public class SocketInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();
        // 添加上自己的处理器
        pipeline.addLast(new SocketHandler());
    }
}
  1. 编写netty服务
@Slf4j
@Component
public class SocketServer {
    @Resource
    private SocketInitializer socketInitializer;

    @Getter
    private ServerBootstrap serverBootstrap;

    /**
     * netty服务监听端口
     */
    @Value("${netty.port:8088}")
    private int port;
    /**
     * 主线程组数量
     */
    @Value("${netty.bossThread:1}")
    private int bossThread;

    /**
     * 启动netty服务器
     */
    public void start() {
        this.init();
        this.serverBootstrap.bind(this.port);
        log.info("Netty started on port: {} (TCP) with boss thread {}", this.port, this.bossThread);
    }

    /**
     * 初始化netty配置
     */
    private void init() {
        // 创建两个线程组,bossGroup为接收请求的线程组,一般1-2个就行
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(this.bossThread);
        // 实际工作的线程组
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        this.serverBootstrap = new ServerBootstrap();
        this.serverBootstrap.group(bossGroup, workerGroup) // 两个线程组加入进来
                .channel(NioServerSocketChannel.class)  // 配置为nio类型
                .childHandler(this.socketInitializer); // 加入自己的初始化器
    }
}
  1. 启动netty
@Component
public class NettyStartListener implements ApplicationRunner {
    @Resource
    private SocketServer socketServer;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        this.socketServer.start();
    }
}

客户端

  1. 编写客户端处理器
public class MyClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //发送消息到服务端
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello,netty", CharsetUtil.UTF_8));
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //接收服务端发送过来的消息
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("收到服务端" + ctx.channel().remoteAddress() + "的消息:" + byteBuf.toString(CharsetUtil.UTF_8));
    }
}
  1. 启动客户端
void testNetty() throws IOException {
        NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
        try {
            //创建bootstrap对象,配置参数
            Bootstrap bootstrap = new Bootstrap();
            //设置线程组
            bootstrap.group(eventExecutors)
                    //设置客户端的通道实现类型
                    .channel(NioSocketChannel.class)
                    //使用匿名内部类初始化通道
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //添加客户端通道的处理器
                            ch.pipeline().addLast(new MyClientHandler());
                        }
                    });
            System.out.println("客户端就绪");
            //连接服务端
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8088).sync();
            //对通道关闭进行监听
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            //关闭线程组
            eventExecutors.shutdownGracefully();
        }
    }

解码器

TCP协议是个“流”性质协议,它的底层根据二进制缓冲区的实际情况进行包的划分,会把上层(Netty层)的ByteBuf包,进行重新的划分和重组,组成一帧一帧的二进制数据。一个上层Netty中的 ByteBuf包,可能会被TCP底层拆分成多个二进制数据帧进行发送,也有可能底层将多个小的ByteBuf包,封装成一个大的底层数据帧发送出去。

如何从底层的二进制数据帧中,界定出来上层数据包的边界,也即是上层包的起点和末尾就是解码器的作用之一。

Netty中提供了几个重要的可以直接使用的帧解码器

详见参考文献[ 5 ]

LineBasedFrameDecoder

基于行分隔符的帧解码器,即会按照行分隔符对数据进行拆包粘包,解码出ByteBuf

DelimiterBasedFrameDecoder

基于分隔符的帧解码器,即会按照指定分隔符对数据进行拆包粘包,解码出ByteBuf

FixedLengthFrameDecoder

基于固定长度帧解码器,即会按照指定的长度对Frame中的数据进行拆包粘包

LengthFieldBasedFrameDecoder

LengthFieldBasedFrameDecoder是一个基于数据体长度的解码器,解码出ByteBuf

public class DataDecoder extends LengthFieldBasedFrameDecoder {
    /**
     * @param maxFrameLength      解码时,处理每个帧数据的最大长度
     * @param lengthFieldOffset   该帧数据中,存放该帧数据的长度的数据的起始位置,数量为前面的字节总数
     * @param lengthFieldLength   记录该帧数据长度的字段本身的长度
     * @param lengthAdjustment    修改帧数据长度字段中定义的值,可以为负数
     * @param initialBytesToStrip 解析的时候需要跳过的字节数
     * @param failFast            为true,当frame长度超过maxFrameLength时立即报TooLongFrameException异常,为false,读取完整个帧再报异常
     */
    public DataDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength,
                       int lengthAdjustment, int initialBytesToStrip, boolean failFast) {
        super(maxFrameLength, lengthFieldOffset, lengthFieldLength,
                lengthAdjustment, initialBytesToStrip, failFast);
    }

    /**
     * 重写计算数据长度的方法
     * @param buf
     * @param offset
     * @param length
     * @param order
     * @return
     */
    @Override
    protected long getUnadjustedFrameLength(ByteBuf buf, int offset, int length, ByteOrder order){
        byte aByte = buf.getByte(offset) ;
        int len = Byte.toUnsignedInt(aByte);
        return len * 12;
    }
}
有的数据体长度是自定义的,无法通过常用的字节转换进行换算,此处对getUnadjustedFrameLength进行重写,通过自定义算法换算数据体长度

参考

[ 1 ] https://www.jianshu.com/p/b60180a0a0e6

[ 2 ] https://developer.aliyun.com/article/769587

[ 3 ] https://learn.lianglianglee.com/

[ 4 ] https://zhuanlan.zhihu.com/p/95621344

[ 5 ] https://fundodoo.com/zh-CN/2020/04/24/18.html#lengthfieldbasedframedecoder

~  ~  The   End  ~  ~


 赏 
感谢您的支持,我会继续努力哒!
支付宝收款码
tips
文章二维码 分类标签:技术springbootnetty
文章标题:Netty入门
文章链接:http://120.46.217.131:82/archives/57/
最后编辑:2023 年 3 月 23 日 12:46 By Yang
许可协议: 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0)
(*) 7 + 3 =
快来做第一个评论的人吧~