|
|
@@ -30,12 +30,110 @@ public class DbJobPersistence : IJobPersistence
|
|
|
var db = scope.ServiceProvider.GetRequiredService<ISqlSugarClient>().CopyNew();
|
|
|
var dynamicJobCompiler = scope.ServiceProvider.GetRequiredService<DynamicJobCompiler>();
|
|
|
|
|
|
- // 获取所有定义的作业
|
|
|
+ // 获取所有编译时定义的作业
|
|
|
var allJobs = App.EffectiveTypes.ScanToBuilders().ToList();
|
|
|
- // 若数据库不存在任何作业,则直接返回
|
|
|
- if (!await db.Queryable<SysJobDetail>().AnyAsync(u => true, stoppingToken)) return allJobs;
|
|
|
+ var builtInJobIds = allJobs
|
|
|
+ .Select(u => u.GetJobBuilder().JobId)
|
|
|
+ .Where(u => !string.IsNullOrWhiteSpace(u))
|
|
|
+ .ToHashSet();
|
|
|
|
|
|
- // 遍历所有定义的作业
|
|
|
+ // 清理代码中已经不存在的内置作业
|
|
|
+ var staleBuiltInJobIds = (await db.Queryable<SysJobDetail>()
|
|
|
+ .Where(u => u.CreateType == JobCreateTypeEnum.BuiltIn)
|
|
|
+ .Select(u => u.JobId)
|
|
|
+ .ToListAsync(stoppingToken))
|
|
|
+ .Where(u => !builtInJobIds.Contains(u))
|
|
|
+ .ToList();
|
|
|
+ if (staleBuiltInJobIds.Count > 0)
|
|
|
+ {
|
|
|
+ await db.Deleteable<SysJobTrigger>().In(u => u.JobId, staleBuiltInJobIds).ExecuteCommandAsync();
|
|
|
+ await db.Deleteable<SysJobDetail>().In(u => u.JobId, staleBuiltInJobIds).ExecuteCommandAsync();
|
|
|
+ }
|
|
|
+
|
|
|
+ // 先把内置作业/触发器按代码定义同步到数据库
|
|
|
+ foreach (var schedulerBuilder in allJobs)
|
|
|
+ {
|
|
|
+ var jobBuilder = schedulerBuilder.GetJobBuilder();
|
|
|
+ if (string.IsNullOrWhiteSpace(jobBuilder.JobId)) continue;
|
|
|
+
|
|
|
+ var codeJobDetail = jobBuilder.Adapt<SysJobDetail>();
|
|
|
+ codeJobDetail.JobId = jobBuilder.JobId;
|
|
|
+ codeJobDetail.CreateType = JobCreateTypeEnum.BuiltIn;
|
|
|
+
|
|
|
+ var dbDetail = await db.Queryable<SysJobDetail>().FirstAsync(u => u.JobId == jobBuilder.JobId, stoppingToken);
|
|
|
+ if (dbDetail == null)
|
|
|
+ {
|
|
|
+ await db.Insertable(codeJobDetail).ExecuteCommandAsync();
|
|
|
+ }
|
|
|
+ else if (dbDetail.CreateType == JobCreateTypeEnum.BuiltIn)
|
|
|
+ {
|
|
|
+ codeJobDetail.Id = dbDetail.Id;
|
|
|
+ codeJobDetail.ScriptCode = dbDetail.ScriptCode;
|
|
|
+ codeJobDetail.UpdatedTime = dbDetail.UpdatedTime;
|
|
|
+
|
|
|
+ await db.Updateable(codeJobDetail)
|
|
|
+ .WhereColumns(u => new { u.JobId })
|
|
|
+ .IgnoreColumns(u => new { u.Id, u.ScriptCode })
|
|
|
+ .ExecuteCommandAsync();
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ // 非内置作业交给数据库配置继续接管
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ jobBuilder.LoadFrom(codeJobDetail);
|
|
|
+
|
|
|
+ var dbTriggers = await db.Queryable<SysJobTrigger>().Where(u => u.JobId == jobBuilder.JobId).ToListAsync(stoppingToken);
|
|
|
+ var codeTriggerIds = new HashSet<string>();
|
|
|
+
|
|
|
+ foreach (var (_, triggerBuilder) in schedulerBuilder.GetEnumerable())
|
|
|
+ {
|
|
|
+ var codeTrigger = triggerBuilder.Adapt<SysJobTrigger>();
|
|
|
+ codeTrigger.JobId = codeJobDetail.JobId;
|
|
|
+ if (string.IsNullOrWhiteSpace(codeTrigger.TriggerId)) continue;
|
|
|
+ if (string.IsNullOrWhiteSpace(codeTrigger.AssemblyName)) codeTrigger.AssemblyName = codeJobDetail.AssemblyName;
|
|
|
+ codeTriggerIds.Add(codeTrigger.TriggerId);
|
|
|
+
|
|
|
+ var dbTrigger = dbTriggers.FirstOrDefault(u => u.JobId == jobBuilder.JobId && u.TriggerId == codeTrigger.TriggerId);
|
|
|
+ if (dbTrigger == null)
|
|
|
+ {
|
|
|
+ await db.Insertable(codeTrigger).ExecuteCommandAsync();
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ codeTrigger.Id = dbTrigger.Id;
|
|
|
+ codeTrigger.Status = dbTrigger.Status;
|
|
|
+ codeTrigger.LastRunTime = dbTrigger.LastRunTime;
|
|
|
+ codeTrigger.NumberOfRuns = dbTrigger.NumberOfRuns;
|
|
|
+ codeTrigger.NumberOfErrors = dbTrigger.NumberOfErrors;
|
|
|
+ codeTrigger.UpdatedTime = dbTrigger.UpdatedTime;
|
|
|
+
|
|
|
+ await db.Updateable(codeTrigger)
|
|
|
+ .WhereColumns(u => new { u.TriggerId, u.JobId })
|
|
|
+ .IgnoreColumns(u => new { u.Id })
|
|
|
+ .ExecuteCommandAsync();
|
|
|
+ }
|
|
|
+
|
|
|
+ triggerBuilder.LoadFrom(codeTrigger).Updated();
|
|
|
+ }
|
|
|
+
|
|
|
+ var staleTriggerIds = dbTriggers
|
|
|
+ .Where(u => !codeTriggerIds.Contains(u.TriggerId))
|
|
|
+ .Select(u => u.TriggerId)
|
|
|
+ .ToList();
|
|
|
+ if (staleTriggerIds.Count > 0)
|
|
|
+ {
|
|
|
+ await db.Deleteable<SysJobTrigger>()
|
|
|
+ .Where(u => u.JobId == jobBuilder.JobId)
|
|
|
+ .In(u => u.TriggerId, staleTriggerIds)
|
|
|
+ .ExecuteCommandAsync();
|
|
|
+ }
|
|
|
+
|
|
|
+ schedulerBuilder.Updated();
|
|
|
+ }
|
|
|
+
|
|
|
+ // 加载数据库中的非内置作业,并把数据库值同步到 builder
|
|
|
foreach (var schedulerBuilder in allJobs)
|
|
|
{
|
|
|
// 获取作业信息构建器
|
|
|
@@ -43,7 +141,7 @@ public class DbJobPersistence : IJobPersistence
|
|
|
|
|
|
// 加载数据库数据
|
|
|
var dbDetail = await db.Queryable<SysJobDetail>().FirstAsync(u => u.JobId == jobBuilder.JobId, stoppingToken);
|
|
|
- if (dbDetail == null) continue;
|
|
|
+ if (dbDetail == null || dbDetail.CreateType == JobCreateTypeEnum.BuiltIn) continue;
|
|
|
|
|
|
// 同步数据库数据
|
|
|
jobBuilder.LoadFrom(dbDetail);
|
|
|
@@ -68,7 +166,6 @@ public class DbJobPersistence : IJobPersistence
|
|
|
triggerBuilder.Updated(); // 再标记更新
|
|
|
}
|
|
|
|
|
|
- // 标记更新
|
|
|
schedulerBuilder.Updated();
|
|
|
}
|
|
|
|