perf: 优化流式输出

This commit is contained in:
ccnetcore
2025-07-21 21:15:02 +08:00
parent 660bd00cae
commit d72cc529ba

View File

@@ -135,7 +135,7 @@ public class AiGateWayManager : DomainService
await _usageStatisticsManager.SetUsageAsync(userId.Value, request.Model, data.Usage.InputTokens ?? 0, await _usageStatisticsManager.SetUsageAsync(userId.Value, request.Model, data.Usage.InputTokens ?? 0,
data.Usage.OutputTokens ?? 0); data.Usage.OutputTokens ?? 0);
} }
await response.WriteAsJsonAsync(data, cancellationToken); await response.WriteAsJsonAsync(data, cancellationToken);
} }
@@ -157,15 +157,14 @@ public class AiGateWayManager : DomainService
{ {
var response = httpContext.Response; var response = httpContext.Response;
// 设置响应头,声明是 SSE 流 // 设置响应头,声明是 SSE 流
response.ContentType = "text/event-stream"; response.ContentType = "text/event-stream;charset=utf-8;";
response.Headers.Append("Cache-Control", "no-cache"); response.Headers.TryAdd("Cache-Control", "no-cache");
response.Headers.Append("Connection", "keep-alive"); response.Headers.TryAdd("Connection", "keep-alive");
var gateWay = LazyServiceProvider.GetRequiredService<AiGateWayManager>(); var gateWay = LazyServiceProvider.GetRequiredService<AiGateWayManager>();
var completeChatResponse = gateWay.CompleteChatStreamAsync(request, cancellationToken); var completeChatResponse = gateWay.CompleteChatStreamAsync(request, cancellationToken);
var tokenUsage = new ThorUsageResponse(); var tokenUsage = new ThorUsageResponse();
await using var writer = new StreamWriter(response.Body, Encoding.UTF8, leaveOpen: true);
//缓存队列算法 //缓存队列算法
// 创建一个队列来缓存消息 // 创建一个队列来缓存消息
@@ -183,14 +182,14 @@ public class AiGateWayManager : DomainService
{ {
if (messageQueue.TryDequeue(out var message)) if (messageQueue.TryDequeue(out var message))
{ {
await writer.WriteLineAsync(message); await response.WriteAsync(message, Encoding.UTF8, cancellationToken).ConfigureAwait(false);
await writer.FlushAsync(cancellationToken); await response.Body.FlushAsync(cancellationToken).ConfigureAwait(false);
} }
if (!isComplete) if (!isComplete)
{ {
// 如果没有完成,才等待,已完成,全部输出 // 如果没有完成,才等待,已完成,全部输出
await Task.Delay(outputInterval, cancellationToken); await Task.Delay(outputInterval, cancellationToken).ConfigureAwait(false);
} }
} }
}, cancellationToken); }, cancellationToken);
@@ -206,13 +205,10 @@ public class AiGateWayManager : DomainService
tokenUsage = data.Usage; tokenUsage = data.Usage;
} }
var message = JsonConvert.SerializeObject(data, new JsonSerializerSettings var message = System.Text.Json.JsonSerializer.Serialize(data, ThorJsonSerializer.DefaultOptions);
{
ContractResolver = new CamelCasePropertyNamesContractResolver()
});
backupSystemContent.Append(data.Choices.FirstOrDefault()?.Delta.Content); backupSystemContent.Append(data.Choices.FirstOrDefault()?.Delta.Content);
// 将消息加入队列而不是直接写入 // 将消息加入队列而不是直接写入
messageQueue.Enqueue($"data: {message}\n"); messageQueue.Enqueue($"data: {message}\n\n");
} }
} }
catch (Exception e) catch (Exception e)
@@ -237,11 +233,11 @@ public class AiGateWayManager : DomainService
ContractResolver = new CamelCasePropertyNamesContractResolver() ContractResolver = new CamelCasePropertyNamesContractResolver()
}); });
backupSystemContent.Append(errorContent); backupSystemContent.Append(errorContent);
messageQueue.Enqueue($"data: {message}\n"); messageQueue.Enqueue($"data: {message}\n\n");
} }
//断开连接 //断开连接
messageQueue.Enqueue("data: [DONE]\n"); messageQueue.Enqueue("data: [DONE]\n\n");
// 标记完成并发送结束标记 // 标记完成并发送结束标记
isComplete = true; isComplete = true;