feat: 新增claude接口转换支持

This commit is contained in:
chenchun
2025-10-11 15:25:43 +08:00
parent 29dc1ae250
commit 345ed80ec8
21 changed files with 2161 additions and 30 deletions

View File

@@ -13,11 +13,14 @@ using Yi.Framework.AiHub.Domain.AiGateWay;
using Yi.Framework.AiHub.Domain.AiGateWay.Exceptions;
using Yi.Framework.AiHub.Domain.Entities.Model;
using Yi.Framework.AiHub.Domain.Shared.Dtos;
using Yi.Framework.AiHub.Domain.Shared.Dtos.Anthropic;
using Yi.Framework.AiHub.Domain.Shared.Dtos.OpenAi;
using Yi.Framework.AiHub.Domain.Shared.Dtos.OpenAi.Embeddings;
using Yi.Framework.AiHub.Domain.Shared.Dtos.OpenAi.Images;
using Yi.Framework.Core.Extensions;
using Yi.Framework.SqlSugarCore.Abstractions;
using JsonSerializer = System.Text.Json.JsonSerializer;
using ThorJsonSerializer = Yi.Framework.AiHub.Domain.AiGateWay.ThorJsonSerializer;
namespace Yi.Framework.AiHub.Domain.Managers;
@@ -394,7 +397,7 @@ public class AiGateWayManager : DomainService
var usage = new ThorUsageResponse()
{
PromptTokens = stream.Usage?.PromptTokens??0,
PromptTokens = stream.Usage?.PromptTokens ?? 0,
InputTokens = stream.Usage?.InputTokens ?? 0,
CompletionTokens = 0,
TotalTokens = stream.Usage?.InputTokens ?? 0
@@ -441,4 +444,210 @@ public class AiGateWayManager : DomainService
throw new UserFriendlyException(errorContent);
}
}
/// <summary>
/// Anthropic聊天完成-流式
/// </summary>
/// <param name="request"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async IAsyncEnumerable<(string, AnthropicStreamDto?)> AnthropicCompleteChatStreamAsync(
AnthropicInput request,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
_specialCompatible.AnthropicCompatible(request);
var modelDescribe = await GetModelAsync(request.Model);
var chatService =
LazyServiceProvider.GetRequiredKeyedService<IAnthropicChatCompletionService>(modelDescribe.HandlerName);
await foreach (var result in chatService.StreamChatCompletionsAsync(modelDescribe, request, cancellationToken))
{
yield return result;
}
}
/// <summary>
/// Anthropic聊天完成-非流式
/// </summary>
/// <param name="httpContext"></param>
/// <param name="request"></param>
/// <param name="userId"></param>
/// <param name="sessionId"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task AnthropicCompleteChatForStatisticsAsync(HttpContext httpContext,
AnthropicInput request,
Guid? userId = null,
Guid? sessionId = null,
CancellationToken cancellationToken = default)
{
_specialCompatible.AnthropicCompatible(request);
var response = httpContext.Response;
// 设置响应头,声明是 json
//response.ContentType = "application/json; charset=UTF-8";
var modelDescribe = await GetModelAsync(request.Model);
var chatService =
LazyServiceProvider.GetRequiredKeyedService<IAnthropicChatCompletionService>(modelDescribe.HandlerName);
var data = await chatService.ChatCompletionsAsync(modelDescribe, request, cancellationToken);
if (userId is not null)
{
await _aiMessageManager.CreateUserMessageAsync(userId.Value, sessionId,
new MessageInputDto
{
Content = request.Messages?.FirstOrDefault()?.Content ?? string.Empty,
ModelId = request.Model,
TokenUsage = data.TokenUsage,
});
await _aiMessageManager.CreateSystemMessageAsync(userId.Value, sessionId,
new MessageInputDto
{
Content = data.content?.FirstOrDefault()?.text,
ModelId = request.Model,
TokenUsage = data.TokenUsage
});
await _usageStatisticsManager.SetUsageAsync(userId.Value, request.Model, data.TokenUsage);
}
await response.WriteAsJsonAsync(data, cancellationToken);
}
/// <summary>
/// Anthropic聊天完成-缓存处理
/// </summary>
/// <param name="httpContext"></param>
/// <param name="request"></param>
/// <param name="userId"></param>
/// <param name="sessionId"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task AnthropicCompleteChatStreamForStatisticsAsync(
HttpContext httpContext,
AnthropicInput request,
Guid? userId = null,
Guid? sessionId = null,
CancellationToken cancellationToken = default)
{
var response = httpContext.Response;
// 设置响应头,声明是 SSE 流
response.ContentType = "text/event-stream;charset=utf-8;";
response.Headers.TryAdd("Cache-Control", "no-cache");
response.Headers.TryAdd("Connection", "keep-alive");
var gateWay = LazyServiceProvider.GetRequiredService<AiGateWayManager>();
var completeChatResponse = gateWay.AnthropicCompleteChatStreamAsync(request, cancellationToken);
ThorUsageResponse? tokenUsage = null;
StringBuilder backupSystemContent = new StringBuilder();
try
{
await foreach (var responseResult in completeChatResponse)
{
tokenUsage = responseResult.Item2?.TokenUsage;
backupSystemContent.Append(responseResult.Item2?.Delta?.Text);
await WriteAsEventStreamDataAsync(httpContext, responseResult.Item1, responseResult.Item2,
cancellationToken);
}
}
catch (Exception e)
{
_logger.LogError(e, $"Ai对话异常");
var errorContent = $"对话Ai异常异常信息\n当前Ai模型{request.Model}\n异常信息{e.Message}\n异常堆栈:{e}";
var model = new AnthropicStreamDto
{
Message = new AnthropicChatCompletionDto
{
content =
[
new AnthropicChatCompletionDtoContent
{
text = errorContent,
}
],
},
Error = new AnthropicStreamErrorDto
{
Type = null,
Message = errorContent
}
};
var message = JsonConvert.SerializeObject(model, new JsonSerializerSettings
{
ContractResolver = new CamelCasePropertyNamesContractResolver()
});
await response.WriteAsJsonAsync(message, ThorJsonSerializer.DefaultOptions);
}
await _aiMessageManager.CreateUserMessageAsync(userId, sessionId,
new MessageInputDto
{
Content = request.Messages?.LastOrDefault()?.Content ?? string.Empty,
ModelId = request.Model,
TokenUsage = tokenUsage,
});
await _aiMessageManager.CreateSystemMessageAsync(userId, sessionId,
new MessageInputDto
{
Content = backupSystemContent.ToString(),
ModelId = request.Model,
TokenUsage = tokenUsage
});
await _usageStatisticsManager.SetUsageAsync(userId, request.Model, tokenUsage);
}
#region Anthropic格式Http响应
private static readonly byte[] EventPrefix = "event: "u8.ToArray();
private static readonly byte[] DataPrefix = "data: "u8.ToArray();
private static readonly byte[] NewLine = "\n"u8.ToArray();
private static readonly byte[] DoubleNewLine = "\n\n"u8.ToArray();
/// <summary>
/// 使用 JsonSerializer.SerializeAsync 直接序列化到响应流
/// </summary>
private static async ValueTask WriteAsEventStreamDataAsync<T>(
HttpContext context,
string @event,
T value,
CancellationToken cancellationToken = default)
where T : class
{
var response = context.Response;
var bodyStream = response.Body;
// 确保 SSE Header 已经设置好
// e.g. Content-Type: text/event-stream; charset=utf-8
await response.StartAsync(cancellationToken).ConfigureAwait(false);
// 写事件类型
await bodyStream.WriteAsync(EventPrefix, cancellationToken).ConfigureAwait(false);
await WriteUtf8StringAsync(bodyStream, @event.Trim(), cancellationToken).ConfigureAwait(false);
await bodyStream.WriteAsync(NewLine, cancellationToken).ConfigureAwait(false);
// 写 data: + JSON
await bodyStream.WriteAsync(DataPrefix, cancellationToken).ConfigureAwait(false);
await JsonSerializer.SerializeAsync(
bodyStream,
value,
ThorJsonSerializer.DefaultOptions,
cancellationToken
).ConfigureAwait(false);
// 事件结束 \n\n
await bodyStream.WriteAsync(DoubleNewLine, cancellationToken).ConfigureAwait(false);
// 及时把数据发送给客户端
await bodyStream.FlushAsync(cancellationToken).ConfigureAwait(false);
}
private static async ValueTask WriteUtf8StringAsync(Stream stream, string value, CancellationToken token)
{
if (string.IsNullOrEmpty(value))
return;
var buffer = Encoding.UTF8.GetBytes(value);
await stream.WriteAsync(buffer, token).ConfigureAwait(false);
}
#endregion
}