using System.Collections.Concurrent;
using System.Text;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using Newtonsoft.Json.Serialization;
using OpenAI.Chat;
using Volo.Abp.Application.Services;
using Volo.Abp.Users;
using Yi.Framework.AiHub.Application.Contracts.Dtos;
using Yi.Framework.AiHub.Domain.Entities;
using Yi.Framework.AiHub.Domain.Managers;
using Yi.Framework.Rbac.Application.Contracts.IServices;
using Yi.Framework.Rbac.Domain.Shared.Dtos;
using Yi.Framework.SqlSugarCore.Abstractions;
namespace Yi.Framework.AiHub.Application.Services;
///
/// ai服务
///
public class AiChatService : ApplicationService
{
private readonly IHttpContextAccessor _httpContextAccessor;
private readonly AiMessageManager _aiMessageManager;
private readonly ISqlSugarRepository _aiModelRepository;
private readonly AiBlacklistManager _aiBlacklistManager;
public AiChatService(IHttpContextAccessor httpContextAccessor,
AiMessageManager aiMessageManager, AiBlacklistManager aiBlacklistManager,
ISqlSugarRepository aiModelRepository)
{
this._httpContextAccessor = httpContextAccessor;
_aiMessageManager = aiMessageManager;
_aiBlacklistManager = aiBlacklistManager;
_aiModelRepository = aiModelRepository;
}
///
/// 查询已登录的账户信息
///
///
[Route("ai-chat/account")]
[Authorize]
public async Task GetAsync()
{
var accountService = LazyServiceProvider.GetRequiredService();
var output = await accountService.GetAsync();
return output;
}
///
/// 获取模型列表
///
///
public async Task> GetModelAsync()
{
var output = await _aiModelRepository._DbQueryable
.OrderByDescending(x=>x.OrderNum)
.Select(x => new ModelGetListOutput
{
Id = x.Id,
Category = "chat",
ModelName = x.Name,
ModelDescribe = x.Description,
ModelPrice = 0,
ModelType = "1",
ModelShow = "0",
SystemPrompt = null,
ApiHost = null,
ApiKey = null,
Remark = x.Description
}).ToListAsync();
return output;
}
///
/// 发送消息
///
///
///
public async Task PostSendAsync(SendMessageInput input, CancellationToken cancellationToken)
{
//除了免费模型,其他的模型都要校验
// if (input.Model != "DeepSeek-R1-0528")
// {
// //有token,需要黑名单校验
// if (CurrentUser.IsAuthenticated)
// {
// await _aiBlacklistManager.VerifiyAiBlacklist(CurrentUser.GetId());
// if (!CurrentUser.Roles.Contains("YiXinAi-Vip"))
// {
// throw new UserFriendlyException("该模型需要VIP用户才能使用,请购买VIP后重新登录重试");
// }
// }
// else
// {
// throw new UserFriendlyException("未登录用户,只能使用未加速的DeepSeek-R1,请登录后重试");
// }
// }
//前面都是校验,后面才是真正的调用
var httpContext = this._httpContextAccessor.HttpContext;
var response = httpContext.Response;
// 设置响应头,声明是 SSE 流
response.ContentType = "text/event-stream";
response.Headers.Append("Cache-Control", "no-cache");
response.Headers.Append("Connection", "keep-alive");
var history = new List();
foreach (var aiChatContextDto in input.Messages)
{
if (aiChatContextDto.Role == "system")
{
history.Add(ChatMessage.CreateAssistantMessage(aiChatContextDto.Content));
}
else if (aiChatContextDto.Role == "user")
{
history.Add(ChatMessage.CreateUserMessage(aiChatContextDto.Content));
}
}
var gateWay = LazyServiceProvider.GetRequiredService();
var completeChatResponse = gateWay.CompleteChatAsync(input.Model, history, cancellationToken);
await using var writer = new StreamWriter(response.Body, Encoding.UTF8, leaveOpen: true);
//缓存队列算法
// 创建一个队列来缓存消息
var messageQueue = new ConcurrentQueue();
// 设置输出速率(例如每50毫秒输出一次)
var outputInterval = TimeSpan.FromMilliseconds(100);
// 标记是否完成接收
var isComplete = false;
// 启动一个后台任务来消费队列
var outputTask = Task.Run(async () =>
{
while (!(isComplete && messageQueue.IsEmpty))
{
if (messageQueue.TryDequeue(out var message))
{
await writer.WriteLineAsync(message);
await writer.FlushAsync(cancellationToken);
}
if (!isComplete)
{
// 如果没有完成,才等待,已完成,全部输出
await Task.Delay(outputInterval, cancellationToken);
}
}
}, cancellationToken);
await foreach (var data in completeChatResponse)
{
var model = MapToMessage(input.Model, data);
var message = JsonConvert.SerializeObject(model, new JsonSerializerSettings
{
ContractResolver = new CamelCasePropertyNamesContractResolver()
});
// 将消息加入队列而不是直接写入
messageQueue.Enqueue($"data: {message}\n");
}
// 标记完成并发送结束标记
isComplete = true;
//断开连接
messageQueue.Enqueue("data: done\n");
await outputTask;
if (CurrentUser.IsAuthenticated && input.SessionId.HasValue)
{
await _aiMessageManager.CreateMessageAsync(CurrentUser.GetId(), input.SessionId.Value, new MessageInputDto
{
Content = input.Messages.LastOrDefault().Content,
Role = input.Messages.LastOrDefault().Role,
DeductCost = 0,
TotalTokens = 0,
ModelId = input.Model,
Remark = null
});
}
}
private SendMessageOutputDto MapToMessage(string modelId, string content)
{
var output = new SendMessageOutputDto
{
Id = 001,
Object = "chat.completion.chunk",
Created = 1750336171,
Model = modelId,
Choices = new()
{
new Choice
{
Index = 0,
Delta = new Delta
{
Content = content,
Role = "assistant"
},
FinishReason = null,
ContentFilterResults = new()
{
Hate = new()
{
Filtered = false,
Detected = null
},
SelfHarm = new()
{
Filtered = false,
Detected = null
},
Sexual = new()
{
Filtered = false,
Detected = null
},
Violence = new()
{
Filtered = false,
Detected = null
},
Jailbreak = new()
{
Filtered = false,
Detected = false
},
Profanity = new()
{
Filtered = false,
Detected = false
},
}
}
},
SystemFingerprint = "",
Usage = new Usage
{
PromptTokens = 75,
CompletionTokens = 25,
TotalTokens = 100,
PromptTokensDetails = new()
{
AudioTokens = 0,
CachedTokens = 0
},
CompletionTokensDetails = new()
{
AudioTokens = 0,
ReasoningTokens = 0,
AcceptedPredictionTokens = 0,
RejectedPredictionTokens = 0
}
}
};
return output;
}
}