|
@@ -42,14 +42,14 @@ public class DbJobPersistence : IJobPersistence
|
|
|
var jobBuilder = schedulerBuilder.GetJobBuilder();
|
|
var jobBuilder = schedulerBuilder.GetJobBuilder();
|
|
|
|
|
|
|
|
// 加载数据库数据
|
|
// 加载数据库数据
|
|
|
- var dbDetail = await db.Queryable<SysJobDetail>().FirstAsync(u => u.JobId == jobBuilder.JobId);
|
|
|
|
|
|
|
+ var dbDetail = await db.Queryable<SysJobDetail>().FirstAsync(u => u.JobId == jobBuilder.JobId, stoppingToken);
|
|
|
if (dbDetail == null) continue;
|
|
if (dbDetail == null) continue;
|
|
|
|
|
|
|
|
// 同步数据库数据
|
|
// 同步数据库数据
|
|
|
jobBuilder.LoadFrom(dbDetail);
|
|
jobBuilder.LoadFrom(dbDetail);
|
|
|
|
|
|
|
|
// 获取作业的所有数据库的触发器
|
|
// 获取作业的所有数据库的触发器
|
|
|
- var dbTriggers = await db.Queryable<SysJobTrigger>().Where(u => u.JobId == jobBuilder.JobId).ToListAsync();
|
|
|
|
|
|
|
+ var dbTriggers = await db.Queryable<SysJobTrigger>().Where(u => u.JobId == jobBuilder.JobId).ToListAsync(stoppingToken);
|
|
|
// 遍历所有作业触发器
|
|
// 遍历所有作业触发器
|
|
|
foreach (var (_, triggerBuilder) in schedulerBuilder.GetEnumerable())
|
|
foreach (var (_, triggerBuilder) in schedulerBuilder.GetEnumerable())
|
|
|
{
|
|
{
|
|
@@ -73,24 +73,16 @@ public class DbJobPersistence : IJobPersistence
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// 获取数据库所有通过脚本创建的作业
|
|
// 获取数据库所有通过脚本创建的作业
|
|
|
- var allDbScriptJobs = await db.Queryable<SysJobDetail>().Where(u => u.CreateType != JobCreateTypeEnum.BuiltIn).ToListAsync();
|
|
|
|
|
|
|
+ var allDbScriptJobs = await db.Queryable<SysJobDetail>().Where(u => u.CreateType != JobCreateTypeEnum.BuiltIn).ToListAsync(stoppingToken);
|
|
|
foreach (var dbDetail in allDbScriptJobs)
|
|
foreach (var dbDetail in allDbScriptJobs)
|
|
|
{
|
|
{
|
|
|
// 动态创建作业
|
|
// 动态创建作业
|
|
|
- Type jobType;
|
|
|
|
|
- switch (dbDetail.CreateType)
|
|
|
|
|
|
|
+ Type jobType = dbDetail.CreateType switch
|
|
|
{
|
|
{
|
|
|
- case JobCreateTypeEnum.Script:
|
|
|
|
|
- jobType = dynamicJobCompiler.BuildJob(dbDetail.ScriptCode);
|
|
|
|
|
- break;
|
|
|
|
|
-
|
|
|
|
|
- case JobCreateTypeEnum.Http:
|
|
|
|
|
- jobType = typeof(HttpJob);
|
|
|
|
|
- break;
|
|
|
|
|
-
|
|
|
|
|
- default:
|
|
|
|
|
- throw new NotSupportedException();
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ JobCreateTypeEnum.Script => dynamicJobCompiler.BuildJob(dbDetail.ScriptCode),
|
|
|
|
|
+ JobCreateTypeEnum.Http => typeof(HttpJob),
|
|
|
|
|
+ _ => throw new NotSupportedException(),
|
|
|
|
|
+ };
|
|
|
|
|
|
|
|
// 动态构建的 jobType 的程序集名称为随机名称,需重新设置
|
|
// 动态构建的 jobType 的程序集名称为随机名称,需重新设置
|
|
|
dbDetail.AssemblyName = jobType.Assembly.FullName!.Split(',')[0];
|
|
dbDetail.AssemblyName = jobType.Assembly.FullName!.Split(',')[0];
|
|
@@ -131,25 +123,23 @@ public class DbJobPersistence : IJobPersistence
|
|
|
/// <returns></returns>
|
|
/// <returns></returns>
|
|
|
public async Task OnChangedAsync(PersistenceContext context)
|
|
public async Task OnChangedAsync(PersistenceContext context)
|
|
|
{
|
|
{
|
|
|
- using (var scope = _serviceScopeFactory.CreateScope())
|
|
|
|
|
- {
|
|
|
|
|
- var db = scope.ServiceProvider.GetRequiredService<ISqlSugarClient>().CopyNew();
|
|
|
|
|
|
|
+ using var scope = _serviceScopeFactory.CreateScope();
|
|
|
|
|
+ var db = scope.ServiceProvider.GetRequiredService<ISqlSugarClient>().CopyNew();
|
|
|
|
|
|
|
|
- var jobDetail = context.JobDetail.Adapt<SysJobDetail>();
|
|
|
|
|
- switch (context.Behavior)
|
|
|
|
|
- {
|
|
|
|
|
- case PersistenceBehavior.Appended:
|
|
|
|
|
- await db.Insertable(jobDetail).ExecuteCommandAsync();
|
|
|
|
|
- break;
|
|
|
|
|
|
|
+ var jobDetail = context.JobDetail.Adapt<SysJobDetail>();
|
|
|
|
|
+ switch (context.Behavior)
|
|
|
|
|
+ {
|
|
|
|
|
+ case PersistenceBehavior.Appended:
|
|
|
|
|
+ await db.Insertable(jobDetail).ExecuteCommandAsync();
|
|
|
|
|
+ break;
|
|
|
|
|
|
|
|
- case PersistenceBehavior.Updated:
|
|
|
|
|
- await db.Updateable(jobDetail).WhereColumns(u => new { u.JobId }).IgnoreColumns(u => new { u.Id, u.CreateType, u.ScriptCode }).ExecuteCommandAsync();
|
|
|
|
|
- break;
|
|
|
|
|
|
|
+ case PersistenceBehavior.Updated:
|
|
|
|
|
+ await db.Updateable(jobDetail).WhereColumns(u => new { u.JobId }).IgnoreColumns(u => new { u.Id, u.CreateType, u.ScriptCode }).ExecuteCommandAsync();
|
|
|
|
|
+ break;
|
|
|
|
|
|
|
|
- case PersistenceBehavior.Removed:
|
|
|
|
|
- await db.Deleteable<SysJobDetail>().Where(u => u.JobId == jobDetail.JobId).ExecuteCommandAsync();
|
|
|
|
|
- break;
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ case PersistenceBehavior.Removed:
|
|
|
|
|
+ await db.Deleteable<SysJobDetail>().Where(u => u.JobId == jobDetail.JobId).ExecuteCommandAsync();
|
|
|
|
|
+ break;
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -160,25 +150,23 @@ public class DbJobPersistence : IJobPersistence
|
|
|
/// <returns></returns>
|
|
/// <returns></returns>
|
|
|
public async Task OnTriggerChangedAsync(PersistenceTriggerContext context)
|
|
public async Task OnTriggerChangedAsync(PersistenceTriggerContext context)
|
|
|
{
|
|
{
|
|
|
- using (var scope = _serviceScopeFactory.CreateScope())
|
|
|
|
|
- {
|
|
|
|
|
- var db = scope.ServiceProvider.GetRequiredService<ISqlSugarClient>().CopyNew();
|
|
|
|
|
|
|
+ using var scope = _serviceScopeFactory.CreateScope();
|
|
|
|
|
+ var db = scope.ServiceProvider.GetRequiredService<ISqlSugarClient>().CopyNew();
|
|
|
|
|
|
|
|
- var jobTrigger = context.Trigger.Adapt<SysJobTrigger>();
|
|
|
|
|
- switch (context.Behavior)
|
|
|
|
|
- {
|
|
|
|
|
- case PersistenceBehavior.Appended:
|
|
|
|
|
- await db.Insertable(jobTrigger).ExecuteCommandAsync();
|
|
|
|
|
- break;
|
|
|
|
|
|
|
+ var jobTrigger = context.Trigger.Adapt<SysJobTrigger>();
|
|
|
|
|
+ switch (context.Behavior)
|
|
|
|
|
+ {
|
|
|
|
|
+ case PersistenceBehavior.Appended:
|
|
|
|
|
+ await db.Insertable(jobTrigger).ExecuteCommandAsync();
|
|
|
|
|
+ break;
|
|
|
|
|
|
|
|
- case PersistenceBehavior.Updated:
|
|
|
|
|
- await db.Updateable(jobTrigger).WhereColumns(u => new { u.TriggerId, u.JobId }).IgnoreColumns(u => new { u.Id }).ExecuteCommandAsync();
|
|
|
|
|
- break;
|
|
|
|
|
|
|
+ case PersistenceBehavior.Updated:
|
|
|
|
|
+ await db.Updateable(jobTrigger).WhereColumns(u => new { u.TriggerId, u.JobId }).IgnoreColumns(u => new { u.Id }).ExecuteCommandAsync();
|
|
|
|
|
+ break;
|
|
|
|
|
|
|
|
- case PersistenceBehavior.Removed:
|
|
|
|
|
- await db.Deleteable<SysJobTrigger>().Where(u => u.TriggerId == jobTrigger.TriggerId && u.JobId == jobTrigger.JobId).ExecuteCommandAsync();
|
|
|
|
|
- break;
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ case PersistenceBehavior.Removed:
|
|
|
|
|
+ await db.Deleteable<SysJobTrigger>().Where(u => u.TriggerId == jobTrigger.TriggerId && u.JobId == jobTrigger.JobId).ExecuteCommandAsync();
|
|
|
|
|
+ break;
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -189,12 +177,10 @@ public class DbJobPersistence : IJobPersistence
|
|
|
/// <returns></returns>
|
|
/// <returns></returns>
|
|
|
public async Task OnExecutionRecordAsync(PersistenceExecutionRecordContext context)
|
|
public async Task OnExecutionRecordAsync(PersistenceExecutionRecordContext context)
|
|
|
{
|
|
{
|
|
|
- using (var scope = _serviceScopeFactory.CreateScope())
|
|
|
|
|
- {
|
|
|
|
|
- var db = scope.ServiceProvider.GetRequiredService<ISqlSugarClient>().CopyNew();
|
|
|
|
|
|
|
+ using var scope = _serviceScopeFactory.CreateScope();
|
|
|
|
|
+ var db = scope.ServiceProvider.GetRequiredService<ISqlSugarClient>().CopyNew();
|
|
|
|
|
|
|
|
- var jobTriggerRecord = context.Timeline.Adapt<SysJobTriggerRecord>();
|
|
|
|
|
- await db.Insertable(jobTriggerRecord).ExecuteCommandAsync();
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ var jobTriggerRecord = context.Timeline.Adapt<SysJobTriggerRecord>();
|
|
|
|
|
+ await db.Insertable(jobTriggerRecord).ExecuteCommandAsync();
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|