DbJobPersistence.cs 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. // 大名科技(天津)有限公司 版权所有
  2. //
  3. // 此源代码遵循位于源代码树根目录中的 LICENSE 文件的许可证
  4. //
  5. // 不得利用本项目从事危害国家安全、扰乱社会秩序、侵犯他人合法权益等法律法规禁止的活动
  6. //
  7. // 任何基于本项目二次开发而产生的一切法律纠纷和责任,均与作者无关
  8. namespace Admin.NET.Core.Service;
  9. /// <summary>
  10. /// 作业持久化(数据库)
  11. /// </summary>
  12. public class DbJobPersistence : IJobPersistence
  13. {
  14. private readonly IServiceScopeFactory _serviceScopeFactory;
  15. public DbJobPersistence(IServiceScopeFactory serviceScopeFactory)
  16. {
  17. _serviceScopeFactory = serviceScopeFactory;
  18. }
  19. /// <summary>
  20. /// 作业调度服务启动时
  21. /// </summary>
  22. /// <param name="stoppingToken"></param>
  23. /// <returns></returns>
  24. /// <exception cref="NotSupportedException"></exception>
  25. public async Task<IEnumerable<SchedulerBuilder>> PreloadAsync(CancellationToken stoppingToken)
  26. {
  27. using var scope = _serviceScopeFactory.CreateScope();
  28. var jobDetailRep = scope.ServiceProvider.GetRequiredService<SqlSugarRepository<SysJobDetail>>();
  29. var jobTriggerRep = scope.ServiceProvider.GetRequiredService<SqlSugarRepository<SysJobTrigger>>();
  30. var dynamicJobCompiler = scope.ServiceProvider.GetRequiredService<DynamicJobCompiler>();
  31. // 获取所有定义的作业
  32. var allJobs = App.EffectiveTypes.ScanToBuilders().ToList();
  33. // 若数据库不存在任何作业,则直接返回
  34. if (!jobDetailRep.IsAny(u => true)) return allJobs;
  35. // 遍历所有定义的作业
  36. foreach (var schedulerBuilder in allJobs)
  37. {
  38. // 获取作业信息构建器
  39. var jobBuilder = schedulerBuilder.GetJobBuilder();
  40. // 加载数据库数据
  41. var dbDetail = await jobDetailRep.GetFirstAsync(u => u.JobId == jobBuilder.JobId);
  42. if (dbDetail == null) continue;
  43. // 同步数据库数据
  44. jobBuilder.LoadFrom(dbDetail);
  45. // 获取作业的所有数据库的触发器
  46. var dbTriggers = await jobTriggerRep.GetListAsync(u => u.JobId == jobBuilder.JobId);
  47. // 遍历所有作业触发器
  48. foreach (var (_, triggerBuilder) in schedulerBuilder.GetEnumerable())
  49. {
  50. // 加载数据库数据
  51. var dbTrigger = dbTriggers.FirstOrDefault(u => u.JobId == jobBuilder.JobId && u.TriggerId == triggerBuilder.TriggerId);
  52. if (dbTrigger == null) continue;
  53. triggerBuilder.LoadFrom(dbTrigger).Updated(); // 标记更新
  54. }
  55. // 遍历所有非编译时定义的触发器加入到作业中
  56. foreach (var dbTrigger in dbTriggers)
  57. {
  58. if (schedulerBuilder.GetTriggerBuilder(dbTrigger.TriggerId)?.JobId == jobBuilder.JobId) continue;
  59. var triggerBuilder = TriggerBuilder.Create(dbTrigger.TriggerId).LoadFrom(dbTrigger);
  60. schedulerBuilder.AddTriggerBuilder(triggerBuilder); // 先添加
  61. triggerBuilder.Updated(); // 再标记更新
  62. }
  63. // 标记更新
  64. schedulerBuilder.Updated();
  65. }
  66. // 获取数据库所有通过脚本创建的作业
  67. var allDbScriptJobs = await jobDetailRep.GetListAsync(u => u.CreateType != JobCreateTypeEnum.BuiltIn);
  68. foreach (var dbDetail in allDbScriptJobs)
  69. {
  70. // 动态创建作业
  71. Type jobType;
  72. switch (dbDetail.CreateType)
  73. {
  74. case JobCreateTypeEnum.Script:
  75. jobType = dynamicJobCompiler.BuildJob(dbDetail.ScriptCode);
  76. break;
  77. case JobCreateTypeEnum.Http:
  78. jobType = typeof(HttpJob);
  79. break;
  80. default:
  81. throw new NotSupportedException();
  82. }
  83. // 动态构建的 jobType 的程序集名称为随机名称,需重新设置
  84. dbDetail.AssemblyName = jobType.Assembly.FullName!.Split(',')[0];
  85. var jobBuilder = JobBuilder.Create(jobType).LoadFrom(dbDetail);
  86. // 强行设置为不扫描 IJob 实现类 [Trigger] 特性触发器,否则 SchedulerBuilder.Create 会再次扫描,导致重复添加同名触发器
  87. jobBuilder.SetIncludeAnnotations(false);
  88. // 获取作业的所有数据库的触发器加入到作业中
  89. var dbTriggers = await jobTriggerRep.GetListAsync(u => u.JobId == jobBuilder.JobId);
  90. var triggerBuilders = dbTriggers.Select(u => TriggerBuilder.Create(u.TriggerId).LoadFrom(u).Updated());
  91. var schedulerBuilder = SchedulerBuilder.Create(jobBuilder, triggerBuilders.ToArray());
  92. // 标记更新
  93. schedulerBuilder.Updated();
  94. allJobs.Add(schedulerBuilder);
  95. }
  96. return allJobs;
  97. }
  98. /// <summary>
  99. /// 作业计划初始化通知
  100. /// </summary>
  101. /// <param name="builder"></param>
  102. /// <param name="stoppingToken"></param>
  103. /// <returns></returns>
  104. public Task<SchedulerBuilder> OnLoadingAsync(SchedulerBuilder builder, CancellationToken stoppingToken)
  105. {
  106. return Task.FromResult(builder);
  107. }
  108. /// <summary>
  109. /// 作业计划Scheduler的JobDetail变化时
  110. /// </summary>
  111. /// <param name="context"></param>
  112. public async Task OnChangedAsync(PersistenceContext context)
  113. {
  114. using (var scope = _serviceScopeFactory.CreateScope())
  115. {
  116. var db = scope.ServiceProvider.GetRequiredService<ISqlSugarClient>();
  117. var jobDetail = context.JobDetail.Adapt<SysJobDetail>();
  118. switch (context.Behavior)
  119. {
  120. case PersistenceBehavior.Appended:
  121. await db.Insertable(jobDetail).ExecuteCommandAsync();
  122. break;
  123. case PersistenceBehavior.Updated:
  124. await db.Updateable(jobDetail).WhereColumns(u => new { u.JobId }).IgnoreColumns(u => new { u.Id, u.CreateType, u.ScriptCode }).ExecuteCommandAsync();
  125. break;
  126. case PersistenceBehavior.Removed:
  127. await db.Deleteable<SysJobDetail>().Where(u => u.JobId == jobDetail.JobId).ExecuteCommandAsync();
  128. break;
  129. }
  130. }
  131. }
  132. /// <summary>
  133. /// 作业计划Scheduler的触发器Trigger变化时
  134. /// </summary>
  135. /// <param name="context"></param>
  136. public async Task OnTriggerChangedAsync(PersistenceTriggerContext context)
  137. {
  138. using (var scope = _serviceScopeFactory.CreateScope())
  139. {
  140. var db = scope.ServiceProvider.GetRequiredService<ISqlSugarClient>();
  141. var jobTrigger = context.Trigger.Adapt<SysJobTrigger>();
  142. switch (context.Behavior)
  143. {
  144. case PersistenceBehavior.Appended:
  145. await db.Insertable(jobTrigger).ExecuteCommandAsync();
  146. break;
  147. case PersistenceBehavior.Updated:
  148. await db.Updateable(jobTrigger).WhereColumns(u => new { u.TriggerId, u.JobId }).IgnoreColumns(u => new { u.Id }).ExecuteCommandAsync();
  149. break;
  150. case PersistenceBehavior.Removed:
  151. await db.Deleteable<SysJobTrigger>().Where(u => u.TriggerId == jobTrigger.TriggerId && u.JobId == jobTrigger.JobId).ExecuteCommandAsync();
  152. break;
  153. }
  154. }
  155. }
  156. /// <summary>
  157. /// 作业触发器运行记录
  158. /// </summary>
  159. /// <param name="timeline"></param>
  160. public async Task OnExecutionRecordAsync(TriggerTimeline timeline)
  161. {
  162. using (var scope = _serviceScopeFactory.CreateScope())
  163. {
  164. var db = scope.ServiceProvider.GetRequiredService<ISqlSugarClient>();
  165. var jobTriggerRecord = timeline.Adapt<SysJobTriggerRecord>();
  166. await db.Insertable(jobTriggerRecord).ExecuteCommandAsync();
  167. }
  168. }
  169. }