JobClusterServer.cs 3.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. namespace Admin.NET.Core.Service;
  2. /// <summary>
  3. /// 作业集群控制
  4. /// </summary>
  5. public class JobClusterServer : IJobClusterServer
  6. {
  7. private readonly SqlSugarRepository<SysJobCluster> _sysJobClusterRep;
  8. public JobClusterServer(SqlSugarRepository<SysJobCluster> sysJobClusterRep)
  9. {
  10. _sysJobClusterRep = sysJobClusterRep;
  11. }
  12. /// <summary>
  13. /// 当前作业调度器启动通知
  14. /// </summary>
  15. /// <param name="context">作业集群服务上下文</param>
  16. public async void Start(JobClusterContext context)
  17. {
  18. // 在作业集群表中,如果 clusterId 不存在,则新增一条(否则更新一条),并设置 status 为 ClusterStatus.Waiting
  19. if (await _sysJobClusterRep.IsAnyAsync(u => u.ClusterId == context.ClusterId))
  20. {
  21. await _sysJobClusterRep.UpdateSetColumnsTrueAsync(u => new SysJobCluster { Status = ClusterStatus.Waiting }, u => u.ClusterId == context.ClusterId);
  22. }
  23. else
  24. {
  25. await _sysJobClusterRep.InsertAsync(new SysJobCluster { ClusterId = context.ClusterId, Status = ClusterStatus.Waiting });
  26. }
  27. }
  28. /// <summary>
  29. /// 等待被唤醒
  30. /// </summary>
  31. /// <param name="context">作业集群服务上下文</param>
  32. /// <returns><see cref="Task"/></returns>
  33. public async Task WaitingForAsync(JobClusterContext context)
  34. {
  35. var clusterId = context.ClusterId;
  36. while (true)
  37. {
  38. try
  39. {
  40. // 在这里查询数据库,根据以下两种情况处理
  41. // 1) 如果作业集群表已有 status 为 ClusterStatus.Working 则继续循环
  42. // 2) 如果作业集群表中还没有其他服务或只有自己,则插入一条集群服务或调用 await WorkNowAsync(clusterId); 之后 return;
  43. // 3) 如果作业集群表中没有 status 为 ClusterStatus.Working 的,调用 await WorkNowAsync(clusterId); 之后 return;
  44. if (await _sysJobClusterRep.IsAnyAsync(u => u.Status == ClusterStatus.Working))
  45. continue;
  46. WorkNowAsync(clusterId);
  47. return;
  48. }
  49. catch { }
  50. // 控制集群心跳频率
  51. await Task.Delay(3000);
  52. }
  53. }
  54. /// <summary>
  55. /// 当前作业调度器停止通知
  56. /// </summary>
  57. /// <param name="context">作业集群服务上下文</param>
  58. public async void Stop(JobClusterContext context)
  59. {
  60. // 在作业集群表中,更新 clusterId 的 status 为 ClusterStatus.Crashed
  61. await _sysJobClusterRep.UpdateSetColumnsTrueAsync(u => new SysJobCluster { Status = ClusterStatus.Crashed }, u => u.ClusterId == context.ClusterId);
  62. }
  63. /// <summary>
  64. /// 当前作业调度器宕机
  65. /// </summary>
  66. /// <param name="context">作业集群服务上下文</param>
  67. public async void Crash(JobClusterContext context)
  68. {
  69. // 在作业集群表中,更新 clusterId 的 status 为 ClusterStatus.Crashed
  70. await _sysJobClusterRep.UpdateSetColumnsTrueAsync(u => new SysJobCluster { Status = ClusterStatus.Crashed }, u => u.ClusterId == context.ClusterId);
  71. }
  72. /// <summary>
  73. /// 指示集群可以工作
  74. /// </summary>
  75. /// <param name="clusterId">集群 Id</param>
  76. /// <returns></returns>
  77. private async void WorkNowAsync(string clusterId)
  78. {
  79. // 在作业集群表中,更新 clusterId 的 status 为 ClusterStatus.Working
  80. await _sysJobClusterRep.UpdateSetColumnsTrueAsync(u => new SysJobCluster { Status = ClusterStatus.Working }, u => u.ClusterId == clusterId);
  81. }
  82. }