DbJobPersistence.cs 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. // Admin.NET 项目的版权、商标、专利和其他相关权利均受相应法律法规的保护。使用本项目应遵守相关法律法规和许可证的要求。
  2. //
  3. // 本项目主要遵循 MIT 许可证和 Apache 许可证(版本 2.0)进行分发和使用。许可证位于源代码树根目录中的 LICENSE-MIT 和 LICENSE-APACHE 文件。
  4. //
  5. // 不得利用本项目从事危害国家安全、扰乱社会秩序、侵犯他人合法权益等法律法规禁止的活动!任何基于本项目二次开发而产生的一切法律纠纷和责任,我们不承担任何责任!
  6. namespace Admin.NET.Core.Service;
  7. /// <summary>
  8. /// 作业持久化(数据库)
  9. /// </summary>
  10. public class DbJobPersistence : IJobPersistence
  11. {
  12. private readonly IServiceScopeFactory _serviceScopeFactory;
  13. public DbJobPersistence(IServiceScopeFactory serviceScopeFactory)
  14. {
  15. _serviceScopeFactory = serviceScopeFactory;
  16. }
  17. /// <summary>
  18. /// 作业调度服务启动时
  19. /// </summary>
  20. /// <param name="stoppingToken"></param>
  21. /// <returns></returns>
  22. /// <exception cref="NotSupportedException"></exception>
  23. public async Task<IEnumerable<SchedulerBuilder>> PreloadAsync(CancellationToken stoppingToken)
  24. {
  25. using var scope = _serviceScopeFactory.CreateScope();
  26. var db = scope.ServiceProvider.GetRequiredService<ISqlSugarClient>().CopyNew();
  27. var dynamicJobCompiler = scope.ServiceProvider.GetRequiredService<DynamicJobCompiler>();
  28. // 获取所有定义的作业
  29. var allJobs = App.EffectiveTypes.ScanToBuilders().ToList();
  30. // 若数据库不存在任何作业,则直接返回
  31. if (!await db.Queryable<SysJobDetail>().AnyAsync(u => true, stoppingToken)) return allJobs;
  32. // 遍历所有定义的作业
  33. foreach (var schedulerBuilder in allJobs)
  34. {
  35. // 获取作业信息构建器
  36. var jobBuilder = schedulerBuilder.GetJobBuilder();
  37. // 加载数据库数据
  38. var dbDetail = await db.Queryable<SysJobDetail>().FirstAsync(u => u.JobId == jobBuilder.JobId, stoppingToken);
  39. if (dbDetail == null) continue;
  40. // 同步数据库数据
  41. jobBuilder.LoadFrom(dbDetail);
  42. // 获取作业的所有数据库的触发器
  43. var dbTriggers = await db.Queryable<SysJobTrigger>().Where(u => u.JobId == jobBuilder.JobId).ToListAsync(stoppingToken);
  44. // 遍历所有作业触发器
  45. foreach (var (_, triggerBuilder) in schedulerBuilder.GetEnumerable())
  46. {
  47. // 加载数据库数据
  48. var dbTrigger = dbTriggers.FirstOrDefault(u => u.JobId == jobBuilder.JobId && u.TriggerId == triggerBuilder.TriggerId);
  49. if (dbTrigger == null) continue;
  50. triggerBuilder.LoadFrom(dbTrigger).Updated(); // 标记更新
  51. }
  52. // 遍历所有非编译时定义的触发器加入到作业中
  53. foreach (var dbTrigger in dbTriggers)
  54. {
  55. if (schedulerBuilder.GetTriggerBuilder(dbTrigger.TriggerId)?.JobId == jobBuilder.JobId) continue;
  56. var triggerBuilder = TriggerBuilder.Create(dbTrigger.TriggerId).LoadFrom(dbTrigger);
  57. schedulerBuilder.AddTriggerBuilder(triggerBuilder); // 先添加
  58. triggerBuilder.Updated(); // 再标记更新
  59. }
  60. // 标记更新
  61. schedulerBuilder.Updated();
  62. }
  63. // 获取数据库所有通过脚本创建的作业
  64. var allDbScriptJobs = await db.Queryable<SysJobDetail>().Where(u => u.CreateType != JobCreateTypeEnum.BuiltIn).ToListAsync(stoppingToken);
  65. foreach (var dbDetail in allDbScriptJobs)
  66. {
  67. // 动态创建作业
  68. Type jobType = dbDetail.CreateType switch
  69. {
  70. JobCreateTypeEnum.Script => dynamicJobCompiler.BuildJob(dbDetail.ScriptCode),
  71. JobCreateTypeEnum.Http => typeof(HttpJob),
  72. _ => throw new NotSupportedException(),
  73. };
  74. // 动态构建的 jobType 的程序集名称为随机名称,需重新设置
  75. dbDetail.AssemblyName = jobType.Assembly.FullName!.Split(',')[0];
  76. var jobBuilder = JobBuilder.Create(jobType).LoadFrom(dbDetail);
  77. // 强行设置为不扫描 IJob 实现类 [Trigger] 特性触发器,否则 SchedulerBuilder.Create 会再次扫描,导致重复添加同名触发器
  78. jobBuilder.SetIncludeAnnotations(false);
  79. // 获取作业的所有数据库的触发器加入到作业中
  80. var dbTriggers = await db.Queryable<SysJobTrigger>().Where(u => u.JobId == jobBuilder.JobId).ToListAsync();
  81. var triggerBuilders = dbTriggers.Select(u => TriggerBuilder.Create(u.TriggerId).LoadFrom(u).Updated());
  82. var schedulerBuilder = SchedulerBuilder.Create(jobBuilder, triggerBuilders.ToArray());
  83. // 标记更新
  84. schedulerBuilder.Updated();
  85. allJobs.Add(schedulerBuilder);
  86. }
  87. return allJobs;
  88. }
  89. /// <summary>
  90. /// 作业计划初始化通知
  91. /// </summary>
  92. /// <param name="builder"></param>
  93. /// <param name="stoppingToken"></param>
  94. /// <returns></returns>
  95. public Task<SchedulerBuilder> OnLoadingAsync(SchedulerBuilder builder, CancellationToken stoppingToken)
  96. {
  97. return Task.FromResult(builder);
  98. }
  99. /// <summary>
  100. /// 作业计划Scheduler的JobDetail变化时
  101. /// </summary>
  102. /// <param name="context"></param>
  103. /// <returns></returns>
  104. public async Task OnChangedAsync(PersistenceContext context)
  105. {
  106. using var scope = _serviceScopeFactory.CreateScope();
  107. var db = scope.ServiceProvider.GetRequiredService<ISqlSugarClient>().CopyNew();
  108. var jobDetail = context.JobDetail.Adapt<SysJobDetail>();
  109. switch (context.Behavior)
  110. {
  111. case PersistenceBehavior.Appended:
  112. await db.Insertable(jobDetail).ExecuteCommandAsync();
  113. break;
  114. case PersistenceBehavior.Updated:
  115. await db.Updateable(jobDetail).WhereColumns(u => new { u.JobId }).IgnoreColumns(u => new { u.Id, u.CreateType, u.ScriptCode }).ExecuteCommandAsync();
  116. break;
  117. case PersistenceBehavior.Removed:
  118. await db.Deleteable<SysJobDetail>().Where(u => u.JobId == jobDetail.JobId).ExecuteCommandAsync();
  119. break;
  120. }
  121. }
  122. /// <summary>
  123. /// 作业计划Scheduler的触发器Trigger变化时
  124. /// </summary>
  125. /// <param name="context"></param>
  126. /// <returns></returns>
  127. public async Task OnTriggerChangedAsync(PersistenceTriggerContext context)
  128. {
  129. using var scope = _serviceScopeFactory.CreateScope();
  130. var db = scope.ServiceProvider.GetRequiredService<ISqlSugarClient>().CopyNew();
  131. var jobTrigger = context.Trigger.Adapt<SysJobTrigger>();
  132. switch (context.Behavior)
  133. {
  134. case PersistenceBehavior.Appended:
  135. await db.Insertable(jobTrigger).ExecuteCommandAsync();
  136. break;
  137. case PersistenceBehavior.Updated:
  138. await db.Updateable(jobTrigger).WhereColumns(u => new { u.TriggerId, u.JobId }).IgnoreColumns(u => new { u.Id }).ExecuteCommandAsync();
  139. break;
  140. case PersistenceBehavior.Removed:
  141. await db.Deleteable<SysJobTrigger>().Where(u => u.TriggerId == jobTrigger.TriggerId && u.JobId == jobTrigger.JobId).ExecuteCommandAsync();
  142. break;
  143. }
  144. }
  145. /// <summary>
  146. /// 作业触发器运行记录
  147. /// </summary>
  148. /// <param name="context"></param>
  149. /// <returns></returns>
  150. public async Task OnExecutionRecordAsync(PersistenceExecutionRecordContext context)
  151. {
  152. using var scope = _serviceScopeFactory.CreateScope();
  153. var db = scope.ServiceProvider.GetRequiredService<ISqlSugarClient>().CopyNew();
  154. var jobTriggerRecord = context.Timeline.Adapt<SysJobTriggerRecord>();
  155. await db.Insertable(jobTriggerRecord).ExecuteCommandAsync();
  156. }
  157. }