diff --git a/Yi.Abp.Net8/module/ai-hub/Yi.Framework.AiHub.Domain.Shared/Dtos/Gemini/GeminiGenerateContentAcquirer.cs b/Yi.Abp.Net8/module/ai-hub/Yi.Framework.AiHub.Domain.Shared/Dtos/Gemini/GeminiGenerateContentAcquirer.cs index d0dfbf81..65012772 100644 --- a/Yi.Abp.Net8/module/ai-hub/Yi.Framework.AiHub.Domain.Shared/Dtos/Gemini/GeminiGenerateContentAcquirer.cs +++ b/Yi.Abp.Net8/module/ai-hub/Yi.Framework.AiHub.Domain.Shared/Dtos/Gemini/GeminiGenerateContentAcquirer.cs @@ -6,6 +6,83 @@ namespace Yi.Framework.AiHub.Domain.Shared.Dtos.Gemini; public static class GeminiGenerateContentAcquirer { + /// + /// 从请求体中提取用户最后一条消息内容 + /// 路径: contents[last].parts[last].text + /// + public static string GetLastUserContent(JsonElement request) + { + var contents = request.GetPath("contents"); + if (!contents.HasValue || contents.Value.ValueKind != JsonValueKind.Array) + { + return string.Empty; + } + + var contentsArray = contents.Value.EnumerateArray().ToList(); + if (contentsArray.Count == 0) + { + return string.Empty; + } + + var lastContent = contentsArray[^1]; + var parts = lastContent.GetPath("parts"); + if (!parts.HasValue || parts.Value.ValueKind != JsonValueKind.Array) + { + return string.Empty; + } + + var partsArray = parts.Value.EnumerateArray().ToList(); + if (partsArray.Count == 0) + { + return string.Empty; + } + + // 获取最后一个 part 的 text + var lastPart = partsArray[^1]; + return lastPart.GetPath("text").GetString() ?? string.Empty; + } + + /// + /// 从响应中提取文本内容(非 thought 类型) + /// 路径: candidates[0].content.parts[].text (where thought != true) + /// + public static string GetTextContent(JsonElement response) + { + var candidates = response.GetPath("candidates"); + if (!candidates.HasValue || candidates.Value.ValueKind != JsonValueKind.Array) + { + return string.Empty; + } + + var candidatesArray = candidates.Value.EnumerateArray().ToList(); + if (candidatesArray.Count == 0) + { + return string.Empty; + } + + var parts = candidatesArray[0].GetPath("content", "parts"); + if (!parts.HasValue || parts.Value.ValueKind != JsonValueKind.Array) + { + return string.Empty; + } + + // 遍历所有 parts,只取非 thought 的 text + foreach (var part in parts.Value.EnumerateArray()) + { + var isThought = part.GetPath("thought").GetBool(); + if (!isThought) + { + var text = part.GetPath("text").GetString(); + if (!string.IsNullOrEmpty(text)) + { + return text; + } + } + } + + return string.Empty; + } + public static ThorUsageResponse? GetUsage(JsonElement response) { var usage = response.GetPath("usageMetadata"); diff --git a/Yi.Abp.Net8/module/ai-hub/Yi.Framework.AiHub.Domain/Managers/AiGateWayManager.cs b/Yi.Abp.Net8/module/ai-hub/Yi.Framework.AiHub.Domain/Managers/AiGateWayManager.cs index 8c51722f..a0d4f355 100644 --- a/Yi.Abp.Net8/module/ai-hub/Yi.Framework.AiHub.Domain/Managers/AiGateWayManager.cs +++ b/Yi.Abp.Net8/module/ai-hub/Yi.Framework.AiHub.Domain/Managers/AiGateWayManager.cs @@ -1177,27 +1177,27 @@ public class AiGateWayManager : DomainService } } }, cancellationToken); - - ThorUsageResponse? tokenUsage = null; + + StreamProcessResult? processResult = null; switch (apiType) { case ModelApiTypeEnum.Completions: - tokenUsage = await ProcessCompletionsStreamAsync(messageQueue, requestBody, modelDescribe, cancellationToken); + processResult = await ProcessCompletionsStreamAsync(messageQueue, requestBody, modelDescribe, cancellationToken); break; case ModelApiTypeEnum.Messages: - tokenUsage = await ProcessAnthropicStreamAsync(messageQueue, requestBody, modelDescribe, cancellationToken); + processResult = await ProcessAnthropicStreamAsync(messageQueue, requestBody, modelDescribe, cancellationToken); break; case ModelApiTypeEnum.Responses: - tokenUsage = await ProcessOpenAiResponsesStreamAsync(messageQueue, requestBody, modelDescribe, cancellationToken); + processResult = await ProcessOpenAiResponsesStreamAsync(messageQueue, requestBody, modelDescribe, cancellationToken); break; case ModelApiTypeEnum.GenerateContent: - tokenUsage = await ProcessGeminiStreamAsync(messageQueue, requestBody, modelDescribe, cancellationToken); + processResult = await ProcessGeminiStreamAsync(messageQueue, requestBody, modelDescribe, cancellationToken); break; default: throw new UserFriendlyException($"不支持的API类型: {apiType}"); } - + // 标记完成并等待消费任务结束 isComplete = true; await outputTask; @@ -1206,25 +1206,25 @@ public class AiGateWayManager : DomainService await _aiMessageManager.CreateUserMessageAsync(userId, sessionId, new MessageInputDto { - Content = "不予存储", + Content = sessionId is null ? "不予存储" : processResult?.UserContent ?? string.Empty, ModelId = sourceModelId, - TokenUsage = tokenUsage, + TokenUsage = processResult?.TokenUsage, }, tokenId); await _aiMessageManager.CreateSystemMessageAsync(userId, sessionId, new MessageInputDto { - Content = "不予存储", + Content = sessionId is null ? "不予存储" : processResult?.SystemContent ?? string.Empty, ModelId = sourceModelId, - TokenUsage = tokenUsage + TokenUsage = processResult?.TokenUsage }, tokenId); - await _usageStatisticsManager.SetUsageAsync(userId, sourceModelId, tokenUsage, tokenId); + await _usageStatisticsManager.SetUsageAsync(userId, sourceModelId, processResult?.TokenUsage, tokenId); // 扣减尊享token包用量 - if (userId.HasValue && tokenUsage is not null && modelDescribe.IsPremium) + if (userId.HasValue && processResult?.TokenUsage is not null && modelDescribe.IsPremium) { - var totalTokens = tokenUsage.TotalTokens ?? 0; + var totalTokens = processResult?.TokenUsage.TotalTokens ?? 0; if (totalTokens > 0) { await PremiumPackageManager.TryConsumeTokensAsync(userId.Value, totalTokens); @@ -1234,10 +1234,20 @@ public class AiGateWayManager : DomainService #region 统一流式处理 - 各API类型的具体实现 + /// + /// 流式处理结果,包含用户输入、系统输出和 token 使用情况 + /// + private class StreamProcessResult + { + public string UserContent { get; set; } = string.Empty; + public string SystemContent { get; set; } = string.Empty; + public ThorUsageResponse TokenUsage { get; set; } = new(); + } + /// /// 处理 OpenAI Completions 格式流式响应 /// - private async Task ProcessCompletionsStreamAsync( + private async Task ProcessCompletionsStreamAsync( ConcurrentQueue messageQueue, JsonElement requestBody, AiModelDescribe modelDescribe, @@ -1246,6 +1256,9 @@ public class AiGateWayManager : DomainService var request = requestBody.Deserialize(ThorJsonSerializer.DefaultOptions)!; _specialCompatible.Compatible(request); + // 提取用户最后一条消息 + var userContent = request.Messages?.LastOrDefault()?.MessagesStore ?? string.Empty; + // 处理 yi- 前缀 if (!string.IsNullOrEmpty(request.Model) && request.Model.StartsWith("yi-", StringComparison.OrdinalIgnoreCase)) @@ -1256,6 +1269,7 @@ public class AiGateWayManager : DomainService var chatService = LazyServiceProvider.GetRequiredKeyedService(modelDescribe.HandlerName); var completeChatResponse = chatService.CompleteChatStreamAsync(modelDescribe, request, cancellationToken); var tokenUsage = new ThorUsageResponse(); + var systemContentBuilder = new StringBuilder(); try { @@ -1267,6 +1281,13 @@ public class AiGateWayManager : DomainService tokenUsage = data.Usage; } + // 累加系统输出内容 (choices[].delta.content) + var deltaContent = data.Choices?.FirstOrDefault()?.Delta?.Content; + if (!string.IsNullOrEmpty(deltaContent)) + { + systemContentBuilder.Append(deltaContent); + } + var message = JsonSerializer.Serialize(data, ThorJsonSerializer.DefaultOptions); messageQueue.Enqueue($"data: {message}\n\n"); } @@ -1275,6 +1296,7 @@ public class AiGateWayManager : DomainService { _logger.LogError(e, "Ai对话异常"); var errorContent = $"对话Ai异常,异常信息:\n当前Ai模型:{request.Model}\n异常信息:{e.Message}\n异常堆栈:{e}"; + systemContentBuilder.Append(errorContent); var model = new ThorChatCompletionsResponse() { Choices = new List() @@ -1296,13 +1318,18 @@ public class AiGateWayManager : DomainService } messageQueue.Enqueue("data: [DONE]\n\n"); - return tokenUsage; + return new StreamProcessResult + { + UserContent = userContent, + SystemContent = systemContentBuilder.ToString(), + TokenUsage = tokenUsage + }; } /// /// 处理 Anthropic Messages 格式流式响应 /// - private async Task ProcessAnthropicStreamAsync( + private async Task ProcessAnthropicStreamAsync( ConcurrentQueue messageQueue, JsonElement requestBody, AiModelDescribe modelDescribe, @@ -1311,6 +1338,16 @@ public class AiGateWayManager : DomainService var request = requestBody.Deserialize(ThorJsonSerializer.DefaultOptions)!; _specialCompatible.AnthropicCompatible(request); + // 提取用户最后一条消息 + var lastMessage = request.Messages?.LastOrDefault(); + var userContent = lastMessage?.Content ?? string.Empty; + if (string.IsNullOrEmpty(userContent) && lastMessage?.Contents != null && lastMessage.Contents.Any()) + { + // 如果是 Contents 数组,提取第一个 text 类型的内容 + var textContent = lastMessage.Contents.FirstOrDefault(c => c.Type == "text"); + userContent = textContent?.Text ?? System.Text.Json.JsonSerializer.Serialize(lastMessage.Contents); + } + // 处理 yi- 前缀 if (!string.IsNullOrEmpty(request.Model) && request.Model.StartsWith("yi-", StringComparison.OrdinalIgnoreCase)) @@ -1321,6 +1358,7 @@ public class AiGateWayManager : DomainService var chatService = LazyServiceProvider.GetRequiredKeyedService(modelDescribe.HandlerName); var completeChatResponse = chatService.StreamChatCompletionsAsync(modelDescribe, request, cancellationToken); var tokenUsage = new ThorUsageResponse(); + var systemContentBuilder = new StringBuilder(); try { @@ -1364,6 +1402,13 @@ public class AiGateWayManager : DomainService } } + // 累加系统输出内容 (delta.text) + var deltaText = responseResult.Item2?.Delta?.Text; + if (!string.IsNullOrEmpty(deltaText)) + { + systemContentBuilder.Append(deltaText); + } + // 序列化为SSE格式字符串 var data = JsonSerializer.Serialize(responseResult.Item2, ThorJsonSerializer.DefaultOptions); messageQueue.Enqueue($"{responseResult.Item1.Trim()}\ndata: {data}\n\n"); @@ -1373,19 +1418,25 @@ public class AiGateWayManager : DomainService { _logger.LogError(e, "Ai对话异常"); var errorContent = $"对话Ai异常,异常信息:\n当前Ai模型:{request.Model}\n异常信息:{e.Message}\n异常堆栈:{e}"; + systemContentBuilder.Append(errorContent); throw new UserFriendlyException(errorContent); } tokenUsage.TotalTokens = (tokenUsage.InputTokens ?? 0) + (tokenUsage.OutputTokens ?? 0); tokenUsage.SetSupplementalMultiplier(modelDescribe.Multiplier); - return tokenUsage; + return new StreamProcessResult + { + UserContent = userContent, + SystemContent = systemContentBuilder.ToString(), + TokenUsage = tokenUsage + }; } /// /// 处理 OpenAI Responses 格式流式响应 /// - private async Task ProcessOpenAiResponsesStreamAsync( + private async Task ProcessOpenAiResponsesStreamAsync( ConcurrentQueue messageQueue, JsonElement requestBody, AiModelDescribe modelDescribe, @@ -1393,6 +1444,27 @@ public class AiGateWayManager : DomainService { var request = requestBody.Deserialize(ThorJsonSerializer.DefaultOptions)!; + // 提取用户输入内容 (input 字段可能是字符串或数组) + var userContent = string.Empty; + if (request.Input.ValueKind == JsonValueKind.String) + { + userContent = request.Input.GetString() ?? string.Empty; + } + else if (request.Input.ValueKind == JsonValueKind.Array) + { + // 获取最后一个 user 角色的消息 + var inputArray = request.Input.EnumerateArray().ToList(); + var lastUserMessage = inputArray.LastOrDefault(x => + x.TryGetProperty("role", out var role) && role.GetString() == "user"); + if (lastUserMessage.ValueKind != JsonValueKind.Undefined) + { + if (lastUserMessage.TryGetProperty("content", out var content)) + { + userContent = content.GetString() ?? string.Empty; + } + } + } + // 处理 yi- 前缀 if (!string.IsNullOrEmpty(request.Model) && request.Model.StartsWith("yi-", StringComparison.OrdinalIgnoreCase)) @@ -1403,11 +1475,22 @@ public class AiGateWayManager : DomainService var chatService = LazyServiceProvider.GetRequiredKeyedService(modelDescribe.HandlerName); var completeChatResponse = chatService.ResponsesStreamAsync(modelDescribe, request, cancellationToken); ThorUsageResponse? tokenUsage = null; + var systemContentBuilder = new StringBuilder(); try { await foreach (var responseResult in completeChatResponse) { + // 提取输出文本内容 (response.output_text.delta 事件) + if (responseResult.Item1.Contains("response.output_text.delta")) + { + var delta = responseResult.Item2?.GetPath("delta").GetString(); + if (!string.IsNullOrEmpty(delta)) + { + systemContentBuilder.Append(delta); + } + } + if (responseResult.Item1.Contains("response.completed")) { var obj = responseResult.Item2!.Value; @@ -1434,29 +1517,46 @@ public class AiGateWayManager : DomainService { _logger.LogError(e, "Ai响应异常"); var errorContent = $"响应Ai异常,异常信息:\n当前Ai模型:{request.Model}\n异常信息:{e.Message}\n异常堆栈:{e}"; + systemContentBuilder.Append(errorContent); throw new UserFriendlyException(errorContent); } - return tokenUsage ?? new ThorUsageResponse(); + return new StreamProcessResult + { + UserContent = userContent, + SystemContent = systemContentBuilder.ToString(), + TokenUsage = tokenUsage ?? new ThorUsageResponse() + }; } /// /// 处理 Gemini GenerateContent 格式流式响应 /// - private async Task ProcessGeminiStreamAsync( + private async Task ProcessGeminiStreamAsync( ConcurrentQueue messageQueue, JsonElement requestBody, AiModelDescribe modelDescribe, CancellationToken cancellationToken) { + // 提取用户最后一条消息 (contents[last].parts[last].text) + var userContent = GeminiGenerateContentAcquirer.GetLastUserContent(requestBody); + var chatService = LazyServiceProvider.GetRequiredKeyedService(modelDescribe.HandlerName); var completeChatResponse = chatService.GenerateContentStreamAsync(modelDescribe, requestBody, cancellationToken); ThorUsageResponse? tokenUsage = null; + var systemContentBuilder = new StringBuilder(); try { await foreach (var responseResult in completeChatResponse) { + // 累加系统输出内容 (candidates[0].content.parts[].text,排除 thought) + var textContent = GeminiGenerateContentAcquirer.GetTextContent(responseResult!.Value); + if (!string.IsNullOrEmpty(textContent)) + { + systemContentBuilder.Append(textContent); + } + if (responseResult!.Value.GetPath("candidates", 0, "finishReason").GetString() == "STOP") { tokenUsage = GeminiGenerateContentAcquirer.GetUsage(responseResult!.Value); @@ -1483,10 +1583,16 @@ public class AiGateWayManager : DomainService { _logger.LogError(e, "Ai生成异常"); var errorContent = $"生成Ai异常,异常信息:\n当前Ai模型:{modelDescribe.ModelId}\n异常信息:{e.Message}\n异常堆栈:{e}"; + systemContentBuilder.Append(errorContent); throw new UserFriendlyException(errorContent); } - return tokenUsage ?? new ThorUsageResponse(); + return new StreamProcessResult + { + UserContent = userContent, + SystemContent = systemContentBuilder.ToString(), + TokenUsage = tokenUsage ?? new ThorUsageResponse() + }; } #endregion