热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Netty实现RPC的思路

RPCRPC(RemoteProcedureCall)远程过程调用,是一个计算机通信协议,可以实现远程调用远程接口就想调用本地接口一样的高效。
RPC

RPC(Remote Procedure Call)远程过程调用,是一个计算机通信协议,可以实现远程调用远程接口就想调用本地接口一样的高效。

Netty实现RPC的思路
image.png

分布式组件中:外部RESTful内部RPC。

RPC调用流程

Netty实现RPC的思路
image.png
  1. 服务消费方(client)以本地调用方式调用服务
  2. client stub 接收到调用后负责将方法、参数等封装成能够进行网络传输的消息体
  3. client stub 将消息进行编码并发送到服务端
  4. server stub 收到消息后进行解码
  5. server stub 根据解码结果调用本地的服务
  6. 本地服务执行并将结果返回给 server stub
  7. server stub 将返回导入结果进行编码并发送至消费方
  8. client stub 接收到消息并进行解码
  9. 服务消费方(client)得到结果

小结:RPC 的目标就是将 2-8 这些步骤都封装起来,用户无需关心这些细节,可以像调用本地方法一样即可完成远程服务调用。

通过客户端代理,实现通过netty通信,实现接口的远程调用。

code

RPC客户端

package com.pl.netty.rpc.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.lang.reflect.Proxy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 

* * @Description: TODO *

* @ClassName NettyClient * @Author pl * @Date 2021/3/5 * @Version V1.0.0 */ public class NettyClient { //创建一个线程池 private static ExecutorService executor= Executors.newFixedThreadPool(5); private static NettyClientHandler client; //编写方法,使用代理模式,获取一个代理对象 public Object getBean(final Class> serviceClass, final String providerName) { //通过代理对目标对象的方法进行增强 return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class>[]{serviceClass}, (proxy, method, args) -> { System.out.println("代理被调用"); if (client == null) initClient(); //方法参数 client.setPara(providerName + args[0]); return executor.submit(client).get(); }); } //初始化客户端 private static void initClient() { client = new NettyClientHandler(); //创建EventLoopGroup NioEventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); pipeline.addLast(client); } }); ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 7000).sync(); } catch (InterruptedException e) { e.printStackTrace(); } } }

NettyClientHandler

package com.pl.netty.rpc.client;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.concurrent.Callable;

/**
 * 

* * @Description: TODO *

* @ClassName NettyClientHandler * @Author pl * @Date 2021/3/5 * @Version V1.0.0 */ public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable { private ChannelHandlerContext context; //上下文 private String result; //返回的结果 private String para; //客户端调用方法时,传入的参数 //与服务端创建连接后调用 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("通道连接成功"); cOntext= ctx; //因为我们在其他方法会使用到 ctx } @Override public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { result = msg.toString(); notify(); //唤醒等待的线程 } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } //被代理对象的调用,真正发送数据给服务器,发送完后就阻塞,等待被唤醒(channelRead) @Override public synchronized Object call() throws Exception { System.out.println("线程被调用-----"); context.writeAndFlush(para); //进行wait wait(); //等待 channelRead 获取到服务器的结果后,进行唤醒。 return result; //服务方返回的结果 } public void setPara(String para){ this.para = para; } }

ClientBootStrap

package com.pl.netty.rpc.client;

import com.pl.netty.rpc.server.HelloService;

/**
 * 

* * @Description: TODO *

* @ClassName ClientBootStrap * @Author pl * @Date 2021/3/5 * @Version V1.0.0 */ public class ClientBootStrap { //这里定义协议头 public static final String providerName = "HelloService#hello#"; public static void main(String[] args) throws InterruptedException { //创建一个消费者 NettyClient customer = new NettyClient(); //创建代理对象 HelloService service = (HelloService) customer.getBean(HelloService.class, providerName); //通过代理对象调用服务提供者的方法 String res = service.hello("你好 Dubbo"); System.out.println("调用的结果,res = " + res); Thread.sleep(2000); } }

RPC服务端

NettyServer

package com.pl.netty.rpc.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

/**
 * 

* * @Description: TODO *

* @ClassName NettyServer * @Author pl * @Date 2021/3/5 * @Version V1.0.0 */ public class NettyServer { public static void startServer(String hostName, int port) { startServer0(hostName, port); } private static void startServer0(String hostname, int port) { NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); pipeline.addLast(new NettyServerHandler()); //业务处理器 } }); ChannelFuture channelFuture = serverBootstrap.bind(hostname,port).sync(); System.out.println("服务提供方开始运行"); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) { startServer("127.0.0.1",7000); } }

NettyServerHandler

package com.pl.netty.rpc.server;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * 

* * @Description: TODO *

* @ClassName NettyServerHandler * @Author pl * @Date 2021/3/5 * @Version V1.0.0 */ public class NettyServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //获取客户端发送的消息,并调用服务 System.out.println("msg=" + msg); //客户端在调用服务器的api 时,我们需要定义一个协议 //比如要求,每次发消息时,都必须以某个字符串开头 "HelloService#hello#你好" if (msg.toString().startsWith("HelloService#hello#")) { String result = new HelloServiceImpl().hello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1)); ctx.writeAndFlush(result); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }

HelloService

package com.pl.netty.rpc.server;

/**
 * 

* * @Description: TODO *

* @ClassName HelloService * @Author pl * @Date 2021/3/5 * @Version V1.0.0 */ public interface HelloService { String hello(String message); }

HelloServiceImpl

package com.pl.netty.rpc.server;

/**
 * 

* * @Description: TODO *

* @ClassName HelloServiceImpl * @Author pl * @Date 2021/3/5 * @Version V1.0.0 */ public class HelloServiceImpl implements HelloService { @Override public String hello(String message) { System.out.println("收到客户端消息=" + message); //根据 message 返回不同的结果 if(message != null) { return "你好客户端,我已经收到你的消息【" + message + "】"; } else { return "你好客户端,我已经收到你的消息。"; } } }

输出

Netty实现RPC的思路
image.png

推荐阅读
  • 本文介绍了解决Netty拆包粘包问题的一种方法——使用特殊结束符。在通讯过程中,客户端和服务器协商定义一个特殊的分隔符号,只要没有发送分隔符号,就代表一条数据没有结束。文章还提供了服务端的示例代码。 ... [详细]
  • Netty源代码分析服务器端启动ServerBootstrap初始化
    本文主要分析了Netty源代码中服务器端启动的过程,包括ServerBootstrap的初始化和相关参数的设置。通过分析NioEventLoopGroup、NioServerSocketChannel、ChannelOption.SO_BACKLOG等关键组件和选项的作用,深入理解Netty服务器端的启动过程。同时,还介绍了LoggingHandler的作用和使用方法,帮助读者更好地理解Netty源代码。 ... [详细]
  • 吃透Netty源码系列四之NioEventLoop
    吃透Netty源码系列四之NioEventLoop新启动的线程的作用执行NioEventLoop的run方法执行任务一(通道注册register0)doRegisterpipeli ... [详细]
  • Hadoop2.6.0 + 云centos +伪分布式只谈部署
    3.0.3玩不好,现将2.6.0tar.gz上传到usr,chmod-Rhadoop:hadophadoop-2.6.0,rm掉3.0.32.在etcp ... [详细]
  • ElasticSearch成功安装完毕。 测试数据添加出现{  error:{    root_cause ... [详细]
  • 一面自我介绍对象相等的判断,equals方法实现。可以简单描述挫折,并说明自己如何克服,最终有哪些收获。职业规划表明自己决心,首先自己不准备继续求学了,必须招工作了。希望去哪 ... [详细]
  • mapreduce源码分析总结
    这篇文章总结的非常到位,故而转之一MapReduce概述MapReduce是一个用于大规模数据处理的分布式计算模型,它最初是由Google工程师设计并实现的ÿ ... [详细]
  • 阿里首席架构师科普RPC框架
    RPC概念及分类RPC全称为RemoteProcedureCall,翻译过来为“远程过程调用”。目前,主流的平台中都支持各种远程调用技术,以满足分布式系统架构中不同的系统之间的远程 ... [详细]
  • 本文介绍了OC学习笔记中的@property和@synthesize,包括属性的定义和合成的使用方法。通过示例代码详细讲解了@property和@synthesize的作用和用法。 ... [详细]
  • ejava,刘聪dejava
    本文目录一览:1、什么是Java?2、java ... [详细]
  • 基于SpringBoot打造在线教育系统(6)– 二级分类模块UI篇
    这一节来做二级分类,为了快速开发,一级分类只做新增,暂时不考虑修改和删除,如果一定要删,就去数据库删吧。我们接下来,需要通过一级分类,获取所有的二级分类。开始 ... [详细]
  • java布尔字段用is前缀_POJO类中布尔类型的变量都不要加is前缀详解
    前言对应阿里巴巴开发手册第一章的命名风格的第八条。【强制】POJO类中布尔类型的变量都不要加is前缀,否则部分框架解析会引起序列化错误。反例:定义为基本 ... [详细]
  • java多线程获取线程返回结果
    我们在使用java多线程编写相关业务代码时,往往有这样一种情况,某个线程依赖于其他线程执行结果。也就是说,我们需要在一个线程中获取另一个线程的信息。可以分为两种情况,一种是轮询,一 ... [详细]
  • SOA架构理解理解SOA架构,了解ESB概念,明白SOA与微服务的区别和联系,了解SOA与热门技术的结合与应用。1、面向服务的架构SOASOA(ServiceOrien ... [详细]
  • 用wGenerator给编程提速
    1.需求设定问题1:以上的需求设定,如果是你来开发,会需要多久呢?请在心中记下您的答案。2.需要制作的内容与步骤用java来做的话实现上述功能,大致需要7个步骤:1.编写DTO(或 ... [详细]
author-avatar
手机用户2502927973
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有