热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Netty分布式pipeline管道传播outBound事件的示例分析

这篇文章将为大家详细讲解有关Netty分布式pipeline管道传播outBound事件的示例分析,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完

这篇文章将为大家详细讲解有关Netty分布式pipeline管道传播outBound事件的示例分析,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。

outbound事件传输流程

在我们业务代码中, 有可能使用wirte方法往写数据:

public void channelActive(ChannelHandlerContext ctx) throws Exception {
    ctx.channel().write("test data");
}

当然, 直接调用write方法是不能往对方channel中写入数据的, 因为这种方式只能写入到缓冲区, 还要调用flush方法才能将缓冲区数据刷到channel中, 或者直接调用writeAndFlush方法, 有关逻辑, 我们会在后面章节中详细讲解, 这里只是以wirte方法为例为了演示outbound事件的传播的流程

这里我们同样给出两种写法

public void channelActive(ChannelHandlerContext ctx) throws Exception {
    //写法1
    ctx.channel().write("test data");
    //写法2
    ctx.write("test data");
}

这两种写法有什么区别, 我们首先跟到第一种写法中去:

ctx.channel().write("test data");

这里获取ctx所绑定的channel

我们跟到AbstractChannel的write方法中:

public ChannelFuture write(Object msg) {
    return pipeline.write(msg);
}

这里pipeline是DefaultChannelPipeline

跟到其write方法中:
public final ChannelFuture write(Object msg) {
    //从tail节点开始(从最后的节点往前写)
    return tail.write(msg);
}

这里调用tail节点write方法, 这里我们应该能分析到, outbound事件, 是通过tail节点开始往上传播的, 带着这点猜想, 我们继往下看

其实tail节点并没有重写write方法, 最终会调用其父类AbstractChannelHandlerContext的write方法

AbstractChannelHandlerContext的write方法:

public ChannelFuture write(Object msg) { 
    return write(msg, newPromise());
}

我们看到这里有个newPromise()这个方法, 这里是创建一个Promise对象, 有关Promise的相关知识我们会在以后的章节剖析

我们继续跟write:

public ChannelFuture write(final Object msg, final ChannelPromise promise) {
    //代码省略
    write(msg, false, promise);
    return promise;
}

继续跟write:

private void write(Object msg, boolean flush, ChannelPromise promise) { 
    AbstractChannelHandlerContext next = findContextOutbound();
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        if (flush) {
            next.invokeWriteAndFlush(m, promise);
        } else {
            //没有调flush
            next.invokeWrite(m, promise);
        }
    } else {
        AbstractWriteTask task;
        if (flush) {
            task = WriteAndFlushTask.newInstance(next, m, promise);
        }  else {
            task = WriteTask.newInstance(next, m, promise);
        }
        safeExecute(executor, task, promise, m);
    }
}

这里跟我们上一小节剖析过channelRead方法有点类似, 但是事件传输的方向有所不同, 这里findContextOutbound()是获取上一个标注outbound事件的HandlerContext

跟到findContextOutbound中
private AbstractChannelHandlerContext findContextOutbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.prev;
    } while (!ctx.outbound);
    return ctx;
}

这里的逻辑我们似曾相识, 跟我们上一小节的findContextInbound()方法有点像, 只是过程是反过来的

在这里, 会找到当前context的上一个节点, 如果标注的事件不是outbound事件, 则继续往上找, 意思就是找到上一个标注outbound事件的节点

回到write方法:
AbstractChannelHandlerContext next = findContextOutbound();

这里将找到节点赋值到next属性中

因为我们之前分析的write事件是从tail节点传播的, 所以上一个节点就有可能是用户自定的handler所属的context

然后判断是否为当前eventLoop线程, 如果是不是, 则封装成task异步执行, 如果不是, 则继续判断是否调用了flush方法, 因为我们这里没有调用, 所以会执行到next.invokeWrite(m, promise),

我们继续跟invokeWrite

private void invokeWrite(Object msg, ChannelPromise promise) {
    if (invokeHandler()) {
        invokeWrite0(msg, promise);
    } else {
        write(msg, promise);
    }
}

这里会判断当前handler的状态是否是添加状态, 这里返回的是true, 将会走到invokeWrite0(msg, promise)这一步

继续跟invokeWrite0
private void invokeWrite0(Object msg, ChannelPromise promise) {
    try {
        //调用当前handler的wirte()方法
        ((ChannelOutboundHandler) handler()).write(this, msg, promise);
    } catch (Throwable t) {
        notifyOutboundHandlerException(t, promise);
    }
}

这里的逻辑也似曾相识, 调用了当前节点包装的handler的write方法, 如果用户没有重写write方法, 则会交给其父类处理

我们跟到ChannelOutboundHandlerAdapter的write方法中看:

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    ctx.write(msg, promise);
}

这里调用了当前ctx的write方法, 这种写法和我们小节开始的写法是相同的, 我们回顾一下:

public void channelActive(ChannelHandlerContext ctx) throws Exception {
    //写法1
    ctx.channel().write("test data");
    //写法2
    ctx.write("test data");
}

我们跟到其write方法中, 这里走到的是AbstractChannelHandlerContext类的write方法:

private void write(Object msg, boolean flush, ChannelPromise promise) { 
    AbstractChannelHandlerContext next = findContextOutbound();
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        if (flush) {
            next.invokeWriteAndFlush(m, promise);
        } else {
            //没有调flush
            next.invokeWrite(m, promise);
        }
    } else {
        AbstractWriteTask task;
        if (flush) {
            task = WriteAndFlushTask.newInstance(next, m, promise);
        }  else {
            task = WriteTask.newInstance(next, m, promise);
        }
        safeExecute(executor, task, promise, m);
    }
}

又是我们所熟悉逻辑, 找到当前节点的上一个标注事件为outbound事件的节点, 继续执行invokeWrite方法, 根据之前的剖析, 我们知道最终会执行到上一个handler的write方法中

走到这里已经不难理解, ctx.channel().write("test data")其实是从tail节点开始传播写事件, 而ctx.write("test data")是从自身开始传播写事件

所以, 在handler中如果重写了write方法要传递write事件, 一定采用ctx.write("test data")这种方式或者交给其父类处理处理, 而不能采用ctx.channel().write("test data")这种方式, 因为会造成每次事件传输到这里都会从tail节点重新传输, 导致不可预知的错误

如果用代码中没有重写handler的write方法, 则事件会一直往上传输, 当传输完所有的outbound节点之后, 最后会走到head节点的wirte方法中

我们跟到HeadContext的write方法中
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    unsafe.write(msg, promise);
}

我们看到write事件最终会流向这里, 通过unsafe对象进行最终的写操作

有关inbound事件和outbound事件的传输, 可通过下图进行说明:

Netty分布式pipeline管道传播outBound事件的示例分析

关于“Netty分布式pipeline管道传播outBound事件的示例分析”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。


推荐阅读
  • 使用Flutternewintegration_test进行示例集成测试?回答首先在dev下的p ... [详细]
  • t-io 2.0.0发布-法网天眼第一版的回顾和更新说明
    本文回顾了t-io 1.x版本的工程结构和性能数据,并介绍了t-io在码云上的成绩和用户反馈。同时,还提到了@openSeLi同学发布的t-io 30W长连接并发压力测试报告。最后,详细介绍了t-io 2.0.0版本的更新内容,包括更简洁的使用方式和内置的httpsession功能。 ... [详细]
  • 涉及的知识点-ViewGroup的测量与布局-View的测量与布局-滑动冲突的处理-VelocityTracker滑动速率跟踪-Scroller实现弹性滑动-屏幕宽高的获取等实现步 ... [详细]
  • 篇首语:本文由编程笔记#小编为大家整理,主要介绍了10分钟了解Android的事件分发相关的知识,希望对你有一定的参考价值。什么是事件分发?大家 ... [详细]
  • php redis 如何使用pipeline,redis中pipeline详解
    一、pipeline出现的背景:redis执行一条命令有四个过程:发送命令、命令排队、命令执行、返回结果;这个过程称为Roundtript ... [详细]
  • Pipeline支持两种语法:Declarative(在Pipeline2.5中引入)和ScriptedPipeline语法:pipeline{*insertDeclarative ... [详细]
  • 一.什么是openGLOpenGL被定义为“图形硬件的一种软件接口”。从本质上说,它是一个3D图形和模型库,具有高度的可移植性,具有非常快的速度。二.管线管线这个术语描述了open ... [详细]
  • 本文介绍了绕过WAF的XSS检测机制的方法,包括确定payload结构、测试和混淆。同时提出了一种构建XSS payload的方法,该payload与安全机制使用的正则表达式不匹配。通过清理用户输入、转义输出、使用文档对象模型(DOM)接收器和源、实施适当的跨域资源共享(CORS)策略和其他安全策略,可以有效阻止XSS漏洞。但是,WAF或自定义过滤器仍然被广泛使用来增加安全性。本文的方法可以绕过这种安全机制,构建与正则表达式不匹配的XSS payload。 ... [详细]
  • Netty源代码分析服务器端启动ServerBootstrap初始化
    本文主要分析了Netty源代码中服务器端启动的过程,包括ServerBootstrap的初始化和相关参数的设置。通过分析NioEventLoopGroup、NioServerSocketChannel、ChannelOption.SO_BACKLOG等关键组件和选项的作用,深入理解Netty服务器端的启动过程。同时,还介绍了LoggingHandler的作用和使用方法,帮助读者更好地理解Netty源代码。 ... [详细]
  • 本文详细介绍了Android中的坐标系以及与View相关的方法。首先介绍了Android坐标系和视图坐标系的概念,并通过图示进行了解释。接着提到了View的大小可以超过手机屏幕,并且只有在手机屏幕内才能看到。最后,作者表示将在后续文章中继续探讨与View相关的内容。 ... [详细]
  • Sleuth+zipkin链路追踪SpringCloud微服务的解决方案
    在庞大的微服务群中,随着业务扩展,微服务个数增多,系统调用链路复杂化。Sleuth+zipkin是解决SpringCloud微服务定位和追踪的方案。通过TraceId将不同服务调用的日志串联起来,实现请求链路跟踪。通过Feign调用和Request传递TraceId,将整个调用链路的服务日志归组合并,提供定位和追踪的功能。 ... [详细]
  •  项目地址https:github.comffmydreamWiCar界面做的很难看,美工方面实在不在行。重点是按钮触摸事件的处理,这里搬了RepeatListener项目代码,例 ... [详细]
  • 本文由编程笔记#小编为大家整理,主要介绍了css回到顶部按钮相关的知识,希望对你有一定的参考价值。 ... [详细]
  • BugDescriptionWhencategoricalvariablehas ... [详细]
  • Java多线程_Future设计模式
       Future模式的核心:去除了主函数的等待时间,并使得原本需要等待的时间段可以用于处理其他业务逻辑。  Future模式有点类似于商品订单。在网上购物时,提交订单后,在收货的 ... [详细]
author-avatar
hero-laiquwuz_82914c
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有