Files
Yi.Framework/Yi.Abp.Net8/module/ai-hub/Yi.Framework.AiHub.Domain/Managers/AiGateWayManager.cs
2025-07-28 13:15:42 +08:00

266 lines
10 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
using System.Collections.Concurrent;
using System.Runtime.CompilerServices;
using System.Text;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Newtonsoft.Json.Serialization;
using Volo.Abp.Domain.Services;
using Yi.Framework.AiHub.Application.Contracts.Dtos;
using Yi.Framework.AiHub.Application.Contracts.Dtos.OpenAi;
using Yi.Framework.AiHub.Domain.AiGateWay;
using Yi.Framework.AiHub.Domain.Entities.Model;
using Yi.Framework.AiHub.Domain.Shared.Dtos;
using Yi.Framework.SqlSugarCore.Abstractions;
namespace Yi.Framework.AiHub.Domain.Managers;
public class AiGateWayManager : DomainService
{
private readonly ISqlSugarRepository<AiAppAggregateRoot> _aiAppRepository;
private readonly ILogger<AiGateWayManager> _logger;
private readonly AiMessageManager _aiMessageManager;
private readonly UsageStatisticsManager _usageStatisticsManager;
private readonly ISpecialCompatible _specialCompatible;
public AiGateWayManager(ISqlSugarRepository<AiAppAggregateRoot> aiAppRepository, ILogger<AiGateWayManager> logger,
AiMessageManager aiMessageManager, UsageStatisticsManager usageStatisticsManager,
ISpecialCompatible specialCompatible)
{
_aiAppRepository = aiAppRepository;
_logger = logger;
_aiMessageManager = aiMessageManager;
_usageStatisticsManager = usageStatisticsManager;
_specialCompatible = specialCompatible;
}
/// <summary>
/// 获取模型
/// </summary>
/// <param name="modelId"></param>
/// <returns></returns>
private async Task<AiModelDescribe> GetModelAsync(string modelId)
{
var allApp = await _aiAppRepository._DbQueryable.Includes(x => x.AiModels).ToListAsync();
foreach (var app in allApp)
{
var model = app.AiModels.FirstOrDefault(x => x.ModelId == modelId);
if (model is not null)
{
return new AiModelDescribe
{
AppId = app.Id,
AppName = app.Name,
Endpoint = app.Endpoint,
ApiKey = app.ApiKey,
OrderNum = model.OrderNum,
HandlerName = model.HandlerName,
ModelId = model.ModelId,
ModelName = model.Name,
Description = model.Description,
AppExtraUrl = app.ExtraUrl,
ModelExtraInfo = model.ExtraInfo
};
}
}
throw new UserFriendlyException($"{modelId}模型当前版本不支持");
}
/// <summary>
/// 聊天完成-流式
/// </summary>
/// <param name="request"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async IAsyncEnumerable<ThorChatCompletionsResponse> CompleteChatStreamAsync(
ThorChatCompletionsRequest request,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
_specialCompatible.Compatible(request);
var modelDescribe = await GetModelAsync(request.Model);
var chatService =
LazyServiceProvider.GetRequiredKeyedService<IChatCompletionService>(modelDescribe.HandlerName);
await foreach (var result in chatService.CompleteChatStreamAsync(modelDescribe, request, cancellationToken))
{
yield return result;
}
}
/// <summary>
/// 聊天完成-非流式
/// </summary>
/// <param name="httpContext"></param>
/// <param name="request"></param>
/// <param name="userId"></param>
/// <param name="sessionId"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task CompleteChatForStatisticsAsync(HttpContext httpContext,
ThorChatCompletionsRequest request,
Guid? userId = null,
Guid? sessionId = null,
CancellationToken cancellationToken = default)
{
_specialCompatible.Compatible(request);
var response = httpContext.Response;
// 设置响应头,声明是 json
//response.ContentType = "application/json; charset=UTF-8";
var modelDescribe = await GetModelAsync(request.Model);
var chatService =
LazyServiceProvider.GetRequiredKeyedService<IChatCompletionService>(modelDescribe.HandlerName);
var data = await chatService.CompleteChatAsync(modelDescribe, request, cancellationToken);
if (userId is not null)
{
await _aiMessageManager.CreateUserMessageAsync(userId.Value, sessionId,
new MessageInputDto
{
Content = request.Messages?.LastOrDefault().Content ?? string.Empty,
ModelId = request.Model,
TokenUsage = data.Usage,
});
await _aiMessageManager.CreateSystemMessageAsync(userId.Value, sessionId,
new MessageInputDto
{
Content = data.Choices.FirstOrDefault()?.Delta.Content,
ModelId = request.Model,
TokenUsage = data.Usage
});
await _usageStatisticsManager.SetUsageAsync(userId.Value, request.Model, data.Usage);
}
await response.WriteAsJsonAsync(data, cancellationToken);
}
/// <summary>
/// 聊天完成-缓存处理
/// </summary>
/// <param name="httpContext"></param>
/// <param name="request"></param>
/// <param name="userId"></param>
/// <param name="sessionId"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task CompleteChatStreamForStatisticsAsync(
HttpContext httpContext,
ThorChatCompletionsRequest request,
Guid? userId = null,
Guid? sessionId = null,
CancellationToken cancellationToken = default)
{
var response = httpContext.Response;
// 设置响应头,声明是 SSE 流
response.ContentType = "text/event-stream;charset=utf-8;";
response.Headers.TryAdd("Cache-Control", "no-cache");
response.Headers.TryAdd("Connection", "keep-alive");
var gateWay = LazyServiceProvider.GetRequiredService<AiGateWayManager>();
var completeChatResponse = gateWay.CompleteChatStreamAsync(request, cancellationToken);
var tokenUsage = new ThorUsageResponse();
//缓存队列算法
// 创建一个队列来缓存消息
var messageQueue = new ConcurrentQueue<string>();
StringBuilder backupSystemContent = new StringBuilder();
// 设置输出速率例如每50毫秒输出一次
var outputInterval = TimeSpan.FromMilliseconds(75);
// 标记是否完成接收
var isComplete = false;
// 启动一个后台任务来消费队列
var outputTask = Task.Run(async () =>
{
while (!(isComplete && messageQueue.IsEmpty))
{
if (messageQueue.TryDequeue(out var message))
{
await response.WriteAsync(message, Encoding.UTF8, cancellationToken).ConfigureAwait(false);
await response.Body.FlushAsync(cancellationToken).ConfigureAwait(false);
}
if (!isComplete)
{
// 如果没有完成,才等待,已完成,全部输出
await Task.Delay(outputInterval, cancellationToken).ConfigureAwait(false);
}
}
}, cancellationToken);
//IAsyncEnumerable 只能在最外层捕获异常(如果你有其他办法的话...
try
{
await foreach (var data in completeChatResponse)
{
if (data.Usage is not null)
{
tokenUsage = data.Usage;
}
var message = System.Text.Json.JsonSerializer.Serialize(data, ThorJsonSerializer.DefaultOptions);
backupSystemContent.Append(data.Choices.FirstOrDefault()?.Delta.Content);
// 将消息加入队列而不是直接写入
messageQueue.Enqueue($"data: {message}\n\n");
}
}
catch (Exception e)
{
_logger.LogError(e, $"Ai对话异常");
var errorContent = $"Ai异常异常信息\n当前Ai模型{request.Model}\n异常信息{e.Message}\n异常堆栈:{e}";
var model = new ThorChatCompletionsResponse()
{
Choices = new List<ThorChatChoiceResponse>()
{
new ThorChatChoiceResponse()
{
Delta = new ThorChatMessage()
{
Content = errorContent
}
}
}
};
var message = JsonConvert.SerializeObject(model, new JsonSerializerSettings
{
ContractResolver = new CamelCasePropertyNamesContractResolver()
});
backupSystemContent.Append(errorContent);
messageQueue.Enqueue($"data: {message}\n\n");
}
//断开连接
messageQueue.Enqueue("data: [DONE]\n\n");
// 标记完成并发送结束标记
isComplete = true;
await outputTask;
if (userId is not null)
{
await _aiMessageManager.CreateUserMessageAsync(userId.Value, sessionId,
new MessageInputDto
{
Content = request.Messages?.LastOrDefault()?.Content ?? string.Empty,
ModelId = request.Model,
TokenUsage = tokenUsage,
});
await _aiMessageManager.CreateSystemMessageAsync(userId.Value, sessionId,
new MessageInputDto
{
Content = backupSystemContent.ToString(),
ModelId = request.Model,
TokenUsage = tokenUsage
});
await _usageStatisticsManager.SetUsageAsync(userId.Value, request.Model, tokenUsage);
}
}
}