基于 gRPC 和 .NET Core 的服务器流
作者:admin 2021-08-03早在 2019 年,我写过《用 Mapbox 绘制位置数据》一文,详细介绍了我如何通过简单的文件上传,用 Mapbox 绘制约 230 万个位置点。本文介绍我是如何通过使用 gRPC 和 .NET Core 的服务器流来快速获取所有位置历史数据的。
https://chandankkrr.medium.com/mapping-location-data-with-mapbox-9b256f64d569
gRPC 是一个现代开源的高性能 RPC 框架,可以在任何环境下运行。它可以有效地连接数据中心内和跨数据中心的服务,并对负载平衡、跟踪、健康检查和认证提供可插拔的支持。gRPC 最初是由谷歌创建的,该公司使用一个名为 Stubby 的单一通用 RPC 基础设施来连接其数据中心内和跨数据中心运行的大量微服务,使用已经超过十年。2015 年 3 月,谷歌决定建立 Stubby 的下一个版本,并将其开源,结果就是现在的 gRPC,被许多企业或组织使用。
https://grpc.io/
服务器流式(Server Streaming)RPC,客户端向服务器发送请求,并获得一个流来读取一连串的消息。客户端从返回的流中读取信息,直到没有消息为止。gRPC 保证在单个 RPC 调用中的信息是有序的。
rpc GetLocationData (GetLocationRequest) returns (stream GetLocationResponse);
gRPC 使用协议缓冲区(protocol buffers)作为接口定义语言(IDL)来定义客户端和服务器之间的契约。在下面的 proto 文件中,定义了一个 RPC 方法 GetLocations,它接收 GetLocationsRequest 消息类型并返回 GetLocationsResponse 消息类型。响应消息类型前面的 stream 关键字表示响应是流类型,而不是单个响应。
syntax = "proto3"; option csharp_namespace = "GPRCStreaming"; package location_data; service LocationData { rpc GetLocations (GetLocationsRequest) returns (stream GetLocationsResponse); } message GetLocationsRequest { int32 dataLimit = 1; } message GetLocationsResponse { int32 latitudeE7 = 1; int32 longitudeE7 = 2; }
我们可以使用 dotnet new grpc -n threemillion 命令轻松创建一个 .NET gRPC 服务。更多关于在 ASP.NET Core 中创建 gRPC 服务器和客户端的信息可在微软文档中找到。
Create a gRPC client and server in ASP.NET Core https://docs.microsoft.com/en-us/aspnet/core/tutorials/grpc/grpc-start?view=aspnetcore-5.0&tabs=visual-studio-code
在添加了 proto 文件并生成了 gRPC 服务资源文件后,接下来我添加了 LocationService 类。在下面的代码片段中,我有一个 LocationService 类,它继承了从 Location.proto 文件中生成的 LocationDataBase 类型。客户端可以通过 Startup.cs 文件中 Configure 方法中的 endpoints.MapGrpcService
using System.Threading.Tasks; using Grpc.Core; using Microsoft.Extensions.Logging; using System.IO; using System; using System.Linq; namespace GPRCStreaming { public class LocationService : LocationData.LocationDataBase { private readonly FileReader _fileReader; private readonly ILogger<LocationService> _logger; public LocationService(FileReader fileReader, ILogger<LocationService> logger) { _fileReader = fileReader; _logger = logger; } public override async Task GetLocations( GetLocationsRequest request, IServerStreamWriter<GetLocationsResponse> responseStream, ServerCallContext context) { try { _logger.LogInformation("Incoming request for GetLocationData"); var locationData = await GetLocationData(); var locationDataCount = locationData.Locations.Count; var dataLimit = request.DataLimit > locationDataCount ? locationDataCount : request.DataLimit; for (var i = 0; i <= dataLimit - 1; i++) { var item = locationData.Locations[i]; await responseStream.WriteAsync(new GetLocationsResponse { LatitudeE7 = item.LatitudeE7, LongitudeE7 = item.LongitudeE7 }); } } catch (Exception exception) { _logger.LogError(exception, "Error occurred"); throw; } } private async Task<RootLocation> GetLocationData() { var currentDirectory = Directory.GetCurrentDirectory(); var filePath = $"{currentDirectory}/Data/Location_History.json"; var locationData = await _fileReader.ReadAllLinesAsync(filePath); return locationData; } } }
现在,让我们运行该服务并发送一个请求。我将使用一个叫 grpcurl 的命令行工具,它可以让你与 gRPC 服务器交互。它基本上是针对 gRPC 服务器的 curl。
https://github.com/fullstorydev/grpcurl
通过 grpcurl 与 gRPC 端点(endpoint)交互只有在 gRPC 反射服务被启用时才可用。这允许服务可以被查询,以发现服务器上的 gRPC 服务。扩展方法 MapGrpcReflectionService 需要引入 Microsoft.AspNetCore.Builder 的命名空间:
public void Configure(IApplicationBuilder app, IWebHostEnvironment env) { app.UseEndpoints(endpoints => { endpoints.MapGrpcService<LocationService>(); if (env.IsDevelopment()) { endpoints.MapGrpcReflectionService(); } endpoints.MapGet("/", async context => { await context.Response.WriteAsync("Communication with gRPC endpoints must be made through a gRPC client. To learn how to create a client, visit: https://go.microsoft.com/fwlink/?linkid=2086909"); }); }); }
grpcurl -plaintext -d '{"dataLimit": "100000"}' localhost:80 location_data.LocationData/GetLocations
一旦服务器收到请求,它就会读取文件,然后在位置列表中循环,直到达到 dataLimit 计数,并将位置数据返回给客户端。
接下来,让我们创建一个 Blazor 客户端来调用 gRPC 服务。我们可以使用 IServiceCollection 接口上的 AddGrpcClient 扩展方法设置一个 gRPC 客户端:
public void ConfigureServices(IServiceCollection services) { services.AddRazorPages(); services.AddServerSideBlazor(); services.AddSingleton<WeatherForecastService>(); services.AddGrpcClient<LocationData.LocationDataClient>(client => { client.Address = new Uri("http://localhost:80"); }); }
我使用 Virtualize Blazor 组件来渲染这些位置。Virtualize 组件不是一次性渲染列表中的每个项目,只有当前可见的项目才会被渲染。
ASP.NET Core Blazor component virtualization https://docs.microsoft.com/en-us/aspnet/core/blazor/components/virtualization?view=aspnetcore-5.0
相关代码:
@page "/locationdata" @using Grpc.Core @using GPRCStreaming @using threemillion.Data @using System.Diagnostics @using Microsoft.AspNetCore.Components.Web.Virtualization @inject IJSRuntime JSRuntime; @inject System.Net.Http.IHttpClientFactory _clientFactory @inject GPRCStreaming.LocationData.LocationDataClient _locationDataClient <table class="tableAction"> <tbody> <tr> <td> <div class="data-input"> <label for="dataLimit">No of records to fetch</label> <input id="dataLimit" type="number" @bind="_dataLimit" /> <button @onclick="FetchData" class="btn-submit">Call gRPC</button> </div> </td> <td> <p class="info"> Total records: <span class="count">@_locations.Count</span> </p> <p class="info"> Time taken: <span class="time">@_stopWatch.ElapsedMilliseconds</span> milliseconds </p> </td> </tr> </tbody> </table> <div class="tableFixHead"> <table class="table"> <thead> <tr> <th>Longitude</th> <th>Latitude</th> </tr> </thead> <tbody> <Virtualize Items="@_locations" Context="locations"> <tr> <td>@locations.LongitudeE7</td> <td>@locations.LatitudeE7</td> </tr> </Virtualize> </tbody> </table> </div> @code { private int _dataLimit = 1000; private List<Location> _locations = new List<Location>(); private Stopwatch _stopWatch = new Stopwatch(); protected override async Task OnInitializedAsync() { await FetchData(); } private async Task FetchData() { ResetState(); _stopWatch.Start(); using (var call = _locationDataClient.GetLocations(new GetLocationsRequest { DataLimit = _dataLimit })) { await foreach (var response in call.ResponseStream.ReadAllAsync()) { _locations.Add(new Location { LongitudeE7 = response.LongitudeE7, LatitudeE7 = response.LatitudeE7 }); StateHasChanged(); } } _stopWatch.Stop(); } private void ResetState() { _locations.Clear(); _stopWatch.Reset(); StateHasChanged(); } }
通过在本地运行的流调用,从 gRPC 服务器接收 2,876,679 个单独的响应大约需要 8 秒钟。让我们也在 Mapbox 中加载数据:
@page "/mapbox" @using Grpc.Core @using GPRCStreaming @using System.Diagnostics @inject IJSRuntime JSRuntime; @inject System.Net.Http.IHttpClientFactory _clientFactory @inject GPRCStreaming.LocationData.LocationDataClient LocationDataClient <table class="tableAction"> <tbody> <tr> <td> <div class="data-input"> <label for="dataLimit">No of records to fetch</label> <input id="dataLimit" type="number" @bind="_dataLimit" /> <button @onclick="LoadMap" class="btn-submit">Load data</button> </div> </td> <td> <p class="info"> Total records: <span class="count">@_locations.Count</span> </p> <p class="info"> Time taken: <span class="time">@_stopWatch.ElapsedMilliseconds</span> milliseconds </p> </td> </tr> </tbody> </table> <div id='map' style="width: 100%; height: 90vh;"></div> @code { private int _dataLimit = 100; private List<object> _locations = new List<object>(); private Stopwatch _stopWatch = new Stopwatch(); protected override async Task OnAfterRenderAsync(bool firstRender) { if (!firstRender) { return; } await JSRuntime.InvokeVoidAsync("mapBoxFunctions.initMapBox"); } private async Task LoadMap() { ResetState(); _stopWatch.Start(); using (var call = LocationDataClient.GetLocations(new GetLocationsRequest { DataLimit = _dataLimit })) { await foreach (var response in call.ResponseStream.ReadAllAsync()) { var pow = Math.Pow(10, 7); var longitude = response.LongitudeE7 / pow; var latitude = response.LatitudeE7 / pow; _locations.Add(new { type = "Feature", geometry = new { type = "Point", coordinates = new double[] { longitude, latitude } } }); StateHasChanged(); } _stopWatch.Stop(); await JSRuntime.InvokeVoidAsync("mapBoxFunctions.addClusterData", _locations); } } private void ResetState() { JSRuntime.InvokeVoidAsync("mapBoxFunctions.clearClusterData"); _locations.Clear(); _stopWatch.Reset(); StateHasChanged(); } }
源代码在我的 GitHub 上 :
https://github.com/Chandankkrr/threemillion高防服务器,高防CDN,免备案CDN,专接抗不住的攻击,免备案高防服务器,智能CC盾防住大规模CC攻击,0误封,高达TB级DDoS防御攻击,防御一切大流量的攻击,高防选择我们,真正的性价比之选!!!
热门文章