简介:
Netty 是一个基于NIO的客户、服务器端的编程框架,它提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序
分析:
package com.yxj.netty.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup parentGroup = new NioEventLoopGroup();//主eventLoopGroup,负责处理请求的的连接事件,并转发给工作eventLoopGroup
EventLoopGroup childGroup = new NioEventLoopGroup();//工作eventLoopGroup,负责处理请求中的读写事件。
try {
ServerBootstrap bootstrap = new ServerBootstrap();//服务端启动类,对应的客户端启动类:Bootstrap
bootstrap.group(parentGroup, childGroup)//设置两个eventLoopGroup,主设置在AbstractBootstrap里,工作设置在ServerBootstrap自身
.channel(NioServerSocketChannel.class)//设置服务端的channel类型为NioServerSocketChannel。对应的客户端为:NioSocketChannel
//.handler(null) //设置主eventLoopGroup的处理handler,这是个示例,实际参数设置成空会报错:handler(null)
.childHandler(new ChannelInitializer<SocketChannel>() {//设置工作eventLoopGroup的处理handler
//netty是基于主从Reactor模型上改造而来,所以实际负责读写处理的都是childGroup,所以这里设置工作eventLoopGroup的handler。
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());//ChannelInboundHandler入栈类型处理:String解码器
pipeline.addLast(new StringEncoder());//ChannelOutboundHandler出栈类型处理:String编码器
pipeline.addLast(new NettyServerHandler());//自定义ChannelInboundHandler类型处理器:服务端处理逻辑
}
});
ChannelFuture future = bootstrap.bind(8888).sync();//核心方法bind。返回一个异步操作结果ChannelFuture
System.out.println("服务器已启动。。。");
future.channel().closeFuture().sync();//监听关闭状态,会自动执行关闭方法。ChannelFuture异步特性
} finally {
//优雅关闭
parentGroup.shutdownGracefully();
childGroup.shutdownGracefully();
}
}
}
上面是一个简单的Netty服务端启动demo示例,其中最核心服务端的启动过程则封装在了bind方法里。点进去查看,发现会进入到一个doBind方法:
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = this.initAndRegister();//完成初始化和注册的工作。启动过程的核心部分在此处
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
} else if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);//完成本地绑定端口的操作(可以看到,先注册完再去绑定网络端口号)
return promise;
} else {
final AbstractBootstrap.PendingRegistrationPromise promise = new AbstractBootstrap.PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {//(可以看到,先注册完再去绑定网络端口号)
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
promise.registered();
AbstractBootstrap.doBind0(regFuture, channel, localAddress, promise);//完成本地绑定端口的操作
}
}
});
return promise;
}
}
doBind方法是在ServerBootstrap的父类AbstractBootstrap里。该方法有两个重要调用,initAndRegister()和doBind0(),doBind0()最后调用java的native方法绑定端口号并开始网络监听。Netty服务启动核心流程就是看initAndRegister()方法
initAndRegister()
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
//通过反射创建一个默认构造的NioServersocketChannel实例
channel = this.channelFactory.newChannel();//这里为什么是NioServersocketChannel?可以自行点进去跟
this.init(channel);//对这个NioServersocketChannel做一些配置上的初始化,该方法是抽象方法,由ServerBootstrap自身实现
} catch (Throwable var3) {
if (channel != null) {
channel.unsafe().closeForcibly();
return (new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE)).setFailure(var3);
}
return (new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE)).setFailure(var3);
}
ChannelFuture regFuture = this.config().group().register(channel);//将这个NioServersocketChannel注册到某某地方?
//记不记得NIO的channel也要注册到Selector上才能用,才能监听!所以这里就是最最最核心的注册步骤。
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
分析源码要抓重点,initAndRegister方法由init和register组成,所以只需要看init和register方法即可
init()
void init(Channel channel) {
setChannelOptions(channel, this.newOptionsArray(), logger);//设置NioServersocketChannel的Options(不是重点)
setAttributes(channel, this.newAttributesArray());//设置NioServersocketChannel的Attributes(不是重点)
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = this.childGroup;//childGroup是服务端启动demo里ServerBootstrap.group()方法里设置进来的工作eventLoopGroup
final ChannelHandler currentChildHandler = this.childHandler;//childHandler是服务端启动demo里ServerBootstrap.childHandler()方法里设置进来的事件处理handler
final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(this.childOptions);//服务端启动demo里ServerBootstrap设置的ChildOptions(上面那个示例demo没写出来)
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(this.childAttrs);//服务端启动demo里ServerBootstrap设置的ChildAttrs(上面那个示例demo没写出来)
p.addLast(new ChannelHandler[]{new ChannelInitializer<Channel>() {//给NioServersocketChannel设置处理channelHandler。
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = ServerBootstrap.this.config.handler();//这里的channelHandler是服务端启动demo里ServerBootstrap.handler(ChannelHandler channelHandler)方法。
//(上面那个示例demo没写出来。handler()方法是给主eventLoopGroup设置事件处理handler。而childHandler()方法则是给工作eventLoopGroup设置事件处理handler)
if (handler != null) {
pipeline.addLast(new ChannelHandler[]{handler});
}
ch.eventLoop().execute(new Runnable() {//不是重点
public void run() {
pipeline.addLast(new ChannelHandler[]{new ServerBootstrap.ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)});
}
});
}
}});
}
init方法总结下来:完成NioServersocketChannel中的Options、Attributes以及handler的初始化。
register()
其实最终调用也不在SingleThreadEventLoopGroup里。还有下一层
public ChannelFuture register(Channel channel) {
return this.register((ChannelPromise)(new DefaultChannelPromise(channel, this)));
}
public ChannelFuture register(ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
//在这里会调用AbstractUnsafe的register方法去完成注册
promise.channel().unsafe().register(this, promise);//AbstractUnsafe是AbstractChannel的一个内部类
return promise;
}
继续往下看AbstractUnsafe的register方法
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
ObjectUtil.checkNotNull(eventLoop, "eventLoop");
if (AbstractChannel.this.isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
} else if (!AbstractChannel.this.isCompatible(eventLoop)) {
promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
} else {
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {//判断当前进行register方法的线程是当前选中的eventLoop。
//上面图片中提到的MultithreadEventLoopGroup.register方法,其在主eventLoopGroup中随机选择了一个eventLoop去和NioServersocketChannel绑定。
//一个NioServersocketChannel对应一个eventLoop。一个eventLoop可以对应多个NioServersocketChannel
this.register0(promise);//核心方法
} else {
//如果当前线程不是被选中的eventLoop,那么则将注册任务封装成一个task由该eventLoop去执行
try {
eventLoop.execute(new Runnable() {
public void run() {
AbstractUnsafe.this.register0(promise);
}
});
} catch (Throwable var4) {
AbstractChannel.logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, var4);
this.closeForcibly();
AbstractChannel.this.closeFuture.setClosed();
this.safeSetFailure(promise, var4);
}
}
}
}
private void register0(ChannelPromise promise) {
try {
if (!promise.setUncancellable() || !this.ensureOpen(promise)) {
return;
}
boolean firstRegistration = this.neverRegistered;//是否第一次注册,默认为true
AbstractChannel.this.doRegister();//最终执行注册的方法
this.neverRegistered = false;//注册完后就将“重未注册标识改为false”
AbstractChannel.this.registered = true;
AbstractChannel.this.pipeline.invokeHandlerAddedIfNeeded();
this.safeSetSuccess(promise);
AbstractChannel.this.pipeline.fireChannelRegistered();//执行channelhandler链的ChannelRegistered()方法
if (AbstractChannel.this.isActive()) {
if (firstRegistration) {
AbstractChannel.this.pipeline.fireChannelActive();//执行channelhandler链的ChannelActive()方法。不过第一次被调用时不会走这里。第一次调用时,channelhandler链的ChannelActive()方法在其他方法里被执行
} else if (AbstractChannel.this.config().isAutoRead()) {
this.beginRead();
}
}
} catch (Throwable var3) {
this.closeForcibly();
AbstractChannel.this.closeFuture.setClosed();
this.safeSetFailure(promise, var3);
}
}
接下来看最终注册的方法,在AbstractNioChannel的doRegister()
protected void doRegister() throws Exception {
boolean selected = false;
while(true) {
try {
//将当前的eventLoop和当前的NioServerSocketChannel绑定,返回一个标识selectionKey。完成注册
//Netty是基于nio上封装的框架,nio将channel绑定到selector后开始监听channel。而Netty则是将channel绑定到eventLoop上开始监听
this.selectionKey = this.javaChannel().register(this.eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException var3) {
if (selected) {
throw var3;
}
this.eventLoop().selectNow();
selected = true;
}
}
}
小结
Netty服务通过链式调用将一系列必要参数设置进ServerBootstarp中,然后调用bind方法完成配置的初始化和注册,以及网络端口的绑定。
bind方法里调用了两个重要方法:initAndRegister()和doBind0()
initAndRegister():创建一个默认构造的NioServerSocketChannel,随后初始化各种配置:主eventGroup、工作EventLoopGroup、事件处理handler等。最后将这个NioServerSocketChannel与主eventLoopGroup中随机选一个eventLoop进行绑定注册,返回一个selectionKey用于标识二者之间的关系。监听的时候可以通过这个selectionKey知道事件来源是那个channel。
doBind0():则是通过调用java的native方法绑定本地网络端口完成整个服务启动过程
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/99047.html