// Admin.NET 项目的版权、商标、专利和其他相关权利均受相应法律法规的保护。使用本项目应遵守相关法律法规和许可证的要求。
//
// 本项目主要遵循 MIT 许可证和 Apache 许可证(版本 2.0)进行分发和使用。许可证位于源代码树根目录中的 LICENSE-MIT 和 LICENSE-APACHE 文件。
//
// 不得利用本项目从事危害国家安全、扰乱社会秩序、侵犯他人合法权益等法律法规禁止的活动!任何基于本项目二次开发而产生的一切法律纠纷和责任,我们不承担任何责任!
namespace Admin.NET.Core.Service;
///
/// 作业持久化(数据库)
///
public class DbJobPersistence : IJobPersistence
{
private readonly IServiceScopeFactory _serviceScopeFactory;
public DbJobPersistence(IServiceScopeFactory serviceScopeFactory)
{
_serviceScopeFactory = serviceScopeFactory;
}
///
/// 作业调度服务启动时
///
///
///
///
public async Task> PreloadAsync(CancellationToken stoppingToken)
{
using var scope = _serviceScopeFactory.CreateScope();
var db = scope.ServiceProvider.GetRequiredService().CopyNew();
var dynamicJobCompiler = scope.ServiceProvider.GetRequiredService();
// 获取所有编译时定义的作业
var allJobs = App.EffectiveTypes.ScanToBuilders().ToList();
var builtInJobIds = allJobs
.Select(u => u.GetJobBuilder().JobId)
.Where(u => !string.IsNullOrWhiteSpace(u))
.ToHashSet();
// 清理代码中已经不存在的内置作业
var staleBuiltInJobIds = (await db.Queryable()
.Where(u => u.CreateType == JobCreateTypeEnum.BuiltIn)
.Select(u => u.JobId)
.ToListAsync(stoppingToken))
.Where(u => !builtInJobIds.Contains(u))
.ToList();
if (staleBuiltInJobIds.Count > 0)
{
await db.Deleteable().In(u => u.JobId, staleBuiltInJobIds).ExecuteCommandAsync();
await db.Deleteable().In(u => u.JobId, staleBuiltInJobIds).ExecuteCommandAsync();
}
// 先把内置作业/触发器按代码定义同步到数据库
foreach (var schedulerBuilder in allJobs)
{
var jobBuilder = schedulerBuilder.GetJobBuilder();
if (string.IsNullOrWhiteSpace(jobBuilder.JobId)) continue;
var codeJobDetail = jobBuilder.Adapt();
codeJobDetail.JobId = jobBuilder.JobId;
codeJobDetail.CreateType = JobCreateTypeEnum.BuiltIn;
var dbDetail = await db.Queryable().FirstAsync(u => u.JobId == jobBuilder.JobId, stoppingToken);
if (dbDetail == null)
{
await db.Insertable(codeJobDetail).ExecuteCommandAsync();
}
else if (dbDetail.CreateType == JobCreateTypeEnum.BuiltIn)
{
codeJobDetail.Id = dbDetail.Id;
codeJobDetail.ScriptCode = dbDetail.ScriptCode;
codeJobDetail.UpdatedTime = dbDetail.UpdatedTime;
await db.Updateable(codeJobDetail)
.WhereColumns(u => new { u.JobId })
.IgnoreColumns(u => new { u.Id, u.ScriptCode })
.ExecuteCommandAsync();
}
else
{
// 非内置作业交给数据库配置继续接管
continue;
}
jobBuilder.LoadFrom(codeJobDetail);
var dbTriggers = await db.Queryable().Where(u => u.JobId == jobBuilder.JobId).ToListAsync(stoppingToken);
var codeTriggerIds = new HashSet();
foreach (var (_, triggerBuilder) in schedulerBuilder.GetEnumerable())
{
var codeTrigger = triggerBuilder.Adapt();
codeTrigger.JobId = codeJobDetail.JobId;
if (string.IsNullOrWhiteSpace(codeTrigger.TriggerId)) continue;
if (string.IsNullOrWhiteSpace(codeTrigger.AssemblyName)) codeTrigger.AssemblyName = codeJobDetail.AssemblyName;
codeTriggerIds.Add(codeTrigger.TriggerId);
var dbTrigger = dbTriggers.FirstOrDefault(u => u.JobId == jobBuilder.JobId && u.TriggerId == codeTrigger.TriggerId);
if (dbTrigger == null)
{
await db.Insertable(codeTrigger).ExecuteCommandAsync();
}
else
{
codeTrigger.Id = dbTrigger.Id;
codeTrigger.Status = dbTrigger.Status;
codeTrigger.LastRunTime = dbTrigger.LastRunTime;
codeTrigger.NumberOfRuns = dbTrigger.NumberOfRuns;
codeTrigger.NumberOfErrors = dbTrigger.NumberOfErrors;
codeTrigger.UpdatedTime = dbTrigger.UpdatedTime;
await db.Updateable(codeTrigger)
.WhereColumns(u => new { u.TriggerId, u.JobId })
.IgnoreColumns(u => new { u.Id })
.ExecuteCommandAsync();
}
triggerBuilder.LoadFrom(codeTrigger).Updated();
}
var staleTriggerIds = dbTriggers
.Where(u => !codeTriggerIds.Contains(u.TriggerId))
.Select(u => u.TriggerId)
.ToList();
if (staleTriggerIds.Count > 0)
{
await db.Deleteable()
.Where(u => u.JobId == jobBuilder.JobId)
.In(u => u.TriggerId, staleTriggerIds)
.ExecuteCommandAsync();
}
schedulerBuilder.Updated();
}
// 加载数据库中的非内置作业,并把数据库值同步到 builder
foreach (var schedulerBuilder in allJobs)
{
// 获取作业信息构建器
var jobBuilder = schedulerBuilder.GetJobBuilder();
// 加载数据库数据
var dbDetail = await db.Queryable().FirstAsync(u => u.JobId == jobBuilder.JobId, stoppingToken);
if (dbDetail == null || dbDetail.CreateType == JobCreateTypeEnum.BuiltIn) continue;
// 同步数据库数据
jobBuilder.LoadFrom(dbDetail);
// 获取作业的所有数据库的触发器
var dbTriggers = await db.Queryable().Where(u => u.JobId == jobBuilder.JobId).ToListAsync(stoppingToken);
// 遍历所有作业触发器
foreach (var (_, triggerBuilder) in schedulerBuilder.GetEnumerable())
{
// 加载数据库数据
var dbTrigger = dbTriggers.FirstOrDefault(u => u.JobId == jobBuilder.JobId && u.TriggerId == triggerBuilder.TriggerId);
if (dbTrigger == null) continue;
triggerBuilder.LoadFrom(dbTrigger).Updated(); // 标记更新
}
// 遍历所有非编译时定义的触发器加入到作业中
foreach (var dbTrigger in dbTriggers)
{
if (schedulerBuilder.GetTriggerBuilder(dbTrigger.TriggerId)?.JobId == jobBuilder.JobId) continue;
var triggerBuilder = TriggerBuilder.Create(dbTrigger.TriggerId).LoadFrom(dbTrigger);
schedulerBuilder.AddTriggerBuilder(triggerBuilder); // 先添加
triggerBuilder.Updated(); // 再标记更新
}
schedulerBuilder.Updated();
}
// 获取数据库所有通过脚本创建的作业
var allDbScriptJobs = await db.Queryable().Where(u => u.CreateType != JobCreateTypeEnum.BuiltIn).ToListAsync(stoppingToken);
foreach (var dbDetail in allDbScriptJobs)
{
// 动态创建作业
Type jobType = dbDetail.CreateType switch
{
JobCreateTypeEnum.Script => dynamicJobCompiler.BuildJob(dbDetail.ScriptCode),
JobCreateTypeEnum.Http => typeof(HttpJob),
_ => throw new NotSupportedException(),
};
// 动态构建的 jobType 的程序集名称为随机名称,需重新设置
dbDetail.AssemblyName = jobType.Assembly.FullName!.Split(',')[0];
var jobBuilder = JobBuilder.Create(jobType).LoadFrom(dbDetail);
// 强行设置为不扫描 IJob 实现类 [Trigger] 特性触发器,否则 SchedulerBuilder.Create 会再次扫描,导致重复添加同名触发器
jobBuilder.SetIncludeAnnotations(false);
// 获取作业的所有数据库的触发器加入到作业中
var dbTriggers = await db.Queryable().Where(u => u.JobId == jobBuilder.JobId).ToListAsync();
var triggerBuilders = dbTriggers.Select(u => TriggerBuilder.Create(u.TriggerId).LoadFrom(u).Updated());
var schedulerBuilder = SchedulerBuilder.Create(jobBuilder, triggerBuilders.ToArray());
// 标记更新
schedulerBuilder.Updated();
allJobs.Add(schedulerBuilder);
}
return allJobs;
}
///
/// 作业计划初始化通知
///
///
///
///
public Task OnLoadingAsync(SchedulerBuilder builder, CancellationToken stoppingToken)
{
return Task.FromResult(builder);
}
///
/// 作业计划Scheduler的JobDetail变化时
///
///
///
public async Task OnChangedAsync(PersistenceContext context)
{
using var scope = _serviceScopeFactory.CreateScope();
var db = scope.ServiceProvider.GetRequiredService().CopyNew();
var jobDetail = context.JobDetail.Adapt();
switch (context.Behavior)
{
case PersistenceBehavior.Appended:
await db.Insertable(jobDetail).ExecuteCommandAsync();
break;
case PersistenceBehavior.Updated:
await db.Updateable(jobDetail).WhereColumns(u => new { u.JobId }).IgnoreColumns(u => new { u.Id, u.CreateType, u.ScriptCode }).ExecuteCommandAsync();
break;
case PersistenceBehavior.Removed:
await db.Deleteable().Where(u => u.JobId == jobDetail.JobId).ExecuteCommandAsync();
break;
}
}
///
/// 作业计划Scheduler的触发器Trigger变化时
///
///
///
public async Task OnTriggerChangedAsync(PersistenceTriggerContext context)
{
using var scope = _serviceScopeFactory.CreateScope();
var db = scope.ServiceProvider.GetRequiredService().CopyNew();
var jobTrigger = context.Trigger.Adapt();
switch (context.Behavior)
{
case PersistenceBehavior.Appended:
await db.Insertable(jobTrigger).ExecuteCommandAsync();
break;
case PersistenceBehavior.Updated:
await db.Updateable(jobTrigger).WhereColumns(u => new { u.TriggerId, u.JobId }).IgnoreColumns(u => new { u.Id }).ExecuteCommandAsync();
break;
case PersistenceBehavior.Removed:
await db.Deleteable().Where(u => u.TriggerId == jobTrigger.TriggerId && u.JobId == jobTrigger.JobId).ExecuteCommandAsync();
break;
}
}
///
/// 作业触发器运行记录
///
///
///
public async Task OnExecutionRecordAsync(PersistenceExecutionRecordContext context)
{
using var scope = _serviceScopeFactory.CreateScope();
var db = scope.ServiceProvider.GetRequiredService().CopyNew();
var jobTriggerRecord = context.Timeline.Adapt();
await db.Insertable(jobTriggerRecord).ExecuteCommandAsync();
await scope.ServiceProvider.GetRequiredService().ClearExpireJobTriggerRecord(jobTriggerRecord);
}
}