MqttManager.cs 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. using Host.Common;
  2. using Host.Common.Enums;
  3. using Host.Entity;
  4. using MQTTnet;
  5. using MQTTnet.Client;
  6. using MQTTnet.Client.Options;
  7. using MQTTnet.Client.Publishing;
  8. using MQTTnet.Extensions.ManagedClient;
  9. using MQTTnet.Protocol;
  10. using Newtonsoft.Json;
  11. using Serilog;
  12. using System;
  13. using System.Threading.Tasks;
  14. namespace Host.Managers
  15. {
  16. /// <summary>
  17. /// Mqtt - 单例
  18. /// </summary>
  19. public class MqttManager
  20. {
  21. public static readonly MqttManager Instance;
  22. static MqttManager()
  23. {
  24. Instance = new MqttManager();
  25. }
  26. public IManagedMqttClient MqttClient { get; private set; }
  27. /// <summary>
  28. /// 重启启动
  29. /// </summary>
  30. /// <param name="model"></param>
  31. /// <returns></returns>
  32. public async Task RestartAsync()
  33. {
  34. try
  35. {
  36. await StopAsync();
  37. var model = await FileConfig.GetMqttSetAsync();
  38. MqttClient = new MqttFactory().CreateManagedMqttClient();
  39. var mqttClientOptions = new MqttClientOptionsBuilder()
  40. .WithKeepAlivePeriod(TimeSpan.FromSeconds(29))
  41. .WithClientId(model.ClientId)
  42. .WithWebSocketServer($"{model.Host}:{model.Port}/mqtt")
  43. .WithCredentials(model.UserName, model.Password);
  44. if (model.ConnectionMethod == ConnectionMethod.WSS)
  45. mqttClientOptions = mqttClientOptions.WithTls();
  46. var options = new ManagedMqttClientOptionsBuilder()
  47. .WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
  48. .WithClientOptions(mqttClientOptions.Build())
  49. .Build();
  50. await MqttClient.StartAsync(options);
  51. }
  52. catch (Exception ex)
  53. {
  54. Log.Logger.Error($"MQTT启动异常,{ex.Message}");
  55. }
  56. }
  57. /// <summary>
  58. /// 发布
  59. /// </summary>
  60. /// <typeparam name="T"></typeparam>
  61. /// <param name="topic"></param>
  62. /// <param name="payloadData"></param>
  63. /// <param name="retain"></param>
  64. /// <param name="serviceLevel"></param>
  65. /// <returns></returns>
  66. public async Task<MqttClientPublishResult> PublishAsync<T>(string topic, T payloadData, bool retain = false, MqttQualityOfServiceLevel serviceLevel = MqttQualityOfServiceLevel.AtMostOnce) //where T : class, new()
  67. {
  68. var payload = JsonConvert.SerializeObject(payloadData, Formatting.None, AppSetting.SerializerSettings);
  69. return await PublishAsync(topic, payload, retain);
  70. }
  71. /// <summary>
  72. /// 发布
  73. /// </summary>
  74. /// <param name="topic"></param>
  75. /// <param name="payload"></param>
  76. /// <param name="retain"></param>
  77. /// <param name="serviceLevel"></param>
  78. /// <returns></returns>
  79. public async Task<MqttClientPublishResult> PublishAsync(string topic, string payload, bool retain = false, MqttQualityOfServiceLevel serviceLevel = MqttQualityOfServiceLevel.AtMostOnce)
  80. {
  81. return await MqttClient.PublishAsync(topic, payload, serviceLevel, retain);
  82. }
  83. /// <summary>
  84. /// Stop
  85. /// </summary>
  86. /// <returns></returns>
  87. private async Task StopAsync()
  88. {
  89. if (MqttClient?.IsStarted ?? false)
  90. await MqttClient?.StopAsync();
  91. }
  92. }
  93. }