JobClusterServer.cs 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. // 麻省理工学院许可证
  2. //
  3. // 版权所有 (c) 2021-2023 zuohuaijun,大名科技(天津)有限公司 联系电话/微信:18020030720 QQ:515096995
  4. //
  5. // 特此免费授予获得本软件的任何人以处理本软件的权利,但须遵守以下条件:在所有副本或重要部分的软件中必须包括上述版权声明和本许可声明。
  6. //
  7. // 软件按“原样”提供,不提供任何形式的明示或暗示的保证,包括但不限于对适销性、适用性和非侵权的保证。
  8. // 在任何情况下,作者或版权持有人均不对任何索赔、损害或其他责任负责,无论是因合同、侵权或其他方式引起的,与软件或其使用或其他交易有关。
  9. namespace Admin.NET.Core.Service;
  10. /// <summary>
  11. /// 作业集群控制
  12. /// </summary>
  13. public class JobClusterServer : IJobClusterServer
  14. {
  15. private readonly Random rd = new(DateTime.Now.Millisecond);
  16. public JobClusterServer()
  17. {
  18. }
  19. /// <summary>
  20. /// 当前作业调度器启动通知
  21. /// </summary>
  22. /// <param name="context">作业集群服务上下文</param>
  23. public async void Start(JobClusterContext context)
  24. {
  25. var _sysJobClusterRep = App.GetService<SqlSugarRepository<SysJobCluster>>();
  26. // 在作业集群表中,如果 clusterId 不存在,则新增一条(否则更新一条),并设置 status 为 ClusterStatus.Waiting
  27. if (await _sysJobClusterRep.IsAnyAsync(u => u.ClusterId == context.ClusterId))
  28. {
  29. await _sysJobClusterRep.AsUpdateable().SetColumns(u => u.Status == ClusterStatus.Waiting).Where(u => u.ClusterId == context.ClusterId).ExecuteCommandAsync();
  30. }
  31. else
  32. {
  33. await _sysJobClusterRep.AsInsertable(new SysJobCluster { ClusterId = context.ClusterId, Status = ClusterStatus.Waiting }).ExecuteCommandAsync();
  34. }
  35. }
  36. /// <summary>
  37. /// 等待被唤醒
  38. /// </summary>
  39. /// <param name="context">作业集群服务上下文</param>
  40. /// <returns><see cref="Task"/></returns>
  41. public async Task WaitingForAsync(JobClusterContext context)
  42. {
  43. var clusterId = context.ClusterId;
  44. while (true)
  45. {
  46. // 控制集群心跳频率(放在头部为了防止 IsAnyAsync continue 没sleep占用大量IO和CPU)
  47. await Task.Delay(3000 + rd.Next(500, 1000)); // 错开集群同时启动
  48. try
  49. {
  50. ICache _cache = App.GetService<ICache>();
  51. //使用分布式锁
  52. using (_cache.AcquireLock("lock:JobClusterServer:WaitingForAsync", 1000))
  53. {
  54. var _sysJobClusterRep = App.GetService<SqlSugarRepository<SysJobCluster>>();
  55. // 在这里查询数据库,根据以下两种情况处理
  56. // 1) 如果作业集群表已有 status 为 ClusterStatus.Working 则继续循环
  57. // 2) 如果作业集群表中还没有其他服务或只有自己,则插入一条集群服务或调用 await WorkNowAsync(clusterId); 之后 return;
  58. // 3) 如果作业集群表中没有 status 为 ClusterStatus.Working 的,调用 await WorkNowAsync(clusterId); 之后 return;
  59. if (await _sysJobClusterRep.IsAnyAsync(u => u.Status == ClusterStatus.Working))
  60. continue;
  61. await WorkNowAsync(clusterId);
  62. return;
  63. }
  64. }
  65. catch { }
  66. }
  67. }
  68. /// <summary>
  69. /// 当前作业调度器停止通知
  70. /// </summary>
  71. /// <param name="context">作业集群服务上下文</param>
  72. public async void Stop(JobClusterContext context)
  73. {
  74. var _sysJobClusterRep = App.GetService<SqlSugarRepository<SysJobCluster>>();
  75. // 在作业集群表中,更新 clusterId 的 status 为 ClusterStatus.Crashed
  76. await _sysJobClusterRep.UpdateSetColumnsTrueAsync(u => new SysJobCluster { Status = ClusterStatus.Crashed }, u => u.ClusterId == context.ClusterId);
  77. }
  78. /// <summary>
  79. /// 当前作业调度器宕机
  80. /// </summary>
  81. /// <param name="context">作业集群服务上下文</param>
  82. public async void Crash(JobClusterContext context)
  83. {
  84. var _sysJobClusterRep = App.GetService<SqlSugarRepository<SysJobCluster>>();
  85. // 在作业集群表中,更新 clusterId 的 status 为 ClusterStatus.Crashed
  86. await _sysJobClusterRep.UpdateSetColumnsTrueAsync(u => new SysJobCluster { Status = ClusterStatus.Crashed }, u => u.ClusterId == context.ClusterId);
  87. }
  88. /// <summary>
  89. /// 指示集群可以工作
  90. /// </summary>
  91. /// <param name="clusterId">集群 Id</param>
  92. /// <returns></returns>
  93. private async Task WorkNowAsync(string clusterId)
  94. {
  95. var _sysJobClusterRep = App.GetService<SqlSugarRepository<SysJobCluster>>();
  96. // 在作业集群表中,更新 clusterId 的 status 为 ClusterStatus.Working
  97. await _sysJobClusterRep.UpdateSetColumnsTrueAsync(u => new SysJobCluster { Status = ClusterStatus.Working }, u => u.ClusterId == clusterId);
  98. }
  99. }