DbJobPersistence.cs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. // Admin.NET 项目的版权、商标、专利和其他相关权利均受相应法律法规的保护。使用本项目应遵守相关法律法规和许可证的要求。
  2. //
  3. // 本项目主要遵循 MIT 许可证和 Apache 许可证(版本 2.0)进行分发和使用。许可证位于源代码树根目录中的 LICENSE-MIT 和 LICENSE-APACHE 文件。
  4. //
  5. // 不得利用本项目从事危害国家安全、扰乱社会秩序、侵犯他人合法权益等法律法规禁止的活动!任何基于本项目二次开发而产生的一切法律纠纷和责任,我们不承担任何责任!
  6. namespace Admin.NET.Core.Service;
  7. /// <summary>
  8. /// 作业持久化(数据库)
  9. /// </summary>
  10. public class DbJobPersistence : IJobPersistence
  11. {
  12. private readonly IServiceScopeFactory _serviceScopeFactory;
  13. public DbJobPersistence(IServiceScopeFactory serviceScopeFactory)
  14. {
  15. _serviceScopeFactory = serviceScopeFactory;
  16. }
  17. /// <summary>
  18. /// 作业调度服务启动时
  19. /// </summary>
  20. /// <param name="stoppingToken"></param>
  21. /// <returns></returns>
  22. /// <exception cref="NotSupportedException"></exception>
  23. public async Task<IEnumerable<SchedulerBuilder>> PreloadAsync(CancellationToken stoppingToken)
  24. {
  25. using var scope = _serviceScopeFactory.CreateScope();
  26. var db = scope.ServiceProvider.GetRequiredService<ISqlSugarClient>().CopyNew();
  27. var dynamicJobCompiler = scope.ServiceProvider.GetRequiredService<DynamicJobCompiler>();
  28. // 获取所有编译时定义的作业
  29. var allJobs = App.EffectiveTypes.ScanToBuilders().ToList();
  30. var builtInJobIds = allJobs
  31. .Select(u => u.GetJobBuilder().JobId)
  32. .Where(u => !string.IsNullOrWhiteSpace(u))
  33. .ToHashSet();
  34. // 清理代码中已经不存在的内置作业
  35. var staleBuiltInJobIds = (await db.Queryable<SysJobDetail>()
  36. .Where(u => u.CreateType == JobCreateTypeEnum.BuiltIn)
  37. .Select(u => u.JobId)
  38. .ToListAsync(stoppingToken))
  39. .Where(u => !builtInJobIds.Contains(u))
  40. .ToList();
  41. if (staleBuiltInJobIds.Count > 0)
  42. {
  43. await db.Deleteable<SysJobTrigger>().In(u => u.JobId, staleBuiltInJobIds).ExecuteCommandAsync();
  44. await db.Deleteable<SysJobDetail>().In(u => u.JobId, staleBuiltInJobIds).ExecuteCommandAsync();
  45. }
  46. // 先把内置作业/触发器按代码定义同步到数据库
  47. foreach (var schedulerBuilder in allJobs)
  48. {
  49. var jobBuilder = schedulerBuilder.GetJobBuilder();
  50. if (string.IsNullOrWhiteSpace(jobBuilder.JobId)) continue;
  51. var codeJobDetail = jobBuilder.Adapt<SysJobDetail>();
  52. codeJobDetail.JobId = jobBuilder.JobId;
  53. codeJobDetail.CreateType = JobCreateTypeEnum.BuiltIn;
  54. var dbDetail = await db.Queryable<SysJobDetail>().FirstAsync(u => u.JobId == jobBuilder.JobId, stoppingToken);
  55. if (dbDetail == null)
  56. {
  57. await db.Insertable(codeJobDetail).ExecuteCommandAsync();
  58. }
  59. else if (dbDetail.CreateType == JobCreateTypeEnum.BuiltIn)
  60. {
  61. codeJobDetail.Id = dbDetail.Id;
  62. codeJobDetail.ScriptCode = dbDetail.ScriptCode;
  63. codeJobDetail.UpdatedTime = dbDetail.UpdatedTime;
  64. await db.Updateable(codeJobDetail)
  65. .WhereColumns(u => new { u.JobId })
  66. .IgnoreColumns(u => new { u.Id, u.ScriptCode })
  67. .ExecuteCommandAsync();
  68. }
  69. else
  70. {
  71. // 非内置作业交给数据库配置继续接管
  72. continue;
  73. }
  74. jobBuilder.LoadFrom(codeJobDetail);
  75. var dbTriggers = await db.Queryable<SysJobTrigger>().Where(u => u.JobId == jobBuilder.JobId).ToListAsync(stoppingToken);
  76. var codeTriggerIds = new HashSet<string>();
  77. foreach (var (_, triggerBuilder) in schedulerBuilder.GetEnumerable())
  78. {
  79. var codeTrigger = triggerBuilder.Adapt<SysJobTrigger>();
  80. codeTrigger.JobId = codeJobDetail.JobId;
  81. if (string.IsNullOrWhiteSpace(codeTrigger.TriggerId)) continue;
  82. if (string.IsNullOrWhiteSpace(codeTrigger.AssemblyName)) codeTrigger.AssemblyName = codeJobDetail.AssemblyName;
  83. codeTriggerIds.Add(codeTrigger.TriggerId);
  84. var dbTrigger = dbTriggers.FirstOrDefault(u => u.JobId == jobBuilder.JobId && u.TriggerId == codeTrigger.TriggerId);
  85. if (dbTrigger == null)
  86. {
  87. await db.Insertable(codeTrigger).ExecuteCommandAsync();
  88. }
  89. else
  90. {
  91. codeTrigger.Id = dbTrigger.Id;
  92. codeTrigger.Status = dbTrigger.Status;
  93. codeTrigger.LastRunTime = dbTrigger.LastRunTime;
  94. codeTrigger.NumberOfRuns = dbTrigger.NumberOfRuns;
  95. codeTrigger.NumberOfErrors = dbTrigger.NumberOfErrors;
  96. codeTrigger.UpdatedTime = dbTrigger.UpdatedTime;
  97. await db.Updateable(codeTrigger)
  98. .WhereColumns(u => new { u.TriggerId, u.JobId })
  99. .IgnoreColumns(u => new { u.Id })
  100. .ExecuteCommandAsync();
  101. }
  102. triggerBuilder.LoadFrom(codeTrigger).Updated();
  103. }
  104. var staleTriggerIds = dbTriggers
  105. .Where(u => !codeTriggerIds.Contains(u.TriggerId))
  106. .Select(u => u.TriggerId)
  107. .ToList();
  108. if (staleTriggerIds.Count > 0)
  109. {
  110. await db.Deleteable<SysJobTrigger>()
  111. .Where(u => u.JobId == jobBuilder.JobId)
  112. .In(u => u.TriggerId, staleTriggerIds)
  113. .ExecuteCommandAsync();
  114. }
  115. schedulerBuilder.Updated();
  116. }
  117. // 加载数据库中的非内置作业,并把数据库值同步到 builder
  118. foreach (var schedulerBuilder in allJobs)
  119. {
  120. // 获取作业信息构建器
  121. var jobBuilder = schedulerBuilder.GetJobBuilder();
  122. // 加载数据库数据
  123. var dbDetail = await db.Queryable<SysJobDetail>().FirstAsync(u => u.JobId == jobBuilder.JobId, stoppingToken);
  124. if (dbDetail == null || dbDetail.CreateType == JobCreateTypeEnum.BuiltIn) continue;
  125. // 同步数据库数据
  126. jobBuilder.LoadFrom(dbDetail);
  127. // 获取作业的所有数据库的触发器
  128. var dbTriggers = await db.Queryable<SysJobTrigger>().Where(u => u.JobId == jobBuilder.JobId).ToListAsync(stoppingToken);
  129. // 遍历所有作业触发器
  130. foreach (var (_, triggerBuilder) in schedulerBuilder.GetEnumerable())
  131. {
  132. // 加载数据库数据
  133. var dbTrigger = dbTriggers.FirstOrDefault(u => u.JobId == jobBuilder.JobId && u.TriggerId == triggerBuilder.TriggerId);
  134. if (dbTrigger == null) continue;
  135. triggerBuilder.LoadFrom(dbTrigger).Updated(); // 标记更新
  136. }
  137. // 遍历所有非编译时定义的触发器加入到作业中
  138. foreach (var dbTrigger in dbTriggers)
  139. {
  140. if (schedulerBuilder.GetTriggerBuilder(dbTrigger.TriggerId)?.JobId == jobBuilder.JobId) continue;
  141. var triggerBuilder = TriggerBuilder.Create(dbTrigger.TriggerId).LoadFrom(dbTrigger);
  142. schedulerBuilder.AddTriggerBuilder(triggerBuilder); // 先添加
  143. triggerBuilder.Updated(); // 再标记更新
  144. }
  145. schedulerBuilder.Updated();
  146. }
  147. // 获取数据库所有通过脚本创建的作业
  148. var allDbScriptJobs = await db.Queryable<SysJobDetail>().Where(u => u.CreateType != JobCreateTypeEnum.BuiltIn).ToListAsync(stoppingToken);
  149. foreach (var dbDetail in allDbScriptJobs)
  150. {
  151. // 动态创建作业
  152. Type jobType = dbDetail.CreateType switch
  153. {
  154. JobCreateTypeEnum.Script => dynamicJobCompiler.BuildJob(dbDetail.ScriptCode),
  155. JobCreateTypeEnum.Http => typeof(HttpJob),
  156. _ => throw new NotSupportedException(),
  157. };
  158. // 动态构建的 jobType 的程序集名称为随机名称,需重新设置
  159. dbDetail.AssemblyName = jobType.Assembly.FullName!.Split(',')[0];
  160. var jobBuilder = JobBuilder.Create(jobType).LoadFrom(dbDetail);
  161. // 强行设置为不扫描 IJob 实现类 [Trigger] 特性触发器,否则 SchedulerBuilder.Create 会再次扫描,导致重复添加同名触发器
  162. jobBuilder.SetIncludeAnnotations(false);
  163. // 获取作业的所有数据库的触发器加入到作业中
  164. var dbTriggers = await db.Queryable<SysJobTrigger>().Where(u => u.JobId == jobBuilder.JobId).ToListAsync();
  165. var triggerBuilders = dbTriggers.Select(u => TriggerBuilder.Create(u.TriggerId).LoadFrom(u).Updated());
  166. var schedulerBuilder = SchedulerBuilder.Create(jobBuilder, triggerBuilders.ToArray());
  167. // 标记更新
  168. schedulerBuilder.Updated();
  169. allJobs.Add(schedulerBuilder);
  170. }
  171. return allJobs;
  172. }
  173. /// <summary>
  174. /// 作业计划初始化通知
  175. /// </summary>
  176. /// <param name="builder"></param>
  177. /// <param name="stoppingToken"></param>
  178. /// <returns></returns>
  179. public Task<SchedulerBuilder> OnLoadingAsync(SchedulerBuilder builder, CancellationToken stoppingToken)
  180. {
  181. return Task.FromResult(builder);
  182. }
  183. /// <summary>
  184. /// 作业计划Scheduler的JobDetail变化时
  185. /// </summary>
  186. /// <param name="context"></param>
  187. /// <returns></returns>
  188. public async Task OnChangedAsync(PersistenceContext context)
  189. {
  190. using var scope = _serviceScopeFactory.CreateScope();
  191. var db = scope.ServiceProvider.GetRequiredService<ISqlSugarClient>().CopyNew();
  192. var jobDetail = context.JobDetail.Adapt<SysJobDetail>();
  193. switch (context.Behavior)
  194. {
  195. case PersistenceBehavior.Appended:
  196. await db.Insertable(jobDetail).ExecuteCommandAsync();
  197. break;
  198. case PersistenceBehavior.Updated:
  199. await db.Updateable(jobDetail).WhereColumns(u => new { u.JobId }).IgnoreColumns(u => new { u.Id, u.CreateType, u.ScriptCode }).ExecuteCommandAsync();
  200. break;
  201. case PersistenceBehavior.Removed:
  202. await db.Deleteable<SysJobDetail>().Where(u => u.JobId == jobDetail.JobId).ExecuteCommandAsync();
  203. break;
  204. }
  205. }
  206. /// <summary>
  207. /// 作业计划Scheduler的触发器Trigger变化时
  208. /// </summary>
  209. /// <param name="context"></param>
  210. /// <returns></returns>
  211. public async Task OnTriggerChangedAsync(PersistenceTriggerContext context)
  212. {
  213. using var scope = _serviceScopeFactory.CreateScope();
  214. var db = scope.ServiceProvider.GetRequiredService<ISqlSugarClient>().CopyNew();
  215. var jobTrigger = context.Trigger.Adapt<SysJobTrigger>();
  216. switch (context.Behavior)
  217. {
  218. case PersistenceBehavior.Appended:
  219. await db.Insertable(jobTrigger).ExecuteCommandAsync();
  220. break;
  221. case PersistenceBehavior.Updated:
  222. await db.Updateable(jobTrigger).WhereColumns(u => new { u.TriggerId, u.JobId }).IgnoreColumns(u => new { u.Id }).ExecuteCommandAsync();
  223. break;
  224. case PersistenceBehavior.Removed:
  225. await db.Deleteable<SysJobTrigger>().Where(u => u.TriggerId == jobTrigger.TriggerId && u.JobId == jobTrigger.JobId).ExecuteCommandAsync();
  226. break;
  227. }
  228. }
  229. /// <summary>
  230. /// 作业触发器运行记录
  231. /// </summary>
  232. /// <param name="context"></param>
  233. /// <returns></returns>
  234. public async Task OnExecutionRecordAsync(PersistenceExecutionRecordContext context)
  235. {
  236. using var scope = _serviceScopeFactory.CreateScope();
  237. var db = scope.ServiceProvider.GetRequiredService<ISqlSugarClient>().CopyNew();
  238. var jobTriggerRecord = context.Timeline.Adapt<SysJobTriggerRecord>();
  239. await db.Insertable(jobTriggerRecord).ExecuteCommandAsync();
  240. await scope.ServiceProvider.GetRequiredService<SysJobService>().ClearExpireJobTriggerRecord(jobTriggerRecord);
  241. }
  242. }