using System.ClientModel; using System.Reflection; using System.Text; using System.Text.Json; using Dm.util; using Microsoft.Agents.AI; using Microsoft.AspNetCore.Http; using Microsoft.Extensions.AI; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using ModelContextProtocol.Server; using OpenAI; using OpenAI.Chat; using Volo.Abp.Domain.Services; using Yi.Framework.AiHub.Application.Contracts.Dtos.Chat; using Yi.Framework.AiHub.Domain.AiGateWay; using Yi.Framework.AiHub.Domain.Entities.Chat; using Yi.Framework.AiHub.Domain.Entities.OpenApi; using Yi.Framework.AiHub.Domain.Shared.Consts; using Yi.Framework.AiHub.Domain.Shared.Dtos; using Yi.Framework.AiHub.Domain.Shared.Dtos.OpenAi; using Yi.Framework.AiHub.Domain.Shared.Enums; using Yi.Framework.SqlSugarCore.Abstractions; namespace Yi.Framework.AiHub.Domain.Managers; public class ChatManager : DomainService { private readonly ILoggerFactory _loggerFactory; private readonly ISqlSugarRepository _messageRepository; private readonly ISqlSugarRepository _agentStoreRepository; private readonly AiMessageManager _aiMessageManager; private readonly UsageStatisticsManager _usageStatisticsManager; private readonly PremiumPackageManager _premiumPackageManager; private readonly AiGateWayManager _aiGateWayManager; public ChatManager(ILoggerFactory loggerFactory, ISqlSugarRepository messageRepository, ISqlSugarRepository agentStoreRepository, AiMessageManager aiMessageManager, UsageStatisticsManager usageStatisticsManager, PremiumPackageManager premiumPackageManager, AiGateWayManager aiGateWayManager) { _loggerFactory = loggerFactory; _messageRepository = messageRepository; _agentStoreRepository = agentStoreRepository; _aiMessageManager = aiMessageManager; _usageStatisticsManager = usageStatisticsManager; _premiumPackageManager = premiumPackageManager; _aiGateWayManager = aiGateWayManager; } /// /// agent流式对话 /// /// /// /// /// /// /// /// /// /// public async Task AgentCompleteChatStreamAsync(HttpContext httpContext, Guid sessionId, string content, string token, Guid tokenId, string modelId, Guid userId, List tools , CancellationToken cancellationToken) { // HttpClient.DefaultProxy = new WebProxy("127.0.0.1:8888"); var response = httpContext.Response; // 设置响应头,声明是 SSE 流 response.ContentType = "text/event-stream;charset=utf-8;"; response.Headers.TryAdd("Cache-Control", "no-cache"); response.Headers.TryAdd("Connection", "keep-alive"); var modelDescribe = await _aiGateWayManager.GetModelAsync(ModelApiTypeEnum.OpenAi, modelId); //token状态检查,在应用层统一处理 var client = new OpenAIClient(new ApiKeyCredential(token), new OpenAIClientOptions { Endpoint = new Uri("https://yxai.chat/v1"), }); #pragma warning disable OPENAI001 var agent = client.GetChatClient(modelId) #pragma warning restore OPENAI001 .CreateAIAgent(new ChatClientAgentOptions { ChatOptions = new() { Instructions = """ 你是一个专业的智能助手,是 “意心Ai” 平台自研的 “橙子小弟” 智能体 擅长于精准解决用户提出的各类问题 但性格暴躁,脏话连篇,喜欢说幽默讽刺的话,攻击性极强 """ }, Name = "橙子小弟", ChatMessageStoreFactory = ctx => new InMemoryChatMessageStore( #pragma warning disable MEAI001 new MessageCountingChatReducer(10), // 保留最近10条非系统消息 #pragma warning restore MEAI001 ctx.SerializedState, ctx.JsonSerializerOptions ) }); //线程根据sessionId数据库中获取 var agentStore = await _agentStoreRepository.GetFirstAsync(x => x.SessionId == sessionId); if (agentStore is null) { agentStore = new AgentStoreAggregateRoot(sessionId); } //获取当前线程 AgentThread currentThread; if (!string.IsNullOrWhiteSpace(agentStore.Store)) { //获取当前存储 JsonElement reloaded = JsonSerializer.Deserialize(agentStore.Store, JsonSerializerOptions.Web); currentThread = agent.DeserializeThread(reloaded, JsonSerializerOptions.Web); } else { currentThread = agent.GetNewThread(); } //给agent塞入工具 var toolContents = GetTools(); var chatOptions = new ChatOptions() { Tools = toolContents.Where(x=>tools.Contains(x.Code)).Select(x => (AITool)x.Tool).ToList(), ToolMode = ChatToolMode.Auto }; await foreach (var update in agent.RunStreamingAsync(content, currentThread, new ChatClientAgentRunOptions(chatOptions), cancellationToken)) { // 检查每个更新中的内容 foreach (var updateContent in update.Contents) { switch (updateContent) { //工具调用中 case FunctionCallContent functionCall: await SendHttpStreamMessageAsync(httpContext, new AgentResultOutput { TypeEnum = AgentResultTypeEnum.ToolCalling, Content = functionCall.Name }, isDone: false, cancellationToken); break; //工具调用完成 case FunctionResultContent functionResult: await SendHttpStreamMessageAsync(httpContext, new AgentResultOutput { TypeEnum = AgentResultTypeEnum.ToolCalled, Content = functionResult.Result }, isDone: false, cancellationToken); break; //内容输出 case TextContent textContent: //发送消息给前端 await SendHttpStreamMessageAsync(httpContext, new AgentResultOutput { TypeEnum = AgentResultTypeEnum.Text, Content = textContent.Text }, isDone: false, cancellationToken); break; //用量统计 case UsageContent usageContent: var usage = new ThorUsageResponse { InputTokens = Convert.ToInt32(usageContent.Details.InputTokenCount ?? 0), OutputTokens = Convert.ToInt32(usageContent.Details.OutputTokenCount ?? 0), TotalTokens = usageContent.Details.TotalTokenCount ?? 0, }; //设置倍率 usage.SetSupplementalMultiplier(modelDescribe.Multiplier); //创建系统回答,用于计费统计 await _aiMessageManager.CreateSystemMessageAsync(userId, sessionId, new MessageInputDto { Content = "不与存储", ModelId = modelId, TokenUsage = usage }, tokenId); //创建用量统计,用于统计分析 await _usageStatisticsManager.SetUsageAsync(userId, modelId, usage, tokenId); //扣减尊享token包用量 if (PremiumPackageConst.ModeIds.Contains(modelId)) { var totalTokens = usage?.TotalTokens ?? 0; if (totalTokens > 0) { await _premiumPackageManager.TryConsumeTokensAsync(userId, totalTokens); } } await SendHttpStreamMessageAsync(httpContext, new AgentResultOutput { TypeEnum = update.RawRepresentation is ChatResponseUpdate raw ? raw.FinishReason?.Value == "tool_calls" ? AgentResultTypeEnum.ToolCallUsage : AgentResultTypeEnum.Usage : AgentResultTypeEnum.Usage, Content = usage! }, isDone: false, cancellationToken); break; } } } //断开连接 await SendHttpStreamMessageAsync(httpContext, null, isDone: true, cancellationToken); //将线程持久化到数据库 string serializedJson = currentThread.Serialize(JsonSerializerOptions.Web).GetRawText(); agentStore.Store = serializedJson; //插入或者更新 await _agentStoreRepository.InsertOrUpdateAsync(agentStore); } public List<(string Code, string Name, AIFunction Tool)> GetTools() { var toolClasses = typeof(YiFrameworkAiHubDomainModule).Assembly.GetTypes() .Where(x => x.GetCustomAttribute() is not null) .ToList(); List<(string Code, string Name, AIFunction Tool)> mcpTools = new(); foreach (var toolClass in toolClasses) { var instance = LazyServiceProvider.GetRequiredService(toolClass); var toolMethods = toolClass.GetMethods() .Where(y => y.GetCustomAttribute() is not null).ToList(); foreach (var toolMethod in toolMethods) { var display = toolMethod.GetCustomAttribute()?.DisplayName; var tool = AIFunctionFactory.Create(toolMethod, instance); mcpTools.add((tool.Name, display, tool)); } } return mcpTools; } /// /// 发送消息 /// /// /// /// /// /// private async Task SendHttpStreamMessageAsync(HttpContext httpContext, AgentResultOutput? content, bool isDone = false, CancellationToken cancellationToken = default) { var response = httpContext.Response; string output; if (isDone) { output = "[DONE]"; } else { output = JsonSerializer.Serialize(content, ThorJsonSerializer.DefaultOptions); } await response.WriteAsync($"data: {output}\n\n", Encoding.UTF8, cancellationToken).ConfigureAwait(false); await response.Body.FlushAsync(cancellationToken).ConfigureAwait(false); } }