热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

.NETCore中使用gRPC

1.什么是gRPC1.基本介绍gRPC一开始由google开发,是一款语言中立、平台中立、开源的远程过程调用(RPC)系统,所以叫g(google)RPC。支持主流开发语言(C,C

1.什么是gRPC

1.基本介绍

gRPC 一开始由 google 开发,是一款语言中立、平台中立、开源的远程过程调用(RPC)系统,所以叫g(google)RPC。支持主流开发语言(C, C++, Python, PHP, Ruby, NodeJS, C#, Objective-C、Golang


2.proto文件

用于定义协议接口和数据格式,不同的语言,相同的文件,可以理解为一项约定,序列化支持 PB(Protocol buffer)和 JSON,PB 是一种语言无关的高性能序列化框架,基于 HTTP/2 + PB, 保障了 RPC 调用的高性能。

说这么多感觉还是很模糊,上面只是介绍了gRPC是什么,在我看来其实它大致的作用跟WebServicesWCF差不多,在某个维度上可以说都是作为远程调用,只不过所处的时代和本身的特性,以及生态的发展下,导致它成为目前比较火热的原因之一,具体的内容后面再讨论,先用起来,再深入了解,接下来我们使用.Net Core 先搭建一个简单的Demo,来亲自上手实践一下。

其实背景就是最近在做一个项目,需要做一个公司内部的Nuget包,大概的业务就是Nuget包请求微服务数据,开始想直接使用http的方式,基于整体项目结构后面定了使用gRPC,既学即用,刚好也可以在实际项目应用中,查漏补缺。


3.上手实践

1.使用vs首先创建一个NetCore gRPC项目,得到一个项目结构如下,框架默认包含一个已经预先定义协议文件服务接口,如果使用其他的方式也很简单直接引用相关的包,然后添加以下服务就可以了

2.我们自己创建一个自己的接口,定义一个协议文件mytestdemo.proto,然后定义一些方法,主要包含如下几类,其他的一些用法可以在网上搜到,或者去看文档,只是简单列一下


1.有参数有返回值

2.无参数有返回值 ,无参使用google.protobuf.Empty

3.集合作为返回值,必须使用repeated 标记


如果你真的不熟悉protobuf的定义方式和写法,这个无伤大雅,可以使用工具生成

syntax = "proto3";
//引入集合包
import "google/protobuf/empty.proto";
//命名空间
option csharp_namespace = "GrpcDemo";
//包名
package MyTest;
//接口定义
service MyTestDemo {
rpc MultipleParam(MultipleRequestPara) returns (MultipleRespone);
rpc NoParam(google.protobuf.Empty) returns (SingeRespone);
rpc CollectionParam(google.protobuf.Empty) returns (CollectionResponePara);
}
//多参数请求参数
message MultipleRequestPara {
int32 Id = 1;
string Name = 2;//参数个数
bool IsExists =3;
}
message SingeRespone {
bool Success =1;
TestEntity a1 = 2;
message TestEntity{
int32 Id =1;
}
}
//多参数返回
message MultipleRespone {
bool Success =1;
}
//返回集合参数
message CollectionResponePara {
repeated CollectionChildrenRespone1 param1 =1;
repeated CollectionChildrenRespone2 param2 =2;
repeated int32 param3 =3;
}
//集合属性1
message CollectionChildrenRespone1 {
int32 Id =1;
}
//集合属性2
message CollectionChildrenRespone2 {
string Name =1;
}

3.右键类,选择添加,选择连接的服务,添加gRPC,或者直接修改项目文件,将新建的proto添加到类中


3.1 重新生成,然后创建服务代码MyTestService,如下代码

3.2 在启动类中映射gRPC app.MapGrpcService(); 否则会报service is unimplemented.


///


/// 继承自MyTestDemo.MyTestDemoBase
///

public class MyTestService : MyTestDemo.MyTestDemoBase
{
public override async Task MultipleParam(MultipleRequestPara request, ServerCallContext context)
{
return await Task.FromResult(new MultipleRespone
{
Success = true,
});
}
public override async Task NoParam(Empty request, ServerCallContext context)
{
TestEntity t = new TestEntity();
t.Id = 1;
return await Task.FromResult(new SingeRespone { Success = true, entity = t }); ;
}
public override async Task CollectionParam(Empty request, ServerCallContext context)
{
CollectionResponePara collectiOnResponePara= new CollectionResponePara();
CollectionChildrenRespone1 a = new CollectionChildrenRespone1 { Id = 1 };
CollectionChildrenRespone2 b = new CollectionChildrenRespone2 { Name = "jeck" };
collectionResponePara.Param1.Add(a);
collectionResponePara.Param2.Add(b);
return await Task.FromResult(collectionResponePara);
}
}

4.创建客户端,将proto文件拷贝过去调用,添加服务为客户端模式,然后添加如下代码

using (var channel = GrpcChannel.ForAddress("https://localhost:7245"))
{
var client = new MyTestDemo.MyTestDemoClient(channel);
//多参数调用
var reply = client.MultipleParam(new MultipleRequestPara { Id = 123, Name = "sa", IsExists = true });
//无参调用
var singeRespOne= client.NoParam(new Google.Protobuf.WellKnownTypes.Empty());
//调用集合
var collectiOnResponePara= client.CollectionParam(new Google.Protobuf.WellKnownTypes.Empty());
}

2.gRPC流

gRPC中支持4种流,分别是:


1.简单 RPC(Unary RPC)它的特点是传入一个请求对象,返回一个请求对象



2.服务端流式 RPC (Server streaming RPC)客户端传入一个请求对象,服务端可以返回多个结果对象,形象的表示就是客户端传入一个股票的id,服务端就将股票的信息远远不断地返回



3.客户端流式 RPC (Client streaming RPC) 客户端源源不断的传入多个请求对象,服务端返回一个结果对象,形象的表示例如上位机采集实时将采集数据,源源不断的传入服务器



4.双向流式 RPC (Bi-directional streaming RPC) 结合服务端和客户端流,传入多请求返回多个结果,相当于建立长连接,可以进行相互的操作


下面我们就主要介绍几类主要的流的使用以及步骤


1.服务端流、客户端流、双向流

服务端流主要的特征就是服务端会源源不断的响应数据到客户端

1.首先还是创建protobuf文件,声明一个服务端流的rpc接口ExcuteServerStream 和一个客户端流接口ExcuteClientStream

syntax = "proto3";
option csharp_namespace = "GrpcDemo";
package streamtest;
service StreamTest {
//服务端流定义
rpc ExcuteServerStream(StreamForClientRequest) returns (stream StreamForClientRespones);
//客户端流定义
rpc ExcuteServerStream(StreamForClientRequest) returns (stream StreamForClientRespones);
//双向流
rpc ExcuteMutualStream(stream StreamForClientRequest) returns ( stream StreamForClientRespones);
}
//调用流的请求对象
message StreamForClientRequest{
int32 Id=1;
}
//调用端流的返回对象
message StreamForClientRespones{
repeated int32 Number=1;//集合
}

2.重新生成服务引用,然后创建对应的实现接口StreamTestService并重写生成的服务,然后在启动程序映射服务接口

//服务端流接口
public override async Task ExcuteServerStream(StreamForClientRequest req,IServerStreamWriter resStream,ServerCallContext context)
{
//list集合作为模拟数据源
var list = new List { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13 };
foreach (var item in list)
{
Console.WriteLine($"********{item}*******");
var ele = new StreamForClientRespones();
ele.Number.Add(item);
//写入流中
await resStream.WriteAsync(ele);
//模拟源源不断的数据响应
await Task.Delay(1000);
}
}
//客户端流接口
public override async Task ExcuteClientStream( IAsyncStreamReader requestStream, ServerCallContext context)
{
StreamForClientRespones intArrayModel = new StreamForClientRespones();
//获取请求流中的数据
while (await requestStream.MoveNext())
{
intArrayModel.Number.Add(requestStream.Current.Id + 1);
Console.WriteLine($"ExcuteClientStream Number {requestStream.Current.Id} 获取到并处理.");
Thread.Sleep(100);
}
return intArrayModel;
}
//双向流
public override async Task ExcuteMutualStream(IAsyncStreamReader reqStream,IServerStreamWriter resStream,ServerCallContext context)
{
int i = 0;
//从流中获取请求
while (await reqStream.MoveNext())
{
i++;
var ele = new StreamForClientRespones();
ele.Number.Add(i);
//写入响应流
await resStream.WriteAsync(ele);
await Task.Delay(500);
}
}

3.创建客户端调用,把服务端的protobuf文件拷贝到客户端,然后生成,启动调用

//调用服务端流
using (var channel = GrpcChannel.ForAddress("https://localhost:7245"))
{
var client = new StreamTest.StreamTestClient(channel);
//调用服务端流
var reply = client.ExcuteServerStream(new StreamForClientRequest { Id =1});
//利用线程取消
//CancellationTokenSource cts = new CancellationTokenSource();
//指定在2s后进行取消操作
//cts.CancelAfter(TimeSpan.FromSeconds(2.5));
//var reply = client.ExcuteServerStream(new StreamForClientRequest { Id = 1 }, cancellationToken: cts.Token);
await foreach (var resp in reply.ResponseStream.ReadAllAsync())
{
Console.WriteLine(resp.Number[0]);
}
}
//调用客户端流
using (var channel = GrpcChannel.ForAddress("https://localhost:7245"))
{
var client = new StreamTest.StreamTestClient(channel);
//调用客户端流接口
var reply = client.ExcuteClientStream();
//模拟源源不断的数据发送
for (int i = 0; i <10; i++)
{
await reply.RequestStream.WriteAsync(new StreamForClientRequest() { Id = new Random().Next(0, 20) });
await Task.Delay(100);
}
Console.WriteLine("*************发送完毕*******************");
await reply.RequestStream.CompleteAsync();
//接受结果
foreach (var item in reply.ResponseAsync.Result.Number)
{
Console.WriteLine($"This is {item} Result");
}
}
//双向流
using (var channel = GrpcChannel.ForAddress("https://localhost:7245"))
{
var client = new StreamTest.StreamTestClient(channel);
//调用双向流接口
var reply = client.ExcuteMutualStream();
//获取流放入线程
var bathCatRespTask = Task.Run(async () =>
{
await foreach (var resp in reply.ResponseStream.ReadAllAsync())
{
Console.WriteLine(resp.Number[0]);
}
});
//写入流
for (int i = 0; i <10; i++)
{
await reply.RequestStream.WriteAsync(new StreamForClientRequest() { Id = new Random().Next(0, 20) });
await Task.Delay(100);
}
//发送完毕
await reply.RequestStream.CompleteAsync();
//开始接收响应
await bathCatRespTask;
}

2.NetCore Web项目作为客户端

1.首先还是先引入proto文件,然后生成客户端

2.在web项目中的控制器中,我们就不能直接简陋的使用 using的方式来连接gRPC服务端了,可以利用内置的依赖注入的模式来完成

3.下载Grpc.Net.ClientFactory包,然后在`Program将客户端添加到依赖注入容器

builder.Services.AddGrpcClient(option => {
option.Address = new Uri("https://localhost:7245");
});

4.然后在控制器中直接注入,就可以使用

public class gRPCTestController : ControllerBase
{
private readonly MyTestDemoClient _client;
public gRPCTestController(MyTestDemoClient client)
{
_client = client;
}
[HttpGet(Name = "Excute")]
public async Task Get()
{
var a = await _client.NoParamAsync(new Google.Protobuf.WellKnownTypes.Empty());
var str = a.Success.ToString();
return str;
}
}

5.调用出现如下问题 ,使用dotnet dev-certs https --trust


3.gRPC AOP拦截

有时候我们想在gRPC服务执行前后做一些操作,这时候可以使用其Aop拦截,如果你要问拦截器可以做什么,我不太想解释,继续往下看,拦截器方法定义在Interceptor类中,服务端和客户端拦截是一样的原理,下面列举一些拦截器:















































名称特点
BlockingUnaryCall拦截阻塞调用
AsyncUnaryCall拦截异步调用
AsyncServerStreamingCall拦截异步服务端流调用
AsyncClientStreamingCall拦截异步客户端流调用
AsyncDuplexStreamingCall拦截异步双向流调用
UnaryServerHandler用于拦截和传入普通调用的服务器端处理程序
ClientStreamingSerHandler用于拦截客户端流调用的服务器端处理程序
ServerStreamingSerHandler用于拦截服务端流调用的服务器端处理程序
DuplexStreamingSerHandler用于拦截双向流调用的服务器端处理程序

1.声明一个UnaryServerHandlerInterceptor类型的自定义拦截器,用于拦截和传入普通调用的服务器端处理程序,然后继承自Grpc.Core.Interceptors.Interceptor类, 重写已经定义的方法UnaryServerHandler

public class UnaryServerHandlerInterceptor : Interceptor
{
public override async Task UnaryServerHandler(
TRequest request,
ServerCallContext context,
UnaryServerMethod continuation)
{
Console.WriteLine("执行调用前");
var result = await continuation(request, context);
Console.WriteLine("执行调用后");
// 或向 客户端附加 一些信息
// 也可以 用try catch 做异常日志
// 可以从 context中取出 调用方ip,做ip限制
// 可以 监控continuation 的 执行时间
return result;
}
}

2.然后在注入容器时加入选项

builder.Services.AddGrpc(option => {
option.EnableDetailedErrors = true;
//加入服务端拦截器选项
option.Interceptors.Add();
});


推荐阅读
author-avatar
轩然
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有