netty入门(十一)任务队列 TaskQueue

命运对每个人都是一样的,不一样的是各自的努力和付出不同,付出的越多,努力的越多,得到的回报也越多,在你累的时候请看一下身边比你成功却还比你更努力的人,这样,你就会更有动力。

导读:本篇文章讲解 netty入门(十一)任务队列 TaskQueue,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com,来源:原文

1.任务队列中的Task有3种典型使用场景

  1. 用户程序自定义的普通任务;

  1. 用户自定义定时任务;

  1. 非当前Reactor线程调用Channel的各种方法;

例如,在推送系统的业务线程里面,根据用户的标识,找到对应的Channel引用,然后调用write类方法向该用户推送消息,就会进入到这种场景。最终的write会提交到任务队列中后被异步消费

2.【用户程序自定义的普通任务】代码示例

2.1问题分析

首先我们 在 NettyServerHandler 的 channelRead() 方法中模拟一下耗时的业务:

public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    // 读取数据实现(这里我们可以读取客户端发送的消息)
    /**
     * 1. ChannelHandlerContext ctx:上下文对象,含有管道 pipeline,通道channel,地址
     * 2. Object msg:就是客户端发送的数据 默认Object
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 比如这里有一个非常耗费时间的业务 -> 异步任务 -> 提交到该channel对应的 NIOEventLoop 的 taskQueue中

        Thread.sleep(10 * 1000);
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~ 喵2", CharsetUtil.UTF_8));
        System.out.println("go on........");
    }

    // 数据读取完毕
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        // writeAndFlush 是 write + flush
        // 将数据写入到缓存,并刷新
        // 一般讲,我们对这个发送的数据进行编码
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~", CharsetUtil.UTF_8));
    }

    // 处理异常,一般是需要关闭通道
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

此时客户端必然是等待 10秒 后才能收到消息:hello,客户端~ 喵2;然后再收到消息:hello,客户端~。

2.2解决方案一:用户程序自定义的普通任务

@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 比如这里有一个非常耗费时间的业务 -> 异步任务 -> 提交到该channel对应的 NIOEventLoop 的 taskQueue中

        // 解决方案1 用户程序自定义的普通任务
        ctx.channel().eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(10 * 1000);
                    ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~ 喵2", CharsetUtil.UTF_8));
                } catch (InterruptedException e) {
                    System.out.println("发生异常 " + e.getMessage());
                    e.printStackTrace();
                }
            }
        });

        System.out.println("go on........");
    }

这是我们启动程序发现:首先收到消息:hello,客户端…;然后过 5秒 收到消息:hello,客户端~ 喵2

netty入门(十一)任务队列 TaskQueue

验证也很简单,Debug查看:ctx –> pipeline –> channel –> eventLoop –> taskQueue,看一下 taskQueue 的size就行了。

2.3解决方案二:用户自定义定时任务

该任务是提交到 scheduleTaskQueue中

@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 比如这里有一个非常耗费时间的业务 -> 异步任务 -> 提交到该channel对应的 NIOEventLoop 的 taskQueue中

        // 解决方案2 用户自定义的定时任务 -> 该任务是提交到 scheduleTaskQueue 中
        ctx.channel().eventLoop().schedule(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(5 * 1000);
                    ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~ 喵4", CharsetUtil.UTF_8));
                } catch (InterruptedException e) {
                    System.out.println("发生异常 " + e.getMessage());
                    e.printStackTrace();
                }
            }
        }, 5, TimeUnit.SECONDS);

        System.out.println("go on........");
    }

验证也很简单,Debug查看:ctx –> pipline –> channel –> eventLoop –> scheduleTaskQueue,看一下 scheduleTaskQueue 的 size 就行了。

方案在说明

  1. Netty抽象出两组线程池,BoosGroup专门负责接收客户端连接,WorkerGroup专门负责网络读写操作。

  1. NioEventLoop 表示一个不断循环执行处理任务的线程,每个 NioEventLoop都有一个selector用于监听绑定在其上的socket网络通道。

  1. NioEventLoop内部采用串行化设计,从消息的读取 -> 解码 -> 处理 -> 编码 -> 发送,始终由IO线程 NioEventLoop负责。

  • NioEventLoopGroup 下包含多个NioEventLoop

  • 每个NioEventLoop中包含有一个Selector,一个taskQueue

  • 每个NioEventLoop的Selector上可以注册监听多个NioChannel

  • 每个NioChannel只会绑定在唯一的NioEventLoop上

  • 每个NioChannel都绑定有一个自己的ChannelPipeline

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/142594.html

(0)
飞熊的头像飞熊bm

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!