目录
- ASP.NET Core 3.0 使用gRPC
- ASP.NET Core 3.0 gRPC 双向流
- ASP.NET Core 3.0 gRPC 认证授权
在前一文 《ASP.NET Core 3.0 使用gRPC》中有提到 gRPC 支持双向流调用,支持实时推送消息,这也是 gRPC的一大特点,且 gRPC 在对双向流的控制支持上也是非常强大的。
二. 什么是 gRPC 流
gRPC 有四种服务类型,分别是:简单 RPC(Unary RPC)、服务端流式 RPC (Server streaming RPC)、客户端流式 RPC (Client streaming RPC)、双向流式 RPC(Bi-directional streaming RPC)。它们主要有以下特点:
简单 RPC | 一般的rpc调用,传入一个请求对象,返回一个返回对象 |
服务端流式 RPC | 传入一个请求对象,服务端可以返回多个结果对象 |
客户端流式 RPC | 客户端传入多个请求对象,服务端返回一个结果对象 |
双向流式 RPC | 结合客户端流式RPC和服务端流式RPC,可以传入多个请求对象,返回多个结果对象 |
gRPC 通信是基于 HTTP/2 实现的,它的双向流映射到 HTTP/2 流。HTTP/2 具有流的概念,流是为了实现HTTP/2的多路复用。流是服务器和客户端在HTTP/2连接内用于交换帧数据的独立双向序列,逻辑上可看做一个较为完整的交互处理单元,即表达一次完整的资源请求、响应数据交换流程;一个业务处理单元,在一个流内进行处理完毕,这个流生命周期完结。
特点如下:
- 一个HTTP/2连接可同时保持多个打开的流,任一端点交换帧
- 流可被客户端或服务器单独或共享创建和使用
- 流可被任一端关闭
- 在流内发送和接收数据都要按照顺序
- 流的标识符自然数表示,1~2^31-1区间,有创建流的终端分配
- 流与流之间逻辑上是并行、独立存在
四.gRPC中使用双向流调用摘自 HTTP/2笔记之流和多路复用 by 聂永
我们在前文中编写的RPC属于简单RPC,没有包含流调用,下面直接讲一下双向流,根据第二小节列举的四种服务类型,如果我们掌握了简单RPC和双向流RPC,那么服务端流式 RPC和客户端流式 RPC自然也就没有问题了。
这里我们继续使用前文的代码,要实现的目标是一次给多个猫洗澡。
① 首先在 LuCat.proto 定义两个rpc,一个 Count 用于统计猫的数量,一个 双向流 RPC BathTheCat 用于给猫洗澡
syntax = "proto3";
option csharp_namespace = "AspNetCoregRpcService";
import"google/protobuf/empty.proto";
packageLuCat;
service LuCat{
rpc BathTheCat(stream BathTheCatReq) returns ( stream BathTheCatResp);
rpc Count(google.protobuf.Empty) returns (CountCatResult);
}
message SuckingCatResult{
stringmessage=1;
}
message BathTheCatReq{
int32id=1;
}
message BathTheCatResp{
stringmessage=1;
}
message CountCatResult{
int32Count=1;
}
② 添加服务的实现
这里安利下Resharper,非常方便
privatereadonlyILogger<LuCatService> _logger;
privatestaticreadonlyList< string> Cats= newList< string>{ "英短银渐层", "英短金渐层", "美短", "蓝猫", "狸花猫", "橘猫"};
privatestaticreadonlyRandom Rand= newRandom(DateTime.Now.Millisecond);
publicLuCatService(ILogger<LuCatService> logger)
{
_logger = logger;
}
publicoverrideasyncTask BathTheCat(IAsyncStreamReader<BathTheCatReq> requestStream, IServerStreamWriter<BathTheCatResp> responseStream, ServerCallContext context)
{
varbathQueue= newQueue< int>;
while( awaitrequestStream.MoveNext)
{
bathQueue.Enqueue(requestStream.Current.Id);
_logger.LogInformation( $"Cat {requestStream.Current.Id} Enqueue.");
}
while(bathQueue.TryDequeue( outvarcatId))
{
awaitresponseStream.WriteAsync( newBathTheCatResp { Message = $"铲屎的成功给一只{Cats[catId]}洗了澡!"});
awaitTask.Delay(500);
}
}
publicoverrideTask<CountCatResult> Count(Empty request, ServerCallContext context)
{
returnTask.FromResult( newCountCatResult
{
Count = Cats.Count
});
}
BathTheCat 方法会接收多个客户端发来的CatId,然后将他们加入队列中,等客户端发送完成后开始依次洗澡并返回给客户端。
③ 客户端实现
随机发送10个猫Id给服务端,然后接收10个洗澡结果。
varchannel = GrpcChannel.ForAddress( "https://localhost:5001");
varcatClient = newLuCat.LuCatClient(channel);
varcatCount = awaitcatClient.CountAsync( newEmpty);
Console.WriteLine( $"一共{catCount.Count}只猫。");
varrand = newRandom(DateTime.Now.Millisecond);
varbathCat = catClient.BathTheCat;
varbathCatRespTask = Task.Run( async=>
{
awaitforeach( varresp inbathCat.ResponseStream.ReadAllAsync)
{
Console.WriteLine(resp.Message);
}
});
for( inti = 0; i < 10; i++)
{
awaitbathCat.RequestStream.WriteAsync( newBathTheCatReq {Id = rand.Next(0, catCount.Count)});
}
awaitbathCat.RequestStream.CompleteAsync;
Console.WriteLine( "客户端已发送完10个需要洗澡的猫id");
Console.WriteLine( "接收洗澡结果:");
awaitbathCatRespTask;
Console.WriteLine( "洗澡完毕");
④ 运行
可以看到双向流调用成功,客户端发送了10个猫洗澡请求对象,服务端返回了10个猫洗澡结果对象。且是实时推送的,这就是 gRPC 的双向流调用。
这里大家可以自行改进来演变成客户端流式或者服务端流式调用。客户端发送一个猫Id列表,然后服务端返回每个猫洗澡结果,这就是服务端流式调用。客户端依次发送猫Id,然后服务端一次性返回所有猫的洗澡结果(给所有猫洗澡看做是一个业务,返回这个业务的结果),就是客户端流式调用。这里我就不再演示了。
五.流控制
gRPC 的流式调用支持对流进行主动取消的控制,进而可以衍生出流超时限制等控制。
在流式调用是,可以传一个 CancellationToken 参数,它就是我们用来对流进行取消控制的:
改造一下我们在第四小节的代码:
① 客户端
varcts = newCancellationTokenSource;
cts.CancelAfter(TimeSpan.FromSeconds(2.5));
varbathCat = catClient.BathTheCat(cancellationToken: cts.Token);
varbathCatRespTask = Task.Run( async=>
{
try
{
awaitforeach( varresp inbathCat.ResponseStream.ReadAllAsync)
{
Console.WriteLine(resp.Message);
}
}
catch(RpcException ex) when (ex.StatusCode == StatusCode.Cancelled)
{
Console.WriteLine( "Stream cancelled.");
}
});
② 服务端
while(!context.CancellationToken.IsCancellationRequested && bathQueue.TryDequeue( outvarcatId))
{
_logger.LogInformation( $"Cat {catId} Dequeue.");
awaitresponseStream.WriteAsync( newBathTheCatResp { Message = $"铲屎的成功给一只{Cats[catId]}洗了澡!"});
awaitTask.Delay(500);
}
③ 运行
设置的是双向流式调用2.5s后取消流,从客户端调用结果看到,并没有收到全部10个猫的洗澡返回结果,流就已经被取消了,这就是 gRPC 的流控制。
六.结束
这里流式调用可以实现实时推送,服务端到客户端或者客户端到服务端短实时推送消息,但是这个和传统意义上的长连接主动推送、广播消息不一样,不管你是服务端流式、客户端流式还是双向流式,必须要由客户端进行发起,通过客户端请求来建立流通信。
七.参考资料
- GRPC的四种服务类型 by twtydgo
- HTTP/2笔记之流和多路复用 by 聂永
- 本文所用代码