From 87518af56282d664b29483c9b234ba0f9816a6da Mon Sep 17 00:00:00 2001 From: ccnetcore Date: Sat, 10 Jan 2026 00:22:57 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=96=B0=E5=A2=9E=E7=BB=9F=E4=B8=80?= =?UTF-8?q?=E6=B5=81=E5=BC=8F=E8=BD=AC=E5=8F=91=E4=B8=8E=E7=BB=9F=E8=AE=A1?= =?UTF-8?q?=E8=83=BD=E5=8A=9B=EF=BC=8C=E6=94=AF=E6=8C=81=E5=A4=9AAPI?= =?UTF-8?q?=E7=B1=BB=E5=9E=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 新增统一流式处理机制,支持 Completions、Anthropic Messages、OpenAI Responses、Gemini GenerateContent 四种 API 的原封不动 SSE 转发 统一处理 token 用量统计、倍率计算、尊享包扣费与消息记录 新增统一发送接口 ai-chat/unified/send,支持从请求体自动解析模型 ID 提升多模型流式接入的一致性与扩展性 --- .../Services/Chat/AiChatService.cs | 86 ++++ .../Managers/AiGateWayManager.cs | 369 ++++++++++++++++++ 2 files changed, 455 insertions(+) diff --git a/Yi.Abp.Net8/module/ai-hub/Yi.Framework.AiHub.Application/Services/Chat/AiChatService.cs b/Yi.Abp.Net8/module/ai-hub/Yi.Framework.AiHub.Application/Services/Chat/AiChatService.cs index 996cd0b5..7ce897c7 100644 --- a/Yi.Abp.Net8/module/ai-hub/Yi.Framework.AiHub.Application/Services/Chat/AiChatService.cs +++ b/Yi.Abp.Net8/module/ai-hub/Yi.Framework.AiHub.Application/Services/Chat/AiChatService.cs @@ -25,6 +25,7 @@ using Yi.Framework.AiHub.Domain.Entities.Model; using Yi.Framework.AiHub.Domain.Extensions; using Yi.Framework.AiHub.Domain.Managers; using Yi.Framework.AiHub.Domain.Shared.Consts; +using System.Text.Json; using Yi.Framework.AiHub.Domain.Shared.Dtos; using Yi.Framework.AiHub.Domain.Shared.Dtos.OpenAi; using Yi.Framework.AiHub.Domain.Shared.Enums; @@ -270,4 +271,89 @@ public class AiChatService : ApplicationService var data = await _agentStoreRepository.GetFirstAsync(x => x.SessionId == sessionId); return data?.Store; } + + /// + /// 统一发送消息 - 支持4种API类型 + /// + /// API类型枚举 + /// 原始请求体JsonElement + /// 模型ID(Gemini格式需要从URL传入) + /// 会话ID + /// + [HttpPost("ai-chat/unified/send")] + public async Task PostUnifiedSendAsync( + [FromQuery] ModelApiTypeEnum apiType, + [FromBody] JsonElement input, + [FromQuery] string modelId, + [FromQuery] Guid? sessionId, + CancellationToken cancellationToken) + { + // 从请求体中提取模型ID(如果未从URL传入) + if (string.IsNullOrEmpty(modelId)) + { + modelId = ExtractModelIdFromRequest(apiType, input); + } + + // 除了免费模型,其他的模型都要校验 + if (modelId != FreeModelId) + { + if (CurrentUser.IsAuthenticated) + { + await _aiBlacklistManager.VerifiyAiBlacklist(CurrentUser.GetId()); + if (!CurrentUser.IsAiVip()) + { + throw new UserFriendlyException("该模型需要VIP用户才能使用,请购买VIP后重新登录重试"); + } + } + else + { + throw new UserFriendlyException("未登录用户,只能使用未加速的DeepSeek-R1,请登录后重试"); + } + } + + // 如果是尊享包服务,需要校验是否尊享包足够 + if (CurrentUser.IsAuthenticated) + { + var isPremium = await _modelManager.IsPremiumModelAsync(modelId); + if (isPremium) + { + var availableTokens = await _premiumPackageManager.GetAvailableTokensAsync(CurrentUser.GetId()); + if (availableTokens <= 0) + { + throw new UserFriendlyException("尊享token包用量不足,请先购买尊享token包"); + } + } + } + + // 调用统一流式处理 + await _aiGateWayManager.UnifiedStreamForStatisticsAsync( + _httpContextAccessor.HttpContext!, + apiType, + input, + modelId, + CurrentUser.Id, + sessionId, + null, + CancellationToken.None); + } + + /// + /// 从请求体中提取模型ID + /// + private string ExtractModelIdFromRequest(ModelApiTypeEnum apiType, JsonElement input) + { + try + { + if (input.TryGetProperty("model", out var modelProperty)) + { + return modelProperty.GetString() ?? string.Empty; + } + } + catch + { + // 忽略解析错误 + } + + throw new UserFriendlyException("无法从请求中获取模型ID,请在URL参数中指定modelId"); + } } \ No newline at end of file diff --git a/Yi.Abp.Net8/module/ai-hub/Yi.Framework.AiHub.Domain/Managers/AiGateWayManager.cs b/Yi.Abp.Net8/module/ai-hub/Yi.Framework.AiHub.Domain/Managers/AiGateWayManager.cs index d6302dbb..8c51722f 100644 --- a/Yi.Abp.Net8/module/ai-hub/Yi.Framework.AiHub.Domain/Managers/AiGateWayManager.cs +++ b/Yi.Abp.Net8/module/ai-hub/Yi.Framework.AiHub.Domain/Managers/AiGateWayManager.cs @@ -1122,6 +1122,375 @@ public class AiGateWayManager : DomainService await _imageStoreTaskRepository.UpdateAsync(imageStoreTask); } + /// + /// 统一流式处理 - 支持4种API类型的原封不动转发 + /// + public async Task UnifiedStreamForStatisticsAsync( + HttpContext httpContext, + ModelApiTypeEnum apiType, + JsonElement requestBody, + string modelId, + Guid? userId = null, + Guid? sessionId = null, + Guid? tokenId = null, + CancellationToken cancellationToken = default) + { + var response = httpContext.Response; + // 设置响应头,声明是 SSE 流 + response.ContentType = "text/event-stream;charset=utf-8;"; + response.Headers.TryAdd("Cache-Control", "no-cache"); + response.Headers.TryAdd("Connection", "keep-alive"); + + var sourceModelId = modelId; + // 处理 yi- 前缀 + if (!string.IsNullOrEmpty(modelId) && + modelId.StartsWith("yi-", StringComparison.OrdinalIgnoreCase)) + { + modelId = modelId[3..]; + } + + var modelDescribe = await GetModelAsync(apiType, sourceModelId); + + // 公共缓存队列 + var messageQueue = new ConcurrentQueue(); + var outputInterval = TimeSpan.FromMilliseconds(75); + var isComplete = false; + + // 公共消费任务 + var outputTask = Task.Run(async () => + { + while (!(isComplete && messageQueue.IsEmpty)) + { + if (messageQueue.TryDequeue(out var message)) + { + await response.WriteAsync(message, Encoding.UTF8, cancellationToken).ConfigureAwait(false); + await response.Body.FlushAsync(cancellationToken).ConfigureAwait(false); + } + + if (!isComplete) + { + await Task.Delay(outputInterval, cancellationToken).ConfigureAwait(false); + } + else + { + await Task.Delay(10, cancellationToken).ConfigureAwait(false); + } + } + }, cancellationToken); + + ThorUsageResponse? tokenUsage = null; + + switch (apiType) + { + case ModelApiTypeEnum.Completions: + tokenUsage = await ProcessCompletionsStreamAsync(messageQueue, requestBody, modelDescribe, cancellationToken); + break; + case ModelApiTypeEnum.Messages: + tokenUsage = await ProcessAnthropicStreamAsync(messageQueue, requestBody, modelDescribe, cancellationToken); + break; + case ModelApiTypeEnum.Responses: + tokenUsage = await ProcessOpenAiResponsesStreamAsync(messageQueue, requestBody, modelDescribe, cancellationToken); + break; + case ModelApiTypeEnum.GenerateContent: + tokenUsage = await ProcessGeminiStreamAsync(messageQueue, requestBody, modelDescribe, cancellationToken); + break; + default: + throw new UserFriendlyException($"不支持的API类型: {apiType}"); + } + + // 标记完成并等待消费任务结束 + isComplete = true; + await outputTask; + + // 统一的统计处理 + await _aiMessageManager.CreateUserMessageAsync(userId, sessionId, + new MessageInputDto + { + Content = "不予存储", + ModelId = sourceModelId, + TokenUsage = tokenUsage, + }, tokenId); + + await _aiMessageManager.CreateSystemMessageAsync(userId, sessionId, + new MessageInputDto + { + Content = "不予存储", + ModelId = sourceModelId, + TokenUsage = tokenUsage + }, tokenId); + + await _usageStatisticsManager.SetUsageAsync(userId, sourceModelId, tokenUsage, tokenId); + + // 扣减尊享token包用量 + if (userId.HasValue && tokenUsage is not null && modelDescribe.IsPremium) + { + var totalTokens = tokenUsage.TotalTokens ?? 0; + if (totalTokens > 0) + { + await PremiumPackageManager.TryConsumeTokensAsync(userId.Value, totalTokens); + } + } + } + + #region 统一流式处理 - 各API类型的具体实现 + + /// + /// 处理 OpenAI Completions 格式流式响应 + /// + private async Task ProcessCompletionsStreamAsync( + ConcurrentQueue messageQueue, + JsonElement requestBody, + AiModelDescribe modelDescribe, + CancellationToken cancellationToken) + { + var request = requestBody.Deserialize(ThorJsonSerializer.DefaultOptions)!; + _specialCompatible.Compatible(request); + + // 处理 yi- 前缀 + if (!string.IsNullOrEmpty(request.Model) && + request.Model.StartsWith("yi-", StringComparison.OrdinalIgnoreCase)) + { + request.Model = request.Model[3..]; + } + + var chatService = LazyServiceProvider.GetRequiredKeyedService(modelDescribe.HandlerName); + var completeChatResponse = chatService.CompleteChatStreamAsync(modelDescribe, request, cancellationToken); + var tokenUsage = new ThorUsageResponse(); + + try + { + await foreach (var data in completeChatResponse) + { + data.SupplementalMultiplier(modelDescribe.Multiplier); + if (data.Usage is not null && (data.Usage.CompletionTokens > 0 || data.Usage.OutputTokens > 0)) + { + tokenUsage = data.Usage; + } + + var message = JsonSerializer.Serialize(data, ThorJsonSerializer.DefaultOptions); + messageQueue.Enqueue($"data: {message}\n\n"); + } + } + catch (Exception e) + { + _logger.LogError(e, "Ai对话异常"); + var errorContent = $"对话Ai异常,异常信息:\n当前Ai模型:{request.Model}\n异常信息:{e.Message}\n异常堆栈:{e}"; + var model = new ThorChatCompletionsResponse() + { + Choices = new List() + { + new ThorChatChoiceResponse() + { + Delta = new ThorChatMessage() + { + Content = errorContent + } + } + } + }; + var errorMessage = JsonConvert.SerializeObject(model, new JsonSerializerSettings + { + ContractResolver = new CamelCasePropertyNamesContractResolver() + }); + messageQueue.Enqueue($"data: {errorMessage}\n\n"); + } + + messageQueue.Enqueue("data: [DONE]\n\n"); + return tokenUsage; + } + + /// + /// 处理 Anthropic Messages 格式流式响应 + /// + private async Task ProcessAnthropicStreamAsync( + ConcurrentQueue messageQueue, + JsonElement requestBody, + AiModelDescribe modelDescribe, + CancellationToken cancellationToken) + { + var request = requestBody.Deserialize(ThorJsonSerializer.DefaultOptions)!; + _specialCompatible.AnthropicCompatible(request); + + // 处理 yi- 前缀 + if (!string.IsNullOrEmpty(request.Model) && + request.Model.StartsWith("yi-", StringComparison.OrdinalIgnoreCase)) + { + request.Model = request.Model[3..]; + } + + var chatService = LazyServiceProvider.GetRequiredKeyedService(modelDescribe.HandlerName); + var completeChatResponse = chatService.StreamChatCompletionsAsync(modelDescribe, request, cancellationToken); + var tokenUsage = new ThorUsageResponse(); + + try + { + await foreach (var responseResult in completeChatResponse) + { + // 部分供应商message_start放一部分 + if (responseResult.Item1.Contains("message_start")) + { + var currentTokenUsage = responseResult.Item2?.Message?.Usage; + if (currentTokenUsage != null) + { + if ((currentTokenUsage.InputTokens ?? 0) != 0) + { + tokenUsage.InputTokens = (currentTokenUsage.InputTokens ?? 0) + + (currentTokenUsage.CacheCreationInputTokens ?? 0) + + (currentTokenUsage.CacheReadInputTokens ?? 0); + } + if ((currentTokenUsage.OutputTokens ?? 0) != 0) + { + tokenUsage.OutputTokens = currentTokenUsage.OutputTokens; + } + } + } + + // message_delta又放一部分 + if (responseResult.Item1.Contains("message_delta")) + { + var currentTokenUsage = responseResult.Item2?.Usage; + if (currentTokenUsage != null) + { + if ((currentTokenUsage.InputTokens ?? 0) != 0) + { + tokenUsage.InputTokens = (currentTokenUsage.InputTokens ?? 0) + + (currentTokenUsage.CacheCreationInputTokens ?? 0) + + (currentTokenUsage.CacheReadInputTokens ?? 0); + } + if ((currentTokenUsage.OutputTokens ?? 0) != 0) + { + tokenUsage.OutputTokens = currentTokenUsage.OutputTokens; + } + } + } + + // 序列化为SSE格式字符串 + var data = JsonSerializer.Serialize(responseResult.Item2, ThorJsonSerializer.DefaultOptions); + messageQueue.Enqueue($"{responseResult.Item1.Trim()}\ndata: {data}\n\n"); + } + } + catch (Exception e) + { + _logger.LogError(e, "Ai对话异常"); + var errorContent = $"对话Ai异常,异常信息:\n当前Ai模型:{request.Model}\n异常信息:{e.Message}\n异常堆栈:{e}"; + throw new UserFriendlyException(errorContent); + } + + tokenUsage.TotalTokens = (tokenUsage.InputTokens ?? 0) + (tokenUsage.OutputTokens ?? 0); + tokenUsage.SetSupplementalMultiplier(modelDescribe.Multiplier); + + return tokenUsage; + } + + /// + /// 处理 OpenAI Responses 格式流式响应 + /// + private async Task ProcessOpenAiResponsesStreamAsync( + ConcurrentQueue messageQueue, + JsonElement requestBody, + AiModelDescribe modelDescribe, + CancellationToken cancellationToken) + { + var request = requestBody.Deserialize(ThorJsonSerializer.DefaultOptions)!; + + // 处理 yi- 前缀 + if (!string.IsNullOrEmpty(request.Model) && + request.Model.StartsWith("yi-", StringComparison.OrdinalIgnoreCase)) + { + request.Model = request.Model[3..]; + } + + var chatService = LazyServiceProvider.GetRequiredKeyedService(modelDescribe.HandlerName); + var completeChatResponse = chatService.ResponsesStreamAsync(modelDescribe, request, cancellationToken); + ThorUsageResponse? tokenUsage = null; + + try + { + await foreach (var responseResult in completeChatResponse) + { + if (responseResult.Item1.Contains("response.completed")) + { + var obj = responseResult.Item2!.Value; + int inputTokens = obj.GetPath("response", "usage", "input_tokens").GetInt(); + int outputTokens = obj.GetPath("response", "usage", "output_tokens").GetInt(); + inputTokens = Convert.ToInt32(inputTokens * modelDescribe.Multiplier); + outputTokens = Convert.ToInt32(outputTokens * modelDescribe.Multiplier); + tokenUsage = new ThorUsageResponse + { + PromptTokens = inputTokens, + InputTokens = inputTokens, + OutputTokens = outputTokens, + CompletionTokens = outputTokens, + TotalTokens = inputTokens + outputTokens, + }; + } + + // 序列化为SSE格式字符串 + var data = JsonSerializer.Serialize(responseResult.Item2, ThorJsonSerializer.DefaultOptions); + messageQueue.Enqueue($"{responseResult.Item1.Trim()}\ndata: {data}\n\n"); + } + } + catch (Exception e) + { + _logger.LogError(e, "Ai响应异常"); + var errorContent = $"响应Ai异常,异常信息:\n当前Ai模型:{request.Model}\n异常信息:{e.Message}\n异常堆栈:{e}"; + throw new UserFriendlyException(errorContent); + } + + return tokenUsage ?? new ThorUsageResponse(); + } + + /// + /// 处理 Gemini GenerateContent 格式流式响应 + /// + private async Task ProcessGeminiStreamAsync( + ConcurrentQueue messageQueue, + JsonElement requestBody, + AiModelDescribe modelDescribe, + CancellationToken cancellationToken) + { + var chatService = LazyServiceProvider.GetRequiredKeyedService(modelDescribe.HandlerName); + var completeChatResponse = chatService.GenerateContentStreamAsync(modelDescribe, requestBody, cancellationToken); + ThorUsageResponse? tokenUsage = null; + + try + { + await foreach (var responseResult in completeChatResponse) + { + if (responseResult!.Value.GetPath("candidates", 0, "finishReason").GetString() == "STOP") + { + tokenUsage = GeminiGenerateContentAcquirer.GetUsage(responseResult!.Value); + // 如果是图片模型,单独扣费 + if (modelDescribe.ModelType == ModelTypeEnum.Image) + { + tokenUsage = new ThorUsageResponse + { + InputTokens = (int)modelDescribe.Multiplier, + OutputTokens = (int)modelDescribe.Multiplier, + TotalTokens = (int)modelDescribe.Multiplier + }; + } + else + { + tokenUsage.SetSupplementalMultiplier(modelDescribe.Multiplier); + } + } + + messageQueue.Enqueue($"data: {JsonSerializer.Serialize(responseResult)}\n\n"); + } + } + catch (Exception e) + { + _logger.LogError(e, "Ai生成异常"); + var errorContent = $"生成Ai异常,异常信息:\n当前Ai模型:{modelDescribe.ModelId}\n异常信息:{e.Message}\n异常堆栈:{e}"; + throw new UserFriendlyException(errorContent); + } + + return tokenUsage ?? new ThorUsageResponse(); + } + + #endregion + #region 流式传输格式Http响应 private static readonly byte[] EventPrefix = "event: "u8.ToArray();