JobClusterServer.cs 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. //using Nest;
  2. using Furion.Schedule;
  3. namespace Admin.NET.Core.Service;
  4. /// <summary>
  5. /// 作业集群控制
  6. /// </summary>
  7. public class JobClusterServer : IJobClusterServer
  8. {
  9. private readonly Random ro=new Random(DateTime.Now.Millisecond);
  10. public JobClusterServer()
  11. {
  12. }
  13. //public JobClusterServer(SqlSugarRepository<SysJobCluster> sysJobClusterRep)
  14. //{
  15. // _sysJobClusterRep = sysJobClusterRep;
  16. //}
  17. /// <summary>
  18. /// 当前作业调度器启动通知
  19. /// </summary>
  20. /// <param name="context">作业集群服务上下文</param>
  21. public async void Start(JobClusterContext context)
  22. {
  23. var _sysJobClusterRep = App.GetService<SqlSugarRepository<SysJobCluster>>();
  24. // 在作业集群表中,如果 clusterId 不存在,则新增一条(否则更新一条),并设置 status 为 ClusterStatus.Waiting
  25. var clusters = await _sysJobClusterRep.AsQueryable().Where(u => u.ClusterId == context.ClusterId).ToListAsync();
  26. if (clusters.Any())
  27. {
  28. await _sysJobClusterRep.AsUpdateable().SetColumns(u => u.Status == ClusterStatus.Waiting).Where(u => u.ClusterId == context.ClusterId).ExecuteCommandAsync();
  29. }
  30. else
  31. {
  32. await _sysJobClusterRep.AsInsertable(new SysJobCluster { ClusterId = context.ClusterId, Status = ClusterStatus.Waiting }).ExecuteCommandAsync();
  33. }
  34. //if (await _sysJobClusterRep.IsAnyAsync(u => u.ClusterId == context.ClusterId))
  35. //{
  36. // await _sysJobClusterRep.UpdateSetColumnsTrueAsync(u => new SysJobCluster { Status = ClusterStatus.Waiting }, u => u.ClusterId == context.ClusterId);
  37. //}
  38. //else
  39. //{
  40. // await _sysJobClusterRep.InsertAsync(new SysJobCluster { ClusterId = context.ClusterId, Status = ClusterStatus.Waiting });
  41. //}
  42. }
  43. /// <summary>
  44. /// 等待被唤醒
  45. /// </summary>
  46. /// <param name="context">作业集群服务上下文</param>
  47. /// <returns><see cref="Task"/></returns>
  48. public async Task WaitingForAsync(JobClusterContext context)
  49. {
  50. var clusterId = context.ClusterId;
  51. while (true)
  52. {
  53. try
  54. {
  55. ICache _cache = App.GetService<ICache>();
  56. //使用分布式锁
  57. using (_cache.AcquireLock("lock:JobClusterServer:WaitingForAsync", 1000))
  58. {
  59. var _sysJobClusterRep = App.GetService<SqlSugarRepository<SysJobCluster>>();
  60. // 在这里查询数据库,根据以下两种情况处理
  61. // 1) 如果作业集群表已有 status 为 ClusterStatus.Working 则继续循环
  62. // 2) 如果作业集群表中还没有其他服务或只有自己,则插入一条集群服务或调用 await WorkNowAsync(clusterId); 之后 return;
  63. // 3) 如果作业集群表中没有 status 为 ClusterStatus.Working 的,调用 await WorkNowAsync(clusterId); 之后 return;
  64. if (await _sysJobClusterRep.IsAnyAsync(u => u.Status == ClusterStatus.Working))
  65. continue;
  66. WorkNowAsync(clusterId);
  67. return;
  68. }
  69. }
  70. catch { }
  71. // 控制集群心跳频率
  72. await Task.Delay(3000+ ro.Next(500,1000));//错开集群同时启动
  73. }
  74. }
  75. /// <summary>
  76. /// 当前作业调度器停止通知
  77. /// </summary>
  78. /// <param name="context">作业集群服务上下文</param>
  79. public async void Stop(JobClusterContext context)
  80. {
  81. var _sysJobClusterRep = App.GetService<SqlSugarRepository<SysJobCluster>>();
  82. // 在作业集群表中,更新 clusterId 的 status 为 ClusterStatus.Crashed
  83. await _sysJobClusterRep.UpdateSetColumnsTrueAsync(u => new SysJobCluster { Status = ClusterStatus.Crashed }, u => u.ClusterId == context.ClusterId);
  84. }
  85. /// <summary>
  86. /// 当前作业调度器宕机
  87. /// </summary>
  88. /// <param name="context">作业集群服务上下文</param>
  89. public async void Crash(JobClusterContext context)
  90. {
  91. var _sysJobClusterRep = App.GetService<SqlSugarRepository<SysJobCluster>>();
  92. // 在作业集群表中,更新 clusterId 的 status 为 ClusterStatus.Crashed
  93. await _sysJobClusterRep.UpdateSetColumnsTrueAsync(u => new SysJobCluster { Status = ClusterStatus.Crashed }, u => u.ClusterId == context.ClusterId);
  94. }
  95. /// <summary>
  96. /// 指示集群可以工作
  97. /// </summary>
  98. /// <param name="clusterId">集群 Id</param>
  99. /// <returns></returns>
  100. private async void WorkNowAsync(string clusterId)
  101. {
  102. var _sysJobClusterRep = App.GetService<SqlSugarRepository<SysJobCluster>>();
  103. // 在作业集群表中,更新 clusterId 的 status 为 ClusterStatus.Working
  104. await _sysJobClusterRep.UpdateSetColumnsTrueAsync(u => new SysJobCluster { Status = ClusterStatus.Working }, u => u.ClusterId == clusterId);
  105. }
  106. }