feat: 流式处理统一返回用户/系统内容并完善消息存储
引入 StreamProcessResult 统一封装流式处理结果,补充各 API 类型下用户输入与系统输出内容的提取与累计,用于会话消息持久化与用量统计;同时增强 Gemini 请求与响应内容解析能力,确保流式场景下消息与 token 使用数据完整一致。
This commit is contained in:
@@ -6,6 +6,83 @@ namespace Yi.Framework.AiHub.Domain.Shared.Dtos.Gemini;
|
|||||||
|
|
||||||
public static class GeminiGenerateContentAcquirer
|
public static class GeminiGenerateContentAcquirer
|
||||||
{
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// 从请求体中提取用户最后一条消息内容
|
||||||
|
/// 路径: contents[last].parts[last].text
|
||||||
|
/// </summary>
|
||||||
|
public static string GetLastUserContent(JsonElement request)
|
||||||
|
{
|
||||||
|
var contents = request.GetPath("contents");
|
||||||
|
if (!contents.HasValue || contents.Value.ValueKind != JsonValueKind.Array)
|
||||||
|
{
|
||||||
|
return string.Empty;
|
||||||
|
}
|
||||||
|
|
||||||
|
var contentsArray = contents.Value.EnumerateArray().ToList();
|
||||||
|
if (contentsArray.Count == 0)
|
||||||
|
{
|
||||||
|
return string.Empty;
|
||||||
|
}
|
||||||
|
|
||||||
|
var lastContent = contentsArray[^1];
|
||||||
|
var parts = lastContent.GetPath("parts");
|
||||||
|
if (!parts.HasValue || parts.Value.ValueKind != JsonValueKind.Array)
|
||||||
|
{
|
||||||
|
return string.Empty;
|
||||||
|
}
|
||||||
|
|
||||||
|
var partsArray = parts.Value.EnumerateArray().ToList();
|
||||||
|
if (partsArray.Count == 0)
|
||||||
|
{
|
||||||
|
return string.Empty;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 获取最后一个 part 的 text
|
||||||
|
var lastPart = partsArray[^1];
|
||||||
|
return lastPart.GetPath("text").GetString() ?? string.Empty;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 从响应中提取文本内容(非 thought 类型)
|
||||||
|
/// 路径: candidates[0].content.parts[].text (where thought != true)
|
||||||
|
/// </summary>
|
||||||
|
public static string GetTextContent(JsonElement response)
|
||||||
|
{
|
||||||
|
var candidates = response.GetPath("candidates");
|
||||||
|
if (!candidates.HasValue || candidates.Value.ValueKind != JsonValueKind.Array)
|
||||||
|
{
|
||||||
|
return string.Empty;
|
||||||
|
}
|
||||||
|
|
||||||
|
var candidatesArray = candidates.Value.EnumerateArray().ToList();
|
||||||
|
if (candidatesArray.Count == 0)
|
||||||
|
{
|
||||||
|
return string.Empty;
|
||||||
|
}
|
||||||
|
|
||||||
|
var parts = candidatesArray[0].GetPath("content", "parts");
|
||||||
|
if (!parts.HasValue || parts.Value.ValueKind != JsonValueKind.Array)
|
||||||
|
{
|
||||||
|
return string.Empty;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 遍历所有 parts,只取非 thought 的 text
|
||||||
|
foreach (var part in parts.Value.EnumerateArray())
|
||||||
|
{
|
||||||
|
var isThought = part.GetPath("thought").GetBool();
|
||||||
|
if (!isThought)
|
||||||
|
{
|
||||||
|
var text = part.GetPath("text").GetString();
|
||||||
|
if (!string.IsNullOrEmpty(text))
|
||||||
|
{
|
||||||
|
return text;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return string.Empty;
|
||||||
|
}
|
||||||
|
|
||||||
public static ThorUsageResponse? GetUsage(JsonElement response)
|
public static ThorUsageResponse? GetUsage(JsonElement response)
|
||||||
{
|
{
|
||||||
var usage = response.GetPath("usageMetadata");
|
var usage = response.GetPath("usageMetadata");
|
||||||
|
|||||||
@@ -1178,21 +1178,21 @@ public class AiGateWayManager : DomainService
|
|||||||
}
|
}
|
||||||
}, cancellationToken);
|
}, cancellationToken);
|
||||||
|
|
||||||
ThorUsageResponse? tokenUsage = null;
|
StreamProcessResult? processResult = null;
|
||||||
|
|
||||||
switch (apiType)
|
switch (apiType)
|
||||||
{
|
{
|
||||||
case ModelApiTypeEnum.Completions:
|
case ModelApiTypeEnum.Completions:
|
||||||
tokenUsage = await ProcessCompletionsStreamAsync(messageQueue, requestBody, modelDescribe, cancellationToken);
|
processResult = await ProcessCompletionsStreamAsync(messageQueue, requestBody, modelDescribe, cancellationToken);
|
||||||
break;
|
break;
|
||||||
case ModelApiTypeEnum.Messages:
|
case ModelApiTypeEnum.Messages:
|
||||||
tokenUsage = await ProcessAnthropicStreamAsync(messageQueue, requestBody, modelDescribe, cancellationToken);
|
processResult = await ProcessAnthropicStreamAsync(messageQueue, requestBody, modelDescribe, cancellationToken);
|
||||||
break;
|
break;
|
||||||
case ModelApiTypeEnum.Responses:
|
case ModelApiTypeEnum.Responses:
|
||||||
tokenUsage = await ProcessOpenAiResponsesStreamAsync(messageQueue, requestBody, modelDescribe, cancellationToken);
|
processResult = await ProcessOpenAiResponsesStreamAsync(messageQueue, requestBody, modelDescribe, cancellationToken);
|
||||||
break;
|
break;
|
||||||
case ModelApiTypeEnum.GenerateContent:
|
case ModelApiTypeEnum.GenerateContent:
|
||||||
tokenUsage = await ProcessGeminiStreamAsync(messageQueue, requestBody, modelDescribe, cancellationToken);
|
processResult = await ProcessGeminiStreamAsync(messageQueue, requestBody, modelDescribe, cancellationToken);
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
throw new UserFriendlyException($"不支持的API类型: {apiType}");
|
throw new UserFriendlyException($"不支持的API类型: {apiType}");
|
||||||
@@ -1206,25 +1206,25 @@ public class AiGateWayManager : DomainService
|
|||||||
await _aiMessageManager.CreateUserMessageAsync(userId, sessionId,
|
await _aiMessageManager.CreateUserMessageAsync(userId, sessionId,
|
||||||
new MessageInputDto
|
new MessageInputDto
|
||||||
{
|
{
|
||||||
Content = "不予存储",
|
Content = sessionId is null ? "不予存储" : processResult?.UserContent ?? string.Empty,
|
||||||
ModelId = sourceModelId,
|
ModelId = sourceModelId,
|
||||||
TokenUsage = tokenUsage,
|
TokenUsage = processResult?.TokenUsage,
|
||||||
}, tokenId);
|
}, tokenId);
|
||||||
|
|
||||||
await _aiMessageManager.CreateSystemMessageAsync(userId, sessionId,
|
await _aiMessageManager.CreateSystemMessageAsync(userId, sessionId,
|
||||||
new MessageInputDto
|
new MessageInputDto
|
||||||
{
|
{
|
||||||
Content = "不予存储",
|
Content = sessionId is null ? "不予存储" : processResult?.SystemContent ?? string.Empty,
|
||||||
ModelId = sourceModelId,
|
ModelId = sourceModelId,
|
||||||
TokenUsage = tokenUsage
|
TokenUsage = processResult?.TokenUsage
|
||||||
}, tokenId);
|
}, tokenId);
|
||||||
|
|
||||||
await _usageStatisticsManager.SetUsageAsync(userId, sourceModelId, tokenUsage, tokenId);
|
await _usageStatisticsManager.SetUsageAsync(userId, sourceModelId, processResult?.TokenUsage, tokenId);
|
||||||
|
|
||||||
// 扣减尊享token包用量
|
// 扣减尊享token包用量
|
||||||
if (userId.HasValue && tokenUsage is not null && modelDescribe.IsPremium)
|
if (userId.HasValue && processResult?.TokenUsage is not null && modelDescribe.IsPremium)
|
||||||
{
|
{
|
||||||
var totalTokens = tokenUsage.TotalTokens ?? 0;
|
var totalTokens = processResult?.TokenUsage.TotalTokens ?? 0;
|
||||||
if (totalTokens > 0)
|
if (totalTokens > 0)
|
||||||
{
|
{
|
||||||
await PremiumPackageManager.TryConsumeTokensAsync(userId.Value, totalTokens);
|
await PremiumPackageManager.TryConsumeTokensAsync(userId.Value, totalTokens);
|
||||||
@@ -1234,10 +1234,20 @@ public class AiGateWayManager : DomainService
|
|||||||
|
|
||||||
#region 统一流式处理 - 各API类型的具体实现
|
#region 统一流式处理 - 各API类型的具体实现
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 流式处理结果,包含用户输入、系统输出和 token 使用情况
|
||||||
|
/// </summary>
|
||||||
|
private class StreamProcessResult
|
||||||
|
{
|
||||||
|
public string UserContent { get; set; } = string.Empty;
|
||||||
|
public string SystemContent { get; set; } = string.Empty;
|
||||||
|
public ThorUsageResponse TokenUsage { get; set; } = new();
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 处理 OpenAI Completions 格式流式响应
|
/// 处理 OpenAI Completions 格式流式响应
|
||||||
/// </summary>
|
/// </summary>
|
||||||
private async Task<ThorUsageResponse> ProcessCompletionsStreamAsync(
|
private async Task<StreamProcessResult> ProcessCompletionsStreamAsync(
|
||||||
ConcurrentQueue<string> messageQueue,
|
ConcurrentQueue<string> messageQueue,
|
||||||
JsonElement requestBody,
|
JsonElement requestBody,
|
||||||
AiModelDescribe modelDescribe,
|
AiModelDescribe modelDescribe,
|
||||||
@@ -1246,6 +1256,9 @@ public class AiGateWayManager : DomainService
|
|||||||
var request = requestBody.Deserialize<ThorChatCompletionsRequest>(ThorJsonSerializer.DefaultOptions)!;
|
var request = requestBody.Deserialize<ThorChatCompletionsRequest>(ThorJsonSerializer.DefaultOptions)!;
|
||||||
_specialCompatible.Compatible(request);
|
_specialCompatible.Compatible(request);
|
||||||
|
|
||||||
|
// 提取用户最后一条消息
|
||||||
|
var userContent = request.Messages?.LastOrDefault()?.MessagesStore ?? string.Empty;
|
||||||
|
|
||||||
// 处理 yi- 前缀
|
// 处理 yi- 前缀
|
||||||
if (!string.IsNullOrEmpty(request.Model) &&
|
if (!string.IsNullOrEmpty(request.Model) &&
|
||||||
request.Model.StartsWith("yi-", StringComparison.OrdinalIgnoreCase))
|
request.Model.StartsWith("yi-", StringComparison.OrdinalIgnoreCase))
|
||||||
@@ -1256,6 +1269,7 @@ public class AiGateWayManager : DomainService
|
|||||||
var chatService = LazyServiceProvider.GetRequiredKeyedService<IChatCompletionService>(modelDescribe.HandlerName);
|
var chatService = LazyServiceProvider.GetRequiredKeyedService<IChatCompletionService>(modelDescribe.HandlerName);
|
||||||
var completeChatResponse = chatService.CompleteChatStreamAsync(modelDescribe, request, cancellationToken);
|
var completeChatResponse = chatService.CompleteChatStreamAsync(modelDescribe, request, cancellationToken);
|
||||||
var tokenUsage = new ThorUsageResponse();
|
var tokenUsage = new ThorUsageResponse();
|
||||||
|
var systemContentBuilder = new StringBuilder();
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
@@ -1267,6 +1281,13 @@ public class AiGateWayManager : DomainService
|
|||||||
tokenUsage = data.Usage;
|
tokenUsage = data.Usage;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 累加系统输出内容 (choices[].delta.content)
|
||||||
|
var deltaContent = data.Choices?.FirstOrDefault()?.Delta?.Content;
|
||||||
|
if (!string.IsNullOrEmpty(deltaContent))
|
||||||
|
{
|
||||||
|
systemContentBuilder.Append(deltaContent);
|
||||||
|
}
|
||||||
|
|
||||||
var message = JsonSerializer.Serialize(data, ThorJsonSerializer.DefaultOptions);
|
var message = JsonSerializer.Serialize(data, ThorJsonSerializer.DefaultOptions);
|
||||||
messageQueue.Enqueue($"data: {message}\n\n");
|
messageQueue.Enqueue($"data: {message}\n\n");
|
||||||
}
|
}
|
||||||
@@ -1275,6 +1296,7 @@ public class AiGateWayManager : DomainService
|
|||||||
{
|
{
|
||||||
_logger.LogError(e, "Ai对话异常");
|
_logger.LogError(e, "Ai对话异常");
|
||||||
var errorContent = $"对话Ai异常,异常信息:\n当前Ai模型:{request.Model}\n异常信息:{e.Message}\n异常堆栈:{e}";
|
var errorContent = $"对话Ai异常,异常信息:\n当前Ai模型:{request.Model}\n异常信息:{e.Message}\n异常堆栈:{e}";
|
||||||
|
systemContentBuilder.Append(errorContent);
|
||||||
var model = new ThorChatCompletionsResponse()
|
var model = new ThorChatCompletionsResponse()
|
||||||
{
|
{
|
||||||
Choices = new List<ThorChatChoiceResponse>()
|
Choices = new List<ThorChatChoiceResponse>()
|
||||||
@@ -1296,13 +1318,18 @@ public class AiGateWayManager : DomainService
|
|||||||
}
|
}
|
||||||
|
|
||||||
messageQueue.Enqueue("data: [DONE]\n\n");
|
messageQueue.Enqueue("data: [DONE]\n\n");
|
||||||
return tokenUsage;
|
return new StreamProcessResult
|
||||||
|
{
|
||||||
|
UserContent = userContent,
|
||||||
|
SystemContent = systemContentBuilder.ToString(),
|
||||||
|
TokenUsage = tokenUsage
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 处理 Anthropic Messages 格式流式响应
|
/// 处理 Anthropic Messages 格式流式响应
|
||||||
/// </summary>
|
/// </summary>
|
||||||
private async Task<ThorUsageResponse> ProcessAnthropicStreamAsync(
|
private async Task<StreamProcessResult> ProcessAnthropicStreamAsync(
|
||||||
ConcurrentQueue<string> messageQueue,
|
ConcurrentQueue<string> messageQueue,
|
||||||
JsonElement requestBody,
|
JsonElement requestBody,
|
||||||
AiModelDescribe modelDescribe,
|
AiModelDescribe modelDescribe,
|
||||||
@@ -1311,6 +1338,16 @@ public class AiGateWayManager : DomainService
|
|||||||
var request = requestBody.Deserialize<AnthropicInput>(ThorJsonSerializer.DefaultOptions)!;
|
var request = requestBody.Deserialize<AnthropicInput>(ThorJsonSerializer.DefaultOptions)!;
|
||||||
_specialCompatible.AnthropicCompatible(request);
|
_specialCompatible.AnthropicCompatible(request);
|
||||||
|
|
||||||
|
// 提取用户最后一条消息
|
||||||
|
var lastMessage = request.Messages?.LastOrDefault();
|
||||||
|
var userContent = lastMessage?.Content ?? string.Empty;
|
||||||
|
if (string.IsNullOrEmpty(userContent) && lastMessage?.Contents != null && lastMessage.Contents.Any())
|
||||||
|
{
|
||||||
|
// 如果是 Contents 数组,提取第一个 text 类型的内容
|
||||||
|
var textContent = lastMessage.Contents.FirstOrDefault(c => c.Type == "text");
|
||||||
|
userContent = textContent?.Text ?? System.Text.Json.JsonSerializer.Serialize(lastMessage.Contents);
|
||||||
|
}
|
||||||
|
|
||||||
// 处理 yi- 前缀
|
// 处理 yi- 前缀
|
||||||
if (!string.IsNullOrEmpty(request.Model) &&
|
if (!string.IsNullOrEmpty(request.Model) &&
|
||||||
request.Model.StartsWith("yi-", StringComparison.OrdinalIgnoreCase))
|
request.Model.StartsWith("yi-", StringComparison.OrdinalIgnoreCase))
|
||||||
@@ -1321,6 +1358,7 @@ public class AiGateWayManager : DomainService
|
|||||||
var chatService = LazyServiceProvider.GetRequiredKeyedService<IAnthropicChatCompletionService>(modelDescribe.HandlerName);
|
var chatService = LazyServiceProvider.GetRequiredKeyedService<IAnthropicChatCompletionService>(modelDescribe.HandlerName);
|
||||||
var completeChatResponse = chatService.StreamChatCompletionsAsync(modelDescribe, request, cancellationToken);
|
var completeChatResponse = chatService.StreamChatCompletionsAsync(modelDescribe, request, cancellationToken);
|
||||||
var tokenUsage = new ThorUsageResponse();
|
var tokenUsage = new ThorUsageResponse();
|
||||||
|
var systemContentBuilder = new StringBuilder();
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
@@ -1364,6 +1402,13 @@ public class AiGateWayManager : DomainService
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 累加系统输出内容 (delta.text)
|
||||||
|
var deltaText = responseResult.Item2?.Delta?.Text;
|
||||||
|
if (!string.IsNullOrEmpty(deltaText))
|
||||||
|
{
|
||||||
|
systemContentBuilder.Append(deltaText);
|
||||||
|
}
|
||||||
|
|
||||||
// 序列化为SSE格式字符串
|
// 序列化为SSE格式字符串
|
||||||
var data = JsonSerializer.Serialize(responseResult.Item2, ThorJsonSerializer.DefaultOptions);
|
var data = JsonSerializer.Serialize(responseResult.Item2, ThorJsonSerializer.DefaultOptions);
|
||||||
messageQueue.Enqueue($"{responseResult.Item1.Trim()}\ndata: {data}\n\n");
|
messageQueue.Enqueue($"{responseResult.Item1.Trim()}\ndata: {data}\n\n");
|
||||||
@@ -1373,19 +1418,25 @@ public class AiGateWayManager : DomainService
|
|||||||
{
|
{
|
||||||
_logger.LogError(e, "Ai对话异常");
|
_logger.LogError(e, "Ai对话异常");
|
||||||
var errorContent = $"对话Ai异常,异常信息:\n当前Ai模型:{request.Model}\n异常信息:{e.Message}\n异常堆栈:{e}";
|
var errorContent = $"对话Ai异常,异常信息:\n当前Ai模型:{request.Model}\n异常信息:{e.Message}\n异常堆栈:{e}";
|
||||||
|
systemContentBuilder.Append(errorContent);
|
||||||
throw new UserFriendlyException(errorContent);
|
throw new UserFriendlyException(errorContent);
|
||||||
}
|
}
|
||||||
|
|
||||||
tokenUsage.TotalTokens = (tokenUsage.InputTokens ?? 0) + (tokenUsage.OutputTokens ?? 0);
|
tokenUsage.TotalTokens = (tokenUsage.InputTokens ?? 0) + (tokenUsage.OutputTokens ?? 0);
|
||||||
tokenUsage.SetSupplementalMultiplier(modelDescribe.Multiplier);
|
tokenUsage.SetSupplementalMultiplier(modelDescribe.Multiplier);
|
||||||
|
|
||||||
return tokenUsage;
|
return new StreamProcessResult
|
||||||
|
{
|
||||||
|
UserContent = userContent,
|
||||||
|
SystemContent = systemContentBuilder.ToString(),
|
||||||
|
TokenUsage = tokenUsage
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 处理 OpenAI Responses 格式流式响应
|
/// 处理 OpenAI Responses 格式流式响应
|
||||||
/// </summary>
|
/// </summary>
|
||||||
private async Task<ThorUsageResponse> ProcessOpenAiResponsesStreamAsync(
|
private async Task<StreamProcessResult> ProcessOpenAiResponsesStreamAsync(
|
||||||
ConcurrentQueue<string> messageQueue,
|
ConcurrentQueue<string> messageQueue,
|
||||||
JsonElement requestBody,
|
JsonElement requestBody,
|
||||||
AiModelDescribe modelDescribe,
|
AiModelDescribe modelDescribe,
|
||||||
@@ -1393,6 +1444,27 @@ public class AiGateWayManager : DomainService
|
|||||||
{
|
{
|
||||||
var request = requestBody.Deserialize<OpenAiResponsesInput>(ThorJsonSerializer.DefaultOptions)!;
|
var request = requestBody.Deserialize<OpenAiResponsesInput>(ThorJsonSerializer.DefaultOptions)!;
|
||||||
|
|
||||||
|
// 提取用户输入内容 (input 字段可能是字符串或数组)
|
||||||
|
var userContent = string.Empty;
|
||||||
|
if (request.Input.ValueKind == JsonValueKind.String)
|
||||||
|
{
|
||||||
|
userContent = request.Input.GetString() ?? string.Empty;
|
||||||
|
}
|
||||||
|
else if (request.Input.ValueKind == JsonValueKind.Array)
|
||||||
|
{
|
||||||
|
// 获取最后一个 user 角色的消息
|
||||||
|
var inputArray = request.Input.EnumerateArray().ToList();
|
||||||
|
var lastUserMessage = inputArray.LastOrDefault(x =>
|
||||||
|
x.TryGetProperty("role", out var role) && role.GetString() == "user");
|
||||||
|
if (lastUserMessage.ValueKind != JsonValueKind.Undefined)
|
||||||
|
{
|
||||||
|
if (lastUserMessage.TryGetProperty("content", out var content))
|
||||||
|
{
|
||||||
|
userContent = content.GetString() ?? string.Empty;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// 处理 yi- 前缀
|
// 处理 yi- 前缀
|
||||||
if (!string.IsNullOrEmpty(request.Model) &&
|
if (!string.IsNullOrEmpty(request.Model) &&
|
||||||
request.Model.StartsWith("yi-", StringComparison.OrdinalIgnoreCase))
|
request.Model.StartsWith("yi-", StringComparison.OrdinalIgnoreCase))
|
||||||
@@ -1403,11 +1475,22 @@ public class AiGateWayManager : DomainService
|
|||||||
var chatService = LazyServiceProvider.GetRequiredKeyedService<IOpenAiResponseService>(modelDescribe.HandlerName);
|
var chatService = LazyServiceProvider.GetRequiredKeyedService<IOpenAiResponseService>(modelDescribe.HandlerName);
|
||||||
var completeChatResponse = chatService.ResponsesStreamAsync(modelDescribe, request, cancellationToken);
|
var completeChatResponse = chatService.ResponsesStreamAsync(modelDescribe, request, cancellationToken);
|
||||||
ThorUsageResponse? tokenUsage = null;
|
ThorUsageResponse? tokenUsage = null;
|
||||||
|
var systemContentBuilder = new StringBuilder();
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
await foreach (var responseResult in completeChatResponse)
|
await foreach (var responseResult in completeChatResponse)
|
||||||
{
|
{
|
||||||
|
// 提取输出文本内容 (response.output_text.delta 事件)
|
||||||
|
if (responseResult.Item1.Contains("response.output_text.delta"))
|
||||||
|
{
|
||||||
|
var delta = responseResult.Item2?.GetPath("delta").GetString();
|
||||||
|
if (!string.IsNullOrEmpty(delta))
|
||||||
|
{
|
||||||
|
systemContentBuilder.Append(delta);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (responseResult.Item1.Contains("response.completed"))
|
if (responseResult.Item1.Contains("response.completed"))
|
||||||
{
|
{
|
||||||
var obj = responseResult.Item2!.Value;
|
var obj = responseResult.Item2!.Value;
|
||||||
@@ -1434,29 +1517,46 @@ public class AiGateWayManager : DomainService
|
|||||||
{
|
{
|
||||||
_logger.LogError(e, "Ai响应异常");
|
_logger.LogError(e, "Ai响应异常");
|
||||||
var errorContent = $"响应Ai异常,异常信息:\n当前Ai模型:{request.Model}\n异常信息:{e.Message}\n异常堆栈:{e}";
|
var errorContent = $"响应Ai异常,异常信息:\n当前Ai模型:{request.Model}\n异常信息:{e.Message}\n异常堆栈:{e}";
|
||||||
|
systemContentBuilder.Append(errorContent);
|
||||||
throw new UserFriendlyException(errorContent);
|
throw new UserFriendlyException(errorContent);
|
||||||
}
|
}
|
||||||
|
|
||||||
return tokenUsage ?? new ThorUsageResponse();
|
return new StreamProcessResult
|
||||||
|
{
|
||||||
|
UserContent = userContent,
|
||||||
|
SystemContent = systemContentBuilder.ToString(),
|
||||||
|
TokenUsage = tokenUsage ?? new ThorUsageResponse()
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 处理 Gemini GenerateContent 格式流式响应
|
/// 处理 Gemini GenerateContent 格式流式响应
|
||||||
/// </summary>
|
/// </summary>
|
||||||
private async Task<ThorUsageResponse> ProcessGeminiStreamAsync(
|
private async Task<StreamProcessResult> ProcessGeminiStreamAsync(
|
||||||
ConcurrentQueue<string> messageQueue,
|
ConcurrentQueue<string> messageQueue,
|
||||||
JsonElement requestBody,
|
JsonElement requestBody,
|
||||||
AiModelDescribe modelDescribe,
|
AiModelDescribe modelDescribe,
|
||||||
CancellationToken cancellationToken)
|
CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
|
// 提取用户最后一条消息 (contents[last].parts[last].text)
|
||||||
|
var userContent = GeminiGenerateContentAcquirer.GetLastUserContent(requestBody);
|
||||||
|
|
||||||
var chatService = LazyServiceProvider.GetRequiredKeyedService<IGeminiGenerateContentService>(modelDescribe.HandlerName);
|
var chatService = LazyServiceProvider.GetRequiredKeyedService<IGeminiGenerateContentService>(modelDescribe.HandlerName);
|
||||||
var completeChatResponse = chatService.GenerateContentStreamAsync(modelDescribe, requestBody, cancellationToken);
|
var completeChatResponse = chatService.GenerateContentStreamAsync(modelDescribe, requestBody, cancellationToken);
|
||||||
ThorUsageResponse? tokenUsage = null;
|
ThorUsageResponse? tokenUsage = null;
|
||||||
|
var systemContentBuilder = new StringBuilder();
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
await foreach (var responseResult in completeChatResponse)
|
await foreach (var responseResult in completeChatResponse)
|
||||||
{
|
{
|
||||||
|
// 累加系统输出内容 (candidates[0].content.parts[].text,排除 thought)
|
||||||
|
var textContent = GeminiGenerateContentAcquirer.GetTextContent(responseResult!.Value);
|
||||||
|
if (!string.IsNullOrEmpty(textContent))
|
||||||
|
{
|
||||||
|
systemContentBuilder.Append(textContent);
|
||||||
|
}
|
||||||
|
|
||||||
if (responseResult!.Value.GetPath("candidates", 0, "finishReason").GetString() == "STOP")
|
if (responseResult!.Value.GetPath("candidates", 0, "finishReason").GetString() == "STOP")
|
||||||
{
|
{
|
||||||
tokenUsage = GeminiGenerateContentAcquirer.GetUsage(responseResult!.Value);
|
tokenUsage = GeminiGenerateContentAcquirer.GetUsage(responseResult!.Value);
|
||||||
@@ -1483,10 +1583,16 @@ public class AiGateWayManager : DomainService
|
|||||||
{
|
{
|
||||||
_logger.LogError(e, "Ai生成异常");
|
_logger.LogError(e, "Ai生成异常");
|
||||||
var errorContent = $"生成Ai异常,异常信息:\n当前Ai模型:{modelDescribe.ModelId}\n异常信息:{e.Message}\n异常堆栈:{e}";
|
var errorContent = $"生成Ai异常,异常信息:\n当前Ai模型:{modelDescribe.ModelId}\n异常信息:{e.Message}\n异常堆栈:{e}";
|
||||||
|
systemContentBuilder.Append(errorContent);
|
||||||
throw new UserFriendlyException(errorContent);
|
throw new UserFriendlyException(errorContent);
|
||||||
}
|
}
|
||||||
|
|
||||||
return tokenUsage ?? new ThorUsageResponse();
|
return new StreamProcessResult
|
||||||
|
{
|
||||||
|
UserContent = userContent,
|
||||||
|
SystemContent = systemContentBuilder.ToString(),
|
||||||
|
TokenUsage = tokenUsage ?? new ThorUsageResponse()
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
#endregion
|
#endregion
|
||||||
|
|||||||
Reference in New Issue
Block a user