Files
Yi.Framework/Yi.Abp.Net8/module/ai-hub/Yi.Framework.AiHub.Application/Services/OpenApiService.cs
2025-07-03 22:44:52 +08:00

261 lines
9.1 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
using System.Collections.Concurrent;
using System.Text;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Newtonsoft.Json.Serialization;
using OpenAI.Chat;
using Volo.Abp.Application.Services;
using Volo.Abp.Users;
using Yi.Framework.AiHub.Application.Contracts.Dtos;
using Yi.Framework.AiHub.Application.Contracts.Dtos.OpenAiDto;
using Yi.Framework.AiHub.Domain.Entities.OpenApi;
using Yi.Framework.AiHub.Domain.Managers;
using Yi.Framework.AiHub.Domain.Shared.Dtos;
using Yi.Framework.SqlSugarCore.Abstractions;
namespace Yi.Framework.AiHub.Application.Services;
public class OpenApiService : ApplicationService
{
private readonly IHttpContextAccessor _httpContextAccessor;
private readonly ILogger<OpenApiService> _logger;
private readonly TokenManager _tokenManager;
private readonly AiMessageManager _aiMessageManager;
private readonly UsageStatisticsManager _usageStatisticsManager;
public OpenApiService(IHttpContextAccessor httpContextAccessor, ILogger<OpenApiService> logger,
TokenManager tokenManager, AiMessageManager aiMessageManager, UsageStatisticsManager usageStatisticsManager)
{
_httpContextAccessor = httpContextAccessor;
_logger = logger;
_tokenManager = tokenManager;
_aiMessageManager = aiMessageManager;
_usageStatisticsManager = usageStatisticsManager;
}
/// <summary>
/// 对话
/// </summary>
/// <param name="input"></param>
/// <param name="cancellationToken"></param>
[HttpPost("openApi/v1/chat/completions")]
public async Task ChatCompletionsAsync(ChatCompletionsInput input, CancellationToken cancellationToken)
{
//前面都是校验,后面才是真正的调用
var httpContext = this._httpContextAccessor.HttpContext;
var userId = await _tokenManager.GetUserIdAsync(GetTokenByHttpContext(httpContext));
var response = httpContext.Response;
// 设置响应头,声明是 SSE 流
response.ContentType = "text/event-stream";
response.Headers.Append("Cache-Control", "no-cache");
response.Headers.Append("Connection", "keep-alive");
var history = new List<ChatMessage>();
foreach (var aiChatContextDto in input.Messages)
{
if (aiChatContextDto.Role == "assistant")
{
history.Add(ChatMessage.CreateAssistantMessage(aiChatContextDto.Content));
}
else if (aiChatContextDto.Role == "user")
{
history.Add(ChatMessage.CreateUserMessage(aiChatContextDto.Content));
}
}
var gateWay = LazyServiceProvider.GetRequiredService<AiGateWayManager>();
var completeChatResponse = gateWay.CompleteChatAsync(input.Model, history, cancellationToken);
var tokenUsage = new TokenUsage();
await using var writer = new StreamWriter(response.Body, Encoding.UTF8, leaveOpen: true);
//缓存队列算法
// 创建一个队列来缓存消息
var messageQueue = new ConcurrentQueue<string>();
StringBuilder backupSystemContent = new StringBuilder();
// 设置输出速率例如每50毫秒输出一次
var outputInterval = TimeSpan.FromMilliseconds(100);
// 标记是否完成接收
var isComplete = false;
// 启动一个后台任务来消费队列
var outputTask = Task.Run(async () =>
{
while (!(isComplete && messageQueue.IsEmpty))
{
if (messageQueue.TryDequeue(out var message))
{
await writer.WriteLineAsync(message);
await writer.FlushAsync(cancellationToken);
}
if (!isComplete)
{
// 如果没有完成,才等待,已完成,全部输出
await Task.Delay(outputInterval, cancellationToken);
}
}
}, cancellationToken);
//IAsyncEnumerable 只能在最外层捕获异常(如果你有其他办法的话...
try
{
await foreach (var data in completeChatResponse)
{
if (data.IsFinish)
{
tokenUsage = data.TokenUsage;
}
var model = MapToMessage(input.Model, data.Content);
var message = JsonConvert.SerializeObject(model, new JsonSerializerSettings
{
ContractResolver = new CamelCasePropertyNamesContractResolver()
});
backupSystemContent.Append(data.Content);
// 将消息加入队列而不是直接写入
messageQueue.Enqueue($"data: {message}\n");
}
}
catch (Exception e)
{
_logger.LogError(e, $"Ai对话异常");
var errorContent = $"Ai对话异常异常信息\n{e.Message}";
var model = MapToMessage(input.Model, errorContent);
var message = JsonConvert.SerializeObject(model, new JsonSerializerSettings
{
ContractResolver = new CamelCasePropertyNamesContractResolver()
});
backupSystemContent.Append(errorContent);
messageQueue.Enqueue($"data: {message}\n");
}
//断开连接
messageQueue.Enqueue("data: [DONE]\n");
// 标记完成并发送结束标记
isComplete = true;
await outputTask;
await _aiMessageManager.CreateUserMessageAsync(userId, null,
new MessageInputDto
{
Content = input.Messages.LastOrDefault()
.Content,
ModelId = input.Model,
TokenUsage = tokenUsage,
});
await _aiMessageManager.CreateSystemMessageAsync(userId, null,
new MessageInputDto
{
Content = backupSystemContent.ToString(),
ModelId = input.Model,
TokenUsage = tokenUsage
});
await _usageStatisticsManager.SetUsageAsync(userId, input.Model, tokenUsage.InputTokenCount,
tokenUsage.OutputTokenCount);
}
private SendMessageOutputDto MapToMessage(string modelId, string content)
{
var output = new SendMessageOutputDto
{
Id = "chatcmpl-BotYP3BlN5T4g9YPnW0fBSBvKzXdd",
Object = "chat.completion.chunk",
Created = 1750336171,
Model = modelId,
Choices = new()
{
new Choice
{
Index = 0,
Delta = new Delta
{
Content = content,
Role = "assistant"
},
FinishReason = null,
ContentFilterResults = new()
{
Hate = new()
{
Filtered = false,
Detected = null
},
SelfHarm = new()
{
Filtered = false,
Detected = null
},
Sexual = new()
{
Filtered = false,
Detected = null
},
Violence = new()
{
Filtered = false,
Detected = null
},
Jailbreak = new()
{
Filtered = false,
Detected = false
},
Profanity = new()
{
Filtered = false,
Detected = false
},
}
}
},
SystemFingerprint = "",
Usage = new Usage
{
PromptTokens = 0,
CompletionTokens = 0,
TotalTokens = 0,
PromptTokensDetails = new()
{
AudioTokens = 0,
CachedTokens = 0
},
CompletionTokensDetails = new()
{
AudioTokens = 0,
ReasoningTokens = 0,
AcceptedPredictionTokens = 0,
RejectedPredictionTokens = 0
}
}
};
return output;
}
private string? GetTokenByHttpContext(HttpContext httpContext)
{
// 获取Authorization头
string authHeader = httpContext.Request.Headers["Authorization"];
// 检查是否有Bearer token
if (authHeader != null && authHeader.StartsWith("Bearer "))
{
return authHeader.Substring("Bearer ".Length).Trim();
}
return null;
}
}