Netty中实现多客户端连接与通信-以实现聊天室群聊功能为例(附代码下载)

生活中,最使人疲惫的往往不是道路的遥远,而是心中的郁闷;最使人痛苦的往往不是生活的不幸,而是希望的破灭;最使人颓废的往往不是前途的坎坷,而是自信的丧失;最使人绝望的往往不是挫折的打击,而是心灵的死亡。所以我们要有自己的梦想,让梦想的星光指引着我们走出落漠,走出惆怅,带着我们走进自己的理想。

导读:本篇文章讲解 Netty中实现多客户端连接与通信-以实现聊天室群聊功能为例(附代码下载),希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com,来源:原文

场景

Netty的Socket编程详解-搭建服务端与客户端并进行数据传输:

https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/108615023

在此基础上要实现多个客户端之间通信,实现类似群聊或者聊天室的功能。

注:

博客:
https://blog.csdn.net/badao_liumang_qizhi
关注公众号
霸道的程序猿
获取编程相关电子书、教程推送与免费下载。

实现

在上面实现的服务端与客户端通信的基础上,在src下新建com.badao.Char包,包下新建ChatServer类作为聊天室的服务端。

package com.badao.Chat;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class ChatServer {
    public static void main(String[] args) throws  Exception
    {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try{
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
                    .childHandler(new ChatServerInitializer());
            //绑定端口
            ChannelFuture channelFuture = serverBootstrap.bind(70).sync();
            channelFuture.channel().closeFuture().sync();
        }finally {
            //关闭事件组
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

在上面中绑定70端口并添加了一个服务端的初始化器ChatServerInitializer

所以新建类ChatServerInitializer

package com.badao.Chat;


import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;

public class ChatServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()));
        pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
        pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
        pipeline.addLast(new ChatServerHandler());
    }
}

使其继承ChannelInitializer,并重写InitChannel方法,在方法中使用Netty自带的处理器进行编码的处理并最后添加一个自定义的处理器ChatServerHandler

新建处理器类ChatServerHandler

package com.badao.Chat;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

public class ChatServerHandler extends SimpleChannelInboundHandler<String> {

    private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        Channel channel = ctx.channel();

        channelGroup.forEach(ch->{
            if(channel!=ch)
            {
                ch.writeAndFlush(channel.remoteAddress()+"发送的消息:"+msg+"\n");
            }
            else
            {
                ch.writeAndFlush("[自己]:"+msg+"\n");
            }
        });
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        channelGroup.writeAndFlush("[服务器]:"+channel.remoteAddress()+"加入\n");
        channelGroup.add(channel);
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        channelGroup.writeAndFlush("[服务器]:"+channel.remoteAddress()+"离开\n");
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        System.out.println(channel.remoteAddress()+"上线了\n");
        System.out.println("当前在线人数:"+channelGroup.size());
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        System.out.println(channel.remoteAddress()+"下线了\n");
        System.out.println("当前在线人数:"+channelGroup.size());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

使处理器继承SimpleChannelinboundHandler并重写channelRead0方法。

在最上面声明了一个通道组的通过 DefaultChannelGroup(GlobalEventExecutor.INSTANCE)

获取其单例,只要是建立连接的客户端都会自动添加进此通道组中。

然后只要是客户端与服务端发送消息后就会执行该方法。

在此方法中直接遍历通道组,判断通道组里面的每一个客户端是不是当前发消息的客户端。

如果是就显示自己发送消息,如果不是则获取远程地址并显示发送消息。

然后就是实现客户端的上线功能以及在线人数统计的功能。

在上面的处理器中重写channelActive方法,此方法会在通道激活即建立连接后调用

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        System.out.println(channel.remoteAddress()+"上线了\n");
        System.out.println("当前在线人数:"+channelGroup.size());
    }

同理重写channelInactive方法,此方法会在断掉连接后调用

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        System.out.println(channel.remoteAddress()+"下线了\n");
        System.out.println("当前在线人数:"+channelGroup.size());
    }

然后就是实现向所有的客户端广播新建客户端加入聊天室的功能

重写handlerAdded方法,此方法会在将通道添加到通道组中调用,所以在此方法中获取加入到通道组的远程地址

并使用channelGroup的writeAndFlush方法就能实现向所有建立连接的客户端发送消息,新的客户端刚上线时不用向自己

发送上线消息,所以在广播完上线消息后再讲此channel添加到channelGroup中。

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        channelGroup.writeAndFlush("[服务器]:"+channel.remoteAddress()+"加入\n");
        channelGroup.add(channel);
    }

同理实现下线提醒需要重写handlerRemoved方法

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        channelGroup.writeAndFlush("[服务器]:"+channel.remoteAddress()+"离开\n");
    }

但是此方法中不用手动从channelGroup中手动去掉channel,因为Netty会自动将其移除掉。

服务端搭建完成之后再搭建客户端,新建ChatClient类并编写main方法,在main方法中

package com.badao.Chat;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.io.BufferedReader;
import java.io.InputStreamReader;

public class ChatClient {
    public static void main(String[] args) throws  Exception {
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        try {

            Bootstrap bootstrap = new Bootstrap();

            bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
                    .handler(new ChatClientInitializer());
            //绑定端口
            Channel channel = bootstrap.connect("localhost", 70).channel();
            BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
            for(;;)
            {
               channel.writeAndFlush(br.readLine()+"\r\n");
            }
        } finally {
            //关闭事件组
            eventLoopGroup.shutdownGracefully();

        }
    }
}

在客户端中读取输入的内容并在一个无限循环中将输入的内容发送至服务端。

在Client中建立对服务端的连接同理也要设置一个初始化器ChatClientInitializer

新建初始化器的类ChatClientInitializer

package com.badao.Chat;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;

public class ChatClientInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()));
        pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
        pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
        pipeline.addLast(new ChatClientHandler());
    }
}

使用Netty自带的处理器对编码进行处理并添加一个自定义的处理器ChatClientHandler

新建类ChatClientHandler

package com.badao.Chat;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class ChatClientHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println(msg);
    }
}

在重写的channelRead0方法中只需要将收到的消息进行输出即可。

现在运行服务端的main方法

为了能运行多个客户端在IDEA中客户端编辑

 

Netty中实现多客户端连接与通信-以实现聊天室群聊功能为例(附代码下载)

然后将下面的勾选上

 

Netty中实现多客户端连接与通信-以实现聊天室群聊功能为例(附代码下载)

然后首先运行一个客户端

 

Netty中实现多客户端连接与通信-以实现聊天室群聊功能为例(附代码下载)

那么在服务端中就会输出上线的客户端以及在线人数

再次运行客户端的main方法,此时服务端会输出两个客户端上线

 

Netty中实现多客户端连接与通信-以实现聊天室群聊功能为例(附代码下载)

同时在第二个客户端上线时第一个客户端会收到加入的提示

 

Netty中实现多客户端连接与通信-以实现聊天室群聊功能为例(附代码下载)

此时停掉第二个客户端即将第二个客户端下线

服务端会提示下线并更新在线人数

Netty中实现多客户端连接与通信-以实现聊天室群聊功能为例(附代码下载)

同时在第一个客户端会收到服务端的推送

 

Netty中实现多客户端连接与通信-以实现聊天室群聊功能为例(附代码下载)

再运行第二个客户端,并在控制台输入消息,回车发送

 

Netty中实现多客户端连接与通信-以实现聊天室群聊功能为例(附代码下载)

此时第一个客户端就会收到第二个客户端发送的消息。

 

Netty中实现多客户端连接与通信-以实现聊天室群聊功能为例(附代码下载)

然后第一个客户端再输入一个消息并回车

 

Netty中实现多客户端连接与通信-以实现聊天室群聊功能为例(附代码下载)

那么第二个客户端也能收到消息

 

Netty中实现多客户端连接与通信-以实现聊天室群聊功能为例(附代码下载)

示例代码下载:

https://download.csdn.net/download/BADAO_LIUMANG_QIZHI/12850228

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/136539.html

(0)
飞熊的头像飞熊bm

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!