feat: Thor搭建

This commit is contained in:
ccnetcore
2025-07-17 23:10:26 +08:00
parent 10f7499066
commit e593f2cba4
54 changed files with 2526 additions and 867 deletions

View File

@@ -0,0 +1,5 @@
namespace Yi.Framework.AiHub.Domain.AiGateWay.Exceptions;
public sealed class PaymentRequiredException() : Exception()
{
}

View File

@@ -0,0 +1,12 @@
namespace Yi.Framework.AiHub.Domain.AiGateWay.Exceptions;
public class ThorRateLimitException : Exception
{
public ThorRateLimitException()
{
}
public ThorRateLimitException(string message) : base(message)
{
}
}

View File

@@ -0,0 +1,274 @@
using System.Net.Http.Json;
using System.Text;
using System.Text.Json;
using System.Text.Json.Serialization;
namespace Yi.Framework.AiHub.Domain.AiGateWay;
public static class HttpClientExtensions
{
public static async Task<HttpResponseMessage> HttpRequestRaw(this HttpClient httpClient, string url,
object? postData,
string token)
{
HttpRequestMessage req = new(HttpMethod.Post, url);
if (postData != null)
{
if (postData is HttpContent data)
{
req.Content = data;
}
else
{
string jsonContent = JsonSerializer.Serialize(postData, ThorJsonSerializer.DefaultOptions);
var stringContent = new StringContent(jsonContent, Encoding.UTF8, "application/json");
req.Content = stringContent;
}
}
if (!string.IsNullOrEmpty(token))
{
req.Headers.Add("Authorization", $"Bearer {token}");
}
var response = await httpClient.SendAsync(req, HttpCompletionOption.ResponseHeadersRead);
return response;
}
public static async Task<HttpResponseMessage> HttpRequestRaw(this HttpClient httpClient, string url,
object? postData,
string token, string tokenKey)
{
HttpRequestMessage req = new(HttpMethod.Post, url);
if (postData != null)
{
if (postData is HttpContent data)
{
req.Content = data;
}
else
{
string jsonContent = JsonSerializer.Serialize(postData, ThorJsonSerializer.DefaultOptions);
var stringContent = new StringContent(jsonContent, Encoding.UTF8, "application/json");
req.Content = stringContent;
}
}
if (!string.IsNullOrEmpty(token))
{
req.Headers.Add(tokenKey, token);
}
var response = await httpClient.SendAsync(req, HttpCompletionOption.ResponseHeadersRead);
return response;
}
public static async Task<HttpResponseMessage> HttpRequestRaw(this HttpClient httpClient, string url,
object? postData,
string token, Dictionary<string, string> headers)
{
HttpRequestMessage req = new(HttpMethod.Post, url);
if (postData != null)
{
if (postData is HttpContent data)
{
req.Content = data;
}
else
{
string jsonContent = JsonSerializer.Serialize(postData, ThorJsonSerializer.DefaultOptions);
var stringContent = new StringContent(jsonContent, Encoding.UTF8, "application/json");
req.Content = stringContent;
}
}
if (!string.IsNullOrEmpty(token))
{
req.Headers.Add("Authorization", $"Bearer {token}");
}
foreach (var header in headers)
{
req.Headers.Add(header.Key, header.Value);
}
var response = await httpClient.SendAsync(req, HttpCompletionOption.ResponseHeadersRead);
return response;
}
public static async Task<HttpResponseMessage> HttpRequestRaw(this HttpClient httpClient, HttpRequestMessage req,
object? postData)
{
if (postData != null)
{
if (postData is HttpContent data)
{
req.Content = data;
}
else
{
string jsonContent = JsonSerializer.Serialize(postData, ThorJsonSerializer.DefaultOptions);
var stringContent = new StringContent(jsonContent, Encoding.UTF8, "application/json");
req.Content = stringContent;
}
}
var response = await httpClient.SendAsync(req, HttpCompletionOption.ResponseHeadersRead);
return response;
}
public static async Task<HttpResponseMessage> PostJsonAsync(this HttpClient httpClient, string url,
object? postData,
string token)
{
HttpRequestMessage req = new(HttpMethod.Post, url);
if (postData != null)
{
if (postData is HttpContent data)
{
req.Content = data;
}
else
{
var stringContent =
new StringContent(JsonSerializer.Serialize(postData, ThorJsonSerializer.DefaultOptions),
Encoding.UTF8, "application/json");
req.Content = stringContent;
}
}
if (!string.IsNullOrEmpty(token))
{
req.Headers.Add("Authorization", $"Bearer {token}");
}
return await httpClient.SendAsync(req);
}
public static async Task<HttpResponseMessage> PostJsonAsync(this HttpClient httpClient, string url,
object? postData,
string token, Dictionary<string, string> headers)
{
HttpRequestMessage req = new(HttpMethod.Post, url);
if (postData != null)
{
if (postData is HttpContent data)
{
req.Content = data;
}
else
{
string jsonContent = JsonSerializer.Serialize(postData, ThorJsonSerializer.DefaultOptions);
var stringContent = new StringContent(jsonContent, Encoding.UTF8, "application/json");
req.Content = stringContent;
}
}
if (!string.IsNullOrEmpty(token))
{
req.Headers.Add("Authorization", $"Bearer {token}");
}
foreach (var header in headers)
{
req.Headers.Add(header.Key, header.Value);
}
return await httpClient.SendAsync(req);
}
public static Task<HttpResponseMessage> PostJsonAsync(this HttpClient httpClient, string url, object? postData,
string token, string tokenKey)
{
HttpRequestMessage req = new(HttpMethod.Post, url);
if (postData != null)
{
if (postData is HttpContent data)
{
req.Content = data;
}
else
{
string jsonContent = JsonSerializer.Serialize(postData, ThorJsonSerializer.DefaultOptions);
var stringContent = new StringContent(jsonContent, Encoding.UTF8, "application/json");
req.Content = stringContent;
}
}
if (!string.IsNullOrEmpty(token))
{
req.Headers.Add(tokenKey, token);
}
return httpClient.SendAsync(req);
}
public static async Task<TResponse> PostAndReadAsAsync<TResponse>(this HttpClient client, string uri,
object? requestModel, CancellationToken cancellationToken = default) where TResponse : ThorBaseResponse, new()
{
var response = await client.PostAsJsonAsync(uri, requestModel, new JsonSerializerOptions
{
DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingDefault
}, cancellationToken);
return await HandleResponseContent<TResponse>(response, cancellationToken);
}
public static async Task<TResponse> PostFileAndReadAsAsync<TResponse>(this HttpClient client, string uri,
HttpContent content, CancellationToken cancellationToken = default) where TResponse : ThorBaseResponse, new()
{
var response = await client.PostAsync(uri, content, cancellationToken);
return await HandleResponseContent<TResponse>(response, cancellationToken);
}
public static async Task<string> PostFileAndReadAsStringAsync(this HttpClient client, string uri,
HttpContent content, CancellationToken cancellationToken = default)
{
var response = await client.PostAsync(uri, content, cancellationToken);
return await response.Content.ReadAsStringAsync(cancellationToken) ?? throw new InvalidOperationException();
}
public static async Task<TResponse> DeleteAndReadAsAsync<TResponse>(this HttpClient client, string uri,
CancellationToken cancellationToken = default) where TResponse : ThorBaseResponse, new()
{
var response = await client.DeleteAsync(uri, cancellationToken);
return await HandleResponseContent<TResponse>(response, cancellationToken);
}
private static async Task<TResponse> HandleResponseContent<TResponse>(this HttpResponseMessage response,
CancellationToken cancellationToken) where TResponse : ThorBaseResponse, new()
{
TResponse result;
if (!response.Content.Headers.ContentType?.MediaType?.Equals("application/json",
StringComparison.OrdinalIgnoreCase) ?? true)
{
result = new()
{
Error = new()
{
MessageObject = await response.Content.ReadAsStringAsync(cancellationToken)
}
};
}
else
{
result = await response.Content.ReadFromJsonAsync<TResponse>(cancellationToken: cancellationToken) ??
throw new InvalidOperationException();
}
return result;
}
}

View File

@@ -0,0 +1,73 @@
using System.Collections.Concurrent;
namespace Yi.Framework.AiHub.Domain.AiGateWay;
public static class HttpClientFactory
{
/// <summary>
/// HttpClient池总数
/// </summary>
/// <returns></returns>
private static int _poolSize;
private static int PoolSize
{
get
{
if (_poolSize == 0)
{
// 获取环境变量
var poolSize = Environment.GetEnvironmentVariable("HttpClientPoolSize");
if (!string.IsNullOrEmpty(poolSize) && int.TryParse(poolSize, out var size))
{
_poolSize = size;
}
else
{
_poolSize = Environment.ProcessorCount;
}
if (_poolSize < 1)
{
_poolSize = 2;
}
}
return _poolSize;
}
}
private static readonly ConcurrentDictionary<string, Lazy<List<HttpClient>>> HttpClientPool = new();
public static HttpClient GetHttpClient(string key)
{
return HttpClientPool.GetOrAdd(key, k => new Lazy<List<HttpClient>>(() =>
{
var clients = new List<HttpClient>(PoolSize);
for (var i = 0; i < PoolSize; i++)
{
clients.Add(new HttpClient(new SocketsHttpHandler
{
PooledConnectionLifetime = TimeSpan.FromMinutes(30),
PooledConnectionIdleTimeout = TimeSpan.FromMinutes(30),
EnableMultipleHttp2Connections = true,
// 连接超时5分钟
ConnectTimeout = TimeSpan.FromMinutes(5),
MaxAutomaticRedirections = 3,
AllowAutoRedirect = true,
Expect100ContinueTimeout = TimeSpan.FromMinutes(30),
})
{
Timeout = TimeSpan.FromMinutes(30),
DefaultRequestHeaders =
{
{ "User-Agent", "yxai" },
}
});
}
return clients;
})).Value[new Random().Next(0, PoolSize)];
}
}

View File

@@ -0,0 +1,29 @@
using Yi.Framework.AiHub.Application.Contracts.Dtos.OpenAi;
using Yi.Framework.AiHub.Domain.Shared.Dtos;
namespace Yi.Framework.AiHub.Domain.AiGateWay;
public interface IChatCompletionService
{
/// <summary>
/// 聊天完成-流式
/// </summary>
/// <param name="aiModelDescribe"></param>
/// <param name="input"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public IAsyncEnumerable<ThorChatCompletionsResponse> CompleteChatStreamAsync(AiModelDescribe aiModelDescribe,
ThorChatCompletionsRequest input,
CancellationToken cancellationToken);
/// <summary>
/// 聊天完成-非流式
/// </summary>
/// <param name="aiModelDescribe"></param>
/// <param name="input"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public Task<ThorChatCompletionsResponse> CompleteChatAsync(AiModelDescribe aiModelDescribe,
ThorChatCompletionsRequest input,
CancellationToken cancellationToken);
}

View File

@@ -0,0 +1,183 @@
using System.Diagnostics;
using System.Net;
using System.Net.Http.Json;
using System.Text.Json;
using Microsoft.Extensions.Logging;
using Yi.Framework.AiHub.Application.Contracts.Dtos.OpenAi;
using Yi.Framework.AiHub.Domain.AiGateWay.Exceptions;
using Yi.Framework.AiHub.Domain.Shared.Dtos;
namespace Yi.Framework.AiHub.Domain.AiGateWay.Impl.ThorAzureDatabricks.Chats;
public class AzureDatabricksChatCompletionsService(ILogger<AzureDatabricksChatCompletionsService> logger)
: IChatCompletionService
{
private string GetAddress(AiModelDescribe? options, string model)
{
// This method should return the appropriate URL for the Azure Databricks API
// based on the provided options and model.
// For now, we will return a placeholder URL.
return $"{options?.Endpoint.TrimEnd('/')}/serving-endpoints/{model}/invocations";
}
public async IAsyncEnumerable<ThorChatCompletionsResponse> CompleteChatStreamAsync(AiModelDescribe options, ThorChatCompletionsRequest chatCompletionCreate,
CancellationToken cancellationToken)
{
var address = GetAddress(options, chatCompletionCreate.Model);
using var openai =
Activity.Current?.Source.StartActivity("OpenAI 对话流式补全");
chatCompletionCreate.StreamOptions = null;
var response = await HttpClientFactory.GetHttpClient(address).HttpRequestRaw(
address,
chatCompletionCreate, options.ApiKey);
openai?.SetTag("Model", chatCompletionCreate.Model);
openai?.SetTag("Response", response.StatusCode.ToString());
if (response.StatusCode == HttpStatusCode.Unauthorized)
{
throw new UnauthorizedAccessException();
}
if (response.StatusCode == HttpStatusCode.PaymentRequired)
{
throw new PaymentRequiredException();
}
// 如果限流则抛出限流异常
if (response.StatusCode == HttpStatusCode.TooManyRequests)
{
throw new ThorRateLimitException();
}
// 大于等于400的状态码都认为是异常
if (response.StatusCode >= HttpStatusCode.BadRequest)
{
var error = await response.Content.ReadAsStringAsync();
logger.LogError("OpenAI对话异常 , StatusCode: {StatusCode} 错误响应内容:{Content}", response.StatusCode,
error);
throw new BusinessException("OpenAI对话异常" + error, response.StatusCode.ToString());
}
using var stream = new StreamReader(await response.Content.ReadAsStreamAsync(cancellationToken));
using StreamReader reader = new(await response.Content.ReadAsStreamAsync(cancellationToken));
string? line = string.Empty;
var first = true;
var isThink = false;
while ((line = await reader.ReadLineAsync(cancellationToken).ConfigureAwait(false)) != null)
{
line += Environment.NewLine;
if (line.StartsWith('{'))
{
logger.LogInformation("OpenAI对话异常 , StatusCode: {StatusCode} Response: {Response}", response.StatusCode,
line);
throw new BusinessException("OpenAI对话异常", line);
}
if (line.StartsWith(OpenAIConstant.Data))
line = line[OpenAIConstant.Data.Length..];
line = line.Trim();
if (string.IsNullOrWhiteSpace(line)) continue;
if (line == OpenAIConstant.Done)
{
break;
}
if (line.StartsWith(':'))
{
continue;
}
var result = JsonSerializer.Deserialize<ThorChatCompletionsResponse>(line,
ThorJsonSerializer.DefaultOptions);
if (result == null)
{
continue;
}
var content = result?.Choices?.FirstOrDefault()?.Delta;
if (first && content?.Content == OpenAIConstant.ThinkStart)
{
isThink = true;
continue;
// 需要将content的内容转换到其他字段
}
if (isThink && content?.Content?.Contains(OpenAIConstant.ThinkEnd) == true)
{
isThink = false;
// 需要将content的内容转换到其他字段
continue;
}
if (isThink && result?.Choices != null)
{
// 需要将content的内容转换到其他字段
foreach (var choice in result.Choices)
{
choice.Delta.ReasoningContent = choice.Delta.Content;
choice.Delta.Content = string.Empty;
}
}
first = false;
yield return result;
}
}
public async Task<ThorChatCompletionsResponse> CompleteChatAsync(AiModelDescribe options, ThorChatCompletionsRequest chatCompletionCreate,
CancellationToken cancellationToken)
{
var address = GetAddress(options, chatCompletionCreate.Model);
using var openai =
Activity.Current?.Source.StartActivity("OpenAI 对话补全");
var response = await HttpClientFactory.GetHttpClient(address).PostJsonAsync(
address,
chatCompletionCreate, options.ApiKey).ConfigureAwait(false);
openai?.SetTag("Model", chatCompletionCreate.Model);
openai?.SetTag("Response", response.StatusCode.ToString());
if (response.StatusCode == HttpStatusCode.Unauthorized)
{
throw new BusinessException("渠道未登录,请联系管理人员", "401");
}
// 如果限流则抛出限流异常
if (response.StatusCode == HttpStatusCode.TooManyRequests)
{
throw new ThorRateLimitException();
}
// 大于等于400的状态码都认为是异常
if (response.StatusCode >= HttpStatusCode.BadRequest)
{
var error = await response.Content.ReadAsStringAsync(cancellationToken).ConfigureAwait(false);
logger.LogError("OpenAI对话异常 请求地址:{Address}, StatusCode: {StatusCode} Response: {Response}", options.Endpoint,
response.StatusCode, error);
throw new BusinessException("OpenAI对话异常", response.StatusCode.ToString());
}
var result =
await response.Content.ReadFromJsonAsync<ThorChatCompletionsResponse>(
cancellationToken: cancellationToken).ConfigureAwait(false);
return result;
}
}

View File

@@ -0,0 +1,71 @@
using System.ClientModel;
using System.Collections.Concurrent;
using Azure.AI.OpenAI;
using Yi.Framework.AiHub.Domain.Shared.Dtos;
namespace Yi.Framework.AiHub.Domain.AiGateWay.Impl.ThorAzureOpenAI;
public static class AzureOpenAIFactory
{
private const string AddressTemplate = "{0}/openai/deployments/{1}/chat/completions?api-version={2}";
private const string EditImageAddressTemplate = "{0}/openai/deployments/{1}/images/edits?api-version={2}";
private const string AudioSpeechTemplate = "{0}/openai/deployments/{1}/audio/speech?api-version={2}";
private const string AudioTranscriptions =
"{0}/openai/deployments/{1}/audio/transcriptions?api-version={2}";
private static readonly ConcurrentDictionary<string, AzureOpenAIClient> Clients = new();
public static string GetAudioTranscriptionsAddress(AiModelDescribe options, string model)
{
if (string.IsNullOrEmpty(options.ExtraUrl))
{
options.ExtraUrl = "2025-03-01-preview";
}
return string.Format(AudioTranscriptions, options.Endpoint.TrimEnd('/'), model, options.ExtraUrl);
}
public static string GetAudioSpeechAddress(AiModelDescribe options, string model)
{
if (string.IsNullOrEmpty(options.ExtraUrl))
{
options.ExtraUrl = "2025-03-01-preview";
}
return string.Format(AudioSpeechTemplate, options.Endpoint.TrimEnd('/'), model, options.ExtraUrl);
}
public static string GetAddress(AiModelDescribe options, string model)
{
if (string.IsNullOrEmpty(options.ExtraUrl))
{
options.ExtraUrl = "2025-03-01-preview";
}
return string.Format(AddressTemplate, options.Endpoint.TrimEnd('/'), model, options.ExtraUrl);
}
public static string GetEditImageAddress(AiModelDescribe options, string model)
{
if (string.IsNullOrEmpty(options.ExtraUrl))
{
options.ExtraUrl = "2025-03-01-preview";
}
return string.Format(EditImageAddressTemplate, options.Endpoint.TrimEnd('/'), model, options.ExtraUrl);
}
public static AzureOpenAIClient CreateClient(AiModelDescribe options)
{
return Clients.GetOrAdd($"{options.ApiKey}_{options.Endpoint}_{options.ExtraUrl}", (_) =>
{
const AzureOpenAIClientOptions.ServiceVersion version = AzureOpenAIClientOptions.ServiceVersion.V2024_06_01;
var client = new AzureOpenAIClient(new Uri(options.Endpoint), new ApiKeyCredential(options.ApiKey),
new AzureOpenAIClientOptions(version));
return client;
});
}
}

View File

@@ -0,0 +1,112 @@
using System.Diagnostics;
using System.Net;
using System.Net.Http.Json;
using System.Text.Json;
using Microsoft.Extensions.Logging;
using Yi.Framework.AiHub.Application.Contracts.Dtos.OpenAi;
using Yi.Framework.AiHub.Domain.AiGateWay.Exceptions;
using Yi.Framework.AiHub.Domain.Shared.Dtos;
namespace Yi.Framework.AiHub.Domain.AiGateWay.Impl.ThorAzureOpenAI.Chats;
public class AzureOpenAiChatCompletionCompletionsService(ILogger<AzureOpenAiChatCompletionCompletionsService> logger)
: IChatCompletionService
{
public async IAsyncEnumerable<ThorChatCompletionsResponse> CompleteChatStreamAsync(AiModelDescribe options,
ThorChatCompletionsRequest chatCompletionCreate,
CancellationToken cancellationToken)
{
using var openai =
Activity.Current?.Source.StartActivity("Azure OpenAI 对话流式补全");
var url = AzureOpenAIFactory.GetAddress(options, chatCompletionCreate.Model);
var response = await HttpClientFactory.GetHttpClient(options.Endpoint).HttpRequestRaw(url,
chatCompletionCreate, options.ApiKey, "Api-Key");
openai?.SetTag("Model", chatCompletionCreate.Model);
openai?.SetTag("Response", response.StatusCode.ToString());
if (response.StatusCode >= HttpStatusCode.BadRequest)
{
var error = await response.Content.ReadAsStringAsync();
logger.LogError("Azure对话异常 , StatusCode: {StatusCode} 错误响应内容:{Content}", response.StatusCode,
error);
throw new BusinessException("AzureOpenAI对话异常" + error, response.StatusCode.ToString());
}
using StreamReader reader = new(await response.Content.ReadAsStreamAsync(cancellationToken));
string? line = string.Empty;
var first = true;
while ((line = await reader.ReadLineAsync().ConfigureAwait(false)) != null)
{
line += Environment.NewLine;
if (line.StartsWith('{'))
{
logger.LogInformation("AzureOpenAI对话异常 , StatusCode: {StatusCode} Response: {Response}",
response.StatusCode,
line);
throw new BusinessException("AzureOpenAI对话异常", line);
}
if (line.StartsWith(OpenAIConstant.Data))
line = line[OpenAIConstant.Data.Length..];
line = line.Trim();
if (string.IsNullOrWhiteSpace(line)) continue;
if (line == OpenAIConstant.Done)
{
break;
}
if (line.StartsWith(':'))
{
continue;
}
var result = JsonSerializer.Deserialize<ThorChatCompletionsResponse>(line,
ThorJsonSerializer.DefaultOptions);
yield return result;
}
}
public async Task<ThorChatCompletionsResponse> CompleteChatAsync(AiModelDescribe options,
ThorChatCompletionsRequest chatCompletionCreate,
CancellationToken cancellationToken)
{
using var openai =
Activity.Current?.Source.StartActivity("Azure OpenAI 对话补全");
var url = AzureOpenAIFactory.GetAddress(options, chatCompletionCreate.Model);
var response =
await HttpClientFactory.GetHttpClient(options.Endpoint)
.PostJsonAsync(url, chatCompletionCreate, options.ApiKey, "Api-Key");
openai?.SetTag("Model", chatCompletionCreate.Model);
openai?.SetTag("Response", response.StatusCode.ToString());
// 如果限流则抛出限流异常
if (response.StatusCode == HttpStatusCode.TooManyRequests)
{
throw new ThorRateLimitException();
}
if (response.StatusCode >= HttpStatusCode.BadRequest)
{
logger.LogError("Azure对话异常 , StatusCode: {StatusCode} Response: {Response} Url:{Url}", response.StatusCode,
await response.Content.ReadAsStringAsync(cancellationToken), url);
}
var result = await response.Content
.ReadFromJsonAsync<ThorChatCompletionsResponse>(ThorJsonSerializer.DefaultOptions,
cancellationToken: cancellationToken)
.ConfigureAwait(false);
return result;
}
}

View File

@@ -0,0 +1,26 @@
using System.Text.Json.Serialization;
using Yi.Framework.AiHub.Application.Contracts.Dtos.OpenAi;
namespace Yi.Framework.AiHub.Domain.AiGateWay;
public record ThorBaseResponse
{
/// <summary>
/// 对象类型
/// </summary>
[JsonPropertyName("object")]
public string? ObjectTypeName { get; set; }
/// <summary>
///
/// </summary>
public bool Successful => Error == null;
/// <summary>
///
/// </summary>
[JsonPropertyName("error")]
public ThorError? Error { get; set; }
}

View File

@@ -0,0 +1,14 @@
using System.Text.Json;
using System.Text.Json.Serialization;
namespace Yi.Framework.AiHub.Domain.AiGateWay;
public static class ThorJsonSerializer
{
public static JsonSerializerOptions DefaultOptions => new()
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull,
Encoder = System.Text.Encodings.Web.JavaScriptEncoder.UnsafeRelaxedJsonEscaping,
};
}