286 lines
10 KiB
C#
286 lines
10 KiB
C#
using System.Collections.Concurrent;
|
||
using System.Runtime.CompilerServices;
|
||
using System.Text;
|
||
using Microsoft.AspNetCore.Http;
|
||
using Microsoft.Extensions.DependencyInjection;
|
||
using Microsoft.Extensions.Logging;
|
||
using Newtonsoft.Json;
|
||
using Newtonsoft.Json.Serialization;
|
||
using OpenAI.Chat;
|
||
using Volo.Abp.Domain.Services;
|
||
using Yi.Framework.AiHub.Application.Contracts.Dtos;
|
||
using Yi.Framework.AiHub.Domain.AiChat;
|
||
using Yi.Framework.AiHub.Domain.Entities;
|
||
using Yi.Framework.AiHub.Domain.Entities.Model;
|
||
using Yi.Framework.AiHub.Domain.Shared.Dtos;
|
||
using Yi.Framework.SqlSugarCore.Abstractions;
|
||
|
||
namespace Yi.Framework.AiHub.Domain.Managers;
|
||
|
||
public class AiGateWayManager : DomainService
|
||
{
|
||
private readonly ISqlSugarRepository<AiAppAggregateRoot> _aiAppRepository;
|
||
private readonly ILogger<AiGateWayManager> _logger;
|
||
private readonly AiMessageManager _aiMessageManager;
|
||
private readonly UsageStatisticsManager _usageStatisticsManager;
|
||
|
||
public AiGateWayManager(ISqlSugarRepository<AiAppAggregateRoot> aiAppRepository, ILogger<AiGateWayManager> logger,
|
||
AiMessageManager aiMessageManager, UsageStatisticsManager usageStatisticsManager)
|
||
{
|
||
_aiAppRepository = aiAppRepository;
|
||
_logger = logger;
|
||
_aiMessageManager = aiMessageManager;
|
||
_usageStatisticsManager = usageStatisticsManager;
|
||
}
|
||
|
||
/// <summary>
|
||
/// 获取模型
|
||
/// </summary>
|
||
/// <param name="modelId"></param>
|
||
/// <returns></returns>
|
||
private async Task<AiModelDescribe> GetModelAsync(string modelId)
|
||
{
|
||
var allApp = await _aiAppRepository._DbQueryable.Includes(x => x.AiModels).ToListAsync();
|
||
foreach (var app in allApp)
|
||
{
|
||
var model = app.AiModels.FirstOrDefault(x => x.ModelId == modelId);
|
||
if (model is not null)
|
||
{
|
||
return new AiModelDescribe
|
||
{
|
||
AppId = app.Id,
|
||
AppName = app.Name,
|
||
Endpoint = app.Endpoint,
|
||
ApiKey = app.ApiKey,
|
||
OrderNum = model.OrderNum,
|
||
HandlerName = model.HandlerName,
|
||
ModelId = model.ModelId,
|
||
ModelName = model.Name,
|
||
Description = model.Description
|
||
};
|
||
}
|
||
}
|
||
|
||
throw new UserFriendlyException($"{modelId}模型当前版本不支持");
|
||
}
|
||
|
||
|
||
/// <summary>
|
||
/// 聊天完成
|
||
/// </summary>
|
||
/// <param name="modelId"></param>
|
||
/// <param name="messages"></param>
|
||
/// <param name="cancellationToken"></param>
|
||
/// <returns></returns>
|
||
public async IAsyncEnumerable<CompleteChatResponse> CompleteChatAsync(string modelId, List<ChatMessage> messages,
|
||
[EnumeratorCancellation] CancellationToken cancellationToken)
|
||
{
|
||
var modelDescribe = await GetModelAsync(modelId);
|
||
var chatService = LazyServiceProvider.GetRequiredKeyedService<IChatService>(modelDescribe.HandlerName);
|
||
await foreach (var result in chatService.CompleteChatAsync(modelDescribe, messages, cancellationToken))
|
||
{
|
||
yield return result;
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// 聊天完成-缓存处理
|
||
/// </summary>
|
||
/// <param name="httpContext"></param>
|
||
/// <param name="modelId"></param>
|
||
/// <param name="messages"></param>
|
||
/// <param name="sessionId"></param>
|
||
/// <param name="cancellationToken"></param>
|
||
/// <param name="userId"></param>
|
||
/// <returns></returns>
|
||
public async Task CompleteChatForHttpContextAsync(
|
||
HttpContext httpContext,
|
||
string modelId,
|
||
List<ChatMessage> messages,
|
||
Guid? userId = null,
|
||
Guid? sessionId = null,
|
||
CancellationToken cancellationToken = default)
|
||
{
|
||
var response = httpContext.Response;
|
||
// 设置响应头,声明是 SSE 流
|
||
response.ContentType = "text/event-stream";
|
||
response.Headers.Append("Cache-Control", "no-cache");
|
||
response.Headers.Append("Connection", "keep-alive");
|
||
|
||
|
||
var gateWay = LazyServiceProvider.GetRequiredService<AiGateWayManager>();
|
||
var completeChatResponse = gateWay.CompleteChatAsync(modelId, messages, cancellationToken);
|
||
var tokenUsage = new TokenUsage();
|
||
await using var writer = new StreamWriter(response.Body, Encoding.UTF8, leaveOpen: true);
|
||
|
||
//缓存队列算法
|
||
// 创建一个队列来缓存消息
|
||
var messageQueue = new ConcurrentQueue<string>();
|
||
|
||
StringBuilder backupSystemContent = new StringBuilder();
|
||
// 设置输出速率(例如每50毫秒输出一次)
|
||
var outputInterval = TimeSpan.FromMilliseconds(75);
|
||
// 标记是否完成接收
|
||
var isComplete = false;
|
||
// 启动一个后台任务来消费队列
|
||
var outputTask = Task.Run(async () =>
|
||
{
|
||
while (!(isComplete && messageQueue.IsEmpty))
|
||
{
|
||
if (messageQueue.TryDequeue(out var message))
|
||
{
|
||
await writer.WriteLineAsync(message);
|
||
await writer.FlushAsync(cancellationToken);
|
||
}
|
||
|
||
if (!isComplete)
|
||
{
|
||
// 如果没有完成,才等待,已完成,全部输出
|
||
await Task.Delay(outputInterval, cancellationToken);
|
||
}
|
||
}
|
||
}, cancellationToken);
|
||
|
||
|
||
//IAsyncEnumerable 只能在最外层捕获异常(如果你有其他办法的话...)
|
||
try
|
||
{
|
||
await foreach (var data in completeChatResponse)
|
||
{
|
||
if (data.IsFinish)
|
||
{
|
||
tokenUsage = data.TokenUsage;
|
||
}
|
||
|
||
var model = MapToMessage(modelId, data.Content);
|
||
var message = JsonConvert.SerializeObject(model, new JsonSerializerSettings
|
||
{
|
||
ContractResolver = new CamelCasePropertyNamesContractResolver()
|
||
});
|
||
backupSystemContent.Append(data.Content);
|
||
// 将消息加入队列而不是直接写入
|
||
messageQueue.Enqueue($"data: {message}\n");
|
||
}
|
||
}
|
||
catch (Exception e)
|
||
{
|
||
_logger.LogError(e, $"Ai对话异常");
|
||
var errorContent = $"Ai对话异常,异常信息:\n{e.Message}";
|
||
var model = MapToMessage(modelId, errorContent);
|
||
var message = JsonConvert.SerializeObject(model, new JsonSerializerSettings
|
||
{
|
||
ContractResolver = new CamelCasePropertyNamesContractResolver()
|
||
});
|
||
backupSystemContent.Append(errorContent);
|
||
messageQueue.Enqueue($"data: {message}\n");
|
||
}
|
||
|
||
//断开连接
|
||
messageQueue.Enqueue("data: [DONE]\n");
|
||
// 标记完成并发送结束标记
|
||
isComplete = true;
|
||
|
||
await outputTask;
|
||
|
||
if (userId is not null)
|
||
{
|
||
await _aiMessageManager.CreateUserMessageAsync(userId.Value, sessionId,
|
||
new MessageInputDto
|
||
{
|
||
Content = messages.LastOrDefault().Content.FirstOrDefault()?.Text ?? string.Empty,
|
||
ModelId = modelId,
|
||
TokenUsage = tokenUsage,
|
||
});
|
||
|
||
await _aiMessageManager.CreateSystemMessageAsync(userId.Value, sessionId,
|
||
new MessageInputDto
|
||
{
|
||
Content = backupSystemContent.ToString(),
|
||
ModelId = modelId,
|
||
TokenUsage = tokenUsage
|
||
});
|
||
|
||
await _usageStatisticsManager.SetUsageAsync(userId.Value, modelId, tokenUsage.InputTokenCount,
|
||
tokenUsage.OutputTokenCount);
|
||
}
|
||
}
|
||
|
||
|
||
private SendMessageOutputDto MapToMessage(string modelId, string content)
|
||
{
|
||
var output = new SendMessageOutputDto
|
||
{
|
||
Id = "chatcmpl-BotYP3BlN5T4g9YPnW0fBSBvKzXdd",
|
||
Object = "chat.completion.chunk",
|
||
Created = 1750336171,
|
||
Model = modelId,
|
||
Choices = new()
|
||
{
|
||
new Choice
|
||
{
|
||
Index = 0,
|
||
Delta = new Delta
|
||
{
|
||
Content = content,
|
||
Role = "assistant"
|
||
},
|
||
FinishReason = null,
|
||
ContentFilterResults = new()
|
||
{
|
||
Hate = new()
|
||
{
|
||
Filtered = false,
|
||
Detected = null
|
||
},
|
||
SelfHarm = new()
|
||
{
|
||
Filtered = false,
|
||
Detected = null
|
||
},
|
||
Sexual = new()
|
||
{
|
||
Filtered = false,
|
||
Detected = null
|
||
},
|
||
Violence = new()
|
||
{
|
||
Filtered = false,
|
||
Detected = null
|
||
},
|
||
Jailbreak = new()
|
||
{
|
||
Filtered = false,
|
||
Detected = false
|
||
},
|
||
Profanity = new()
|
||
{
|
||
Filtered = false,
|
||
Detected = false
|
||
},
|
||
}
|
||
}
|
||
},
|
||
SystemFingerprint = "",
|
||
Usage = new Usage
|
||
{
|
||
PromptTokens = 0,
|
||
CompletionTokens = 0,
|
||
TotalTokens = 0,
|
||
PromptTokensDetails = new()
|
||
{
|
||
AudioTokens = 0,
|
||
CachedTokens = 0
|
||
},
|
||
CompletionTokensDetails = new()
|
||
{
|
||
AudioTokens = 0,
|
||
ReasoningTokens = 0,
|
||
AcceptedPredictionTokens = 0,
|
||
RejectedPredictionTokens = 0
|
||
}
|
||
}
|
||
};
|
||
|
||
return output;
|
||
}
|
||
} |