修改了开启Mqtt和关闭Mqtt时的事件

This commit is contained in:
2025-07-15 17:13:27 +08:00
parent e3730eccb4
commit cd4b249bf9
4 changed files with 228 additions and 103 deletions

View File

@@ -8,6 +8,8 @@ namespace PMSWPF.Models;
/// </summary> /// </summary>
public partial class Mqtt : ObservableObject public partial class Mqtt : ObservableObject
{ {
public event Action<Mqtt> OnMqttIsActiveChanged;
/// <summary> /// <summary>
/// MQTT客户端ID。 /// MQTT客户端ID。
/// </summary> /// </summary>
@@ -34,6 +36,17 @@ public partial class Mqtt : ObservableObject
[ObservableProperty] [ObservableProperty]
private bool _isActive; private bool _isActive;
partial void OnIsActiveChanged(bool value)
{
OnMqttIsActiveChanged?.Invoke(this);
}
/// <summary>
/// 显示连接的消息:
/// </summary>
[ObservableProperty]
private string connectMessage;
/// <summary> /// <summary>
/// 是否设置为默认MQTT客户端。 /// 是否设置为默认MQTT客户端。
/// </summary> /// </summary>

View File

@@ -6,6 +6,8 @@ using System.Threading.Tasks;
using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Hosting;
using MQTTnet; using MQTTnet;
using MQTTnet.Client; using MQTTnet.Client;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Options; using MQTTnet.Client.Options;
using PMSWPF.Models; using PMSWPF.Models;
using PMSWPF.Services; using PMSWPF.Services;
@@ -28,6 +30,9 @@ namespace PMSWPF.Services
// 存储MQTT配置的字典键为MQTT配置ID值为Mqtt模型对象。 // 存储MQTT配置的字典键为MQTT配置ID值为Mqtt模型对象。
private readonly Dictionary<int, Mqtt> _mqttConfigDic; private readonly Dictionary<int, Mqtt> _mqttConfigDic;
// 存储每个客户端重连尝试次数的字典
private readonly Dictionary<int, int> _reconnectAttempts;
// 定时器,用于周期性地执行数据发布任务。 // 定时器,用于周期性地执行数据发布任务。
private Timer _timer; private Timer _timer;
@@ -45,6 +50,7 @@ namespace PMSWPF.Services
_dataServices = dataServices; _dataServices = dataServices;
_mqttClients = new Dictionary<int, IMqttClient>(); _mqttClients = new Dictionary<int, IMqttClient>();
_mqttConfigDic = new Dictionary<int, Mqtt>(); _mqttConfigDic = new Dictionary<int, Mqtt>();
_reconnectAttempts = new Dictionary<int, int>();
} }
/// <summary> /// <summary>
@@ -61,23 +67,23 @@ namespace PMSWPF.Services
_serviceMainThread.IsBackground = true; _serviceMainThread.IsBackground = true;
_serviceMainThread.Name = "MqttServiceMainThread"; _serviceMainThread.Name = "MqttServiceMainThread";
_serviceMainThread.Start(); _serviceMainThread.Start();
} }
private void Execute() private void Execute()
{ {
while (!_stopEvent.WaitOne(0)) while (!_stopEvent.WaitOne(0))
{ {
if (_dataServices.Mqtts==null || _dataServices.Mqtts.Count==0) if (_dataServices.Mqtts == null || _dataServices.Mqtts.Count == 0)
{ {
_reloadEvent.Reset(); _reloadEvent.Reset();
continue; continue;
} }
_reloadEvent.WaitOne(); _reloadEvent.WaitOne();
// 初始加载MQTT配置和变量数据。 // 初始加载MQTT配置和变量数据。
LoadMqttConfigurations(); LoadMqttConfigurations();
ConnectMqttClient(); ConnectMqttList();
_reloadEvent.Reset(); _reloadEvent.Reset();
} }
@@ -87,7 +93,7 @@ namespace PMSWPF.Services
/// <summary> /// <summary>
/// 停止MQTT后台服务。 /// 停止MQTT后台服务。
/// </summary> /// </summary>
public void StopService() public void StopService()
{ {
NlogHelper.Info("Mqtt后台服务开始停止...."); // 记录服务停止信息 NlogHelper.Info("Mqtt后台服务开始停止...."); // 记录服务停止信息
@@ -95,24 +101,32 @@ namespace PMSWPF.Services
// 取消订阅事件。 // 取消订阅事件。
_dataServices.OnMqttListChanged -= HandleMqttListChanged; _dataServices.OnMqttListChanged -= HandleMqttListChanged;
// 断开所有已连接的MQTT客户端。 DisconnecetAll();
foreach (var mqttId in _mqttClients.Keys.ToList())
{
var client=_mqttClients[mqttId];
var mqtt=_mqttConfigDic[mqttId];
mqtt.IsConnected = false;
if (client.IsConnected)
{
client.DisconnectAsync().GetAwaiter().GetResult();
}
}
// 清空所有字典。 // 清空所有字典。
_mqttClients.Clear(); _mqttClients.Clear();
_mqttConfigDic.Clear(); _mqttConfigDic.Clear();
_reconnectAttempts.Clear();
NlogHelper.Info("Mqtt后台服务已停止。"); // 记录服务停止信息 NlogHelper.Info("Mqtt后台服务已停止。"); // 记录服务停止信息
} }
private void DisconnecetAll()
{
// 断开所有已连接的MQTT客户端。
foreach (var mqttId in _mqttClients.Keys.ToList())
{
var client = _mqttClients[mqttId];
var mqtt = _mqttConfigDic[mqttId];
mqtt.IsConnected = false;
if (client.IsConnected)
{
client.DisconnectAsync()
.GetAwaiter()
.GetResult();
}
}
}
/// <summary> /// <summary>
/// 加载并连接MQTT配置。 /// 加载并连接MQTT配置。
@@ -121,90 +135,76 @@ namespace PMSWPF.Services
private void LoadMqttConfigurations() private void LoadMqttConfigurations()
{ {
NlogHelper.Info("开始加载Mqtt配置文件..."); NlogHelper.Info("开始加载Mqtt配置文件...");
_mqttConfigDic.Clear();
// 从数据服务获取所有MQTT配置。 // 从数据服务获取所有MQTT配置。
var _mqttConfigList = _dataServices.Mqtts.Where(m => m.IsActive) var _mqttConfigList = _dataServices.Mqtts.Where(m => m.IsActive)
.ToList(); .ToList();
foreach (var mqtt in _mqttConfigList) foreach (var mqtt in _mqttConfigList)
{ {
mqtt.OnMqttIsActiveChanged += OnMqttIsActiveChangedHandler;
_mqttConfigDic[mqtt.Id] = mqtt; _mqttConfigDic[mqtt.Id] = mqtt;
mqtt.ConnectMessage = "配置加载成功.";
} }
NlogHelper.Info($"Mqtt配置文件加载成功开启的Mqtt客户端{_mqttConfigList.Count}个。"); NlogHelper.Info($"Mqtt配置文件加载成功开启的Mqtt客户端{_mqttConfigList.Count}个。");
} }
private async void OnMqttIsActiveChangedHandler(Mqtt mqtt)
{
try
{
if (mqtt.IsActive)
{
await ConnectMqtt(mqtt);
}
else
{
if (!_mqttClients.TryGetValue(mqtt.Id,out var client))
{
NlogHelper.Warn($"没有在Mqtt连接字典中找到名字为{mqtt.Name}的连接。");
return;
}
if (client.IsConnected)
{
await client.DisconnectAsync();
}
mqtt.IsConnected = false;
mqtt.ConnectMessage = "断开连接.";
_mqttClients.Remove(mqtt.Id);
NlogHelper.Info($"{mqtt.Name}的客户端,与服务器断开连接.");
}
}
catch (Exception e)
{
NotificationHelper.ShowError($"{mqtt.Name}客户端,开启或关闭的过程中发生了错误:{e.Message}",e);
}
}
/// <summary> /// <summary>
/// 连接到指定的MQTT代理。 /// 连接到指定的MQTT代理。
/// </summary> /// </summary>
/// <param name="mqtt">MQTT配置对象。</param> /// <param name="mqtt">MQTT配置对象。</param>
/// <returns>表示异步操作的任务。</returns> /// <returns>表示异步操作的任务。</returns>
private void ConnectMqttClient() private async Task ConnectMqttList()
{ {
foreach (Mqtt mqtt in _mqttConfigDic.Values.ToList()) foreach (Mqtt mqtt in _mqttConfigDic.Values.ToList())
{ {
try try
{ {
NlogHelper.Info($"开始连接:{mqtt.Name}的服务器..."); if (_mqttClients.TryGetValue(mqtt.Id, out var mclient) && mclient.IsConnected)
// 创建MQTT客户端工厂和客户端实例。
var factory = new MqttFactory();
var client = factory.CreateMqttClient();
// 构建MQTT客户端连接选项。
var options = new MqttClientOptionsBuilder()
.WithClientId(mqtt.ClientID)
.WithTcpServer(mqtt.Host, mqtt.Port)
.WithCredentials(mqtt.UserName, mqtt.PassWord)
.WithCleanSession() // 清理会话,每次连接都是新会话
.Build();
// 设置连接成功事件处理程序。
client.UseConnectedHandler(async e =>
{ {
NotificationHelper.ShowSuccess($"已连接到MQTT服务器: {mqtt.Name}"); NlogHelper.Info($"{mqtt.Name}的Mqtt服务器连接已存在。");
mqtt.ConnectMessage = "连接成功.";
mqtt.IsConnected = true; mqtt.IsConnected = true;
continue;
}
// 订阅主题 await ConnectMqtt(mqtt);
await client.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic(mqtt.SubTopic).Build());
NlogHelper.Info($"MQTT客户端 {mqtt.Name} 已订阅主题: {mqtt.SubTopic}");
});
// 设置接收消息处理程序
client.UseApplicationMessageReceivedHandler(e =>
{
var topic = e.ApplicationMessage.Topic;
var payload = System.Text.Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
NlogHelper.Info($"MQTT客户端 {mqtt.Name} 收到消息: 主题={topic}, 消息={payload}");
// 在这里添加处理消息的逻辑
});
// 设置断开连接事件处理程序。
client.UseDisconnectedHandler(async e =>
{
NotificationHelper.ShowWarn($"与MQTT服务器断开连接: {mqtt.Name}");
mqtt.IsConnected = false;
// 服务停止
if (_stopEvent.WaitOne(0))
return;
if (!mqtt.IsActive)
return;
NlogHelper.Info($"5秒后重新连接Mqtt服务器{mqtt.Name}");
// 尝试重新连接。
await Task.Delay(TimeSpan.FromSeconds(5)); // 等待5秒后重连
try
{
await client.ConnectAsync(options, CancellationToken.None);
}
catch (Exception ex)
{
NlogHelper.Error($"重新与Mqtt服务器连接失败: {mqtt.Name}", ex);
}
});
// 尝试连接到MQTT代理。
client.ConnectAsync(options, CancellationToken.None)
.GetAwaiter()
.GetResult();
// 将连接成功的客户端添加到字典。
_mqttClients[mqtt.Id] = client;
} }
catch (Exception ex) catch (Exception ex)
{ {
@@ -213,6 +213,110 @@ namespace PMSWPF.Services
} }
} }
private async Task ConnectMqtt(Mqtt mqtt)
{
NlogHelper.Info($"开始连接:{mqtt.Name}的服务器...");
mqtt.ConnectMessage = "开始连接服务器...";
// 创建MQTT客户端工厂和客户端实例。
var factory = new MqttFactory();
var client = factory.CreateMqttClient();
// 构建MQTT客户端连接选项。
var options = new MqttClientOptionsBuilder()
.WithClientId(mqtt.ClientID)
.WithTcpServer(mqtt.Host, mqtt.Port)
.WithCredentials(mqtt.UserName, mqtt.PassWord)
.WithCleanSession() // 清理会话,每次连接都是新会话
.Build();
// 设置连接成功事件处理程序。
client.UseConnectedHandler(async (e) => { await HandleConnected(e, client, mqtt); });
// 设置接收消息处理程序
client.UseApplicationMessageReceivedHandler(e => { HandleMessageReceived(e, mqtt); });
// 设置断开连接事件处理程序。
client.UseDisconnectedHandler(async (e) => await HandleDisconnected(e,options, client, mqtt));
// 尝试连接到MQTT代理。
await client.ConnectAsync(options, CancellationToken.None);
// 将连接成功的客户端添加到字典。
_mqttClients[mqtt.Id] = client;
}
private static void HandleMessageReceived(MqttApplicationMessageReceivedEventArgs e, Mqtt mqtt)
{
var topic = e.ApplicationMessage.Topic;
var payload = System.Text.Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
NlogHelper.Info($"MQTT客户端 {mqtt.Name} 收到消息: 主题={topic}, 消息={payload}");
// 在这里添加处理消息的逻辑
}
private async Task HandleDisconnected(MqttClientDisconnectedEventArgs args, IMqttClientOptions options,
IMqttClient client,
Mqtt mqtt)
{
NotificationHelper.ShowWarn($"与MQTT服务器断开连接: {mqtt.Name}");
mqtt.ConnectMessage = "断开连接.";
mqtt.IsConnected = false;
// 服务停止
if (_stopEvent.WaitOne(0) || !mqtt.IsActive)
return;
// 增加重连尝试次数
if (!_reconnectAttempts.ContainsKey(mqtt.Id))
{
_reconnectAttempts[mqtt.Id] = 0;
}
_reconnectAttempts[mqtt.Id]++;
// 指数退避策略
var maxDelay = TimeSpan.FromMinutes(5); // 最大延迟5分钟
var baseDelay = TimeSpan.FromSeconds(5); // 基础延迟5秒
var delay = TimeSpan.FromSeconds(baseDelay.TotalSeconds *
Math.Pow(2, _reconnectAttempts[mqtt.Id] - 1));
if (delay > maxDelay)
{
delay = maxDelay;
}
NlogHelper.Info(
$"与MQTT服务器{mqtt.Name} 的连接已断开。将在 {delay.TotalSeconds} 秒后尝试第 {_reconnectAttempts[mqtt.Id]} 次重新连接...");
mqtt.ConnectMessage = $"连接已断开。将在 {delay.TotalSeconds} 秒后尝试第 {_reconnectAttempts[mqtt.Id]} 次重新连接...";
await Task.Delay(delay);
try
{
mqtt.ConnectMessage = $"开始重新连接服务器...";
await client.ConnectAsync(options, CancellationToken.None);
}
catch (Exception ex)
{
mqtt.ConnectMessage = $"重新与Mqtt服务器连接失败.";
NlogHelper.Error($"重新与Mqtt服务器连接失败: {mqtt.Name}", ex);
}
}
private async Task HandleConnected(MqttClientConnectedEventArgs args, IMqttClient client, Mqtt mqtt)
{
// 重置重连尝试次数
if (_reconnectAttempts.ContainsKey(mqtt.Id))
{
_reconnectAttempts[mqtt.Id] = 0;
}
NotificationHelper.ShowSuccess($"已连接到MQTT服务器: {mqtt.Name}");
mqtt.IsConnected = true;
mqtt.ConnectMessage = "连接成功.";
// 订阅主题
await client.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic(mqtt.SubTopic)
.Build());
NlogHelper.Info($"MQTT客户端 {mqtt.Name} 已订阅主题: {mqtt.SubTopic}");
}
/// <summary> /// <summary>
/// 处理MQTT列表变化事件的回调方法。 /// 处理MQTT列表变化事件的回调方法。

View File

@@ -19,33 +19,33 @@ public partial class MqttsViewModel : ViewModelBase
private readonly MqttRepository _mqttRepository; private readonly MqttRepository _mqttRepository;
private readonly ILogger<MqttsViewModel> _logger; private readonly ILogger<MqttsViewModel> _logger;
private readonly NavgatorServices _navgatorServices; private readonly NavgatorServices _navgatorServices;
[ObservableProperty]
private ObservableCollection<Mqtt> _mqtts; private ObservableCollection<Mqtt> _mqtts;
public ObservableCollection<Mqtt> Mqtts // public ObservableCollection<Mqtt> Mqtts
{ // {
get => _mqtts; // get => _mqtts;
set // set
{ // {
if (_mqtts != null) // if (_mqtts != null)
{ // {
foreach (var mqtt in _mqtts) // foreach (var mqtt in _mqtts)
{ // {
mqtt.PropertyChanged -= Mqtt_PropertyChanged; // mqtt.PropertyChanged -= Mqtt_PropertyChanged;
} // }
} // }
//
SetProperty(ref _mqtts, value); // SetProperty(ref _mqtts, value);
//
if (_mqtts != null) // if (_mqtts != null)
{ // {
foreach (var mqtt in _mqtts) // foreach (var mqtt in _mqtts)
{ // {
mqtt.PropertyChanged += Mqtt_PropertyChanged; // mqtt.PropertyChanged += Mqtt_PropertyChanged;
} // }
} // }
} // }
} // }
[ObservableProperty] [ObservableProperty]
private Mqtt _selectedMqtt; private Mqtt _selectedMqtt;

View File

@@ -136,6 +136,14 @@
FontSize="14" /> FontSize="14" />
<TextBlock Text="{Binding ConnTime}" /> <TextBlock Text="{Binding ConnTime}" />
</StackPanel> </StackPanel>
<StackPanel Orientation="Horizontal"
Margin="0,2">
<ui:FontIcon Icon="{x:Static ui:SegoeFluentIcons.Message}"
VerticalAlignment="Center"
Margin="0,0,8,0"
FontSize="14" />
<TextBlock Text="{Binding ConnectMessage}" />
</StackPanel>
<StackPanel Orientation="Horizontal" <StackPanel Orientation="Horizontal"
Margin="0,2"> Margin="0,2">
<ui:FontIcon Icon="{x:Static ui:SegoeFluentIcons.FavoriteStar}" <ui:FontIcon Icon="{x:Static ui:SegoeFluentIcons.FavoriteStar}"