feat: 新增统一流式转发与统计能力,支持多API类型
新增统一流式处理机制,支持 Completions、Anthropic Messages、OpenAI Responses、Gemini GenerateContent 四种 API 的原封不动 SSE 转发 统一处理 token 用量统计、倍率计算、尊享包扣费与消息记录 新增统一发送接口 ai-chat/unified/send,支持从请求体自动解析模型 ID 提升多模型流式接入的一致性与扩展性
This commit is contained in:
@@ -25,6 +25,7 @@ using Yi.Framework.AiHub.Domain.Entities.Model;
|
||||
using Yi.Framework.AiHub.Domain.Extensions;
|
||||
using Yi.Framework.AiHub.Domain.Managers;
|
||||
using Yi.Framework.AiHub.Domain.Shared.Consts;
|
||||
using System.Text.Json;
|
||||
using Yi.Framework.AiHub.Domain.Shared.Dtos;
|
||||
using Yi.Framework.AiHub.Domain.Shared.Dtos.OpenAi;
|
||||
using Yi.Framework.AiHub.Domain.Shared.Enums;
|
||||
@@ -270,4 +271,89 @@ public class AiChatService : ApplicationService
|
||||
var data = await _agentStoreRepository.GetFirstAsync(x => x.SessionId == sessionId);
|
||||
return data?.Store;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 统一发送消息 - 支持4种API类型
|
||||
/// </summary>
|
||||
/// <param name="apiType">API类型枚举</param>
|
||||
/// <param name="input">原始请求体JsonElement</param>
|
||||
/// <param name="modelId">模型ID(Gemini格式需要从URL传入)</param>
|
||||
/// <param name="sessionId">会话ID</param>
|
||||
/// <param name="cancellationToken"></param>
|
||||
[HttpPost("ai-chat/unified/send")]
|
||||
public async Task PostUnifiedSendAsync(
|
||||
[FromQuery] ModelApiTypeEnum apiType,
|
||||
[FromBody] JsonElement input,
|
||||
[FromQuery] string modelId,
|
||||
[FromQuery] Guid? sessionId,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
// 从请求体中提取模型ID(如果未从URL传入)
|
||||
if (string.IsNullOrEmpty(modelId))
|
||||
{
|
||||
modelId = ExtractModelIdFromRequest(apiType, input);
|
||||
}
|
||||
|
||||
// 除了免费模型,其他的模型都要校验
|
||||
if (modelId != FreeModelId)
|
||||
{
|
||||
if (CurrentUser.IsAuthenticated)
|
||||
{
|
||||
await _aiBlacklistManager.VerifiyAiBlacklist(CurrentUser.GetId());
|
||||
if (!CurrentUser.IsAiVip())
|
||||
{
|
||||
throw new UserFriendlyException("该模型需要VIP用户才能使用,请购买VIP后重新登录重试");
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
throw new UserFriendlyException("未登录用户,只能使用未加速的DeepSeek-R1,请登录后重试");
|
||||
}
|
||||
}
|
||||
|
||||
// 如果是尊享包服务,需要校验是否尊享包足够
|
||||
if (CurrentUser.IsAuthenticated)
|
||||
{
|
||||
var isPremium = await _modelManager.IsPremiumModelAsync(modelId);
|
||||
if (isPremium)
|
||||
{
|
||||
var availableTokens = await _premiumPackageManager.GetAvailableTokensAsync(CurrentUser.GetId());
|
||||
if (availableTokens <= 0)
|
||||
{
|
||||
throw new UserFriendlyException("尊享token包用量不足,请先购买尊享token包");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 调用统一流式处理
|
||||
await _aiGateWayManager.UnifiedStreamForStatisticsAsync(
|
||||
_httpContextAccessor.HttpContext!,
|
||||
apiType,
|
||||
input,
|
||||
modelId,
|
||||
CurrentUser.Id,
|
||||
sessionId,
|
||||
null,
|
||||
CancellationToken.None);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 从请求体中提取模型ID
|
||||
/// </summary>
|
||||
private string ExtractModelIdFromRequest(ModelApiTypeEnum apiType, JsonElement input)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (input.TryGetProperty("model", out var modelProperty))
|
||||
{
|
||||
return modelProperty.GetString() ?? string.Empty;
|
||||
}
|
||||
}
|
||||
catch
|
||||
{
|
||||
// 忽略解析错误
|
||||
}
|
||||
|
||||
throw new UserFriendlyException("无法从请求中获取模型ID,请在URL参数中指定modelId");
|
||||
}
|
||||
}
|
||||
@@ -1122,6 +1122,375 @@ public class AiGateWayManager : DomainService
|
||||
await _imageStoreTaskRepository.UpdateAsync(imageStoreTask);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 统一流式处理 - 支持4种API类型的原封不动转发
|
||||
/// </summary>
|
||||
public async Task UnifiedStreamForStatisticsAsync(
|
||||
HttpContext httpContext,
|
||||
ModelApiTypeEnum apiType,
|
||||
JsonElement requestBody,
|
||||
string modelId,
|
||||
Guid? userId = null,
|
||||
Guid? sessionId = null,
|
||||
Guid? tokenId = null,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
var response = httpContext.Response;
|
||||
// 设置响应头,声明是 SSE 流
|
||||
response.ContentType = "text/event-stream;charset=utf-8;";
|
||||
response.Headers.TryAdd("Cache-Control", "no-cache");
|
||||
response.Headers.TryAdd("Connection", "keep-alive");
|
||||
|
||||
var sourceModelId = modelId;
|
||||
// 处理 yi- 前缀
|
||||
if (!string.IsNullOrEmpty(modelId) &&
|
||||
modelId.StartsWith("yi-", StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
modelId = modelId[3..];
|
||||
}
|
||||
|
||||
var modelDescribe = await GetModelAsync(apiType, sourceModelId);
|
||||
|
||||
// 公共缓存队列
|
||||
var messageQueue = new ConcurrentQueue<string>();
|
||||
var outputInterval = TimeSpan.FromMilliseconds(75);
|
||||
var isComplete = false;
|
||||
|
||||
// 公共消费任务
|
||||
var outputTask = Task.Run(async () =>
|
||||
{
|
||||
while (!(isComplete && messageQueue.IsEmpty))
|
||||
{
|
||||
if (messageQueue.TryDequeue(out var message))
|
||||
{
|
||||
await response.WriteAsync(message, Encoding.UTF8, cancellationToken).ConfigureAwait(false);
|
||||
await response.Body.FlushAsync(cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
if (!isComplete)
|
||||
{
|
||||
await Task.Delay(outputInterval, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
else
|
||||
{
|
||||
await Task.Delay(10, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
}, cancellationToken);
|
||||
|
||||
ThorUsageResponse? tokenUsage = null;
|
||||
|
||||
switch (apiType)
|
||||
{
|
||||
case ModelApiTypeEnum.Completions:
|
||||
tokenUsage = await ProcessCompletionsStreamAsync(messageQueue, requestBody, modelDescribe, cancellationToken);
|
||||
break;
|
||||
case ModelApiTypeEnum.Messages:
|
||||
tokenUsage = await ProcessAnthropicStreamAsync(messageQueue, requestBody, modelDescribe, cancellationToken);
|
||||
break;
|
||||
case ModelApiTypeEnum.Responses:
|
||||
tokenUsage = await ProcessOpenAiResponsesStreamAsync(messageQueue, requestBody, modelDescribe, cancellationToken);
|
||||
break;
|
||||
case ModelApiTypeEnum.GenerateContent:
|
||||
tokenUsage = await ProcessGeminiStreamAsync(messageQueue, requestBody, modelDescribe, cancellationToken);
|
||||
break;
|
||||
default:
|
||||
throw new UserFriendlyException($"不支持的API类型: {apiType}");
|
||||
}
|
||||
|
||||
// 标记完成并等待消费任务结束
|
||||
isComplete = true;
|
||||
await outputTask;
|
||||
|
||||
// 统一的统计处理
|
||||
await _aiMessageManager.CreateUserMessageAsync(userId, sessionId,
|
||||
new MessageInputDto
|
||||
{
|
||||
Content = "不予存储",
|
||||
ModelId = sourceModelId,
|
||||
TokenUsage = tokenUsage,
|
||||
}, tokenId);
|
||||
|
||||
await _aiMessageManager.CreateSystemMessageAsync(userId, sessionId,
|
||||
new MessageInputDto
|
||||
{
|
||||
Content = "不予存储",
|
||||
ModelId = sourceModelId,
|
||||
TokenUsage = tokenUsage
|
||||
}, tokenId);
|
||||
|
||||
await _usageStatisticsManager.SetUsageAsync(userId, sourceModelId, tokenUsage, tokenId);
|
||||
|
||||
// 扣减尊享token包用量
|
||||
if (userId.HasValue && tokenUsage is not null && modelDescribe.IsPremium)
|
||||
{
|
||||
var totalTokens = tokenUsage.TotalTokens ?? 0;
|
||||
if (totalTokens > 0)
|
||||
{
|
||||
await PremiumPackageManager.TryConsumeTokensAsync(userId.Value, totalTokens);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#region 统一流式处理 - 各API类型的具体实现
|
||||
|
||||
/// <summary>
|
||||
/// 处理 OpenAI Completions 格式流式响应
|
||||
/// </summary>
|
||||
private async Task<ThorUsageResponse> ProcessCompletionsStreamAsync(
|
||||
ConcurrentQueue<string> messageQueue,
|
||||
JsonElement requestBody,
|
||||
AiModelDescribe modelDescribe,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
var request = requestBody.Deserialize<ThorChatCompletionsRequest>(ThorJsonSerializer.DefaultOptions)!;
|
||||
_specialCompatible.Compatible(request);
|
||||
|
||||
// 处理 yi- 前缀
|
||||
if (!string.IsNullOrEmpty(request.Model) &&
|
||||
request.Model.StartsWith("yi-", StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
request.Model = request.Model[3..];
|
||||
}
|
||||
|
||||
var chatService = LazyServiceProvider.GetRequiredKeyedService<IChatCompletionService>(modelDescribe.HandlerName);
|
||||
var completeChatResponse = chatService.CompleteChatStreamAsync(modelDescribe, request, cancellationToken);
|
||||
var tokenUsage = new ThorUsageResponse();
|
||||
|
||||
try
|
||||
{
|
||||
await foreach (var data in completeChatResponse)
|
||||
{
|
||||
data.SupplementalMultiplier(modelDescribe.Multiplier);
|
||||
if (data.Usage is not null && (data.Usage.CompletionTokens > 0 || data.Usage.OutputTokens > 0))
|
||||
{
|
||||
tokenUsage = data.Usage;
|
||||
}
|
||||
|
||||
var message = JsonSerializer.Serialize(data, ThorJsonSerializer.DefaultOptions);
|
||||
messageQueue.Enqueue($"data: {message}\n\n");
|
||||
}
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
_logger.LogError(e, "Ai对话异常");
|
||||
var errorContent = $"对话Ai异常,异常信息:\n当前Ai模型:{request.Model}\n异常信息:{e.Message}\n异常堆栈:{e}";
|
||||
var model = new ThorChatCompletionsResponse()
|
||||
{
|
||||
Choices = new List<ThorChatChoiceResponse>()
|
||||
{
|
||||
new ThorChatChoiceResponse()
|
||||
{
|
||||
Delta = new ThorChatMessage()
|
||||
{
|
||||
Content = errorContent
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
var errorMessage = JsonConvert.SerializeObject(model, new JsonSerializerSettings
|
||||
{
|
||||
ContractResolver = new CamelCasePropertyNamesContractResolver()
|
||||
});
|
||||
messageQueue.Enqueue($"data: {errorMessage}\n\n");
|
||||
}
|
||||
|
||||
messageQueue.Enqueue("data: [DONE]\n\n");
|
||||
return tokenUsage;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 处理 Anthropic Messages 格式流式响应
|
||||
/// </summary>
|
||||
private async Task<ThorUsageResponse> ProcessAnthropicStreamAsync(
|
||||
ConcurrentQueue<string> messageQueue,
|
||||
JsonElement requestBody,
|
||||
AiModelDescribe modelDescribe,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
var request = requestBody.Deserialize<AnthropicInput>(ThorJsonSerializer.DefaultOptions)!;
|
||||
_specialCompatible.AnthropicCompatible(request);
|
||||
|
||||
// 处理 yi- 前缀
|
||||
if (!string.IsNullOrEmpty(request.Model) &&
|
||||
request.Model.StartsWith("yi-", StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
request.Model = request.Model[3..];
|
||||
}
|
||||
|
||||
var chatService = LazyServiceProvider.GetRequiredKeyedService<IAnthropicChatCompletionService>(modelDescribe.HandlerName);
|
||||
var completeChatResponse = chatService.StreamChatCompletionsAsync(modelDescribe, request, cancellationToken);
|
||||
var tokenUsage = new ThorUsageResponse();
|
||||
|
||||
try
|
||||
{
|
||||
await foreach (var responseResult in completeChatResponse)
|
||||
{
|
||||
// 部分供应商message_start放一部分
|
||||
if (responseResult.Item1.Contains("message_start"))
|
||||
{
|
||||
var currentTokenUsage = responseResult.Item2?.Message?.Usage;
|
||||
if (currentTokenUsage != null)
|
||||
{
|
||||
if ((currentTokenUsage.InputTokens ?? 0) != 0)
|
||||
{
|
||||
tokenUsage.InputTokens = (currentTokenUsage.InputTokens ?? 0) +
|
||||
(currentTokenUsage.CacheCreationInputTokens ?? 0) +
|
||||
(currentTokenUsage.CacheReadInputTokens ?? 0);
|
||||
}
|
||||
if ((currentTokenUsage.OutputTokens ?? 0) != 0)
|
||||
{
|
||||
tokenUsage.OutputTokens = currentTokenUsage.OutputTokens;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// message_delta又放一部分
|
||||
if (responseResult.Item1.Contains("message_delta"))
|
||||
{
|
||||
var currentTokenUsage = responseResult.Item2?.Usage;
|
||||
if (currentTokenUsage != null)
|
||||
{
|
||||
if ((currentTokenUsage.InputTokens ?? 0) != 0)
|
||||
{
|
||||
tokenUsage.InputTokens = (currentTokenUsage.InputTokens ?? 0) +
|
||||
(currentTokenUsage.CacheCreationInputTokens ?? 0) +
|
||||
(currentTokenUsage.CacheReadInputTokens ?? 0);
|
||||
}
|
||||
if ((currentTokenUsage.OutputTokens ?? 0) != 0)
|
||||
{
|
||||
tokenUsage.OutputTokens = currentTokenUsage.OutputTokens;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 序列化为SSE格式字符串
|
||||
var data = JsonSerializer.Serialize(responseResult.Item2, ThorJsonSerializer.DefaultOptions);
|
||||
messageQueue.Enqueue($"{responseResult.Item1.Trim()}\ndata: {data}\n\n");
|
||||
}
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
_logger.LogError(e, "Ai对话异常");
|
||||
var errorContent = $"对话Ai异常,异常信息:\n当前Ai模型:{request.Model}\n异常信息:{e.Message}\n异常堆栈:{e}";
|
||||
throw new UserFriendlyException(errorContent);
|
||||
}
|
||||
|
||||
tokenUsage.TotalTokens = (tokenUsage.InputTokens ?? 0) + (tokenUsage.OutputTokens ?? 0);
|
||||
tokenUsage.SetSupplementalMultiplier(modelDescribe.Multiplier);
|
||||
|
||||
return tokenUsage;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 处理 OpenAI Responses 格式流式响应
|
||||
/// </summary>
|
||||
private async Task<ThorUsageResponse> ProcessOpenAiResponsesStreamAsync(
|
||||
ConcurrentQueue<string> messageQueue,
|
||||
JsonElement requestBody,
|
||||
AiModelDescribe modelDescribe,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
var request = requestBody.Deserialize<OpenAiResponsesInput>(ThorJsonSerializer.DefaultOptions)!;
|
||||
|
||||
// 处理 yi- 前缀
|
||||
if (!string.IsNullOrEmpty(request.Model) &&
|
||||
request.Model.StartsWith("yi-", StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
request.Model = request.Model[3..];
|
||||
}
|
||||
|
||||
var chatService = LazyServiceProvider.GetRequiredKeyedService<IOpenAiResponseService>(modelDescribe.HandlerName);
|
||||
var completeChatResponse = chatService.ResponsesStreamAsync(modelDescribe, request, cancellationToken);
|
||||
ThorUsageResponse? tokenUsage = null;
|
||||
|
||||
try
|
||||
{
|
||||
await foreach (var responseResult in completeChatResponse)
|
||||
{
|
||||
if (responseResult.Item1.Contains("response.completed"))
|
||||
{
|
||||
var obj = responseResult.Item2!.Value;
|
||||
int inputTokens = obj.GetPath("response", "usage", "input_tokens").GetInt();
|
||||
int outputTokens = obj.GetPath("response", "usage", "output_tokens").GetInt();
|
||||
inputTokens = Convert.ToInt32(inputTokens * modelDescribe.Multiplier);
|
||||
outputTokens = Convert.ToInt32(outputTokens * modelDescribe.Multiplier);
|
||||
tokenUsage = new ThorUsageResponse
|
||||
{
|
||||
PromptTokens = inputTokens,
|
||||
InputTokens = inputTokens,
|
||||
OutputTokens = outputTokens,
|
||||
CompletionTokens = outputTokens,
|
||||
TotalTokens = inputTokens + outputTokens,
|
||||
};
|
||||
}
|
||||
|
||||
// 序列化为SSE格式字符串
|
||||
var data = JsonSerializer.Serialize(responseResult.Item2, ThorJsonSerializer.DefaultOptions);
|
||||
messageQueue.Enqueue($"{responseResult.Item1.Trim()}\ndata: {data}\n\n");
|
||||
}
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
_logger.LogError(e, "Ai响应异常");
|
||||
var errorContent = $"响应Ai异常,异常信息:\n当前Ai模型:{request.Model}\n异常信息:{e.Message}\n异常堆栈:{e}";
|
||||
throw new UserFriendlyException(errorContent);
|
||||
}
|
||||
|
||||
return tokenUsage ?? new ThorUsageResponse();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 处理 Gemini GenerateContent 格式流式响应
|
||||
/// </summary>
|
||||
private async Task<ThorUsageResponse> ProcessGeminiStreamAsync(
|
||||
ConcurrentQueue<string> messageQueue,
|
||||
JsonElement requestBody,
|
||||
AiModelDescribe modelDescribe,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
var chatService = LazyServiceProvider.GetRequiredKeyedService<IGeminiGenerateContentService>(modelDescribe.HandlerName);
|
||||
var completeChatResponse = chatService.GenerateContentStreamAsync(modelDescribe, requestBody, cancellationToken);
|
||||
ThorUsageResponse? tokenUsage = null;
|
||||
|
||||
try
|
||||
{
|
||||
await foreach (var responseResult in completeChatResponse)
|
||||
{
|
||||
if (responseResult!.Value.GetPath("candidates", 0, "finishReason").GetString() == "STOP")
|
||||
{
|
||||
tokenUsage = GeminiGenerateContentAcquirer.GetUsage(responseResult!.Value);
|
||||
// 如果是图片模型,单独扣费
|
||||
if (modelDescribe.ModelType == ModelTypeEnum.Image)
|
||||
{
|
||||
tokenUsage = new ThorUsageResponse
|
||||
{
|
||||
InputTokens = (int)modelDescribe.Multiplier,
|
||||
OutputTokens = (int)modelDescribe.Multiplier,
|
||||
TotalTokens = (int)modelDescribe.Multiplier
|
||||
};
|
||||
}
|
||||
else
|
||||
{
|
||||
tokenUsage.SetSupplementalMultiplier(modelDescribe.Multiplier);
|
||||
}
|
||||
}
|
||||
|
||||
messageQueue.Enqueue($"data: {JsonSerializer.Serialize(responseResult)}\n\n");
|
||||
}
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
_logger.LogError(e, "Ai生成异常");
|
||||
var errorContent = $"生成Ai异常,异常信息:\n当前Ai模型:{modelDescribe.ModelId}\n异常信息:{e.Message}\n异常堆栈:{e}";
|
||||
throw new UserFriendlyException(errorContent);
|
||||
}
|
||||
|
||||
return tokenUsage ?? new ThorUsageResponse();
|
||||
}
|
||||
|
||||
#endregion
|
||||
|
||||
#region 流式传输格式Http响应
|
||||
|
||||
private static readonly byte[] EventPrefix = "event: "u8.ToArray();
|
||||
|
||||
Reference in New Issue
Block a user