NetCore+Web客户端实现gRPC实时推送

之前出过websocket推送,sse推送,grpc的推送应该更具性价比,虽然前端要求复杂了一点点。下面快速的一步一步完成一个netcore服务端+web客户端的推送。

后端项目结构

GrpcRealtimePush/
├── Services/
│ └── ChatService.cs # gRPC服务实现
├── Protos/
│ └── chat.proto # Protocol Buffers定义
├── Program.cs # 服务启动配置
├── GrpcRealtimePush.csproj # 项目文件
└── appsettings.json # 配置文件

1.安装必要的grpc包

<Project Sdk="Microsoft.NET.Sdk.Web">   <PropertyGroup>     <TargetFramework>net9.0</TargetFramework>     <Nullable>enable</Nullable>     <ImplicitUsings>enable</ImplicitUsings>   </PropertyGroup>    <ItemGroup>     <Protobuf Include="Protoschat.proto" GrpcServices="Server" />   </ItemGroup>    <ItemGroup>     <PackageReference Include="Grpc.AspNetCore" Version="2.64.0" />     <PackageReference Include="Grpc.AspNetCore.Web" Version="2.64.0" />   </ItemGroup> </Project>

 

2.创建好proto文件

syntax = "proto3";  package chat;  option csharp_namespace = "GrpcRealtimePush";  // 服务定义 service ChatService {   // 服务端流式推送方法   rpc StartRealtimePush(RealtimePushRequest) returns (stream RealtimePushResponse); }  // 请求消息 message RealtimePushRequest {   string client_id = 1;    // 客户端ID   int64 timestamp = 2;      // 时间戳 }  // 响应消息 message RealtimePushResponse {   string data = 1;          // 推送数据   int64 timestamp = 2;      // 时间戳   string data_type = 3;     // 数据类型 }

proto文件定义就这样:

- **`service ChatService`**: 定义gRPC服务
- **`rpc StartRealtimePush`**: 服务端流式方法,返回 `stream`表示持续推送
- **`message`**: 定义请求和响应的数据结构
- **字段编号**: 1, 2, 3等是字段的唯一标识,用于序列化

3.实现上面的方法

using Grpc.Core;  namespace GrpcRealtimePush.Services;  public class ChatService : GrpcRealtimePush.ChatService.ChatServiceBase {     private readonly ILogger<ChatService> _logger;      public ChatService(ILogger<ChatService> logger)     {         _logger = logger;     }      public override async Task StartRealtimePush(RealtimePushRequest request,          IServerStreamWriter<RealtimePushResponse> responseStream, ServerCallContext context)     {         _logger.LogInformation("🚀 实时推送已启动! 客户端: {ClientId}", request.ClientId);                  try         {             // 开始连续数据推送             var counter = 1;             var random = new Random();             var dataTypes = new[] { "系统状态", "用户活动", "数据更新", "通知消息", "性能指标" };                          _logger.LogInformation("🔄 开始连续数据推送循环...");                          while (!context.CancellationToken.IsCancellationRequested && counter <= 100)             {                 // 模拟不同类型的实时数据                 var dataType = dataTypes[random.Next(dataTypes.Length)];                 var value = random.Next(1, 1000);                 var timestamp = DateTime.UtcNow;                                  var response = new RealtimePushResponse                 {                     Data = $"#{counter:D4} - 数值: {value} | 时间: {timestamp:HH:mm:ss.fff}",                     Timestamp = DateTimeOffset.UtcNow.ToUnixTimeSeconds(),                     DataType = dataType                 };                  await responseStream.WriteAsync(response);                 _logger.LogInformation("📡 推送数据 #{Counter}: [{DataType}] = {Value} at {Time}",                      counter, dataType, value, timestamp.ToString("HH:mm:ss.fff"));                                  counter++;                                  // 等待2秒后发送下一条数据                 await Task.Delay(2000, context.CancellationToken);             }                          // 发送完成消息             await responseStream.WriteAsync(new RealtimePushResponse             {                 Data = "实时推送测试完成!",                 Timestamp = DateTimeOffset.UtcNow.ToUnixTimeSeconds(),                 DataType = "系统消息"             });                      }         catch (OperationCanceledException)         {             _logger.LogInformation("实时推送会话已取消,客户端: {ClientId}", request.ClientId);         }         catch (Exception ex)         {             _logger.LogError(ex, "实时推送会话出错: {Error}", ex.Message);                          // 尝试向客户端发送错误消息             try             {                 await responseStream.WriteAsync(new RealtimePushResponse                 {                     Data = $"服务器错误: {ex.Message}",                     Timestamp = DateTimeOffset.UtcNow.ToUnixTimeSeconds(),                     DataType = "错误消息"                 });             }             catch (Exception sendError)             {                 _logger.LogError(sendError, "发送错误消息失败");             }         }                  _logger.LogInformation("实时推送会话结束,客户端: {ClientId}", request.ClientId);     } }

4.Program文件

using GrpcRealtimePush.Services;  var builder = WebApplication.CreateBuilder(args);  // 添加gRPC服务 builder.Services.AddGrpc();  // 配置CORS策略,支持gRPC-Web builder.Services.AddCors(options => {     options.AddPolicy("AllowAll", policy =>     {         policy.AllowAnyOrigin()               .AllowAnyMethod()               .AllowAnyHeader()               .WithExposedHeaders("Grpc-Status", "Grpc-Message", "Grpc-Encoding", "Grpc-Accept-Encoding", "Content-Type");     }); });  var app = builder.Build();  // 配置HTTP请求管道  // 启用CORS app.UseCors("AllowAll");  // 启用gRPC-Web中间件 app.UseGrpcWeb();  // 配置HTTPS重定向(gRPC-Web需要) app.UseHttpsRedirection();  // 映射gRPC服务并启用gRPC-Web支持 app.MapGrpcService<ChatService>().EnableGrpcWeb();  app.MapGet("/", () => "Communication with gRPC endpoints must be made through a gRPC client. To learn how to create a client, visit: https://go.microsoft.com/fwlink/?linkid=2086909");  app.Run();

 

以上代码对于后端来说应该轻车熟路,后端服务就这样起来了。

先测试一下后端服务是否正常,我这里有go环境,直接安装grpcurl工具。

# 安装grpcurl工具 go install github.com/fullstorydev/grpcurl/cmd/grpcurl@latest  # 测试服务 grpcurl -insecure localhost:5201 list

grpcurl -insecure -d "{"client_id":"test-client","timestamp":1234567890}" localhost:5201 chat.ChatService/StartRealtimePush

 

NetCore+Web客户端实现gRPC实时推送

 

下面就是完成前端代码了,这里使用js+html。

前端的结构如下:

client/
├── generated/ # 生成的代码
│ ├── chat_pb_browser.js # Protocol Buffers消息类
│ └── chat_grpc_web_pb_browser.js # gRPC服务客户端
├── grpc-web-shim.js # gRPC-Web兼容层
├── client.js # 主要业务逻辑
├── index.html # 用户界面

前端准备工作安装protoc和插件。protoc把后端的proto文件转成两个js文件,插件就是grpc链接需要的。

# 安装Protocol Buffers编译器 # Windows: 下载 https://github.com/protocolbuffers/protobuf/releases # macOS: brew install protobuf # Linux: apt-get install protobuf-compiler  # 验证安装 protoc --version  # 安装gRPC-Web插件 npm install -g grpc-web

核心转换代码脚本如下:

protoc -I=GrpcRealtimePushProtos `   --js_out=import_style=commonjs:clientgenerated `   --grpc-web_out=import_style=commonjs,mode=grpcwebtext:clientgenerated `   GrpcRealtimePushProtoschat.proto

 

执行了protoc后会生成下面2个js文件

1. `chat_pb_browser.js`

// Browser-compatible version of chat_pb.js (function () {     'use strict';      // 确保命名空间存在     if (!window.proto) window.proto = {};     if (!window.proto.chat) window.proto.chat = {};      // RealtimePushRequest类     window.proto.chat.RealtimePushRequest = function (opt_data) {         jspb.Message.initialize(this, opt_data, 0, -1, null, null);     };      // 继承jspb.Message     if (jspb.Message) {         window.proto.chat.RealtimePushRequest.prototype = Object.create(jspb.Message.prototype);         window.proto.chat.RealtimePushRequest.prototype.constructor = window.proto.chat.RealtimePushRequest;     }      // RealtimePushRequest方法     window.proto.chat.RealtimePushRequest.prototype.getClientId = function () {         return jspb.Message.getFieldWithDefault(this, 1, "");     };      window.proto.chat.RealtimePushRequest.prototype.setClientId = function (value) {         return jspb.Message.setProto3StringField(this, 1, value);     };      window.proto.chat.RealtimePushRequest.prototype.getTimestamp = function () {         return jspb.Message.getFieldWithDefault(this, 2, 0);     };      window.proto.chat.RealtimePushRequest.prototype.setTimestamp = function (value) {         return jspb.Message.setProto3IntField(this, 2, value);     };      // 序列化方法     window.proto.chat.RealtimePushRequest.prototype.serializeBinary = function () {         const writer = new jspb.BinaryWriter();         window.proto.chat.RealtimePushRequest.serializeBinaryToWriter(this, writer);         return writer.getResultBuffer();     };      window.proto.chat.RealtimePushRequest.serializeBinaryToWriter = function (message, writer) {         const f = message.getClientId();         if (f.length > 0) {             writer.writeString(1, f);         }         const f2 = message.getTimestamp();         if (f2 !== 0) {             writer.writeInt64(2, f2);         }     };      window.proto.chat.RealtimePushRequest.deserializeBinary = function (bytes) {         const reader = new jspb.BinaryReader(bytes);         const msg = new window.proto.chat.RealtimePushRequest();         return window.proto.chat.RealtimePushRequest.deserializeBinaryFromReader(msg, reader);     };      window.proto.chat.RealtimePushRequest.deserializeBinaryFromReader = function (msg, reader) {         while (reader.nextField()) {             if (reader.isEndGroup()) {                 break;             }             const field = reader.getFieldNumber();             switch (field) {                 case 1:                     const value = reader.readString();                     msg.setClientId(value);                     break;                 case 2:                     const value2 = reader.readInt64();                     msg.setTimestamp(value2);                     break;                 default:                     reader.skipField();                     break;             }         }         return msg;     };      // RealtimePushResponse类     window.proto.chat.RealtimePushResponse = function (opt_data) {         jspb.Message.initialize(this, opt_data, 0, -1, null, null);     };      // 继承jspb.Message     if (jspb.Message) {         window.proto.chat.RealtimePushResponse.prototype = Object.create(jspb.Message.prototype);         window.proto.chat.RealtimePushResponse.prototype.constructor = window.proto.chat.RealtimePushResponse;     }      // RealtimePushResponse方法     window.proto.chat.RealtimePushResponse.prototype.getData = function () {         return jspb.Message.getFieldWithDefault(this, 1, "");     };      window.proto.chat.RealtimePushResponse.prototype.setData = function (value) {         return jspb.Message.setProto3StringField(this, 1, value);     };      window.proto.chat.RealtimePushResponse.prototype.getTimestamp = function () {         return jspb.Message.getFieldWithDefault(this, 2, 0);     };      window.proto.chat.RealtimePushResponse.prototype.setTimestamp = function (value) {         return jspb.Message.setProto3IntField(this, 2, value);     };      window.proto.chat.RealtimePushResponse.prototype.getDataType = function () {         return jspb.Message.getFieldWithDefault(this, 3, "");     };      window.proto.chat.RealtimePushResponse.prototype.setDataType = function (value) {         return jspb.Message.setProto3StringField(this, 3, value);     };      // 序列化方法     window.proto.chat.RealtimePushResponse.prototype.serializeBinary = function () {         const writer = new jspb.BinaryWriter();         window.proto.chat.RealtimePushResponse.serializeBinaryToWriter(this, writer);         return writer.getResultBuffer();     };      window.proto.chat.RealtimePushResponse.serializeBinaryToWriter = function (message, writer) {         const f = message.getData();         if (f.length > 0) {             writer.writeString(1, f);         }         const f2 = message.getTimestamp();         if (f2 !== 0) {             writer.writeInt64(2, f2);         }         const f3 = message.getDataType();         if (f3.length > 0) {             writer.writeString(3, f3);         }     };      window.proto.chat.RealtimePushResponse.deserializeBinary = function (bytes) {         const reader = new jspb.BinaryReader(bytes);         const msg = new window.proto.chat.RealtimePushResponse();         return window.proto.chat.RealtimePushResponse.deserializeBinaryFromReader(msg, reader);     };      window.proto.chat.RealtimePushResponse.deserializeBinaryFromReader = function (msg, reader) {         while (reader.nextField()) {             if (reader.isEndGroup()) {                 break;             }             const field = reader.getFieldNumber();             switch (field) {                 case 1:                     const value = reader.readString();                     msg.setData(value);                     break;                 case 2:                     const value2 = reader.readInt64();                     msg.setTimestamp(value2);                     break;                 case 3:                     const value3 = reader.readString();                     msg.setDataType(value3);                     break;                 default:                     reader.skipField();                     break;             }         }         return msg;     };      console.log('chat_pb_browser.js loaded successfully'); })();

 

2. `chat_grpc_web_pb_browser.js`

// Browser-compatible version of chat_grpc_web_pb.js (function () {     'use strict';      // 确保命名空间存在     if (!window.proto) window.proto = {};     if (!window.proto.chat) window.proto.chat = {};      // ChatServiceClient类     window.proto.chat.ChatServiceClient = function (hostname, credentials, options) {         if (!options) options = {};         options['format'] = options['format'] || 'text';          // 使用gRPC-Web基类         window.grpc.web.GrpcWebClientBase.call(this, options);          this.hostname_ = hostname;         this.credentials_ = credentials;         this.options_ = options;     };      // 继承基类     if (window.grpc && window.grpc.web && window.grpc.web.GrpcWebClientBase) {         window.proto.chat.ChatServiceClient.prototype = Object.create(window.grpc.web.GrpcWebClientBase.prototype);         window.proto.chat.ChatServiceClient.prototype.constructor = window.proto.chat.ChatServiceClient;     }      // 方法描述符     const methodDescriptor_StartRealtimePush = new window.grpc.web.MethodDescriptor(         '/chat.ChatService/StartRealtimePush',         window.grpc.web.MethodType.SERVER_STREAMING,         window.proto.chat.RealtimePushRequest,         window.proto.chat.RealtimePushResponse,         function (request) {              return request.serializeBinary();          },         function (bytes) {              return window.proto.chat.RealtimePushResponse.deserializeBinary(bytes);          }     );      // StartRealtimePush方法     window.proto.chat.ChatServiceClient.prototype.startRealtimePush = function (request, metadata) {         const url = this.hostname_ + '/chat.ChatService/StartRealtimePush';         return this.serverStreaming(url, request, metadata || {}, methodDescriptor_StartRealtimePush);     };      console.log('chat_grpc_web_pb_browser.js loaded successfully'); })();

 

下面就需要创建连接层代码,该代码手动创建,有需要可以拷贝更改复用。

`grpc-web-shim.js`

// gRPC-Web compatibility shim (function() {     'use strict';      // 创建grpc命名空间     if (typeof window.grpc === 'undefined') {         window.grpc = {};     }      if (typeof window.grpc.web === 'undefined') {         window.grpc.web = {};     }      // 方法类型枚举     window.grpc.web.MethodType = {         UNARY: 'unary',         SERVER_STREAMING: 'server_streaming',         CLIENT_STREAMING: 'client_streaming',         BIDIRECTIONAL_STREAMING: 'bidirectional_streaming'     };      // 方法描述符     window.grpc.web.MethodDescriptor = function(path, methodType, requestType, responseType, requestSerializeFn, responseDeserializeFn) {         this.path = path;         this.methodType = methodType;         this.requestType = requestType;         this.responseType = responseType;         this.requestSerializeFn = requestSerializeFn;         this.responseDeserializeFn = responseDeserializeFn;     };      // 基础客户端类     window.grpc.web.GrpcWebClientBase = function(options) {         this.options = options || {};         this.format = this.options.format || 'text';     };      // 服务端流式方法     window.grpc.web.GrpcWebClientBase.prototype.serverStreaming = function(url, request, metadata, methodDescriptor) {         const self = this;                  // 创建简单的事件发射器         const stream = {             listeners: {},                          on: function(event, callback) {                 if (!this.listeners[event]) {                     this.listeners[event] = [];                 }                 this.listeners[event].push(callback);             },                          emit: function(event, data) {                 if (this.listeners[event]) {                     this.listeners[event].forEach(callback => callback(data));                 }             }         };          try {             // 序列化请求             const serializedRequest = methodDescriptor.requestSerializeFn(request);                          // 创建gRPC-Web帧             const frameHeader = new Uint8Array(5);             frameHeader[0] = 0; // 压缩标志                          const messageLength = serializedRequest.length;             frameHeader[1] = (messageLength >>> 24) & 0xFF;             frameHeader[2] = (messageLength >>> 16) & 0xFF;             frameHeader[3] = (messageLength >>> 8) & 0xFF;             frameHeader[4] = messageLength & 0xFF;                          const framedMessage = new Uint8Array(5 + messageLength);             framedMessage.set(frameHeader, 0);             framedMessage.set(serializedRequest, 5);                          const base64Request = btoa(String.fromCharCode.apply(null, framedMessage));                          const headers = {                 'Content-Type': 'application/grpc-web-text',                 'X-Grpc-Web': '1',                 'Accept': 'application/grpc-web-text'             };                          // 添加元数据             if (metadata) {                 Object.keys(metadata).forEach(key => {                     if (key.toLowerCase() !== 'content-type') {                         headers[key] = metadata[key];                     }                 });             }                          const fetchOptions = {                 method: 'POST',                 headers: headers,                 body: base64Request             };              fetch(url, fetchOptions)                 .then(response => {                     if (!response.ok) {                         throw new Error(`HTTP ${response.status}: ${response.statusText}`);                     }                                          console.log('开始读取流式响应...');                                          // 使用ReadableStream读取gRPC-Web流式响应                     const reader = response.body.getReader();                     const decoder = new TextDecoder();                     let buffer = '';                     let messageCount = 0;                                          function readStreamChunk() {                         return reader.read().then(({ done, value }) => {                             if (done) {                                 console.log('📡 流读取完成,总共处理消息:', messageCount);                                 if (buffer.length > 0) {                                     console.log('📦 处理流结束时的剩余缓冲区');                                     processStreamBuffer();                                 }                                 stream.emit('end');                                 return;                             }                                                          // 将新数据添加到缓冲区                             const chunk = decoder.decode(value, { stream: true });                             buffer += chunk;                             console.log('📦 收到流数据块:', chunk.length, '字符,缓冲区总计:', buffer.length);                                                          // 处理缓冲区中的完整消息                             processStreamBuffer();                                                          // 继续读取                             return readStreamChunk();                         }).catch(error => {                             console.error('❌ 流读取错误:', error);                             stream.emit('error', error);                         });                     }                                          function processStreamBuffer() {                         console.log('🔍 处理缓冲区,长度:', buffer.length);                                                  while (buffer.length > 0) {                             try {                                 // 查找完整的base64块                                 let messageBase64 = buffer;                                                                  // 检查是否包含trailer标记                                 const trailerMarkers = ['gAAAA', 'gAAA', 'gAA', 'gA'];                                 let trailerIndex = -1;                                                                  for (const marker of trailerMarkers) {                                     const index = messageBase64.indexOf(marker);                                     if (index > 0) {                                         trailerIndex = index;                                         break;                                     }                                 }                                                                  if (trailerIndex > 0) {                                     messageBase64 = messageBase64.substring(0, trailerIndex);                                     console.log('📦 在索引处找到trailer:', trailerIndex);                                 }                                                                  // 清理base64字符串                                 const cleanBase64 = messageBase64.replace(/[^A-Za-z0-9+/=]/g, '');                                                                  // 确保base64字符串长度是4的倍数                                 let paddedBase64 = cleanBase64;                                 const padding = paddedBase64.length % 4;                                 if (padding > 0) {                                     paddedBase64 += '='.repeat(4 - padding);                                 }                                                                  if (paddedBase64.length === 0) {                                     console.log('❌ 清理后base64为空');                                     buffer = '';                                     break;                                 }                                                                  // 解码base64                                 const binaryString = atob(paddedBase64);                                 const responseBytes = new Uint8Array(binaryString.length);                                 for (let i = 0; i < binaryString.length; i++) {                                     responseBytes[i] = binaryString.charCodeAt(i);                                 }                                                                  console.log('📦 解码字节长度:', responseBytes.length);                                                                  // 检查是否有足够的数据来读取gRPC帧头                                 if (responseBytes.length >= 5) {                                     const compressionFlag = responseBytes[0];                                     const frameMsgLength = (responseBytes[1] << 24) | (responseBytes[2] << 16) | (responseBytes[3] << 8) | responseBytes[4];                                                                          console.log(`📡 流帧: 压缩=${compressionFlag}, 长度=${frameMsgLength}, 总计=${responseBytes.length}`);                                                                          // 检查是否有完整的消息数据                                     if (responseBytes.length >= 5 + frameMsgLength && frameMsgLength > 0) {                                         const messageBytes = responseBytes.slice(5, 5 + frameMsgLength);                                                                                  try {                                             const response = methodDescriptor.responseDeserializeFn(messageBytes);                                             messageCount++;                                             console.log(`✅ 成功解析消息 #${messageCount},发射数据`);                                             stream.emit('data', response);                                                                                          // 处理完成后,移除已处理的数据                                             if (trailerIndex > 0) {                                                 buffer = buffer.substring(trailerIndex);                                                 console.log('📦 移动缓冲区越过trailer,剩余长度:', buffer.length);                                             } else {                                                 buffer = '';                                                 console.log('📦 完全清空缓冲区');                                             }                                                                                      } catch (deserializeError) {                                             console.error('❌ 反序列化错误:', deserializeError);                                             buffer = '';                                             break;                                         }                                     } else {                                         console.log('❌ 帧数据不完整或长度无效');                                         if (buffer.length < 200) {                                             break;                                         } else {                                             buffer = '';                                             break;                                         }                                     }                                 } else {                                     console.log('❌ 帧太短,等待更多数据');                                     break;                                 }                                                              } catch (parseError) {                                 console.error('❌ 处理流消息错误:', parseError);                                 buffer = '';                                 break;                             }                         }                                                  console.log('🔍 剩余缓冲区长度:', buffer.length);                     }                                          // 开始读取流                     return readStreamChunk();                 })                 .catch(error => {                     console.error('流获取错误:', error);                     stream.emit('error', error);                 });                          } catch (error) {             setTimeout(() => stream.emit('error', error), 0);         }          return stream;     };      console.log('gRPC-Web shim loaded successfully'); })();

 

下面就是简单的获取实时数据的业务逻辑了

`client.js`

// gRPC-Web Chat Client Implementation class RealtimePushClient {     constructor() {         this.client = null;         this.isConnected = false;         this.serverUrl = 'https://localhost:5201';                  // 流式传输相关属性         this.currentStream = null;         this.streamMessageCount = 0;         this.streamStartTime = null;                  this.initializeUI();     }      initializeUI() {         const streamButton = document.getElementById('streamButton');         const stopStreamButton = document.getElementById('stopStreamButton');         const clearButton = document.getElementById('clearButton');          streamButton.addEventListener('click', () => this.startStreamingChat());         stopStreamButton.addEventListener('click', () => this.stopStreaming());         clearButton.addEventListener('click', () => this.clearMessages());          // 初始化连接状态         this.updateConnectionStatus(false, '正在初始化...');          // 页面加载时尝试连接         this.connect();     }      connect() {         try {             // 初始化gRPC-Web客户端             console.log('正在初始化实时推送客户端...');                          // 检查必要的依赖是否可用             if (typeof jspb === 'undefined') {                 throw new Error('google-protobuf 库未加载');             }                          if (typeof grpc === 'undefined' || !grpc.web) {                 console.warn('grpc-web 库未完全加载,等待重试...');                 setTimeout(() => this.connect(), 1000);                 return;             }                          if (typeof proto === 'undefined' || !proto.chat || !proto.chat.ChatServiceClient) {                 throw new Error('gRPC 生成的客户端代码未加载');             }              // 创建gRPC-Web客户端             this.client = new proto.chat.ChatServiceClient(this.serverUrl, null, {                 format: 'text',                 withCredentials: false             });                          console.log('实时推送客户端创建成功');             this.updateConnectionStatus(true, '已连接');             this.addMessage('系统', '🚀 实时推送客户端已就绪', 'system');                      } catch (error) {             console.error('连接初始化失败:', error);             this.updateConnectionStatus(false, '初始化失败');             this.addMessage('系统', '初始化失败: ' + this.getErrorMessage(error), 'error');         }     }      startStreamingChat() {         if (!this.isConnected) {             this.addMessage('系统', '未连接到服务器,无法启动实时推送', 'error');             return;         }          if (!this.client) {             this.addMessage('系统', 'gRPC客户端未初始化', 'error');             return;         }          // 检查是否已在流式传输         if (this.currentStream) {             this.addMessage('系统', '实时推送已在运行中', 'system');             return;         }          try {             // 创建实时推送请求             const pushRequest = new proto.chat.RealtimePushRequest();             pushRequest.setClientId('web-client-' + Date.now());             pushRequest.setTimestamp(Math.floor(Date.now() / 1000));              console.log('启动实时推送:', {                 clientId: pushRequest.getClientId(),                 timestamp: pushRequest.getTimestamp()             });              // 添加流式传输的元数据             const metadata = {                 'x-user-agent': 'grpc-web-realtime-client'             };              // 开始流式传输             const stream = this.client.startRealtimePush(pushRequest, metadata);                          if (!stream) {                 throw new Error('无法创建实时推送连接');             }              // 存储流引用             this.currentStream = stream;             this.streamMessageCount = 0;             this.streamStartTime = Date.now();              // 更新UI显示流式传输已激活             this.updateStreamingUI(true);              stream.on('data', (response) => {                 if (response && typeof response.getData === 'function') {                     this.streamMessageCount++;                                          // 添加带有实时数据特殊样式的消息                     this.addRealtimeMessage(                         `[${response.getDataType()}] ${response.getData()}`,                          this.streamMessageCount                     );                                          // 更新统计信息                     this.updateStreamStats();                 }             });              stream.on('error', (error) => {                 console.error('实时推送错误:', error);                 this.addMessage('系统', '实时推送错误: ' + this.getErrorMessage(error), 'error');                 this.stopStreaming();             });              stream.on('end', () => {                 console.log('实时推送结束');                 this.addMessage('系统', '实时推送已结束', 'system');                 this.stopStreaming();             });              this.addMessage('系统', '🚀 实时数据推送已启动', 'system');                      } catch (error) {             console.error('启动实时推送失败:', error);             this.addMessage('系统', '启动实时推送失败: ' + this.getErrorMessage(error), 'error');         }     }      // 其他方法实现...     updateConnectionStatus(connected, message = '') {         const statusDiv = document.getElementById('status');         const streamButton = document.getElementById('streamButton');                  this.isConnected = connected;                  if (connected) {             statusDiv.textContent = '状态: 已连接' + (message ? ' - ' + message : '');             statusDiv.className = 'status connected';             streamButton.disabled = false;         } else {             statusDiv.textContent = '状态: 未连接' + (message ? ' - ' + message : '');             statusDiv.className = 'status disconnected';             streamButton.disabled = true;         }     }      addMessage(sender, content, type) {         const chatContainer = document.getElementById('chatContainer');         const messageDiv = document.createElement('div');         messageDiv.className = `message ${type}`;                  const timestamp = new Date().toLocaleTimeString();         messageDiv.innerHTML = `             <div><strong>${sender}</strong> <small>${timestamp}</small></div>             <div>${content}</div>         `;          chatContainer.appendChild(messageDiv);         chatContainer.scrollTop = chatContainer.scrollHeight;     }      addRealtimeMessage(content, count) {         const chatContainer = document.getElementById('chatContainer');         const messageDiv = document.createElement('div');         messageDiv.className = 'message realtime';                  const timestamp = new Date().toLocaleTimeString();         messageDiv.innerHTML = `             <div class="realtime-header">                 <strong>📡 实时数据 #${count}</strong>                  <small>${timestamp}</small>             </div>             <div class="realtime-content">${content}</div>         `;          chatContainer.appendChild(messageDiv);         chatContainer.scrollTop = chatContainer.scrollHeight;          // 保持最后100条消息以防止内存问题         const messages = chatContainer.querySelectorAll('.message');         if (messages.length > 100) {             for (let i = 0; i < messages.length - 100; i++) {                 messages[i].remove();             }         }     }      getErrorMessage(error) {         if (!error) return '未知错误';                  // 处理gRPC-Web特定错误         if (error.code !== undefined) {             const grpcErrorCodes = {                 0: 'OK',                 1: 'CANCELLED - 操作被取消',                 2: 'UNKNOWN - 未知错误',                 3: 'INVALID_ARGUMENT - 无效参数',                 4: 'DEADLINE_EXCEEDED - 请求超时',                 5: 'NOT_FOUND - 未找到',                 6: 'ALREADY_EXISTS - 已存在',                 7: 'PERMISSION_DENIED - 权限被拒绝',                 8: 'RESOURCE_EXHAUSTED - 资源耗尽',                 9: 'FAILED_PRECONDITION - 前置条件失败',                 10: 'ABORTED - 操作被中止',                 11: 'OUT_OF_RANGE - 超出范围',                 12: 'UNIMPLEMENTED - 未实现',                 13: 'INTERNAL - 内部错误',                 14: 'UNAVAILABLE - 服务不可用',                 15: 'DATA_LOSS - 数据丢失',                 16: 'UNAUTHENTICATED - 未认证'             };                          const codeDescription = grpcErrorCodes[error.code] || `未知错误代码: ${error.code}`;             return `gRPC错误: ${codeDescription}`;         }                  return error.message || error.toString();     } }  // 页面加载时初始化实时推送客户端 document.addEventListener('DOMContentLoaded', () => {     window.realtimePushClient = new RealtimePushClient(); });

 

最后创建一个html界面

`​index.html`

<!DOCTYPE html> <html lang="zh-CN"> <head>     <meta charset="UTF-8">     <meta name="viewport" content="width=device-width, initial-scale=1.0">     <title>gRPC-Web 实时数据推送</title>     <style>         body {             font-family: Arial, sans-serif;             max-width: 800px;             margin: 0 auto;             padding: 20px;             background-color: #f5f5f5;         }                  h1 {             color: #333;             text-align: center;             margin-bottom: 30px;         }                  .chat-container {             border: 1px solid #ccc;             height: 400px;             overflow-y: auto;             padding: 10px;             margin-bottom: 20px;             background-color: #fff;             border-radius: 8px;             box-shadow: 0 2px 4px rgba(0,0,0,0.1);         }                  .message {             margin-bottom: 10px;             padding: 8px;             border-radius: 5px;             border-left: 4px solid #ddd;         }          .system {             background-color: #fff3e0;             border-left-color: #ff9800;             text-align: center;             font-style: italic;         }                  .error {             background-color: #ffebee;             border-left-color: #f44336;             color: #c62828;             text-align: center;         }                  .realtime {             background-color: #e8f5e8;             border-left-color: #4caf50;             animation: fadeIn 0.3s ease-in;         }                  .realtime-header {             font-weight: bold;             color: #2e7d32;             margin-bottom: 5px;         }                  .realtime-content {             font-family: 'Courier New', monospace;             font-size: 0.9em;             color: #1b5e20;         }                  .input-container {             display: flex;             gap: 10px;             margin-top: 20px;         }                  button {             padding: 12px 24px;             border: none;             border-radius: 6px;             cursor: pointer;             font-size: 14px;             font-weight: bold;             transition: background-color 0.3s;         }                  #streamButton {             background-color: #4caf50;             color: white;         }                  #streamButton:hover:not(:disabled) {             background-color: #388e3c;         }                  #streamButton:disabled {             background-color: #cccccc;             cursor: not-allowed;             opacity: 0.6;         }                  #stopStreamButton {             background-color: #f44336;             color: white;         }                  #stopStreamButton:hover {             background-color: #d32f2f;         }                  #clearButton {             background-color: #757575;             color: white;         }                  #clearButton:hover {             background-color: #616161;         }                  .status {             margin-bottom: 15px;             padding: 10px;             border-radius: 6px;             font-weight: bold;             text-align: center;         }                  .connected {             background-color: #c8e6c9;             color: #2e7d32;             border: 1px solid #4caf50;         }                  .disconnected {             background-color: #ffcdd2;             color: #c62828;             border: 1px solid #f44336;         }                  .stream-stats {             background-color: #f3e5f5;             padding: 10px;             margin: 10px 0;             border-radius: 6px;             font-size: 0.9em;             color: #4a148c;             border: 1px solid #9c27b0;         }                  @keyframes fadeIn {             from { opacity: 0; transform: translateY(-10px); }             to { opacity: 1; transform: translateY(0); }         }     </style> </head> <body>     <h1>🚀 gRPC-Web 实时数据推送系统</h1>          <div id="status" class="status disconnected">         状态: 未连接     </div>          <div id="chatContainer" class="chat-container">         <div class="loading">正在初始化客户端...</div>     </div>          <div class="input-container">         <button id="streamButton">🚀 启动实时推送</button>         <button id="stopStreamButton" style="display: none;">⏹️ 停止推送</button>         <button id="clearButton">🗑️ 清空消息</button>     </div>      <!-- 引入依赖库 -->     <script src="https://unpkg.com/google-protobuf@3.21.2/google-protobuf.js"></script>          <!-- 本地gRPC-Web兼容层 -->     <script src="./grpc-web-shim.js"></script>          <!-- 浏览器兼容的gRPC-Web文件 -->     <script src="./generated/chat_pb_browser.js"></script>     <script src="./generated/chat_grpc_web_pb_browser.js"></script>          <!-- 主要客户端脚本 -->     <script src="./client.js"></script> </body> </html>

 

直接双击index.html,或者通过http.server启动服务就能愉快的接收推送的实时数据了

NetCore+Web客户端实现gRPC实时推送

 

跟其他推送送相比,类型安全,性能高,压缩传输等等,但是前端支持相对没那么友好。

 

发表评论

评论已关闭。

相关文章