DbJobPersistence.cs 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. // 大名科技(天津)有限公司版权所有 电话:18020030720 QQ:515096995
  2. //
  3. // 此源代码遵循位于源代码树根目录中的 LICENSE 文件的许可证
  4. namespace Admin.NET.Core.Service;
  5. /// <summary>
  6. /// 作业持久化(数据库)
  7. /// </summary>
  8. public class DbJobPersistence : IJobPersistence
  9. {
  10. private readonly IServiceScopeFactory _serviceScopeFactory;
  11. public DbJobPersistence(IServiceScopeFactory serviceScopeFactory)
  12. {
  13. _serviceScopeFactory = serviceScopeFactory;
  14. }
  15. /// <summary>
  16. /// 作业调度服务启动时
  17. /// </summary>
  18. /// <param name="stoppingToken"></param>
  19. /// <returns></returns>
  20. /// <exception cref="NotSupportedException"></exception>
  21. public async Task<IEnumerable<SchedulerBuilder>> PreloadAsync(CancellationToken stoppingToken)
  22. {
  23. using var scope = _serviceScopeFactory.CreateScope();
  24. var jobDetailRep = scope.ServiceProvider.GetRequiredService<SqlSugarRepository<SysJobDetail>>();
  25. var jobTriggerRep = scope.ServiceProvider.GetRequiredService<SqlSugarRepository<SysJobTrigger>>();
  26. var dynamicJobCompiler = scope.ServiceProvider.GetRequiredService<DynamicJobCompiler>();
  27. // 获取所有定义的作业
  28. var allJobs = App.EffectiveTypes.ScanToBuilders().ToList();
  29. // 若数据库不存在任何作业,则直接返回
  30. if (!jobDetailRep.IsAny(u => true)) return allJobs;
  31. // 遍历所有定义的作业
  32. foreach (var schedulerBuilder in allJobs)
  33. {
  34. // 获取作业信息构建器
  35. var jobBuilder = schedulerBuilder.GetJobBuilder();
  36. // 加载数据库数据
  37. var dbDetail = await jobDetailRep.GetFirstAsync(u => u.JobId == jobBuilder.JobId);
  38. if (dbDetail == null) continue;
  39. // 同步数据库数据
  40. jobBuilder.LoadFrom(dbDetail);
  41. // 获取作业的所有数据库的触发器
  42. var dbTriggers = await jobTriggerRep.GetListAsync(u => u.JobId == jobBuilder.JobId);
  43. // 遍历所有作业触发器
  44. foreach (var (_, triggerBuilder) in schedulerBuilder.GetEnumerable())
  45. {
  46. // 加载数据库数据
  47. var dbTrigger = dbTriggers.FirstOrDefault(u => u.JobId == jobBuilder.JobId && u.TriggerId == triggerBuilder.TriggerId);
  48. if (dbTrigger == null) continue;
  49. triggerBuilder.LoadFrom(dbTrigger).Updated(); // 标记更新
  50. }
  51. // 遍历所有非编译时定义的触发器加入到作业中
  52. foreach (var dbTrigger in dbTriggers)
  53. {
  54. if (schedulerBuilder.GetTriggerBuilder(dbTrigger.TriggerId)?.JobId == jobBuilder.JobId) continue;
  55. var triggerBuilder = TriggerBuilder.Create(dbTrigger.TriggerId).LoadFrom(dbTrigger);
  56. schedulerBuilder.AddTriggerBuilder(triggerBuilder); // 先添加
  57. triggerBuilder.Updated(); // 再标记更新
  58. }
  59. // 标记更新
  60. schedulerBuilder.Updated();
  61. }
  62. // 获取数据库所有通过脚本创建的作业
  63. var allDbScriptJobs = await jobDetailRep.GetListAsync(u => u.CreateType != JobCreateTypeEnum.BuiltIn);
  64. foreach (var dbDetail in allDbScriptJobs)
  65. {
  66. // 动态创建作业
  67. Type jobType;
  68. switch (dbDetail.CreateType)
  69. {
  70. case JobCreateTypeEnum.Script:
  71. jobType = dynamicJobCompiler.BuildJob(dbDetail.ScriptCode);
  72. break;
  73. case JobCreateTypeEnum.Http:
  74. jobType = typeof(HttpJob);
  75. break;
  76. default:
  77. throw new NotSupportedException();
  78. }
  79. // 动态构建的 jobType 的程序集名称为随机名称,需重新设置
  80. dbDetail.AssemblyName = jobType.Assembly.FullName!.Split(',')[0];
  81. var jobBuilder = JobBuilder.Create(jobType).LoadFrom(dbDetail);
  82. // 强行设置为不扫描 IJob 实现类 [Trigger] 特性触发器,否则 SchedulerBuilder.Create 会再次扫描,导致重复添加同名触发器
  83. jobBuilder.SetIncludeAnnotations(false);
  84. // 获取作业的所有数据库的触发器加入到作业中
  85. var dbTriggers = await jobTriggerRep.GetListAsync(u => u.JobId == jobBuilder.JobId);
  86. var triggerBuilders = dbTriggers.Select(u => TriggerBuilder.Create(u.TriggerId).LoadFrom(u).Updated());
  87. var schedulerBuilder = SchedulerBuilder.Create(jobBuilder, triggerBuilders.ToArray());
  88. // 标记更新
  89. schedulerBuilder.Updated();
  90. allJobs.Add(schedulerBuilder);
  91. }
  92. return allJobs;
  93. }
  94. /// <summary>
  95. /// 作业计划初始化通知
  96. /// </summary>
  97. /// <param name="builder"></param>
  98. /// <param name="stoppingToken"></param>
  99. /// <returns></returns>
  100. public Task<SchedulerBuilder> OnLoadingAsync(SchedulerBuilder builder, CancellationToken stoppingToken)
  101. {
  102. return Task.FromResult(builder);
  103. }
  104. /// <summary>
  105. /// 作业计划Scheduler的JobDetail变化时
  106. /// </summary>
  107. /// <param name="context"></param>
  108. public async Task OnChangedAsync(PersistenceContext context)
  109. {
  110. using (var scope = _serviceScopeFactory.CreateScope())
  111. {
  112. var db = scope.ServiceProvider.GetRequiredService<ISqlSugarClient>();
  113. var jobDetail = context.JobDetail.Adapt<SysJobDetail>();
  114. switch (context.Behavior)
  115. {
  116. case PersistenceBehavior.Appended:
  117. await db.Insertable(jobDetail).ExecuteCommandAsync();
  118. break;
  119. case PersistenceBehavior.Updated:
  120. await db.Updateable(jobDetail).WhereColumns(u => new { u.JobId }).IgnoreColumns(u => new { u.Id, u.CreateType, u.ScriptCode }).ExecuteCommandAsync();
  121. break;
  122. case PersistenceBehavior.Removed:
  123. await db.Deleteable<SysJobDetail>().Where(u => u.JobId == jobDetail.JobId).ExecuteCommandAsync();
  124. break;
  125. }
  126. }
  127. }
  128. /// <summary>
  129. /// 作业计划Scheduler的触发器Trigger变化时
  130. /// </summary>
  131. /// <param name="context"></param>
  132. public async Task OnTriggerChangedAsync(PersistenceTriggerContext context)
  133. {
  134. using (var scope = _serviceScopeFactory.CreateScope())
  135. {
  136. var db = scope.ServiceProvider.GetRequiredService<ISqlSugarClient>();
  137. var jobTrigger = context.Trigger.Adapt<SysJobTrigger>();
  138. switch (context.Behavior)
  139. {
  140. case PersistenceBehavior.Appended:
  141. await db.Insertable(jobTrigger).ExecuteCommandAsync();
  142. break;
  143. case PersistenceBehavior.Updated:
  144. await db.Updateable(jobTrigger).WhereColumns(u => new { u.TriggerId, u.JobId }).IgnoreColumns(u => new { u.Id }).ExecuteCommandAsync();
  145. break;
  146. case PersistenceBehavior.Removed:
  147. await db.Deleteable<SysJobTrigger>().Where(u => u.TriggerId == jobTrigger.TriggerId && u.JobId == jobTrigger.JobId).ExecuteCommandAsync();
  148. break;
  149. }
  150. }
  151. }
  152. /// <summary>
  153. /// 作业触发器运行记录
  154. /// </summary>
  155. /// <param name="timeline"></param>
  156. public async Task OnExecutionRecordAsync(TriggerTimeline timeline)
  157. {
  158. using (var scope = _serviceScopeFactory.CreateScope())
  159. {
  160. var db = scope.ServiceProvider.GetRequiredService<ISqlSugarClient>();
  161. var jobTriggerRecord = timeline.Adapt<SysJobTriggerRecord>();
  162. await db.Insertable(jobTriggerRecord).ExecuteCommandAsync();
  163. }
  164. }
  165. }