feat: 消息创建返回ID并在流式响应中下发

- 消息管理器创建用户/系统消息时返回 MessageId
- 网关在流式响应中新增消息创建事件,返回 MessageId 与创建时间
- 统一在消息创建完成后发送 [DONE] 标识,优化流式结束时机
This commit is contained in:
ccnetcore
2026-02-01 13:02:06 +08:00
parent 11cbb1b612
commit 6d54c650f0
3 changed files with 79 additions and 10 deletions

View File

@@ -0,0 +1,47 @@
using System.Reflection;
using System.Text.Json.Serialization;
namespace Yi.Framework.AiHub.Application.Contracts.Dtos.Chat;
/// <summary>
/// 消息创建结果输出
/// </summary>
public class MessageCreatedOutput
{
/// <summary>
/// 消息类型
/// </summary>
[JsonIgnore]
public ChatMessageTypeEnum TypeEnum { get; set; }
/// <summary>
/// 消息类型
/// </summary>
public string Type => TypeEnum.ToString();
/// <summary>
/// 消息ID
/// </summary>
public Guid MessageId { get; set; }
/// <summary>
/// 消息创建时间
/// </summary>
public DateTime CreationTime { get; set; }
}
/// <summary>
/// 消息类型枚举
/// </summary>
public enum ChatMessageTypeEnum
{
/// <summary>
/// 用户消息
/// </summary>
UserMessage,
/// <summary>
/// 系统消息
/// </summary>
SystemMessage
}

View File

@@ -23,6 +23,7 @@ using Yi.Framework.AiHub.Domain.Shared.Dtos.OpenAi.Embeddings;
using Yi.Framework.AiHub.Domain.Shared.Dtos.OpenAi.Images; using Yi.Framework.AiHub.Domain.Shared.Dtos.OpenAi.Images;
using Yi.Framework.AiHub.Domain.Shared.Dtos.OpenAi.Responses; using Yi.Framework.AiHub.Domain.Shared.Dtos.OpenAi.Responses;
using Yi.Framework.AiHub.Domain.Shared.Enums; using Yi.Framework.AiHub.Domain.Shared.Enums;
using Yi.Framework.AiHub.Application.Contracts.Dtos.Chat;
using Yi.Framework.AiHub.Domain.Shared.Extensions; using Yi.Framework.AiHub.Domain.Shared.Extensions;
using Yi.Framework.Core.Extensions; using Yi.Framework.Core.Extensions;
using Yi.Framework.SqlSugarCore.Abstractions; using Yi.Framework.SqlSugarCore.Abstractions;
@@ -1207,12 +1208,9 @@ public class AiGateWayManager : DomainService
throw new UserFriendlyException($"不支持的API类型: {apiType}"); throw new UserFriendlyException($"不支持的API类型: {apiType}");
} }
// 标记完成并等待消费任务结束
isComplete = true;
await outputTask;
// 统一的统计处理 // 统一的统计处理
await _aiMessageManager.CreateUserMessageAsync(userId, sessionId, var userMessageId = await _aiMessageManager.CreateUserMessageAsync(userId, sessionId,
new MessageInputDto new MessageInputDto
{ {
Content = sessionId is null ? "不予存储" : processResult?.UserContent ?? string.Empty, Content = sessionId is null ? "不予存储" : processResult?.UserContent ?? string.Empty,
@@ -1220,7 +1218,7 @@ public class AiGateWayManager : DomainService
TokenUsage = processResult?.TokenUsage, TokenUsage = processResult?.TokenUsage,
}, tokenId,createTime:startTime); }, tokenId,createTime:startTime);
await _aiMessageManager.CreateSystemMessageAsync(userId, sessionId, var systemMessageId = await _aiMessageManager.CreateSystemMessageAsync(userId, sessionId,
new MessageInputDto new MessageInputDto
{ {
Content = sessionId is null ? "不予存储" : processResult?.SystemContent ?? string.Empty, Content = sessionId is null ? "不予存储" : processResult?.SystemContent ?? string.Empty,
@@ -1228,6 +1226,29 @@ public class AiGateWayManager : DomainService
TokenUsage = processResult?.TokenUsage TokenUsage = processResult?.TokenUsage
}, tokenId); }, tokenId);
// 流式返回消息ID
var now = DateTime.Now;
var userMessageOutput = new MessageCreatedOutput
{
TypeEnum = ChatMessageTypeEnum.UserMessage,
MessageId = userMessageId,
CreationTime = startTime
};
messageQueue.Enqueue($"data: {JsonSerializer.Serialize(userMessageOutput, ThorJsonSerializer.DefaultOptions)}\n\n");
var systemMessageOutput = new MessageCreatedOutput
{
TypeEnum = ChatMessageTypeEnum.SystemMessage,
MessageId = systemMessageId,
CreationTime = now
};
messageQueue.Enqueue($"data: {JsonSerializer.Serialize(systemMessageOutput, ThorJsonSerializer.DefaultOptions)}\n\n");
// 标记完成并等待消费任务结束
messageQueue.Enqueue("data: [DONE]\n\n");
isComplete = true;
await outputTask;
await _usageStatisticsManager.SetUsageAsync(userId, sourceModelId, processResult?.TokenUsage, tokenId); await _usageStatisticsManager.SetUsageAsync(userId, sourceModelId, processResult?.TokenUsage, tokenId);
// 扣减尊享token包用量 // 扣减尊享token包用量
@@ -1325,8 +1346,7 @@ public class AiGateWayManager : DomainService
}); });
messageQueue.Enqueue($"data: {errorMessage}\n\n"); messageQueue.Enqueue($"data: {errorMessage}\n\n");
} }
messageQueue.Enqueue("data: [DONE]\n\n");
return new StreamProcessResult return new StreamProcessResult
{ {
UserContent = userContent, UserContent = userContent,

View File

@@ -24,11 +24,12 @@ public class AiMessageManager : DomainService
/// <param name="input">消息输入</param> /// <param name="input">消息输入</param>
/// <param name="tokenId">Token IdWeb端传Guid.Empty</param> /// <param name="tokenId">Token IdWeb端传Guid.Empty</param>
/// <returns></returns> /// <returns></returns>
public async Task CreateSystemMessageAsync(Guid? userId, Guid? sessionId, MessageInputDto input, Guid? tokenId = null) public async Task<Guid> CreateSystemMessageAsync(Guid? userId, Guid? sessionId, MessageInputDto input, Guid? tokenId = null)
{ {
input.Role = "system"; input.Role = "system";
var message = new MessageAggregateRoot(userId, sessionId, input.Content, input.Role, input.ModelId, input.TokenUsage, tokenId); var message = new MessageAggregateRoot(userId, sessionId, input.Content, input.Role, input.ModelId, input.TokenUsage, tokenId);
await _repository.InsertAsync(message); await _repository.InsertAsync(message);
return message.Id;
} }
/// <summary> /// <summary>
@@ -40,7 +41,7 @@ public class AiMessageManager : DomainService
/// <param name="tokenId">Token IdWeb端传Guid.Empty</param> /// <param name="tokenId">Token IdWeb端传Guid.Empty</param>
/// <param name="createTime"></param> /// <param name="createTime"></param>
/// <returns></returns> /// <returns></returns>
public async Task CreateUserMessageAsync( Guid? userId, Guid? sessionId, MessageInputDto input, Guid? tokenId = null,DateTime? createTime=null) public async Task<Guid> CreateUserMessageAsync( Guid? userId, Guid? sessionId, MessageInputDto input, Guid? tokenId = null,DateTime? createTime=null)
{ {
input.Role = "user"; input.Role = "user";
var message = new MessageAggregateRoot(userId, sessionId, input.Content, input.Role, input.ModelId, input.TokenUsage, tokenId) var message = new MessageAggregateRoot(userId, sessionId, input.Content, input.Role, input.ModelId, input.TokenUsage, tokenId)
@@ -48,5 +49,6 @@ public class AiMessageManager : DomainService
CreationTime = createTime??DateTime.Now CreationTime = createTime??DateTime.Now
}; };
await _repository.InsertAsync(message); await _repository.InsertAsync(message);
return message.Id;
} }
} }