feat: 优化整体aihub架构

This commit is contained in:
ccnetcore
2025-07-05 15:11:56 +08:00
parent 0af2f867fc
commit 52961b459e
6 changed files with 272 additions and 411 deletions

View File

@@ -12,6 +12,12 @@ public class ModelGetListOutput
/// </summary>
public string Category { get; set; }
/// <summary>
/// 模型id
/// </summary>
public string ModelId { get; set; }
/// <summary>
/// 模型名称
/// </summary>

View File

@@ -14,6 +14,7 @@ using Volo.Abp.Users;
using Yi.Framework.AiHub.Application.Contracts.Dtos;
using Yi.Framework.AiHub.Domain.Entities;
using Yi.Framework.AiHub.Domain.Entities.Model;
using Yi.Framework.AiHub.Domain.Extensions;
using Yi.Framework.AiHub.Domain.Managers;
using Yi.Framework.AiHub.Domain.Shared.Dtos;
using Yi.Framework.Rbac.Application.Contracts.IServices;
@@ -28,23 +29,21 @@ namespace Yi.Framework.AiHub.Application.Services;
public class AiChatService : ApplicationService
{
private readonly IHttpContextAccessor _httpContextAccessor;
private readonly AiMessageManager _aiMessageManager;
private readonly ISqlSugarRepository<AiModelEntity> _aiModelRepository;
private readonly AiBlacklistManager _aiBlacklistManager;
private readonly UsageStatisticsManager _usageStatisticsManager;
private readonly ILogger<AiChatService> _logger;
private readonly AiGateWayManager _aiGateWayManager;
public AiChatService(IHttpContextAccessor httpContextAccessor,
AiMessageManager aiMessageManager, AiBlacklistManager aiBlacklistManager,
ISqlSugarRepository<AiModelEntity> aiModelRepository, UsageStatisticsManager usageStatisticsManager,
ILogger<AiChatService> logger)
AiBlacklistManager aiBlacklistManager,
ISqlSugarRepository<AiModelEntity> aiModelRepository,
ILogger<AiChatService> logger, AiGateWayManager aiGateWayManager)
{
this._httpContextAccessor = httpContextAccessor;
_aiMessageManager = aiMessageManager;
_httpContextAccessor = httpContextAccessor;
_aiBlacklistManager = aiBlacklistManager;
_aiModelRepository = aiModelRepository;
_usageStatisticsManager = usageStatisticsManager;
_logger = logger;
_aiGateWayManager = aiGateWayManager;
}
@@ -73,6 +72,7 @@ public class AiChatService : ApplicationService
{
Id = x.Id,
Category = "chat",
ModelId = x.ModelId,
ModelName = x.Name,
ModelDescribe = x.Description,
ModelPrice = 0,
@@ -101,7 +101,7 @@ public class AiChatService : ApplicationService
if (CurrentUser.IsAuthenticated)
{
await _aiBlacklistManager.VerifiyAiBlacklist(CurrentUser.GetId());
if (!CurrentUser.Roles.Contains("YiXinAi-Vip") && CurrentUser.UserName != "cc")
if (!CurrentUser.IsAiVip())
{
throw new UserFriendlyException("该模型需要VIP用户才能使用请购买VIP后重新登录重试");
}
@@ -112,19 +112,10 @@ public class AiChatService : ApplicationService
}
}
//前面都是校验,后面才是真正的调用
var httpContext = this._httpContextAccessor.HttpContext;
var response = httpContext.Response;
// 设置响应头,声明是 SSE 流
response.ContentType = "text/event-stream";
response.Headers.Append("Cache-Control", "no-cache");
response.Headers.Append("Connection", "keep-alive");
var history = new List<ChatMessage>();
foreach (var aiChatContextDto in input.Messages)
{
if (aiChatContextDto.Role == "system")
if (aiChatContextDto.Role == "assistant")
{
history.Add(ChatMessage.CreateAssistantMessage(aiChatContextDto.Content));
}
@@ -134,180 +125,8 @@ public class AiChatService : ApplicationService
}
}
var gateWay = LazyServiceProvider.GetRequiredService<AiGateWayManager>();
var completeChatResponse = gateWay.CompleteChatAsync(input.Model, history, cancellationToken);
var tokenUsage = new TokenUsage();
await using var writer = new StreamWriter(response.Body, Encoding.UTF8, leaveOpen: true);
//缓存队列算法
// 创建一个队列来缓存消息
var messageQueue = new ConcurrentQueue<string>();
StringBuilder backupSystemContent = new StringBuilder();
// 设置输出速率例如每50毫秒输出一次
var outputInterval = TimeSpan.FromMilliseconds(100);
// 标记是否完成接收
var isComplete = false;
// 启动一个后台任务来消费队列
var outputTask = Task.Run(async () =>
{
while (!(isComplete && messageQueue.IsEmpty))
{
if (messageQueue.TryDequeue(out var message))
{
await writer.WriteLineAsync(message);
await writer.FlushAsync(cancellationToken);
}
if (!isComplete)
{
// 如果没有完成,才等待,已完成,全部输出
await Task.Delay(outputInterval, cancellationToken);
}
}
}, cancellationToken);
//IAsyncEnumerable 只能在最外层捕获异常(如果你有其他办法的话...
try
{
await foreach (var data in completeChatResponse)
{
if (data.IsFinish)
{
tokenUsage = data.TokenUsage;
}
var model = MapToMessage(input.Model, data.Content);
var message = JsonConvert.SerializeObject(model, new JsonSerializerSettings
{
ContractResolver = new CamelCasePropertyNamesContractResolver()
});
backupSystemContent.Append(data.Content);
// 将消息加入队列而不是直接写入
messageQueue.Enqueue($"data: {message}\n");
}
}
catch (Exception e)
{
_logger.LogError(e, $"Ai对话异常");
var errorContent = $"Ai对话异常异常信息\n{e.Message}";
var model = MapToMessage(input.Model, errorContent);
var message = JsonConvert.SerializeObject(model, new JsonSerializerSettings
{
ContractResolver = new CamelCasePropertyNamesContractResolver()
});
backupSystemContent.Append(errorContent);
messageQueue.Enqueue($"data: {message}\n");
}
//断开连接
messageQueue.Enqueue("data: [DONE]\n");
// 标记完成并发送结束标记
isComplete = true;
await outputTask;
if (CurrentUser.IsAuthenticated && input.SessionId.HasValue)
{
await _aiMessageManager.CreateUserMessageAsync(CurrentUser.GetId(), input.SessionId.Value,
new MessageInputDto
{
Content = input.Messages.LastOrDefault()
.Content,
ModelId = input.Model,
TokenUsage = tokenUsage,
});
await _aiMessageManager.CreateSystemMessageAsync(CurrentUser.GetId(), input.SessionId.Value,
new MessageInputDto
{
Content = backupSystemContent.ToString(),
ModelId = input.Model,
TokenUsage = tokenUsage
});
await _usageStatisticsManager.SetUsageAsync(CurrentUser.GetId(), input.Model, tokenUsage.InputTokenCount,
tokenUsage.OutputTokenCount);
}
}
private SendMessageOutputDto MapToMessage(string modelId, string content)
{
var output = new SendMessageOutputDto
{
Id = "chatcmpl-BotYP3BlN5T4g9YPnW0fBSBvKzXdd",
Object = "chat.completion.chunk",
Created = 1750336171,
Model = modelId,
Choices = new()
{
new Choice
{
Index = 0,
Delta = new Delta
{
Content = content,
Role = "assistant"
},
FinishReason = null,
ContentFilterResults = new()
{
Hate = new()
{
Filtered = false,
Detected = null
},
SelfHarm = new()
{
Filtered = false,
Detected = null
},
Sexual = new()
{
Filtered = false,
Detected = null
},
Violence = new()
{
Filtered = false,
Detected = null
},
Jailbreak = new()
{
Filtered = false,
Detected = false
},
Profanity = new()
{
Filtered = false,
Detected = false
},
}
}
},
SystemFingerprint = "",
Usage = new Usage
{
PromptTokens = 0,
CompletionTokens = 0,
TotalTokens = 0,
PromptTokensDetails = new()
{
AudioTokens = 0,
CachedTokens = 0
},
CompletionTokensDetails = new()
{
AudioTokens = 0,
ReasoningTokens = 0,
AcceptedPredictionTokens = 0,
RejectedPredictionTokens = 0
}
}
};
return output;
//ai网关代理httpcontext
await _aiGateWayManager.CompleteChatForHttpContextAsync(_httpContextAccessor.HttpContext, input.Model, history,
CurrentUser.Id, input.SessionId, cancellationToken);
}
}

View File

@@ -1,20 +1,12 @@
using System.Collections.Concurrent;
using System.Text;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Newtonsoft.Json.Serialization;
using OpenAI.Chat;
using SqlSugar;
using Volo.Abp.Application.Services;
using Volo.Abp.Users;
using Yi.Framework.AiHub.Application.Contracts.Dtos;
using Yi.Framework.AiHub.Application.Contracts.Dtos.OpenAiDto;
using Yi.Framework.AiHub.Domain.Entities.OpenApi;
using Yi.Framework.AiHub.Domain.Entities.Model;
using Yi.Framework.AiHub.Domain.Managers;
using Yi.Framework.AiHub.Domain.Shared.Dtos;
using Yi.Framework.SqlSugarCore.Abstractions;
namespace Yi.Framework.AiHub.Application.Services;
@@ -24,17 +16,18 @@ public class OpenApiService : ApplicationService
private readonly IHttpContextAccessor _httpContextAccessor;
private readonly ILogger<OpenApiService> _logger;
private readonly TokenManager _tokenManager;
private readonly AiMessageManager _aiMessageManager;
private readonly UsageStatisticsManager _usageStatisticsManager;
private readonly AiGateWayManager _aiGateWayManager;
private readonly ISqlSugarRepository<AiModelEntity> _aiModelRepository;
public OpenApiService(IHttpContextAccessor httpContextAccessor, ILogger<OpenApiService> logger,
TokenManager tokenManager, AiMessageManager aiMessageManager, UsageStatisticsManager usageStatisticsManager)
TokenManager tokenManager, AiGateWayManager aiGateWayManager,
ISqlSugarRepository<AiModelEntity> aiModelRepository)
{
_httpContextAccessor = httpContextAccessor;
_logger = logger;
_tokenManager = tokenManager;
_aiMessageManager = aiMessageManager;
_usageStatisticsManager = usageStatisticsManager;
_aiGateWayManager = aiGateWayManager;
_aiModelRepository = aiModelRepository;
}
/// <summary>
@@ -49,15 +42,6 @@ public class OpenApiService : ApplicationService
var httpContext = this._httpContextAccessor.HttpContext;
var userId = await _tokenManager.GetUserIdAsync(GetTokenByHttpContext(httpContext));
var response = httpContext.Response;
// 设置响应头,声明是 SSE 流
response.ContentType = "text/event-stream";
response.Headers.Append("Cache-Control", "no-cache");
response.Headers.Append("Connection", "keep-alive");
var history = new List<ChatMessage>();
foreach (var aiChatContextDto in input.Messages)
{
@@ -71,100 +55,9 @@ public class OpenApiService : ApplicationService
}
}
var gateWay = LazyServiceProvider.GetRequiredService<AiGateWayManager>();
var completeChatResponse = gateWay.CompleteChatAsync(input.Model, history, cancellationToken);
var tokenUsage = new TokenUsage();
await using var writer = new StreamWriter(response.Body, Encoding.UTF8, leaveOpen: true);
//缓存队列算法
// 创建一个队列来缓存消息
var messageQueue = new ConcurrentQueue<string>();
StringBuilder backupSystemContent = new StringBuilder();
// 设置输出速率例如每50毫秒输出一次
var outputInterval = TimeSpan.FromMilliseconds(100);
// 标记是否完成接收
var isComplete = false;
// 启动一个后台任务来消费队列
var outputTask = Task.Run(async () =>
{
while (!(isComplete && messageQueue.IsEmpty))
{
if (messageQueue.TryDequeue(out var message))
{
await writer.WriteLineAsync(message);
await writer.FlushAsync(cancellationToken);
}
if (!isComplete)
{
// 如果没有完成,才等待,已完成,全部输出
await Task.Delay(outputInterval, cancellationToken);
}
}
}, cancellationToken);
//IAsyncEnumerable 只能在最外层捕获异常(如果你有其他办法的话...
try
{
await foreach (var data in completeChatResponse)
{
if (data.IsFinish)
{
tokenUsage = data.TokenUsage;
}
var model = MapToMessage(input.Model, data.Content);
var message = JsonConvert.SerializeObject(model, new JsonSerializerSettings
{
ContractResolver = new CamelCasePropertyNamesContractResolver()
});
backupSystemContent.Append(data.Content);
// 将消息加入队列而不是直接写入
messageQueue.Enqueue($"data: {message}\n");
}
}
catch (Exception e)
{
_logger.LogError(e, $"Ai对话异常");
var errorContent = $"Ai对话异常异常信息\n{e.Message}";
var model = MapToMessage(input.Model, errorContent);
var message = JsonConvert.SerializeObject(model, new JsonSerializerSettings
{
ContractResolver = new CamelCasePropertyNamesContractResolver()
});
backupSystemContent.Append(errorContent);
messageQueue.Enqueue($"data: {message}\n");
}
//断开连接
messageQueue.Enqueue("data: [DONE]\n");
// 标记完成并发送结束标记
isComplete = true;
await outputTask;
await _aiMessageManager.CreateUserMessageAsync(userId, null,
new MessageInputDto
{
Content = input.Messages.LastOrDefault()
.Content,
ModelId = input.Model,
TokenUsage = tokenUsage,
});
await _aiMessageManager.CreateSystemMessageAsync(userId, null,
new MessageInputDto
{
Content = backupSystemContent.ToString(),
ModelId = input.Model,
TokenUsage = tokenUsage
});
await _usageStatisticsManager.SetUsageAsync(userId, input.Model, tokenUsage.InputTokenCount,
tokenUsage.OutputTokenCount);
//ai网关代理httpcontext
await _aiGateWayManager.CompleteChatForHttpContextAsync(_httpContextAccessor.HttpContext, input.Model, history,
userId, null, cancellationToken);
}
/// <summary>
@@ -174,105 +67,20 @@ public class OpenApiService : ApplicationService
[HttpGet("openApi/v1/models")]
public async Task<ModelGetOutput> ModelsAsync()
{
var data = await _aiModelRepository._DbQueryable
.OrderByDescending(x => x.OrderNum)
.Select(x => new ModelDataOutput
{
ModelId = x.ModelId,
Object = "model",
Owned_by = "organization-owner",
Permission = new List<string>()
}).ToListAsync();
return new ModelGetOutput()
{
Data = new List<ModelDataOutput>()
{
new ModelDataOutput
{
ModelId = "gpt-4.1-mini",
Object = "model",
Owned_by = "organization-owner",
Permission = new List<string>()
},
new ModelDataOutput
{
ModelId = "gpt-4.1-nano",
Object = "model",
Owned_by = "organization-owner",
Permission = new List<string>()
}
}
Data = data
};
}
private SendMessageOutputDto MapToMessage(string modelId, string content)
{
var output = new SendMessageOutputDto
{
Id = "chatcmpl-BotYP3BlN5T4g9YPnW0fBSBvKzXdd",
Object = "chat.completion.chunk",
Created = 1750336171,
Model = modelId,
Choices = new()
{
new Choice
{
Index = 0,
Delta = new Delta
{
Content = content,
Role = "assistant"
},
FinishReason = null,
ContentFilterResults = new()
{
Hate = new()
{
Filtered = false,
Detected = null
},
SelfHarm = new()
{
Filtered = false,
Detected = null
},
Sexual = new()
{
Filtered = false,
Detected = null
},
Violence = new()
{
Filtered = false,
Detected = null
},
Jailbreak = new()
{
Filtered = false,
Detected = false
},
Profanity = new()
{
Filtered = false,
Detected = false
},
}
}
},
SystemFingerprint = "",
Usage = new Usage
{
PromptTokens = 0,
CompletionTokens = 0,
TotalTokens = 0,
PromptTokensDetails = new()
{
AudioTokens = 0,
CachedTokens = 0
},
CompletionTokensDetails = new()
{
AudioTokens = 0,
ReasoningTokens = 0,
AcceptedPredictionTokens = 0,
RejectedPredictionTokens = 0
}
}
};
return output;
}
private string? GetTokenByHttpContext(HttpContext httpContext)

View File

@@ -3,6 +3,7 @@ using Volo.Abp.Application.Services;
using Volo.Abp.Users;
using Yi.Framework.AiHub.Application.Contracts.Dtos.Token;
using Yi.Framework.AiHub.Domain.Entities.OpenApi;
using Yi.Framework.AiHub.Domain.Extensions;
using Yi.Framework.AiHub.Domain.Managers;
using Yi.Framework.SqlSugarCore.Abstractions;
@@ -44,7 +45,7 @@ public class TokenService : ApplicationService
[Authorize]
public async Task CreateAsync()
{
if (!CurrentUser.Roles.Contains("YiXinAi-Vip") && CurrentUser.UserName != "cc")
if (!CurrentUser.IsAiVip())
{
throw new UserFriendlyException("充值成为Vip畅想第三方token服务");
}

View File

@@ -0,0 +1,12 @@
using Microsoft.AspNetCore.Http;
using Volo.Abp.Users;
namespace Yi.Framework.AiHub.Domain.Extensions;
public static class CurrentExtensions
{
public static bool IsAiVip(this ICurrentUser currentUser)
{
return currentUser.Roles.Contains("YiXinAi-Vip") || currentUser.UserName == "cc";
}
}

View File

@@ -1,7 +1,14 @@
using System.Collections.Concurrent;
using System.Runtime.CompilerServices;
using System.Text;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Newtonsoft.Json.Serialization;
using OpenAI.Chat;
using Volo.Abp.Domain.Services;
using Yi.Framework.AiHub.Application.Contracts.Dtos;
using Yi.Framework.AiHub.Domain.AiChat;
using Yi.Framework.AiHub.Domain.Entities;
using Yi.Framework.AiHub.Domain.Entities.Model;
@@ -13,10 +20,17 @@ namespace Yi.Framework.AiHub.Domain.Managers;
public class AiGateWayManager : DomainService
{
private readonly ISqlSugarRepository<AiAppAggregateRoot> _aiAppRepository;
private readonly ILogger<AiGateWayManager> _logger;
private readonly AiMessageManager _aiMessageManager;
private readonly UsageStatisticsManager _usageStatisticsManager;
public AiGateWayManager(ISqlSugarRepository<AiAppAggregateRoot> aiAppRepository)
public AiGateWayManager(ISqlSugarRepository<AiAppAggregateRoot> aiAppRepository, ILogger<AiGateWayManager> logger,
AiMessageManager aiMessageManager, UsageStatisticsManager usageStatisticsManager)
{
_aiAppRepository = aiAppRepository;
_logger = logger;
_aiMessageManager = aiMessageManager;
_usageStatisticsManager = usageStatisticsManager;
}
/// <summary>
@@ -68,4 +82,205 @@ public class AiGateWayManager : DomainService
yield return result;
}
}
/// <summary>
/// 聊天完成-缓存处理
/// </summary>
/// <param name="httpContext"></param>
/// <param name="modelId"></param>
/// <param name="messages"></param>
/// <param name="sessionId"></param>
/// <param name="cancellationToken"></param>
/// <param name="userId"></param>
/// <returns></returns>
public async Task CompleteChatForHttpContextAsync(
HttpContext httpContext,
string modelId,
List<ChatMessage> messages,
Guid? userId = null,
Guid? sessionId = null,
CancellationToken cancellationToken = default)
{
var response = httpContext.Response;
// 设置响应头,声明是 SSE 流
response.ContentType = "text/event-stream";
response.Headers.Append("Cache-Control", "no-cache");
response.Headers.Append("Connection", "keep-alive");
var gateWay = LazyServiceProvider.GetRequiredService<AiGateWayManager>();
var completeChatResponse = gateWay.CompleteChatAsync(modelId, messages, cancellationToken);
var tokenUsage = new TokenUsage();
await using var writer = new StreamWriter(response.Body, Encoding.UTF8, leaveOpen: true);
//缓存队列算法
// 创建一个队列来缓存消息
var messageQueue = new ConcurrentQueue<string>();
StringBuilder backupSystemContent = new StringBuilder();
// 设置输出速率例如每50毫秒输出一次
var outputInterval = TimeSpan.FromMilliseconds(100);
// 标记是否完成接收
var isComplete = false;
// 启动一个后台任务来消费队列
var outputTask = Task.Run(async () =>
{
while (!(isComplete && messageQueue.IsEmpty))
{
if (messageQueue.TryDequeue(out var message))
{
await writer.WriteLineAsync(message);
await writer.FlushAsync(cancellationToken);
}
if (!isComplete)
{
// 如果没有完成,才等待,已完成,全部输出
await Task.Delay(outputInterval, cancellationToken);
}
}
}, cancellationToken);
//IAsyncEnumerable 只能在最外层捕获异常(如果你有其他办法的话...
try
{
await foreach (var data in completeChatResponse)
{
if (data.IsFinish)
{
tokenUsage = data.TokenUsage;
}
var model = MapToMessage(modelId, data.Content);
var message = JsonConvert.SerializeObject(model, new JsonSerializerSettings
{
ContractResolver = new CamelCasePropertyNamesContractResolver()
});
backupSystemContent.Append(data.Content);
// 将消息加入队列而不是直接写入
messageQueue.Enqueue($"data: {message}\n");
}
}
catch (Exception e)
{
_logger.LogError(e, $"Ai对话异常");
var errorContent = $"Ai对话异常异常信息\n{e.Message}";
var model = MapToMessage(modelId, errorContent);
var message = JsonConvert.SerializeObject(model, new JsonSerializerSettings
{
ContractResolver = new CamelCasePropertyNamesContractResolver()
});
backupSystemContent.Append(errorContent);
messageQueue.Enqueue($"data: {message}\n");
}
//断开连接
messageQueue.Enqueue("data: [DONE]\n");
// 标记完成并发送结束标记
isComplete = true;
await outputTask;
if (userId is not null)
{
await _aiMessageManager.CreateUserMessageAsync(userId.Value, sessionId,
new MessageInputDto
{
Content = messages.LastOrDefault().Content.FirstOrDefault()?.Text ?? string.Empty,
ModelId = modelId,
TokenUsage = tokenUsage,
});
await _aiMessageManager.CreateSystemMessageAsync(userId.Value, sessionId,
new MessageInputDto
{
Content = backupSystemContent.ToString(),
ModelId = modelId,
TokenUsage = tokenUsage
});
await _usageStatisticsManager.SetUsageAsync(userId.Value, modelId, tokenUsage.InputTokenCount,
tokenUsage.OutputTokenCount);
}
}
private SendMessageOutputDto MapToMessage(string modelId, string content)
{
var output = new SendMessageOutputDto
{
Id = "chatcmpl-BotYP3BlN5T4g9YPnW0fBSBvKzXdd",
Object = "chat.completion.chunk",
Created = 1750336171,
Model = modelId,
Choices = new()
{
new Choice
{
Index = 0,
Delta = new Delta
{
Content = content,
Role = "assistant"
},
FinishReason = null,
ContentFilterResults = new()
{
Hate = new()
{
Filtered = false,
Detected = null
},
SelfHarm = new()
{
Filtered = false,
Detected = null
},
Sexual = new()
{
Filtered = false,
Detected = null
},
Violence = new()
{
Filtered = false,
Detected = null
},
Jailbreak = new()
{
Filtered = false,
Detected = false
},
Profanity = new()
{
Filtered = false,
Detected = false
},
}
}
},
SystemFingerprint = "",
Usage = new Usage
{
PromptTokens = 0,
CompletionTokens = 0,
TotalTokens = 0,
PromptTokensDetails = new()
{
AudioTokens = 0,
CachedTokens = 0
},
CompletionTokensDetails = new()
{
AudioTokens = 0,
ReasoningTokens = 0,
AcceptedPredictionTokens = 0,
RejectedPredictionTokens = 0
}
}
};
return output;
}
}