feat: 提交队列

This commit is contained in:
ccnetcore
2025-06-25 00:30:01 +08:00
parent 64d04996af
commit c69729fadd
2 changed files with 59 additions and 19 deletions

View File

@@ -2,6 +2,7 @@
using Hangfire;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Volo.Abp.BackgroundJobs.Hangfire;
using Volo.Abp.BackgroundWorkers;
using Volo.Abp.BackgroundWorkers.Hangfire;
@@ -32,13 +33,19 @@ public sealed class YiFrameworkBackgroundWorkersHangfireModule : AbpModule
/// <param name="context">应用程序初始化上下文</param>
public override async Task OnApplicationInitializationAsync(ApplicationInitializationContext context)
{
if (!context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundWorkerOptions>>().Value.IsEnabled)
{
return;
}
// 获取后台任务管理器和所有 Hangfire 后台任务
var backgroundWorkerManager = context.ServiceProvider.GetRequiredService<IBackgroundWorkerManager>();
var workers = context.ServiceProvider.GetServices<IHangfireBackgroundWorker>();
// 获取配置
var configuration = context.ServiceProvider.GetRequiredService<IConfiguration>();
// 检查是否启用 Redis
var isRedisEnabled = configuration.GetValue<bool>("Redis:IsEnabled");
@@ -56,11 +63,11 @@ public sealed class YiFrameworkBackgroundWorkersHangfireModule : AbpModule
{
// 内存模式:直接使用 Hangfire
var unProxyWorker = ProxyHelper.UnProxy(worker);
// 添加或更新循环任务
RecurringJob.AddOrUpdate(
worker.RecurringJobId,
(Expression<Func<Task>>)(() =>
(Expression<Func<Task>>)(() =>
((IHangfireBackgroundWorker)unProxyWorker).DoWorkAsync(default)),
worker.CronExpression,
new RecurringJobOptions

View File

@@ -1,4 +1,5 @@
using System.Text;
using System.Collections.Concurrent;
using System.Text;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
@@ -100,24 +101,56 @@ public class AiChatService : ApplicationService
}
var gateWay = LazyServiceProvider.GetRequiredService<AiGateWayManager>();
var completeChatResponse = gateWay.CompleteChatAsync(input.Model, history, cancellationToken);
await using var writer = new StreamWriter(response.Body, Encoding.UTF8, leaveOpen: true);
await foreach (var data in completeChatResponse)
{
var model = MapToMessage(input.Model, data);
var message = JsonConvert.SerializeObject(model, new JsonSerializerSettings
{
ContractResolver = new CamelCasePropertyNamesContractResolver()
});
await writer.WriteLineAsync($"data: {message}\n");
await writer.FlushAsync(cancellationToken); // 确保立即推送数据
}
var completeChatResponse = gateWay.CompleteChatAsync(input.Model, history, cancellationToken);
await using var writer = new StreamWriter(response.Body, Encoding.UTF8, leaveOpen: true);
//缓存队列算法
// 创建一个队列来缓存消息
var messageQueue = new ConcurrentQueue<string>();
// 设置输出速率例如每50毫秒输出一次
var outputInterval = TimeSpan.FromMilliseconds(80);
// 标记是否完成接收
var isComplete = false;
// 启动一个后台任务来消费队列
var outputTask = Task.Run(async () =>
{
while (!(isComplete&&messageQueue.IsEmpty))
{
if (messageQueue.TryDequeue(out var message))
{
await writer.WriteLineAsync(message);
await writer.FlushAsync(cancellationToken);
}
if (!isComplete)
{
// 如果没有完成,才等待,已完成,全部输出
await Task.Delay(outputInterval, cancellationToken);
}
}
}, cancellationToken);
await foreach (var data in completeChatResponse)
{
var model = MapToMessage(input.Model, data);
var message = JsonConvert.SerializeObject(model, new JsonSerializerSettings
{
ContractResolver = new CamelCasePropertyNamesContractResolver()
});
// 将消息加入队列而不是直接写入
messageQueue.Enqueue($"data: {message}\n");
}
// 标记完成并发送结束标记
isComplete = true;
//断开连接
await writer.WriteLineAsync("data: done\n");
await writer.FlushAsync(cancellationToken); // 确保立即推送数据
messageQueue.Enqueue("data: done\n");
await outputTask;
if (CurrentUser.IsAuthenticated && input.SessionId.HasValue)
{
// 等待接入token