diff --git a/Yi.Abp.Net8/framework/Yi.Framework.BackgroundWorkers.Hangfire/YiFrameworkBackgroundWorkersHangfireModule.cs b/Yi.Abp.Net8/framework/Yi.Framework.BackgroundWorkers.Hangfire/YiFrameworkBackgroundWorkersHangfireModule.cs index 6663358a..71948243 100644 --- a/Yi.Abp.Net8/framework/Yi.Framework.BackgroundWorkers.Hangfire/YiFrameworkBackgroundWorkersHangfireModule.cs +++ b/Yi.Abp.Net8/framework/Yi.Framework.BackgroundWorkers.Hangfire/YiFrameworkBackgroundWorkersHangfireModule.cs @@ -2,6 +2,7 @@ using Hangfire; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Options; using Volo.Abp.BackgroundJobs.Hangfire; using Volo.Abp.BackgroundWorkers; using Volo.Abp.BackgroundWorkers.Hangfire; @@ -32,13 +33,19 @@ public sealed class YiFrameworkBackgroundWorkersHangfireModule : AbpModule /// 应用程序初始化上下文 public override async Task OnApplicationInitializationAsync(ApplicationInitializationContext context) { + if (!context.ServiceProvider.GetRequiredService>().Value.IsEnabled) + { + return; + } + + // 获取后台任务管理器和所有 Hangfire 后台任务 var backgroundWorkerManager = context.ServiceProvider.GetRequiredService(); var workers = context.ServiceProvider.GetServices(); // 获取配置 var configuration = context.ServiceProvider.GetRequiredService(); - + // 检查是否启用 Redis var isRedisEnabled = configuration.GetValue("Redis:IsEnabled"); @@ -56,11 +63,11 @@ public sealed class YiFrameworkBackgroundWorkersHangfireModule : AbpModule { // 内存模式:直接使用 Hangfire var unProxyWorker = ProxyHelper.UnProxy(worker); - + // 添加或更新循环任务 RecurringJob.AddOrUpdate( worker.RecurringJobId, - (Expression>)(() => + (Expression>)(() => ((IHangfireBackgroundWorker)unProxyWorker).DoWorkAsync(default)), worker.CronExpression, new RecurringJobOptions diff --git a/Yi.Abp.Net8/module/ai-hub/Yi.Framework.AiHub.Application/Services/AiChatService.cs b/Yi.Abp.Net8/module/ai-hub/Yi.Framework.AiHub.Application/Services/AiChatService.cs index 950ee61d..b5b8dd01 100644 --- a/Yi.Abp.Net8/module/ai-hub/Yi.Framework.AiHub.Application/Services/AiChatService.cs +++ b/Yi.Abp.Net8/module/ai-hub/Yi.Framework.AiHub.Application/Services/AiChatService.cs @@ -1,4 +1,5 @@ -using System.Text; +using System.Collections.Concurrent; +using System.Text; using Microsoft.AspNetCore.Authorization; using Microsoft.AspNetCore.Http; using Microsoft.AspNetCore.Mvc; @@ -100,24 +101,56 @@ public class AiChatService : ApplicationService } var gateWay = LazyServiceProvider.GetRequiredService(); - var completeChatResponse = gateWay.CompleteChatAsync(input.Model, history, cancellationToken); - await using var writer = new StreamWriter(response.Body, Encoding.UTF8, leaveOpen: true); - await foreach (var data in completeChatResponse) - { - var model = MapToMessage(input.Model, data); - var message = JsonConvert.SerializeObject(model, new JsonSerializerSettings - { - ContractResolver = new CamelCasePropertyNamesContractResolver() - }); - - await writer.WriteLineAsync($"data: {message}\n"); - await writer.FlushAsync(cancellationToken); // 确保立即推送数据 - } + var completeChatResponse = gateWay.CompleteChatAsync(input.Model, history, cancellationToken); + await using var writer = new StreamWriter(response.Body, Encoding.UTF8, leaveOpen: true); + + //缓存队列算法 + + // 创建一个队列来缓存消息 + var messageQueue = new ConcurrentQueue(); + // 设置输出速率(例如每50毫秒输出一次) + var outputInterval = TimeSpan.FromMilliseconds(80); + // 标记是否完成接收 + var isComplete = false; + // 启动一个后台任务来消费队列 + var outputTask = Task.Run(async () => + { + while (!(isComplete&&messageQueue.IsEmpty)) + { + if (messageQueue.TryDequeue(out var message)) + { + await writer.WriteLineAsync(message); + await writer.FlushAsync(cancellationToken); + } + + if (!isComplete) + { + // 如果没有完成,才等待,已完成,全部输出 + await Task.Delay(outputInterval, cancellationToken); + } + } + }, cancellationToken); + + + await foreach (var data in completeChatResponse) + { + var model = MapToMessage(input.Model, data); + var message = JsonConvert.SerializeObject(model, new JsonSerializerSettings + { + ContractResolver = new CamelCasePropertyNamesContractResolver() + }); + + // 将消息加入队列而不是直接写入 + messageQueue.Enqueue($"data: {message}\n"); + } + + // 标记完成并发送结束标记 + isComplete = true; //断开连接 - await writer.WriteLineAsync("data: done\n"); - await writer.FlushAsync(cancellationToken); // 确保立即推送数据 + messageQueue.Enqueue("data: done\n"); + await outputTask; if (CurrentUser.IsAuthenticated && input.SessionId.HasValue) { // 等待接入token