diff --git a/Yi.Abp.Net8/module/ai-hub/Yi.Framework.AiHub.Application/Services/Chat/AiChatService.cs b/Yi.Abp.Net8/module/ai-hub/Yi.Framework.AiHub.Application/Services/Chat/AiChatService.cs
index 996cd0b5..7ce897c7 100644
--- a/Yi.Abp.Net8/module/ai-hub/Yi.Framework.AiHub.Application/Services/Chat/AiChatService.cs
+++ b/Yi.Abp.Net8/module/ai-hub/Yi.Framework.AiHub.Application/Services/Chat/AiChatService.cs
@@ -25,6 +25,7 @@ using Yi.Framework.AiHub.Domain.Entities.Model;
using Yi.Framework.AiHub.Domain.Extensions;
using Yi.Framework.AiHub.Domain.Managers;
using Yi.Framework.AiHub.Domain.Shared.Consts;
+using System.Text.Json;
using Yi.Framework.AiHub.Domain.Shared.Dtos;
using Yi.Framework.AiHub.Domain.Shared.Dtos.OpenAi;
using Yi.Framework.AiHub.Domain.Shared.Enums;
@@ -270,4 +271,89 @@ public class AiChatService : ApplicationService
var data = await _agentStoreRepository.GetFirstAsync(x => x.SessionId == sessionId);
return data?.Store;
}
+
+ ///
+ /// 统一发送消息 - 支持4种API类型
+ ///
+ /// API类型枚举
+ /// 原始请求体JsonElement
+ /// 模型ID(Gemini格式需要从URL传入)
+ /// 会话ID
+ ///
+ [HttpPost("ai-chat/unified/send")]
+ public async Task PostUnifiedSendAsync(
+ [FromQuery] ModelApiTypeEnum apiType,
+ [FromBody] JsonElement input,
+ [FromQuery] string modelId,
+ [FromQuery] Guid? sessionId,
+ CancellationToken cancellationToken)
+ {
+ // 从请求体中提取模型ID(如果未从URL传入)
+ if (string.IsNullOrEmpty(modelId))
+ {
+ modelId = ExtractModelIdFromRequest(apiType, input);
+ }
+
+ // 除了免费模型,其他的模型都要校验
+ if (modelId != FreeModelId)
+ {
+ if (CurrentUser.IsAuthenticated)
+ {
+ await _aiBlacklistManager.VerifiyAiBlacklist(CurrentUser.GetId());
+ if (!CurrentUser.IsAiVip())
+ {
+ throw new UserFriendlyException("该模型需要VIP用户才能使用,请购买VIP后重新登录重试");
+ }
+ }
+ else
+ {
+ throw new UserFriendlyException("未登录用户,只能使用未加速的DeepSeek-R1,请登录后重试");
+ }
+ }
+
+ // 如果是尊享包服务,需要校验是否尊享包足够
+ if (CurrentUser.IsAuthenticated)
+ {
+ var isPremium = await _modelManager.IsPremiumModelAsync(modelId);
+ if (isPremium)
+ {
+ var availableTokens = await _premiumPackageManager.GetAvailableTokensAsync(CurrentUser.GetId());
+ if (availableTokens <= 0)
+ {
+ throw new UserFriendlyException("尊享token包用量不足,请先购买尊享token包");
+ }
+ }
+ }
+
+ // 调用统一流式处理
+ await _aiGateWayManager.UnifiedStreamForStatisticsAsync(
+ _httpContextAccessor.HttpContext!,
+ apiType,
+ input,
+ modelId,
+ CurrentUser.Id,
+ sessionId,
+ null,
+ CancellationToken.None);
+ }
+
+ ///
+ /// 从请求体中提取模型ID
+ ///
+ private string ExtractModelIdFromRequest(ModelApiTypeEnum apiType, JsonElement input)
+ {
+ try
+ {
+ if (input.TryGetProperty("model", out var modelProperty))
+ {
+ return modelProperty.GetString() ?? string.Empty;
+ }
+ }
+ catch
+ {
+ // 忽略解析错误
+ }
+
+ throw new UserFriendlyException("无法从请求中获取模型ID,请在URL参数中指定modelId");
+ }
}
\ No newline at end of file
diff --git a/Yi.Abp.Net8/module/ai-hub/Yi.Framework.AiHub.Domain/Managers/AiGateWayManager.cs b/Yi.Abp.Net8/module/ai-hub/Yi.Framework.AiHub.Domain/Managers/AiGateWayManager.cs
index d6302dbb..8c51722f 100644
--- a/Yi.Abp.Net8/module/ai-hub/Yi.Framework.AiHub.Domain/Managers/AiGateWayManager.cs
+++ b/Yi.Abp.Net8/module/ai-hub/Yi.Framework.AiHub.Domain/Managers/AiGateWayManager.cs
@@ -1122,6 +1122,375 @@ public class AiGateWayManager : DomainService
await _imageStoreTaskRepository.UpdateAsync(imageStoreTask);
}
+ ///
+ /// 统一流式处理 - 支持4种API类型的原封不动转发
+ ///
+ public async Task UnifiedStreamForStatisticsAsync(
+ HttpContext httpContext,
+ ModelApiTypeEnum apiType,
+ JsonElement requestBody,
+ string modelId,
+ Guid? userId = null,
+ Guid? sessionId = null,
+ Guid? tokenId = null,
+ CancellationToken cancellationToken = default)
+ {
+ var response = httpContext.Response;
+ // 设置响应头,声明是 SSE 流
+ response.ContentType = "text/event-stream;charset=utf-8;";
+ response.Headers.TryAdd("Cache-Control", "no-cache");
+ response.Headers.TryAdd("Connection", "keep-alive");
+
+ var sourceModelId = modelId;
+ // 处理 yi- 前缀
+ if (!string.IsNullOrEmpty(modelId) &&
+ modelId.StartsWith("yi-", StringComparison.OrdinalIgnoreCase))
+ {
+ modelId = modelId[3..];
+ }
+
+ var modelDescribe = await GetModelAsync(apiType, sourceModelId);
+
+ // 公共缓存队列
+ var messageQueue = new ConcurrentQueue();
+ var outputInterval = TimeSpan.FromMilliseconds(75);
+ var isComplete = false;
+
+ // 公共消费任务
+ var outputTask = Task.Run(async () =>
+ {
+ while (!(isComplete && messageQueue.IsEmpty))
+ {
+ if (messageQueue.TryDequeue(out var message))
+ {
+ await response.WriteAsync(message, Encoding.UTF8, cancellationToken).ConfigureAwait(false);
+ await response.Body.FlushAsync(cancellationToken).ConfigureAwait(false);
+ }
+
+ if (!isComplete)
+ {
+ await Task.Delay(outputInterval, cancellationToken).ConfigureAwait(false);
+ }
+ else
+ {
+ await Task.Delay(10, cancellationToken).ConfigureAwait(false);
+ }
+ }
+ }, cancellationToken);
+
+ ThorUsageResponse? tokenUsage = null;
+
+ switch (apiType)
+ {
+ case ModelApiTypeEnum.Completions:
+ tokenUsage = await ProcessCompletionsStreamAsync(messageQueue, requestBody, modelDescribe, cancellationToken);
+ break;
+ case ModelApiTypeEnum.Messages:
+ tokenUsage = await ProcessAnthropicStreamAsync(messageQueue, requestBody, modelDescribe, cancellationToken);
+ break;
+ case ModelApiTypeEnum.Responses:
+ tokenUsage = await ProcessOpenAiResponsesStreamAsync(messageQueue, requestBody, modelDescribe, cancellationToken);
+ break;
+ case ModelApiTypeEnum.GenerateContent:
+ tokenUsage = await ProcessGeminiStreamAsync(messageQueue, requestBody, modelDescribe, cancellationToken);
+ break;
+ default:
+ throw new UserFriendlyException($"不支持的API类型: {apiType}");
+ }
+
+ // 标记完成并等待消费任务结束
+ isComplete = true;
+ await outputTask;
+
+ // 统一的统计处理
+ await _aiMessageManager.CreateUserMessageAsync(userId, sessionId,
+ new MessageInputDto
+ {
+ Content = "不予存储",
+ ModelId = sourceModelId,
+ TokenUsage = tokenUsage,
+ }, tokenId);
+
+ await _aiMessageManager.CreateSystemMessageAsync(userId, sessionId,
+ new MessageInputDto
+ {
+ Content = "不予存储",
+ ModelId = sourceModelId,
+ TokenUsage = tokenUsage
+ }, tokenId);
+
+ await _usageStatisticsManager.SetUsageAsync(userId, sourceModelId, tokenUsage, tokenId);
+
+ // 扣减尊享token包用量
+ if (userId.HasValue && tokenUsage is not null && modelDescribe.IsPremium)
+ {
+ var totalTokens = tokenUsage.TotalTokens ?? 0;
+ if (totalTokens > 0)
+ {
+ await PremiumPackageManager.TryConsumeTokensAsync(userId.Value, totalTokens);
+ }
+ }
+ }
+
+ #region 统一流式处理 - 各API类型的具体实现
+
+ ///
+ /// 处理 OpenAI Completions 格式流式响应
+ ///
+ private async Task ProcessCompletionsStreamAsync(
+ ConcurrentQueue messageQueue,
+ JsonElement requestBody,
+ AiModelDescribe modelDescribe,
+ CancellationToken cancellationToken)
+ {
+ var request = requestBody.Deserialize(ThorJsonSerializer.DefaultOptions)!;
+ _specialCompatible.Compatible(request);
+
+ // 处理 yi- 前缀
+ if (!string.IsNullOrEmpty(request.Model) &&
+ request.Model.StartsWith("yi-", StringComparison.OrdinalIgnoreCase))
+ {
+ request.Model = request.Model[3..];
+ }
+
+ var chatService = LazyServiceProvider.GetRequiredKeyedService(modelDescribe.HandlerName);
+ var completeChatResponse = chatService.CompleteChatStreamAsync(modelDescribe, request, cancellationToken);
+ var tokenUsage = new ThorUsageResponse();
+
+ try
+ {
+ await foreach (var data in completeChatResponse)
+ {
+ data.SupplementalMultiplier(modelDescribe.Multiplier);
+ if (data.Usage is not null && (data.Usage.CompletionTokens > 0 || data.Usage.OutputTokens > 0))
+ {
+ tokenUsage = data.Usage;
+ }
+
+ var message = JsonSerializer.Serialize(data, ThorJsonSerializer.DefaultOptions);
+ messageQueue.Enqueue($"data: {message}\n\n");
+ }
+ }
+ catch (Exception e)
+ {
+ _logger.LogError(e, "Ai对话异常");
+ var errorContent = $"对话Ai异常,异常信息:\n当前Ai模型:{request.Model}\n异常信息:{e.Message}\n异常堆栈:{e}";
+ var model = new ThorChatCompletionsResponse()
+ {
+ Choices = new List()
+ {
+ new ThorChatChoiceResponse()
+ {
+ Delta = new ThorChatMessage()
+ {
+ Content = errorContent
+ }
+ }
+ }
+ };
+ var errorMessage = JsonConvert.SerializeObject(model, new JsonSerializerSettings
+ {
+ ContractResolver = new CamelCasePropertyNamesContractResolver()
+ });
+ messageQueue.Enqueue($"data: {errorMessage}\n\n");
+ }
+
+ messageQueue.Enqueue("data: [DONE]\n\n");
+ return tokenUsage;
+ }
+
+ ///
+ /// 处理 Anthropic Messages 格式流式响应
+ ///
+ private async Task ProcessAnthropicStreamAsync(
+ ConcurrentQueue messageQueue,
+ JsonElement requestBody,
+ AiModelDescribe modelDescribe,
+ CancellationToken cancellationToken)
+ {
+ var request = requestBody.Deserialize(ThorJsonSerializer.DefaultOptions)!;
+ _specialCompatible.AnthropicCompatible(request);
+
+ // 处理 yi- 前缀
+ if (!string.IsNullOrEmpty(request.Model) &&
+ request.Model.StartsWith("yi-", StringComparison.OrdinalIgnoreCase))
+ {
+ request.Model = request.Model[3..];
+ }
+
+ var chatService = LazyServiceProvider.GetRequiredKeyedService(modelDescribe.HandlerName);
+ var completeChatResponse = chatService.StreamChatCompletionsAsync(modelDescribe, request, cancellationToken);
+ var tokenUsage = new ThorUsageResponse();
+
+ try
+ {
+ await foreach (var responseResult in completeChatResponse)
+ {
+ // 部分供应商message_start放一部分
+ if (responseResult.Item1.Contains("message_start"))
+ {
+ var currentTokenUsage = responseResult.Item2?.Message?.Usage;
+ if (currentTokenUsage != null)
+ {
+ if ((currentTokenUsage.InputTokens ?? 0) != 0)
+ {
+ tokenUsage.InputTokens = (currentTokenUsage.InputTokens ?? 0) +
+ (currentTokenUsage.CacheCreationInputTokens ?? 0) +
+ (currentTokenUsage.CacheReadInputTokens ?? 0);
+ }
+ if ((currentTokenUsage.OutputTokens ?? 0) != 0)
+ {
+ tokenUsage.OutputTokens = currentTokenUsage.OutputTokens;
+ }
+ }
+ }
+
+ // message_delta又放一部分
+ if (responseResult.Item1.Contains("message_delta"))
+ {
+ var currentTokenUsage = responseResult.Item2?.Usage;
+ if (currentTokenUsage != null)
+ {
+ if ((currentTokenUsage.InputTokens ?? 0) != 0)
+ {
+ tokenUsage.InputTokens = (currentTokenUsage.InputTokens ?? 0) +
+ (currentTokenUsage.CacheCreationInputTokens ?? 0) +
+ (currentTokenUsage.CacheReadInputTokens ?? 0);
+ }
+ if ((currentTokenUsage.OutputTokens ?? 0) != 0)
+ {
+ tokenUsage.OutputTokens = currentTokenUsage.OutputTokens;
+ }
+ }
+ }
+
+ // 序列化为SSE格式字符串
+ var data = JsonSerializer.Serialize(responseResult.Item2, ThorJsonSerializer.DefaultOptions);
+ messageQueue.Enqueue($"{responseResult.Item1.Trim()}\ndata: {data}\n\n");
+ }
+ }
+ catch (Exception e)
+ {
+ _logger.LogError(e, "Ai对话异常");
+ var errorContent = $"对话Ai异常,异常信息:\n当前Ai模型:{request.Model}\n异常信息:{e.Message}\n异常堆栈:{e}";
+ throw new UserFriendlyException(errorContent);
+ }
+
+ tokenUsage.TotalTokens = (tokenUsage.InputTokens ?? 0) + (tokenUsage.OutputTokens ?? 0);
+ tokenUsage.SetSupplementalMultiplier(modelDescribe.Multiplier);
+
+ return tokenUsage;
+ }
+
+ ///
+ /// 处理 OpenAI Responses 格式流式响应
+ ///
+ private async Task ProcessOpenAiResponsesStreamAsync(
+ ConcurrentQueue messageQueue,
+ JsonElement requestBody,
+ AiModelDescribe modelDescribe,
+ CancellationToken cancellationToken)
+ {
+ var request = requestBody.Deserialize(ThorJsonSerializer.DefaultOptions)!;
+
+ // 处理 yi- 前缀
+ if (!string.IsNullOrEmpty(request.Model) &&
+ request.Model.StartsWith("yi-", StringComparison.OrdinalIgnoreCase))
+ {
+ request.Model = request.Model[3..];
+ }
+
+ var chatService = LazyServiceProvider.GetRequiredKeyedService(modelDescribe.HandlerName);
+ var completeChatResponse = chatService.ResponsesStreamAsync(modelDescribe, request, cancellationToken);
+ ThorUsageResponse? tokenUsage = null;
+
+ try
+ {
+ await foreach (var responseResult in completeChatResponse)
+ {
+ if (responseResult.Item1.Contains("response.completed"))
+ {
+ var obj = responseResult.Item2!.Value;
+ int inputTokens = obj.GetPath("response", "usage", "input_tokens").GetInt();
+ int outputTokens = obj.GetPath("response", "usage", "output_tokens").GetInt();
+ inputTokens = Convert.ToInt32(inputTokens * modelDescribe.Multiplier);
+ outputTokens = Convert.ToInt32(outputTokens * modelDescribe.Multiplier);
+ tokenUsage = new ThorUsageResponse
+ {
+ PromptTokens = inputTokens,
+ InputTokens = inputTokens,
+ OutputTokens = outputTokens,
+ CompletionTokens = outputTokens,
+ TotalTokens = inputTokens + outputTokens,
+ };
+ }
+
+ // 序列化为SSE格式字符串
+ var data = JsonSerializer.Serialize(responseResult.Item2, ThorJsonSerializer.DefaultOptions);
+ messageQueue.Enqueue($"{responseResult.Item1.Trim()}\ndata: {data}\n\n");
+ }
+ }
+ catch (Exception e)
+ {
+ _logger.LogError(e, "Ai响应异常");
+ var errorContent = $"响应Ai异常,异常信息:\n当前Ai模型:{request.Model}\n异常信息:{e.Message}\n异常堆栈:{e}";
+ throw new UserFriendlyException(errorContent);
+ }
+
+ return tokenUsage ?? new ThorUsageResponse();
+ }
+
+ ///
+ /// 处理 Gemini GenerateContent 格式流式响应
+ ///
+ private async Task ProcessGeminiStreamAsync(
+ ConcurrentQueue messageQueue,
+ JsonElement requestBody,
+ AiModelDescribe modelDescribe,
+ CancellationToken cancellationToken)
+ {
+ var chatService = LazyServiceProvider.GetRequiredKeyedService(modelDescribe.HandlerName);
+ var completeChatResponse = chatService.GenerateContentStreamAsync(modelDescribe, requestBody, cancellationToken);
+ ThorUsageResponse? tokenUsage = null;
+
+ try
+ {
+ await foreach (var responseResult in completeChatResponse)
+ {
+ if (responseResult!.Value.GetPath("candidates", 0, "finishReason").GetString() == "STOP")
+ {
+ tokenUsage = GeminiGenerateContentAcquirer.GetUsage(responseResult!.Value);
+ // 如果是图片模型,单独扣费
+ if (modelDescribe.ModelType == ModelTypeEnum.Image)
+ {
+ tokenUsage = new ThorUsageResponse
+ {
+ InputTokens = (int)modelDescribe.Multiplier,
+ OutputTokens = (int)modelDescribe.Multiplier,
+ TotalTokens = (int)modelDescribe.Multiplier
+ };
+ }
+ else
+ {
+ tokenUsage.SetSupplementalMultiplier(modelDescribe.Multiplier);
+ }
+ }
+
+ messageQueue.Enqueue($"data: {JsonSerializer.Serialize(responseResult)}\n\n");
+ }
+ }
+ catch (Exception e)
+ {
+ _logger.LogError(e, "Ai生成异常");
+ var errorContent = $"生成Ai异常,异常信息:\n当前Ai模型:{modelDescribe.ModelId}\n异常信息:{e.Message}\n异常堆栈:{e}";
+ throw new UserFriendlyException(errorContent);
+ }
+
+ return tokenUsage ?? new ThorUsageResponse();
+ }
+
+ #endregion
+
#region 流式传输格式Http响应
private static readonly byte[] EventPrefix = "event: "u8.ToArray();