Press "Enter" to skip to content

Netty 报错:is not a Sharable handler, so can’t be added or removed multiple times

尝试用netty写了个日志收集服务,类似于logstash的功能,因为我们才采集日志的时候可能有很多的策略要去做,logstash的功能不够支撑,所以尝试使用netty自己写。

看了一些netty的教程,大概是以下几个步骤

  • new两个 NioEventLoopGroup

一个是bossGroup,一个是workerGroup

  • new一个ServerBootstrap

搞出来的对象暂且叫b

  • b.group(bossGroup,workerGroup)

帮这两个group搞里头

  • b.channel(NioServerSocketChannel.class)

指定下我们server所使用的channel模型

  • b.childHandler

给我们的server配置handler

  • 一些其他配置

完整代码如下:

public class NettyServer {

    private int port;

    private final MessageHandler messageHandler;

    public NettyServer(int port, MessageHandler messageHandler) {
        this.messageHandler = messageHandler;
        this.port = port;
    }

    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) {
                            ch.pipeline().addLast(
                                    new ProcessingHandler(messageHandler));
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            log.info("starting netty server " + Thread.currentThread().getName() + " on port " + port);
            ChannelFuture f = b.bind(port).sync();
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

然后我们只要启动这个server就行了:

(new Thread(() -> {
                while (true) {
                    try {
                        new NettyServer(5000, new LogstashHandler(kafkaProducer, config)).run();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }, "logstash-server")).start();

你可能注意到上面有个 ProcessingHandler,这个是数据处理的核心代码:

public class ProcessingHandler extends ChannelInboundHandlerAdapter {

    private StringBuffer sBuffer = new StringBuffer();

    private static final String NEW_LINE = "\n";

    private static final String HOST_KEY = "host";

    private final MessageHandler messageHandler;

    ObjectMapper mapper = new ObjectMapper();

    public ProcessingHandler(MessageHandler messageHandler) {
        this.messageHandler = messageHandler;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {

        ByteBuf in = (ByteBuf) msg;
        final String s = in.toString(CharsetUtil.UTF_8);
        try{
            if(s.endsWith(NEW_LINE)) {
                String hostAddress = "";
                if(ctx.channel().remoteAddress() instanceof InetSocketAddress){
                    hostAddress = ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress().getHostAddress();
                }
                if(sBuffer.length() == 0) {
                    onMessage(s, hostAddress);
                }else {
                    sBuffer.append(s);
                    onMessage(sBuffer.toString(), hostAddress);
                    sBuffer.delete(0, sBuffer.length());
                }
            }else {
                sBuffer.append(s);
            }
        }catch(Exception e){
            e.printStackTrace();
            log.error(e.getMessage());
        }finally{
            in.release();
        }

    }

    @SuppressWarnings("unchecked")
    private void onMessage(String message, String host) {
        String[] messages = message.split(NEW_LINE);
        for (String s : messages) {
            try {
                HashMap<String, String> hashMap = mapper.readValue(s, HashMap.class);
                hashMap.put(HOST_KEY, host);
                messageHandler.onMessage(hashMap);
            } catch (Exception e) {
                e.printStackTrace();
                log.error("data:" + message + " error:" + e.getMessage());
            }
        }
    }

}

我们自己写个ProcessingHandler 来继承ChannelInboundHandlerAdapter,在他的channelRead里面就能读取到对端数据了。

起初我在写的时候,发现channelRead里面读取的数据可能是残的,就是说如果有大的包,logback会拆包发给服务端。

那我就根据行标识来认定数据是一条完整数据。

logback可能还会合并包,所以我在我自己写的onMessage里面还要再拆包。

大概逻辑没啥问题

我第一个版本写的代码是在 NettyServer的构造函数里面写的 new ProcessingHandler(messageHandler) 然后直接挂到NettyServer属性上面,然后在他的 ChannelInitializer里面直接这样写了

ch.pipeline().addLast(processingHandler)

项目启动,有客户端连上来的时候,控制台会报错:

Failed to initialize a channel. Closing: [id: 0xe648b9c0, L:/127.0.0.1:5000 – R:/127.0.0.1:45748]

io.netty.channel.ChannelPipelineException: com.logdevour.devour.netty.handler.ProcessingHandler is not a @Sharable handler, so can’t be added or removed multiple times.

然后我在网上找了些资料,说是要在我的ProcessingHandler上面加个注解@Sharable,然后我就加了这个注解,确实不报错了。

项目跑了一段时间,我发现数据错乱,A客户端的数据会跑到B客户端上面来,导致解析也各种报错。

然后我又仔细研究了一下,因为我前面提到我的客户端在传数据的时候是分段的,然后我在ProcessingHandler里面定义了一个StringBuffer来暂存数据的,如果我还用单例来传给pipeline的话,相当于每个客户端都是在共享这个对象了,难怪他们的数据会混到一起了。

这也是官方为什么要限制你必须要显示的配置@Sharable注解的原因了,你应该真的确定你的这个handler是可以share的,不然的话,必须要在ChannelInitializer的时候每次传入不同的对象实例避免他们数据干扰。具体的写法就是我最前面贴的那样,你要在ChannelInitializer里面去new他就行了。

结论

@Sharable注解不要乱用,你的handler真的是可以share的才可以加,不然的话,需要你每次在初始化会话的时候new新的handler进去。你项目的具体情况你自己需要判断(是否有数据共享),而不是向网上大多数答案那样单纯的加个注解来解决。

参考资料:

https://stackoverflow.com/questions/25714860/adding-netty-handler-after-channel-initialization-netty-4-0-17-final

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注