using System.Collections.Concurrent; using System.Runtime.CompilerServices; using System.Text; using Microsoft.AspNetCore.Http; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Newtonsoft.Json; using Newtonsoft.Json.Serialization; using OpenAI.Chat; using Volo.Abp.Domain.Services; using Yi.Framework.AiHub.Application.Contracts.Dtos; using Yi.Framework.AiHub.Application.Contracts.Dtos.OpenAi; using Yi.Framework.AiHub.Domain.AiChat; using Yi.Framework.AiHub.Domain.Entities; using Yi.Framework.AiHub.Domain.Entities.Model; using Yi.Framework.AiHub.Domain.Shared.Dtos; using Yi.Framework.SqlSugarCore.Abstractions; using Usage = Yi.Framework.AiHub.Application.Contracts.Dtos.Usage; namespace Yi.Framework.AiHub.Domain.Managers; public class AiGateWayManager : DomainService { private readonly ISqlSugarRepository _aiAppRepository; private readonly ILogger _logger; private readonly AiMessageManager _aiMessageManager; private readonly UsageStatisticsManager _usageStatisticsManager; public AiGateWayManager(ISqlSugarRepository aiAppRepository, ILogger logger, AiMessageManager aiMessageManager, UsageStatisticsManager usageStatisticsManager) { _aiAppRepository = aiAppRepository; _logger = logger; _aiMessageManager = aiMessageManager; _usageStatisticsManager = usageStatisticsManager; } /// /// 获取模型 /// /// /// private async Task GetModelAsync(string modelId) { var allApp = await _aiAppRepository._DbQueryable.Includes(x => x.AiModels).ToListAsync(); foreach (var app in allApp) { var model = app.AiModels.FirstOrDefault(x => x.ModelId == modelId); if (model is not null) { return new AiModelDescribe { AppId = app.Id, AppName = app.Name, Endpoint = app.Endpoint, ApiKey = app.ApiKey, OrderNum = model.OrderNum, HandlerName = model.HandlerName, ModelId = model.ModelId, ModelName = model.Name, Description = model.Description }; } } throw new UserFriendlyException($"{modelId}模型当前版本不支持"); } /// /// 聊天完成-流式 /// /// /// /// /// public async IAsyncEnumerable CompleteChatStreamAsync(string modelId, List messages, [EnumeratorCancellation] CancellationToken cancellationToken) { var modelDescribe = await GetModelAsync(modelId); var chatService = LazyServiceProvider.GetRequiredKeyedService(modelDescribe.HandlerName); await foreach (var result in chatService.CompleteChatStreamAsync(modelDescribe, messages, cancellationToken)) { yield return result; } } /// /// 聊天完成-非流式 /// /// /// /// /// /// /// /// public async Task CompleteChatForStatisticsAsync(HttpContext httpContext, string modelId, List messages, Guid? userId = null, Guid? sessionId = null, CancellationToken cancellationToken = default) { var response = httpContext.Response; // 设置响应头,声明是 json response.ContentType = "application/json; charset=UTF-8"; await using var writer = new StreamWriter(response.Body, Encoding.UTF8, leaveOpen: true); var modelDescribe = await GetModelAsync(modelId); var chatService = LazyServiceProvider.GetRequiredKeyedService(modelDescribe.HandlerName); var data = await chatService.CompleteChatAsync(modelDescribe, messages, cancellationToken); if (userId is not null) { await _aiMessageManager.CreateUserMessageAsync(userId.Value, sessionId, new MessageInputDto { Content = messages.LastOrDefault().Content.FirstOrDefault()?.Text ?? string.Empty, ModelId = modelId, TokenUsage = data.TokenUsage, }); await _aiMessageManager.CreateSystemMessageAsync(userId.Value, sessionId, new MessageInputDto { Content = data.Content, ModelId = modelId, TokenUsage = data.TokenUsage }); await _usageStatisticsManager.SetUsageAsync(userId.Value, modelId, data.TokenUsage.InputTokenCount, data.TokenUsage.OutputTokenCount); } var result = MapToChatCompletions(modelId, data.Content); var body = JsonConvert.SerializeObject(result, new JsonSerializerSettings { ContractResolver = new CamelCasePropertyNamesContractResolver() }); await writer.WriteLineAsync(body); await writer.FlushAsync(cancellationToken); } /// /// 聊天完成-缓存处理 /// /// /// /// /// /// /// /// public async Task CompleteChatStreamForStatisticsAsync( HttpContext httpContext, string modelId, List messages, Guid? userId = null, Guid? sessionId = null, CancellationToken cancellationToken = default) { var response = httpContext.Response; // 设置响应头,声明是 SSE 流 response.ContentType = "text/event-stream"; response.Headers.Append("Cache-Control", "no-cache"); response.Headers.Append("Connection", "keep-alive"); var gateWay = LazyServiceProvider.GetRequiredService(); var completeChatResponse = gateWay.CompleteChatStreamAsync(modelId, messages, cancellationToken); var tokenUsage = new TokenUsage(); await using var writer = new StreamWriter(response.Body, Encoding.UTF8, leaveOpen: true); //缓存队列算法 // 创建一个队列来缓存消息 var messageQueue = new ConcurrentQueue(); StringBuilder backupSystemContent = new StringBuilder(); // 设置输出速率(例如每50毫秒输出一次) var outputInterval = TimeSpan.FromMilliseconds(75); // 标记是否完成接收 var isComplete = false; // 启动一个后台任务来消费队列 var outputTask = Task.Run(async () => { while (!(isComplete && messageQueue.IsEmpty)) { if (messageQueue.TryDequeue(out var message)) { await writer.WriteLineAsync(message); await writer.FlushAsync(cancellationToken); } if (!isComplete) { // 如果没有完成,才等待,已完成,全部输出 await Task.Delay(outputInterval, cancellationToken); } } }, cancellationToken); //IAsyncEnumerable 只能在最外层捕获异常(如果你有其他办法的话...) try { await foreach (var data in completeChatResponse) { if (data.IsFinish) { tokenUsage = data.TokenUsage; } var model = MapToMessage(modelId, data.Content); var message = JsonConvert.SerializeObject(model, new JsonSerializerSettings { ContractResolver = new CamelCasePropertyNamesContractResolver() }); backupSystemContent.Append(data.Content); // 将消息加入队列而不是直接写入 messageQueue.Enqueue($"data: {message}\n"); } } catch (Exception e) { _logger.LogError(e, $"Ai对话异常"); var errorContent = $"Ai对话异常,异常信息:\n{e.Message}"; var model = MapToMessage(modelId, errorContent); var message = JsonConvert.SerializeObject(model, new JsonSerializerSettings { ContractResolver = new CamelCasePropertyNamesContractResolver() }); backupSystemContent.Append(errorContent); messageQueue.Enqueue($"data: {message}\n"); } //断开连接 messageQueue.Enqueue("data: [DONE]\n"); // 标记完成并发送结束标记 isComplete = true; await outputTask; if (userId is not null) { await _aiMessageManager.CreateUserMessageAsync(userId.Value, sessionId, new MessageInputDto { Content = messages.LastOrDefault().Content.FirstOrDefault()?.Text ?? string.Empty, ModelId = modelId, TokenUsage = tokenUsage, }); await _aiMessageManager.CreateSystemMessageAsync(userId.Value, sessionId, new MessageInputDto { Content = backupSystemContent.ToString(), ModelId = modelId, TokenUsage = tokenUsage }); await _usageStatisticsManager.SetUsageAsync(userId.Value, modelId, tokenUsage.InputTokenCount, tokenUsage.OutputTokenCount); } } private SendMessageStreamOutputDto MapToMessage(string modelId, string content) { var output = new SendMessageStreamOutputDto { Id = "chatcmpl-BotYP3BlN5T4g9YPnW0fBSBvKzXdd", Object = "chat.completion.chunk", Created = 1750336171, Model = modelId, Choices = new() { new Choice { Index = 0, Delta = new Delta { Content = content, Role = "assistant" }, FinishReason = null, ContentFilterResults = new() { Hate = new() { Filtered = false, Detected = null }, SelfHarm = new() { Filtered = false, Detected = null }, Sexual = new() { Filtered = false, Detected = null }, Violence = new() { Filtered = false, Detected = null }, Jailbreak = new() { Filtered = false, Detected = false }, Profanity = new() { Filtered = false, Detected = false }, } } }, SystemFingerprint = "", Usage = new Usage { PromptTokens = 0, CompletionTokens = 0, TotalTokens = 0, PromptTokensDetails = new() { AudioTokens = 0, CachedTokens = 0 }, CompletionTokensDetails = new() { AudioTokens = 0, ReasoningTokens = 0, AcceptedPredictionTokens = 0, RejectedPredictionTokens = 0 } } }; return output; } private ChatCompletionsOutput MapToChatCompletions(string modelId, string content) { return new ChatCompletionsOutput { Id = "resp_67ccd2bed1ec8190b14f964abc0542670bb6a6b452d3795b", Object = "response", CreatedAt = 1741476542, Status = "completed", Error = null, IncompleteDetails = null, Instructions = null, MaxOutputTokens = null, Model = modelId, Output = new List() { new Output { Type = "message", Id = "msg_67ccd2bf17f0819081ff3bb2cf6508e60bb6a6b452d3795b", Status = "completed", Role = "assistant", Content = new List { new Content { Type = "output_text", Text = content, Annotations = new List() } } } }, ParallelToolCalls = true, PreviousResponseId = null, Reasoning = new Reasoning { Effort = null, Summary = null }, Store = true, Temperature = 0, Text = new Text { Format = new Format { Type = "text" } }, ToolChoice = "auto", Tools = new List(), TopP = 1.0, Truncation = "disabled", Usage = new Application.Contracts.Dtos.OpenAi.Usage { InputTokens = 0, InputTokensDetails = new InputTokensDetails { CachedTokens = 0 }, OutputTokens = 0, OutputTokensDetails = new OutputTokensDetails { ReasoningTokens = 0 }, TotalTokens = 0 }, User = null, Metadata = null }; } }