feat: 初步搭建任务调度管理实现

This commit is contained in:
陈淳
2024-01-12 17:18:39 +08:00
parent 5a65a2e49f
commit e71936063d
2 changed files with 192 additions and 176 deletions

View File

@@ -7,74 +7,59 @@
/// 作业 Id /// 作业 Id
/// </summary> /// </summary>
public string JobId { get; internal set; } public string JobId { get; set; }
/// <summary> /// <summary>
/// 作业组名称 /// 作业组名称
/// </summary> /// </summary>
public string GroupName { get; internal set; } public string GroupName { get; set; }
/// <summary> /// <summary>
/// 作业处理程序类型 /// 作业处理程序类型
/// </summary> /// </summary>
/// <remarks>存储的是类型的 FullName</remarks> /// <remarks>存储的是类型的 FullName</remarks>
public string JobType { get; internal set; } public string JobType { get; set; }
/// <summary> /// <summary>
/// 作业处理程序类型所在程序集 /// 作业处理程序类型所在程序集
/// </summary> /// </summary>
/// <remarks>存储的是程序集 Name</remarks> /// <remarks>存储的是程序集 Name</remarks>
public string AssemblyName { get; internal set; } public string AssemblyName { get; set; }
/// <summary> /// <summary>
/// 描述信息 /// 描述信息
/// </summary> /// </summary>
public string Description { get; internal set; } public string Description { get; set; }
/// <summary> /// <summary>
/// 是否采用并行执行 /// 是否采用并行执行
/// </summary> /// </summary>
/// <remarks>如果设置为 false那么使用串行执行</remarks> /// <remarks>如果设置为 false那么使用串行执行</remarks>
public bool Concurrent { get; internal set; } = true; public bool Concurrent { get; set; } = true;
/// <summary> /// <summary>
/// 是否扫描 IJob 实现类 [Trigger] 特性触发器 /// 是否扫描 IJob 实现类 [Trigger] 特性触发器
/// </summary> /// </summary>
public bool IncludeAnnotations { get; internal set; } = false; public bool IncludeAnnotations { get; set; } = false;
/// <summary> /// <summary>
/// 作业信息额外数据 /// 作业信息额外数据
/// </summary> /// </summary>
public string Properties { get; internal set; } = "{}"; public string Properties { get; set; } = "{}";
/// <summary> /// <summary>
/// 作业更新时间 /// 作业更新时间
/// </summary> /// </summary>
public DateTime? UpdatedTime { get; internal set; } public DateTime? UpdatedTime { get; set; }
/// <summary>
/// 标记其他作业正在执行
/// </summary>
/// <remarks>当 <see cref="Concurrent"/> 为 false 时有效,也就是串行执行</remarks>
internal bool Blocked { get; set; } = false;
/// <summary>
/// 作业处理程序运行时类型
/// </summary>
internal string RuntimeJobType { get; set; }
/// <summary>
/// 作业信息额外数据运行时实例
/// </summary>
internal string RuntimeProperties { get; set; }
public string TriggerArgs { get; set; } public string TriggerArgs { get; set; }

View File

@@ -1,171 +1,202 @@
using Volo.Abp.Application.Services; using System.Reflection;
using Mapster;
using Microsoft.AspNetCore.Mvc;
using Quartz;
using Quartz.Impl.Matchers;
using Volo.Abp.Application.Dtos;
using Volo.Abp.Application.Services;
using Yi.Framework.Rbac.Application.Contracts.Dtos.Task;
using Yi.Framework.Rbac.Application.Contracts.IServices; using Yi.Framework.Rbac.Application.Contracts.IServices;
using Yi.Framework.Rbac.Domain.Shared.Enums;
namespace Yi.Framework.Rbac.Application.Services namespace Yi.Framework.Rbac.Application.Services
{ {
public class TaskService : ApplicationService, ITaskService public class TaskService : ApplicationService, ITaskService
{ {
//private readonly ISchedulerFactory _schedulerFactory; private readonly ISchedulerFactory _schedulerFactory;
//public TaskService(ISchedulerFactory schedulerFactory) public TaskService(ISchedulerFactory schedulerFactory)
//{ {
// _schedulerFactory = schedulerFactory; _schedulerFactory = schedulerFactory;
//} }
///// <summary>
///// 单查job
///// </summary>
///// <param name="jobId"></param>
///// <returns></returns>
//[HttpGet("{jobId}")]
//public TaskGetOutput GetById([FromRoute] string jobId)
//{
// var result = _schedulerFactory.TryGetJob(jobId, out var scheduler);
// var data = scheduler.GetModel();
// var output = data.JobDetail.Adapt<TaskGetOutput>();
// output.TriggerArgs = data.Triggers[0].Args;
// output.NextRunTime = data.Triggers[0].NextRunTime;
// output.LastRunTime = data.Triggers[0].LastRunTime;
// output.NumberOfRuns = data.Triggers[0].NumberOfRuns;
// return output;
//}
///// <summary>
///// 多查job
///// </summary>
///// <returns></returns>
//[HttpGet("")]
//public PagedResultDto<TaskGetListOutput> GetList([FromQuery] TaskGetListInput input)
//{
// var data = _schedulerFactory.GetJobsOfModels().Skip((input.PageNum - 1) * input.PageSize).Take(input.PageSize).OrderByDescending(x => x.JobDetail.UpdatedTime)
// .ToList();
// var output = data.Select(x =>
// {
// var res = new TaskGetListOutput();
// res = x.JobDetail.Adapt<TaskGetListOutput>();
// res.TriggerArgs = x.Triggers[0].Args;
// res.Status = x.Triggers[0].Status.ToString();
// return res;
// }).ToList();
// return new PagedResultDto<TaskGetListOutput>(data.Count(), output);
//}
///// <summary>
///// 创建job
///// </summary>
///// <param name="input"></param>
///// <returns></returns>
//public ScheduleResult Create(TaskCreateInput input)
//{
// //jobBuilder /// <summary>
// var jobBuilder = JobBuilder.Create(input.AssemblyName, input.JobType).SetJobId(input.JobId).SetGroupName(input.GroupName) /// 单查job
// .SetConcurrent(input.Concurrent).SetDescription(input.Description); /// </summary>
/// <param name="jobId"></param>
/// <returns></returns>
[HttpGet("{jobId}")]
public async Task<TaskGetOutput> GetByIdAsync([FromRoute] string jobId)
{
var scheduler = await _schedulerFactory.GetScheduler();
// //triggerBuilder var jobDetail = await scheduler.GetJobDetail(new JobKey(jobId));
// //毫秒 var trigger = (await scheduler.GetTriggersOfJob(new JobKey(jobId))).First();
// TriggerBuilder triggerBuilder = null; //状态
// switch (input.Type) var state = await scheduler.GetTriggerState(trigger.Key);
// { var output = new TaskGetOutput
// case Core.Rbac.Enums.JobTypeEnum.Cron: {
// triggerBuilder = Triggers.Cron(input.Cron, CronStringFormat.WithSeconds); JobId = jobDetail.Key.Name,
// break; GroupName = jobDetail.Key.Group,
// case Core.Rbac.Enums.JobTypeEnum.Millisecond: JobType = jobDetail.JobType.Name,
// triggerBuilder = Triggers.Period(input.Millisecond); Properties = Newtonsoft.Json.JsonConvert.SerializeObject(jobDetail.JobDataMap),
// break; Concurrent = !jobDetail.ConcurrentExecutionDisallowed,
// } Description = jobDetail.Description,
LastRunTime = trigger.GetPreviousFireTimeUtc()?.DateTime,
};
// //作业计划,单个jobBuilder与多个triggerBuilder组合 if (trigger is ISimpleTrigger simple)
// var schedulerBuilder = SchedulerBuilder.Create(jobBuilder, triggerBuilder); {
output.TriggerArgs = simple.RepeatInterval.TotalMilliseconds.ToString() + "毫秒";
}
else if (trigger is ICronTrigger cron)
{
output.TriggerArgs = cron.CronExpressionString!;
}
return output;
}
/// <summary>
/// 多查job
/// </summary>
/// <returns></returns>
[HttpGet("")]
public async Task<PagedResultDto<TaskGetListOutput>> GetList([FromQuery] TaskGetListInput input)
{
var items = new List<TaskGetOutput>();
var scheduler = await _schedulerFactory.GetScheduler();
var groups = await scheduler.GetJobGroupNames();
foreach (var groupName in groups)
{
foreach (var jobKey in await scheduler.GetJobKeys(GroupMatcher<JobKey>.GroupEquals(groupName)))
{
string jobName = jobKey.Name;
string jobGroup = jobKey.Group;
var triggers = (await scheduler.GetTriggersOfJob(jobKey)).First();
items.Add(await GetByIdAsync(jobName));
}
}
// //调度中心工厂使用作业计划管理job,返回调度中心单个 var output = items.Skip((input.SkipCount - 1) * input.MaxResultCount).Take(input.MaxResultCount)
// var result = _schedulerFactory.TryAddJob(schedulerBuilder, out var scheduler); .OrderByDescending(x => x.LastRunTime)
.ToList();
return new PagedResultDto<TaskGetListOutput>(items.Count(), output.Adapt<List<TaskGetListOutput>>());
}
// return result; /// <summary>
//} /// 创建job
/// </summary>
/// <param name="input"></param>
/// <returns></returns>
public async Task Create(TaskCreateInput input)
{
var scheduler = await _schedulerFactory.GetScheduler();
///// <summary> //设置启动时执行一次,然后最大只执行一次
///// 移除job
///// </summary>
///// <param name="jobId"></param>
///// <returns></returns>
//public ScheduleResult Remove(string jobId)
//{
// var res = _schedulerFactory.TryRemoveJob(jobId, out var scheduler);
// return res;
//}
///// <summary>
///// 暂停job
///// </summary>
///// <param name="jobId"></param>
///// <returns></returns>
//[HttpPut]
//public ScheduleResult Pause(string jobId)
//{
// var res = _schedulerFactory.TryGetJob(jobId, out var scheduler);
// scheduler.Pause();
// return res;
//}
///// <summary>
///// 开始job
///// </summary>
///// <param name="jobId"></param>
///// <returns></returns>
//[HttpPut]
//public ScheduleResult Start(string jobId)
//{
// var res = _schedulerFactory.TryGetJob(jobId, out var scheduler);
// scheduler.Start();
// return res;
//}
///// <summary>
///// 更新job
///// </summary>
///// <param name="jobId"></param>
///// <param name="input"></param>
///// <returns></returns>
//public ScheduleResult Update(string jobId, TaskUpdateInput input)
//{
// //jobBuilder
// var jobBuilder = JobBuilder.Create(input.AssemblyName, input.JobType).SetJobId(jobId).SetGroupName(input.GroupName)
// .SetConcurrent(input.Concurrent).SetDescription(input.Description);
// //triggerBuilder
// //毫秒
// TriggerBuilder triggerBuilder = null;
// switch (input.Type)
// {
// case Core.Rbac.Enums.JobTypeEnum.Cron:
// triggerBuilder = Triggers.Cron(input.Cron, CronStringFormat.WithSeconds);
// break;
// case Core.Rbac.Enums.JobTypeEnum.Millisecond:
// triggerBuilder = Triggers.Period(input.Millisecond);
// break;
// }
// //作业计划,单个jobBuilder与多个triggerBuilder组合
// var schedulerBuilder = SchedulerBuilder.Create(jobBuilder, triggerBuilder);
// var result = _schedulerFactory.TryUpdateJob(schedulerBuilder, out var scheduler); //jobBuilder
// return result; var jobClassType = Assembly.LoadFrom(input.AssemblyName).GetType(input.JobType);
//}
//[HttpPost] var jobBuilder = JobBuilder.Create(jobClassType).WithIdentity(new JobKey(input.JobId, input.GroupName))
//public bool RunOnce(string jobId) .WithDescription(input.Description);
//{ if (!input.Concurrent)
// var result = _schedulerFactory.TryGetJob(jobId, out var scheduler); {
jobBuilder.DisallowConcurrentExecution();
}
// var triggerBuilder = Triggers.Period(100).SetRunOnStart(true).SetMaxNumberOfRuns(1); //triggerBuilder
// scheduler.AddTrigger(triggerBuilder); TriggerBuilder triggerBuilder = null;
// //设置启动时执行一次,然后最大只执行一次 switch (input.Type)
// return true; {
//} case JobTypeEnum.Cron:
triggerBuilder =
TriggerBuilder.Create().StartNow()
.WithCronSchedule(input.Cron);
break;
case JobTypeEnum.Millisecond:
triggerBuilder =
TriggerBuilder.Create().StartNow()
.WithSimpleSchedule(x => x
.WithInterval(TimeSpan.FromMilliseconds(input.Millisecond))
.RepeatForever()
);
break;
}
//作业计划,单个jobBuilder与多个triggerBuilder组合
await scheduler.ScheduleJob(jobBuilder.Build(), triggerBuilder.Build());
}
/// <summary>
/// 移除job
/// </summary>
/// <param name="jobId"></param>
/// <returns></returns>
public async Task Remove(string jobId)
{
var scheduler = await _schedulerFactory.GetScheduler();
await scheduler.ResumeJob(new JobKey(jobId));
}
/// <summary>
/// 暂停job
/// </summary>
/// <param name="jobId"></param>
/// <returns></returns>
[HttpPut]
public async Task Pause(string jobId)
{
var scheduler = await _schedulerFactory.GetScheduler();
await scheduler.PauseJob(new JobKey(jobId));
}
/// <summary>
/// 开始job
/// </summary>
/// <param name="jobId"></param>
/// <returns></returns>
[HttpPut]
public async Task Start(string jobId)
{
var scheduler = await _schedulerFactory.GetScheduler();
await scheduler.ResumeJob(new JobKey(jobId));
}
/// <summary>
/// 更新job
/// </summary>
/// <param name="jobId"></param>
/// <param name="input"></param>
/// <returns></returns>
public async Task Update(string jobId, TaskUpdateInput input)
{
await Remove(jobId);
await Create(input.Adapt<TaskCreateInput>());
}
[HttpPost]
public async Task RunOnce(string jobId)
{
var scheduler = await _schedulerFactory.GetScheduler();
var jobDetail = await scheduler.GetJobDetail(new JobKey(jobId));
//设置启动时执行一次,然后最大只执行一次
var trigger = TriggerBuilder.Create().WithIdentity(Guid.NewGuid().ToString()).StartNow()
.WithSimpleSchedule(x => x
.WithRepeatCount(1))
.Build();
await scheduler.ScheduleJob(jobDetail, trigger);
}
} }
} }