using System.Collections.Concurrent; using System.Text; using Microsoft.AspNetCore.Authorization; using Microsoft.AspNetCore.Http; using Microsoft.AspNetCore.Mvc; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Newtonsoft.Json; using Newtonsoft.Json.Serialization; using OpenAI.Chat; using Volo.Abp.Application.Services; using Volo.Abp.Users; using Yi.Framework.AiHub.Application.Contracts.Dtos; using Yi.Framework.AiHub.Application.Contracts.Dtos.OpenAiDto; using Yi.Framework.AiHub.Domain.Entities.OpenApi; using Yi.Framework.AiHub.Domain.Managers; using Yi.Framework.AiHub.Domain.Shared.Dtos; using Yi.Framework.SqlSugarCore.Abstractions; namespace Yi.Framework.AiHub.Application.Services; public class OpenApiService : ApplicationService { private readonly IHttpContextAccessor _httpContextAccessor; private readonly ILogger _logger; private readonly TokenManager _tokenManager; private readonly AiMessageManager _aiMessageManager; private readonly UsageStatisticsManager _usageStatisticsManager; public OpenApiService(IHttpContextAccessor httpContextAccessor, ILogger logger, TokenManager tokenManager, AiMessageManager aiMessageManager, UsageStatisticsManager usageStatisticsManager) { _httpContextAccessor = httpContextAccessor; _logger = logger; _tokenManager = tokenManager; _aiMessageManager = aiMessageManager; _usageStatisticsManager = usageStatisticsManager; } /// /// 对话 /// /// /// [HttpPost("openApi/v1/chat/completions")] public async Task ChatCompletionsAsync(ChatCompletionsInput input, CancellationToken cancellationToken) { //前面都是校验,后面才是真正的调用 var httpContext = this._httpContextAccessor.HttpContext; var userId = await _tokenManager.GetUserIdAsync(GetTokenByHttpContext(httpContext)); var response = httpContext.Response; // 设置响应头,声明是 SSE 流 response.ContentType = "text/event-stream"; response.Headers.Append("Cache-Control", "no-cache"); response.Headers.Append("Connection", "keep-alive"); var history = new List(); foreach (var aiChatContextDto in input.Messages) { if (aiChatContextDto.Role == "assistant") { history.Add(ChatMessage.CreateAssistantMessage(aiChatContextDto.Content)); } else if (aiChatContextDto.Role == "user") { history.Add(ChatMessage.CreateUserMessage(aiChatContextDto.Content)); } } var gateWay = LazyServiceProvider.GetRequiredService(); var completeChatResponse = gateWay.CompleteChatAsync(input.Model, history, cancellationToken); var tokenUsage = new TokenUsage(); await using var writer = new StreamWriter(response.Body, Encoding.UTF8, leaveOpen: true); //缓存队列算法 // 创建一个队列来缓存消息 var messageQueue = new ConcurrentQueue(); StringBuilder backupSystemContent = new StringBuilder(); // 设置输出速率(例如每50毫秒输出一次) var outputInterval = TimeSpan.FromMilliseconds(100); // 标记是否完成接收 var isComplete = false; // 启动一个后台任务来消费队列 var outputTask = Task.Run(async () => { while (!(isComplete && messageQueue.IsEmpty)) { if (messageQueue.TryDequeue(out var message)) { await writer.WriteLineAsync(message); await writer.FlushAsync(cancellationToken); } if (!isComplete) { // 如果没有完成,才等待,已完成,全部输出 await Task.Delay(outputInterval, cancellationToken); } } }, cancellationToken); //IAsyncEnumerable 只能在最外层捕获异常(如果你有其他办法的话...) try { await foreach (var data in completeChatResponse) { if (data.IsFinish) { tokenUsage = data.TokenUsage; } var model = MapToMessage(input.Model, data.Content); var message = JsonConvert.SerializeObject(model, new JsonSerializerSettings { ContractResolver = new CamelCasePropertyNamesContractResolver() }); backupSystemContent.Append(data.Content); // 将消息加入队列而不是直接写入 messageQueue.Enqueue($"data: {message}\n"); } } catch (Exception e) { _logger.LogError(e, $"Ai对话异常"); var errorContent = $"Ai对话异常,异常信息:\n{e.Message}"; var model = MapToMessage(input.Model, errorContent); var message = JsonConvert.SerializeObject(model, new JsonSerializerSettings { ContractResolver = new CamelCasePropertyNamesContractResolver() }); backupSystemContent.Append(errorContent); messageQueue.Enqueue($"data: {message}\n"); } //断开连接 messageQueue.Enqueue("data: [DONE]\n"); // 标记完成并发送结束标记 isComplete = true; await outputTask; await _aiMessageManager.CreateUserMessageAsync(userId, null, new MessageInputDto { Content = input.Messages.LastOrDefault() .Content, ModelId = input.Model, TokenUsage = tokenUsage, }); await _aiMessageManager.CreateSystemMessageAsync(userId, null, new MessageInputDto { Content = backupSystemContent.ToString(), ModelId = input.Model, TokenUsage = tokenUsage }); await _usageStatisticsManager.SetUsageAsync(userId, input.Model, tokenUsage.InputTokenCount, tokenUsage.OutputTokenCount); } /// /// 获取模型列表 /// /// [HttpGet("openApi/v1/models")] public async Task ModelsAsync() { return new ModelGetOutput() { Data = new List() { new ModelDataOutput { ModelId = "gpt-4.1-mini", Object = "model", Owned_by = "organization-owner", Permission = new List() }, new ModelDataOutput { ModelId = "gpt-4.1-nano", Object = "model", Owned_by = "organization-owner", Permission = new List() } } }; } private SendMessageOutputDto MapToMessage(string modelId, string content) { var output = new SendMessageOutputDto { Id = "chatcmpl-BotYP3BlN5T4g9YPnW0fBSBvKzXdd", Object = "chat.completion.chunk", Created = 1750336171, Model = modelId, Choices = new() { new Choice { Index = 0, Delta = new Delta { Content = content, Role = "assistant" }, FinishReason = null, ContentFilterResults = new() { Hate = new() { Filtered = false, Detected = null }, SelfHarm = new() { Filtered = false, Detected = null }, Sexual = new() { Filtered = false, Detected = null }, Violence = new() { Filtered = false, Detected = null }, Jailbreak = new() { Filtered = false, Detected = false }, Profanity = new() { Filtered = false, Detected = false }, } } }, SystemFingerprint = "", Usage = new Usage { PromptTokens = 0, CompletionTokens = 0, TotalTokens = 0, PromptTokensDetails = new() { AudioTokens = 0, CachedTokens = 0 }, CompletionTokensDetails = new() { AudioTokens = 0, ReasoningTokens = 0, AcceptedPredictionTokens = 0, RejectedPredictionTokens = 0 } } }; return output; } private string? GetTokenByHttpContext(HttpContext httpContext) { // 获取Authorization头 string authHeader = httpContext.Request.Headers["Authorization"]; // 检查是否有Bearer token if (authHeader != null && authHeader.StartsWith("Bearer ")) { return authHeader.Substring("Bearer ".Length).Trim(); } return null; } }