diff --git a/Yi.Abp.Net8/module/ai-hub/Yi.Framework.AiHub.Application.Contracts/Dtos/ModelGetListOutput.cs b/Yi.Abp.Net8/module/ai-hub/Yi.Framework.AiHub.Application.Contracts/Dtos/ModelGetListOutput.cs
index 786c8d98..93d59236 100644
--- a/Yi.Abp.Net8/module/ai-hub/Yi.Framework.AiHub.Application.Contracts/Dtos/ModelGetListOutput.cs
+++ b/Yi.Abp.Net8/module/ai-hub/Yi.Framework.AiHub.Application.Contracts/Dtos/ModelGetListOutput.cs
@@ -12,6 +12,12 @@ public class ModelGetListOutput
///
public string Category { get; set; }
+
+ ///
+ /// 模型id
+ ///
+ public string ModelId { get; set; }
+
///
/// 模型名称
///
diff --git a/Yi.Abp.Net8/module/ai-hub/Yi.Framework.AiHub.Application/Services/AiChatService.cs b/Yi.Abp.Net8/module/ai-hub/Yi.Framework.AiHub.Application/Services/AiChatService.cs
index 0634583c..278a4997 100644
--- a/Yi.Abp.Net8/module/ai-hub/Yi.Framework.AiHub.Application/Services/AiChatService.cs
+++ b/Yi.Abp.Net8/module/ai-hub/Yi.Framework.AiHub.Application/Services/AiChatService.cs
@@ -14,6 +14,7 @@ using Volo.Abp.Users;
using Yi.Framework.AiHub.Application.Contracts.Dtos;
using Yi.Framework.AiHub.Domain.Entities;
using Yi.Framework.AiHub.Domain.Entities.Model;
+using Yi.Framework.AiHub.Domain.Extensions;
using Yi.Framework.AiHub.Domain.Managers;
using Yi.Framework.AiHub.Domain.Shared.Dtos;
using Yi.Framework.Rbac.Application.Contracts.IServices;
@@ -28,23 +29,21 @@ namespace Yi.Framework.AiHub.Application.Services;
public class AiChatService : ApplicationService
{
private readonly IHttpContextAccessor _httpContextAccessor;
- private readonly AiMessageManager _aiMessageManager;
private readonly ISqlSugarRepository _aiModelRepository;
private readonly AiBlacklistManager _aiBlacklistManager;
- private readonly UsageStatisticsManager _usageStatisticsManager;
private readonly ILogger _logger;
+ private readonly AiGateWayManager _aiGateWayManager;
public AiChatService(IHttpContextAccessor httpContextAccessor,
- AiMessageManager aiMessageManager, AiBlacklistManager aiBlacklistManager,
- ISqlSugarRepository aiModelRepository, UsageStatisticsManager usageStatisticsManager,
- ILogger logger)
+ AiBlacklistManager aiBlacklistManager,
+ ISqlSugarRepository aiModelRepository,
+ ILogger logger, AiGateWayManager aiGateWayManager)
{
- this._httpContextAccessor = httpContextAccessor;
- _aiMessageManager = aiMessageManager;
+ _httpContextAccessor = httpContextAccessor;
_aiBlacklistManager = aiBlacklistManager;
_aiModelRepository = aiModelRepository;
- _usageStatisticsManager = usageStatisticsManager;
_logger = logger;
+ _aiGateWayManager = aiGateWayManager;
}
@@ -73,6 +72,7 @@ public class AiChatService : ApplicationService
{
Id = x.Id,
Category = "chat",
+ ModelId = x.ModelId,
ModelName = x.Name,
ModelDescribe = x.Description,
ModelPrice = 0,
@@ -101,7 +101,7 @@ public class AiChatService : ApplicationService
if (CurrentUser.IsAuthenticated)
{
await _aiBlacklistManager.VerifiyAiBlacklist(CurrentUser.GetId());
- if (!CurrentUser.Roles.Contains("YiXinAi-Vip") && CurrentUser.UserName != "cc")
+ if (!CurrentUser.IsAiVip())
{
throw new UserFriendlyException("该模型需要VIP用户才能使用,请购买VIP后重新登录重试");
}
@@ -112,19 +112,10 @@ public class AiChatService : ApplicationService
}
}
- //前面都是校验,后面才是真正的调用
- var httpContext = this._httpContextAccessor.HttpContext;
- var response = httpContext.Response;
- // 设置响应头,声明是 SSE 流
- response.ContentType = "text/event-stream";
- response.Headers.Append("Cache-Control", "no-cache");
- response.Headers.Append("Connection", "keep-alive");
-
-
var history = new List();
foreach (var aiChatContextDto in input.Messages)
{
- if (aiChatContextDto.Role == "system")
+ if (aiChatContextDto.Role == "assistant")
{
history.Add(ChatMessage.CreateAssistantMessage(aiChatContextDto.Content));
}
@@ -134,180 +125,8 @@ public class AiChatService : ApplicationService
}
}
- var gateWay = LazyServiceProvider.GetRequiredService();
- var completeChatResponse = gateWay.CompleteChatAsync(input.Model, history, cancellationToken);
- var tokenUsage = new TokenUsage();
- await using var writer = new StreamWriter(response.Body, Encoding.UTF8, leaveOpen: true);
-
-
- //缓存队列算法
- // 创建一个队列来缓存消息
- var messageQueue = new ConcurrentQueue();
-
- StringBuilder backupSystemContent = new StringBuilder();
- // 设置输出速率(例如每50毫秒输出一次)
- var outputInterval = TimeSpan.FromMilliseconds(100);
- // 标记是否完成接收
- var isComplete = false;
- // 启动一个后台任务来消费队列
- var outputTask = Task.Run(async () =>
- {
- while (!(isComplete && messageQueue.IsEmpty))
- {
- if (messageQueue.TryDequeue(out var message))
- {
- await writer.WriteLineAsync(message);
- await writer.FlushAsync(cancellationToken);
- }
-
- if (!isComplete)
- {
- // 如果没有完成,才等待,已完成,全部输出
- await Task.Delay(outputInterval, cancellationToken);
- }
- }
- }, cancellationToken);
-
-
- //IAsyncEnumerable 只能在最外层捕获异常(如果你有其他办法的话...)
- try
- {
- await foreach (var data in completeChatResponse)
- {
- if (data.IsFinish)
- {
- tokenUsage = data.TokenUsage;
- }
-
- var model = MapToMessage(input.Model, data.Content);
- var message = JsonConvert.SerializeObject(model, new JsonSerializerSettings
- {
- ContractResolver = new CamelCasePropertyNamesContractResolver()
- });
- backupSystemContent.Append(data.Content);
- // 将消息加入队列而不是直接写入
- messageQueue.Enqueue($"data: {message}\n");
- }
- }
- catch (Exception e)
- {
- _logger.LogError(e, $"Ai对话异常");
- var errorContent = $"Ai对话异常,异常信息:\n{e.Message}";
- var model = MapToMessage(input.Model, errorContent);
- var message = JsonConvert.SerializeObject(model, new JsonSerializerSettings
- {
- ContractResolver = new CamelCasePropertyNamesContractResolver()
- });
- backupSystemContent.Append(errorContent);
- messageQueue.Enqueue($"data: {message}\n");
- }
-
- //断开连接
- messageQueue.Enqueue("data: [DONE]\n");
- // 标记完成并发送结束标记
- isComplete = true;
-
- await outputTask;
- if (CurrentUser.IsAuthenticated && input.SessionId.HasValue)
- {
- await _aiMessageManager.CreateUserMessageAsync(CurrentUser.GetId(), input.SessionId.Value,
- new MessageInputDto
- {
- Content = input.Messages.LastOrDefault()
- .Content,
- ModelId = input.Model,
- TokenUsage = tokenUsage,
- });
-
- await _aiMessageManager.CreateSystemMessageAsync(CurrentUser.GetId(), input.SessionId.Value,
- new MessageInputDto
- {
- Content = backupSystemContent.ToString(),
- ModelId = input.Model,
- TokenUsage = tokenUsage
- });
-
- await _usageStatisticsManager.SetUsageAsync(CurrentUser.GetId(), input.Model, tokenUsage.InputTokenCount,
- tokenUsage.OutputTokenCount);
- }
- }
-
-
- private SendMessageOutputDto MapToMessage(string modelId, string content)
- {
- var output = new SendMessageOutputDto
- {
- Id = "chatcmpl-BotYP3BlN5T4g9YPnW0fBSBvKzXdd",
- Object = "chat.completion.chunk",
- Created = 1750336171,
- Model = modelId,
- Choices = new()
- {
- new Choice
- {
- Index = 0,
- Delta = new Delta
- {
- Content = content,
- Role = "assistant"
- },
- FinishReason = null,
- ContentFilterResults = new()
- {
- Hate = new()
- {
- Filtered = false,
- Detected = null
- },
- SelfHarm = new()
- {
- Filtered = false,
- Detected = null
- },
- Sexual = new()
- {
- Filtered = false,
- Detected = null
- },
- Violence = new()
- {
- Filtered = false,
- Detected = null
- },
- Jailbreak = new()
- {
- Filtered = false,
- Detected = false
- },
- Profanity = new()
- {
- Filtered = false,
- Detected = false
- },
- }
- }
- },
- SystemFingerprint = "",
- Usage = new Usage
- {
- PromptTokens = 0,
- CompletionTokens = 0,
- TotalTokens = 0,
- PromptTokensDetails = new()
- {
- AudioTokens = 0,
- CachedTokens = 0
- },
- CompletionTokensDetails = new()
- {
- AudioTokens = 0,
- ReasoningTokens = 0,
- AcceptedPredictionTokens = 0,
- RejectedPredictionTokens = 0
- }
- }
- };
-
- return output;
+ //ai网关代理httpcontext
+ await _aiGateWayManager.CompleteChatForHttpContextAsync(_httpContextAccessor.HttpContext, input.Model, history,
+ CurrentUser.Id, input.SessionId, cancellationToken);
}
}
\ No newline at end of file
diff --git a/Yi.Abp.Net8/module/ai-hub/Yi.Framework.AiHub.Application/Services/OpenApiService.cs b/Yi.Abp.Net8/module/ai-hub/Yi.Framework.AiHub.Application/Services/OpenApiService.cs
index f2b6e485..9c6c687c 100644
--- a/Yi.Abp.Net8/module/ai-hub/Yi.Framework.AiHub.Application/Services/OpenApiService.cs
+++ b/Yi.Abp.Net8/module/ai-hub/Yi.Framework.AiHub.Application/Services/OpenApiService.cs
@@ -1,20 +1,12 @@
-using System.Collections.Concurrent;
-using System.Text;
-using Microsoft.AspNetCore.Authorization;
-using Microsoft.AspNetCore.Http;
+using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
-using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
-using Newtonsoft.Json;
-using Newtonsoft.Json.Serialization;
using OpenAI.Chat;
+using SqlSugar;
using Volo.Abp.Application.Services;
-using Volo.Abp.Users;
-using Yi.Framework.AiHub.Application.Contracts.Dtos;
using Yi.Framework.AiHub.Application.Contracts.Dtos.OpenAiDto;
-using Yi.Framework.AiHub.Domain.Entities.OpenApi;
+using Yi.Framework.AiHub.Domain.Entities.Model;
using Yi.Framework.AiHub.Domain.Managers;
-using Yi.Framework.AiHub.Domain.Shared.Dtos;
using Yi.Framework.SqlSugarCore.Abstractions;
namespace Yi.Framework.AiHub.Application.Services;
@@ -24,17 +16,18 @@ public class OpenApiService : ApplicationService
private readonly IHttpContextAccessor _httpContextAccessor;
private readonly ILogger _logger;
private readonly TokenManager _tokenManager;
- private readonly AiMessageManager _aiMessageManager;
- private readonly UsageStatisticsManager _usageStatisticsManager;
+ private readonly AiGateWayManager _aiGateWayManager;
+ private readonly ISqlSugarRepository _aiModelRepository;
public OpenApiService(IHttpContextAccessor httpContextAccessor, ILogger logger,
- TokenManager tokenManager, AiMessageManager aiMessageManager, UsageStatisticsManager usageStatisticsManager)
+ TokenManager tokenManager, AiGateWayManager aiGateWayManager,
+ ISqlSugarRepository aiModelRepository)
{
_httpContextAccessor = httpContextAccessor;
_logger = logger;
_tokenManager = tokenManager;
- _aiMessageManager = aiMessageManager;
- _usageStatisticsManager = usageStatisticsManager;
+ _aiGateWayManager = aiGateWayManager;
+ _aiModelRepository = aiModelRepository;
}
///
@@ -49,15 +42,6 @@ public class OpenApiService : ApplicationService
var httpContext = this._httpContextAccessor.HttpContext;
var userId = await _tokenManager.GetUserIdAsync(GetTokenByHttpContext(httpContext));
-
-
- var response = httpContext.Response;
- // 设置响应头,声明是 SSE 流
- response.ContentType = "text/event-stream";
- response.Headers.Append("Cache-Control", "no-cache");
- response.Headers.Append("Connection", "keep-alive");
-
-
var history = new List();
foreach (var aiChatContextDto in input.Messages)
{
@@ -71,100 +55,9 @@ public class OpenApiService : ApplicationService
}
}
- var gateWay = LazyServiceProvider.GetRequiredService();
- var completeChatResponse = gateWay.CompleteChatAsync(input.Model, history, cancellationToken);
- var tokenUsage = new TokenUsage();
- await using var writer = new StreamWriter(response.Body, Encoding.UTF8, leaveOpen: true);
-
-
- //缓存队列算法
- // 创建一个队列来缓存消息
- var messageQueue = new ConcurrentQueue();
-
- StringBuilder backupSystemContent = new StringBuilder();
- // 设置输出速率(例如每50毫秒输出一次)
- var outputInterval = TimeSpan.FromMilliseconds(100);
- // 标记是否完成接收
- var isComplete = false;
- // 启动一个后台任务来消费队列
- var outputTask = Task.Run(async () =>
- {
- while (!(isComplete && messageQueue.IsEmpty))
- {
- if (messageQueue.TryDequeue(out var message))
- {
- await writer.WriteLineAsync(message);
- await writer.FlushAsync(cancellationToken);
- }
-
- if (!isComplete)
- {
- // 如果没有完成,才等待,已完成,全部输出
- await Task.Delay(outputInterval, cancellationToken);
- }
- }
- }, cancellationToken);
-
-
- //IAsyncEnumerable 只能在最外层捕获异常(如果你有其他办法的话...)
- try
- {
- await foreach (var data in completeChatResponse)
- {
- if (data.IsFinish)
- {
- tokenUsage = data.TokenUsage;
- }
-
- var model = MapToMessage(input.Model, data.Content);
- var message = JsonConvert.SerializeObject(model, new JsonSerializerSettings
- {
- ContractResolver = new CamelCasePropertyNamesContractResolver()
- });
- backupSystemContent.Append(data.Content);
- // 将消息加入队列而不是直接写入
- messageQueue.Enqueue($"data: {message}\n");
- }
- }
- catch (Exception e)
- {
- _logger.LogError(e, $"Ai对话异常");
- var errorContent = $"Ai对话异常,异常信息:\n{e.Message}";
- var model = MapToMessage(input.Model, errorContent);
- var message = JsonConvert.SerializeObject(model, new JsonSerializerSettings
- {
- ContractResolver = new CamelCasePropertyNamesContractResolver()
- });
- backupSystemContent.Append(errorContent);
- messageQueue.Enqueue($"data: {message}\n");
- }
-
- //断开连接
- messageQueue.Enqueue("data: [DONE]\n");
- // 标记完成并发送结束标记
- isComplete = true;
-
- await outputTask;
-
- await _aiMessageManager.CreateUserMessageAsync(userId, null,
- new MessageInputDto
- {
- Content = input.Messages.LastOrDefault()
- .Content,
- ModelId = input.Model,
- TokenUsage = tokenUsage,
- });
-
- await _aiMessageManager.CreateSystemMessageAsync(userId, null,
- new MessageInputDto
- {
- Content = backupSystemContent.ToString(),
- ModelId = input.Model,
- TokenUsage = tokenUsage
- });
-
- await _usageStatisticsManager.SetUsageAsync(userId, input.Model, tokenUsage.InputTokenCount,
- tokenUsage.OutputTokenCount);
+ //ai网关代理httpcontext
+ await _aiGateWayManager.CompleteChatForHttpContextAsync(_httpContextAccessor.HttpContext, input.Model, history,
+ userId, null, cancellationToken);
}
///
@@ -174,105 +67,20 @@ public class OpenApiService : ApplicationService
[HttpGet("openApi/v1/models")]
public async Task ModelsAsync()
{
+ var data = await _aiModelRepository._DbQueryable
+ .OrderByDescending(x => x.OrderNum)
+ .Select(x => new ModelDataOutput
+ {
+ ModelId = x.ModelId,
+ Object = "model",
+ Owned_by = "organization-owner",
+ Permission = new List()
+ }).ToListAsync();
+
return new ModelGetOutput()
{
- Data = new List()
- {
- new ModelDataOutput
- {
- ModelId = "gpt-4.1-mini",
- Object = "model",
- Owned_by = "organization-owner",
- Permission = new List()
- },
- new ModelDataOutput
- {
- ModelId = "gpt-4.1-nano",
- Object = "model",
- Owned_by = "organization-owner",
- Permission = new List()
- }
- }
+ Data = data
};
-
- }
-
- private SendMessageOutputDto MapToMessage(string modelId, string content)
- {
- var output = new SendMessageOutputDto
- {
- Id = "chatcmpl-BotYP3BlN5T4g9YPnW0fBSBvKzXdd",
- Object = "chat.completion.chunk",
- Created = 1750336171,
- Model = modelId,
- Choices = new()
- {
- new Choice
- {
- Index = 0,
- Delta = new Delta
- {
- Content = content,
- Role = "assistant"
- },
- FinishReason = null,
- ContentFilterResults = new()
- {
- Hate = new()
- {
- Filtered = false,
- Detected = null
- },
- SelfHarm = new()
- {
- Filtered = false,
- Detected = null
- },
- Sexual = new()
- {
- Filtered = false,
- Detected = null
- },
- Violence = new()
- {
- Filtered = false,
- Detected = null
- },
- Jailbreak = new()
- {
- Filtered = false,
- Detected = false
- },
- Profanity = new()
- {
- Filtered = false,
- Detected = false
- },
- }
- }
- },
- SystemFingerprint = "",
- Usage = new Usage
- {
- PromptTokens = 0,
- CompletionTokens = 0,
- TotalTokens = 0,
- PromptTokensDetails = new()
- {
- AudioTokens = 0,
- CachedTokens = 0
- },
- CompletionTokensDetails = new()
- {
- AudioTokens = 0,
- ReasoningTokens = 0,
- AcceptedPredictionTokens = 0,
- RejectedPredictionTokens = 0
- }
- }
- };
-
- return output;
}
private string? GetTokenByHttpContext(HttpContext httpContext)
diff --git a/Yi.Abp.Net8/module/ai-hub/Yi.Framework.AiHub.Application/Services/TokenService.cs b/Yi.Abp.Net8/module/ai-hub/Yi.Framework.AiHub.Application/Services/TokenService.cs
index fb26b6d9..e08cbc50 100644
--- a/Yi.Abp.Net8/module/ai-hub/Yi.Framework.AiHub.Application/Services/TokenService.cs
+++ b/Yi.Abp.Net8/module/ai-hub/Yi.Framework.AiHub.Application/Services/TokenService.cs
@@ -3,6 +3,7 @@ using Volo.Abp.Application.Services;
using Volo.Abp.Users;
using Yi.Framework.AiHub.Application.Contracts.Dtos.Token;
using Yi.Framework.AiHub.Domain.Entities.OpenApi;
+using Yi.Framework.AiHub.Domain.Extensions;
using Yi.Framework.AiHub.Domain.Managers;
using Yi.Framework.SqlSugarCore.Abstractions;
@@ -44,7 +45,7 @@ public class TokenService : ApplicationService
[Authorize]
public async Task CreateAsync()
{
- if (!CurrentUser.Roles.Contains("YiXinAi-Vip") && CurrentUser.UserName != "cc")
+ if (!CurrentUser.IsAiVip())
{
throw new UserFriendlyException("充值成为Vip,畅想第三方token服务");
}
diff --git a/Yi.Abp.Net8/module/ai-hub/Yi.Framework.AiHub.Domain/Extensions/CurrentExtensions.cs b/Yi.Abp.Net8/module/ai-hub/Yi.Framework.AiHub.Domain/Extensions/CurrentExtensions.cs
new file mode 100644
index 00000000..acab6162
--- /dev/null
+++ b/Yi.Abp.Net8/module/ai-hub/Yi.Framework.AiHub.Domain/Extensions/CurrentExtensions.cs
@@ -0,0 +1,12 @@
+using Microsoft.AspNetCore.Http;
+using Volo.Abp.Users;
+
+namespace Yi.Framework.AiHub.Domain.Extensions;
+
+public static class CurrentExtensions
+{
+ public static bool IsAiVip(this ICurrentUser currentUser)
+ {
+ return currentUser.Roles.Contains("YiXinAi-Vip") || currentUser.UserName == "cc";
+ }
+}
\ No newline at end of file
diff --git a/Yi.Abp.Net8/module/ai-hub/Yi.Framework.AiHub.Domain/Managers/AiGateWayManager.cs b/Yi.Abp.Net8/module/ai-hub/Yi.Framework.AiHub.Domain/Managers/AiGateWayManager.cs
index 8bba851a..c1b27bb7 100644
--- a/Yi.Abp.Net8/module/ai-hub/Yi.Framework.AiHub.Domain/Managers/AiGateWayManager.cs
+++ b/Yi.Abp.Net8/module/ai-hub/Yi.Framework.AiHub.Domain/Managers/AiGateWayManager.cs
@@ -1,7 +1,14 @@
+using System.Collections.Concurrent;
using System.Runtime.CompilerServices;
+using System.Text;
+using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Logging;
+using Newtonsoft.Json;
+using Newtonsoft.Json.Serialization;
using OpenAI.Chat;
using Volo.Abp.Domain.Services;
+using Yi.Framework.AiHub.Application.Contracts.Dtos;
using Yi.Framework.AiHub.Domain.AiChat;
using Yi.Framework.AiHub.Domain.Entities;
using Yi.Framework.AiHub.Domain.Entities.Model;
@@ -13,10 +20,17 @@ namespace Yi.Framework.AiHub.Domain.Managers;
public class AiGateWayManager : DomainService
{
private readonly ISqlSugarRepository _aiAppRepository;
+ private readonly ILogger _logger;
+ private readonly AiMessageManager _aiMessageManager;
+ private readonly UsageStatisticsManager _usageStatisticsManager;
- public AiGateWayManager(ISqlSugarRepository aiAppRepository)
+ public AiGateWayManager(ISqlSugarRepository aiAppRepository, ILogger logger,
+ AiMessageManager aiMessageManager, UsageStatisticsManager usageStatisticsManager)
{
_aiAppRepository = aiAppRepository;
+ _logger = logger;
+ _aiMessageManager = aiMessageManager;
+ _usageStatisticsManager = usageStatisticsManager;
}
///
@@ -68,4 +82,205 @@ public class AiGateWayManager : DomainService
yield return result;
}
}
+
+ ///
+ /// 聊天完成-缓存处理
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ public async Task CompleteChatForHttpContextAsync(
+ HttpContext httpContext,
+ string modelId,
+ List messages,
+ Guid? userId = null,
+ Guid? sessionId = null,
+ CancellationToken cancellationToken = default)
+ {
+ var response = httpContext.Response;
+ // 设置响应头,声明是 SSE 流
+ response.ContentType = "text/event-stream";
+ response.Headers.Append("Cache-Control", "no-cache");
+ response.Headers.Append("Connection", "keep-alive");
+
+
+ var gateWay = LazyServiceProvider.GetRequiredService();
+ var completeChatResponse = gateWay.CompleteChatAsync(modelId, messages, cancellationToken);
+ var tokenUsage = new TokenUsage();
+ await using var writer = new StreamWriter(response.Body, Encoding.UTF8, leaveOpen: true);
+
+ //缓存队列算法
+ // 创建一个队列来缓存消息
+ var messageQueue = new ConcurrentQueue();
+
+ StringBuilder backupSystemContent = new StringBuilder();
+ // 设置输出速率(例如每50毫秒输出一次)
+ var outputInterval = TimeSpan.FromMilliseconds(100);
+ // 标记是否完成接收
+ var isComplete = false;
+ // 启动一个后台任务来消费队列
+ var outputTask = Task.Run(async () =>
+ {
+ while (!(isComplete && messageQueue.IsEmpty))
+ {
+ if (messageQueue.TryDequeue(out var message))
+ {
+ await writer.WriteLineAsync(message);
+ await writer.FlushAsync(cancellationToken);
+ }
+
+ if (!isComplete)
+ {
+ // 如果没有完成,才等待,已完成,全部输出
+ await Task.Delay(outputInterval, cancellationToken);
+ }
+ }
+ }, cancellationToken);
+
+
+ //IAsyncEnumerable 只能在最外层捕获异常(如果你有其他办法的话...)
+ try
+ {
+ await foreach (var data in completeChatResponse)
+ {
+ if (data.IsFinish)
+ {
+ tokenUsage = data.TokenUsage;
+ }
+
+ var model = MapToMessage(modelId, data.Content);
+ var message = JsonConvert.SerializeObject(model, new JsonSerializerSettings
+ {
+ ContractResolver = new CamelCasePropertyNamesContractResolver()
+ });
+ backupSystemContent.Append(data.Content);
+ // 将消息加入队列而不是直接写入
+ messageQueue.Enqueue($"data: {message}\n");
+ }
+ }
+ catch (Exception e)
+ {
+ _logger.LogError(e, $"Ai对话异常");
+ var errorContent = $"Ai对话异常,异常信息:\n{e.Message}";
+ var model = MapToMessage(modelId, errorContent);
+ var message = JsonConvert.SerializeObject(model, new JsonSerializerSettings
+ {
+ ContractResolver = new CamelCasePropertyNamesContractResolver()
+ });
+ backupSystemContent.Append(errorContent);
+ messageQueue.Enqueue($"data: {message}\n");
+ }
+
+ //断开连接
+ messageQueue.Enqueue("data: [DONE]\n");
+ // 标记完成并发送结束标记
+ isComplete = true;
+
+ await outputTask;
+
+ if (userId is not null)
+ {
+ await _aiMessageManager.CreateUserMessageAsync(userId.Value, sessionId,
+ new MessageInputDto
+ {
+ Content = messages.LastOrDefault().Content.FirstOrDefault()?.Text ?? string.Empty,
+ ModelId = modelId,
+ TokenUsage = tokenUsage,
+ });
+
+ await _aiMessageManager.CreateSystemMessageAsync(userId.Value, sessionId,
+ new MessageInputDto
+ {
+ Content = backupSystemContent.ToString(),
+ ModelId = modelId,
+ TokenUsage = tokenUsage
+ });
+
+ await _usageStatisticsManager.SetUsageAsync(userId.Value, modelId, tokenUsage.InputTokenCount,
+ tokenUsage.OutputTokenCount);
+ }
+ }
+
+
+ private SendMessageOutputDto MapToMessage(string modelId, string content)
+ {
+ var output = new SendMessageOutputDto
+ {
+ Id = "chatcmpl-BotYP3BlN5T4g9YPnW0fBSBvKzXdd",
+ Object = "chat.completion.chunk",
+ Created = 1750336171,
+ Model = modelId,
+ Choices = new()
+ {
+ new Choice
+ {
+ Index = 0,
+ Delta = new Delta
+ {
+ Content = content,
+ Role = "assistant"
+ },
+ FinishReason = null,
+ ContentFilterResults = new()
+ {
+ Hate = new()
+ {
+ Filtered = false,
+ Detected = null
+ },
+ SelfHarm = new()
+ {
+ Filtered = false,
+ Detected = null
+ },
+ Sexual = new()
+ {
+ Filtered = false,
+ Detected = null
+ },
+ Violence = new()
+ {
+ Filtered = false,
+ Detected = null
+ },
+ Jailbreak = new()
+ {
+ Filtered = false,
+ Detected = false
+ },
+ Profanity = new()
+ {
+ Filtered = false,
+ Detected = false
+ },
+ }
+ }
+ },
+ SystemFingerprint = "",
+ Usage = new Usage
+ {
+ PromptTokens = 0,
+ CompletionTokens = 0,
+ TotalTokens = 0,
+ PromptTokensDetails = new()
+ {
+ AudioTokens = 0,
+ CachedTokens = 0
+ },
+ CompletionTokensDetails = new()
+ {
+ AudioTokens = 0,
+ ReasoningTokens = 0,
+ AcceptedPredictionTokens = 0,
+ RejectedPredictionTokens = 0
+ }
+ }
+ };
+
+ return output;
+ }
}
\ No newline at end of file