diff --git a/DMS.Application/Interfaces/IEventService.cs b/DMS.Application/Interfaces/IEventService.cs index e0b6425..ca3ab8f 100644 --- a/DMS.Application/Interfaces/IEventService.cs +++ b/DMS.Application/Interfaces/IEventService.cs @@ -20,7 +20,7 @@ public interface IEventService /// /// 事件发送者 /// 设备状态改变事件参数 - void RaiseDeviceStatusChanged(object sender, DeviceActiveChangedEventArgs e); + void RaiseDeviceActiveChanged(object sender, DeviceActiveChangedEventArgs e); #endregion diff --git a/DMS.Application/Services/EventService.cs b/DMS.Application/Services/EventService.cs index d750cb6..8d54951 100644 --- a/DMS.Application/Services/EventService.cs +++ b/DMS.Application/Services/EventService.cs @@ -9,6 +9,13 @@ namespace DMS.Application.Services; /// public class EventService : IEventService { + private readonly IAppDataStorageService _appDataStorageService; + + public EventService(IAppDataStorageService appDataStorageService) + { + _appDataStorageService = appDataStorageService; + } + #region 设备事件 /// @@ -21,9 +28,16 @@ public class EventService : IEventService /// /// 事件发送者 /// 设备状态改变事件参数 - public void RaiseDeviceStatusChanged(object sender, DeviceActiveChangedEventArgs e) + public void RaiseDeviceActiveChanged(object sender, DeviceActiveChangedEventArgs e) { - OnDeviceActiveChanged?.Invoke(sender, e); + if (_appDataStorageService.Devices.TryGetValue(e.DeviceId, out var device)) + { + if (device.IsActive != e.NewStatus) + { + device.IsActive = e.NewStatus; + OnDeviceActiveChanged?.Invoke(sender, e); + } + } } #endregion diff --git a/DMS.Application/Services/Monitoring/DeviceMonitoringService.cs b/DMS.Application/Services/Monitoring/DeviceMonitoringService.cs index 1f6629c..868e0d6 100644 --- a/DMS.Application/Services/Monitoring/DeviceMonitoringService.cs +++ b/DMS.Application/Services/Monitoring/DeviceMonitoringService.cs @@ -40,11 +40,7 @@ public class DeviceMonitoringService : IDeviceMonitoringService, IDisposable { if (_appDataStorageService.Devices.TryGetValue(e.DeviceId, out var device)) { - if (device.IsActive != e.NewStatus) - { - device.IsActive = e.NewStatus; - _appDataCenterService.DeviceManagementService.UpdateDeviceAsync(device); - } + _appDataCenterService.DeviceManagementService.UpdateDeviceAsync(device); } } diff --git a/DMS.Infrastructure/Services/OpcUaBackgroundService.cs b/DMS.Infrastructure/Services/OpcUaBackgroundService.cs deleted file mode 100644 index 7e89599..0000000 --- a/DMS.Infrastructure/Services/OpcUaBackgroundService.cs +++ /dev/null @@ -1,339 +0,0 @@ -using System.Collections.Concurrent; -using DMS.Application.DTOs; -using DMS.Application.DTOs.Events; -using DMS.Core.Enums; -using DMS.Core.Models; -using Microsoft.Extensions.Hosting; -using Opc.Ua; -using Opc.Ua.Client; -using Microsoft.Extensions.Logging; -using DMS.Application.Interfaces; -using DMS.Core.Interfaces; -using DMS.Infrastructure.Helper; -using DMS.Infrastructure.Models; - -namespace DMS.Infrastructure.Services; - -public class OpcUaBackgroundService : BackgroundService -{ - private readonly IAppDataCenterService _appDataCenterService; - private readonly IDataProcessingService _dataProcessingService; - private readonly IAppDataStorageService _appDataStorageService; - - // private readonly IDataProcessingService _dataProcessingService; - private readonly ILogger _logger; - - // 存储 OPC UA 设备,键为设备Id,值为会话对象。 - private readonly ConcurrentDictionary _opcUaDevices = new(); - - // 存储 OPC UA 会话,键为终结点 URL,值为会话对象。 - private readonly ConcurrentDictionary _opcUaServices; - - // 存储 OPC UA 订阅,键为终结点 URL,值为订阅对象。 - private readonly ConcurrentDictionary _opcUaSubscriptions; - - // 存储活动的 OPC UA 变量,键为变量的OpcNodeId - private readonly ConcurrentDictionary _opcUaVariables; - - // 储存所有要轮询更新的变量,键是Device.Id,值是这个设备所有要轮询的变量 - private readonly ConcurrentDictionary> _opcUaPollVariablesByDeviceId; - - // 储存所有要订阅更新的变量,键是Device.Id,值是这个设备所有要轮询的变量 - private readonly ConcurrentDictionary> _opcUaVariablesByDeviceId; - - private readonly SemaphoreSlim _reloadSemaphore = new SemaphoreSlim(0); - - // OPC UA 轮询间隔(毫秒) - private readonly int _opcUaPollIntervalMs = 100; - - // OPC UA 订阅发布间隔(毫秒) - private readonly int _opcUaSubscriptionPublishingIntervalMs = 1000; - - // OPC UA 订阅采样间隔(毫秒) - private readonly int _opcUaSubscriptionSamplingIntervalMs = 1000; - - // 模拟 PollingIntervals,实际应用中可能从配置或数据库加载 - private static readonly Dictionary PollingIntervals - = new Dictionary - { - { 10, TimeSpan.FromMilliseconds(10) }, // TenMilliseconds - { 100, TimeSpan.FromMilliseconds(100) }, // HundredMilliseconds - { 500, TimeSpan.FromMilliseconds(500) }, // FiveHundredMilliseconds - { 1000, TimeSpan.FromMilliseconds(1000) }, // OneSecond - { 5000, TimeSpan.FromMilliseconds(5000) }, // FiveSeconds - { 10000, TimeSpan.FromMilliseconds(10000) }, // TenSeconds - { 20000, TimeSpan.FromMilliseconds(20000) }, // TwentySeconds - { 30000, TimeSpan.FromMilliseconds(30000) }, // ThirtySeconds - { 60000, TimeSpan.FromMilliseconds(60000) }, // OneMinute - { 180000, TimeSpan.FromMilliseconds(180000) }, // ThreeMinutes - { 300000, TimeSpan.FromMilliseconds(300000) }, // FiveMinutes - { 600000, TimeSpan.FromMilliseconds(600000) }, // TenMinutes - { 1800000, TimeSpan.FromMilliseconds(1800000) } // ThirtyMinutes - }; - - public OpcUaBackgroundService(IAppDataCenterService appDataCenterService,IDataProcessingService dataProcessingService,IAppDataStorageService appDataStorageService, ILogger logger) - { - _appDataCenterService = appDataCenterService; - _dataProcessingService = dataProcessingService; - _appDataStorageService = appDataStorageService; - _logger = logger; - _opcUaServices = new ConcurrentDictionary(); - _opcUaSubscriptions = new ConcurrentDictionary(); - _opcUaVariables = new ConcurrentDictionary(); - _opcUaPollVariablesByDeviceId = new ConcurrentDictionary>(); - _opcUaVariablesByDeviceId = new ConcurrentDictionary>(); - - _appDataCenterService.DataLoaderService.OnLoadDataCompleted += OnLoadDataCompleted; - } - - private void OnLoadDataCompleted(object? sender, DataLoadCompletedEventArgs e) - { - _reloadSemaphore.Release(); - } - - protected override async Task ExecuteAsync(CancellationToken stoppingToken) - { - _logger.LogInformation("OPC UA 后台服务正在启动。"); - try - { - while (!stoppingToken.IsCancellationRequested) - { - await _reloadSemaphore.WaitAsync(stoppingToken); // Wait for a reload signal - - if (stoppingToken.IsCancellationRequested) - { - break; - } - - if (_appDataStorageService.Devices.IsEmpty) - { - _logger.LogInformation("没有可用的OPC UA设备,等待设备列表更新..."); - continue; - } - - var isLoaded = LoadVariables(); - if (!isLoaded) - { - _logger.LogInformation("加载变量过程中发生了错误,停止后面的操作。"); - continue; - } - - await ConnectOpcUaServiceAsync(stoppingToken); - await SetupOpcUaSubscriptionAsync(stoppingToken); - _logger.LogInformation("OPC UA 后台服务已启动。"); - - } - } - catch (OperationCanceledException) - { - _logger.LogInformation("OPC UA 后台服务已停止。"); - } - catch (Exception e) - { - _logger.LogError(e, $"OPC UA 后台服务运行中发生了错误:{e.Message}"); - } - finally - { - await DisconnectAllOpcUaSessionsAsync(); - } - } - - - /// - /// 从数据库加载所有活动的 OPC UA 变量,并进行相应的连接和订阅管理。 - /// - private bool LoadVariables() - { - try - { - _opcUaDevices.Clear(); - _opcUaPollVariablesByDeviceId.Clear(); - _opcUaVariablesByDeviceId.Clear(); - _opcUaVariables.Clear(); - - _logger.LogInformation("开始加载OPC UA变量...."); - var opcUaDevices = _appDataStorageService - .Devices.Values.Where(d => d.Protocol == ProtocolType.OpcUa && d.IsActive == true) - .ToList(); - int totalVariableCount = 0; - foreach (var opcUaDevice in opcUaDevices) - { - _opcUaDevices.AddOrUpdate(opcUaDevice.Id, opcUaDevice, (key, oldValue) => opcUaDevice); - - //查找设备中所有要订阅的变量 - var variableDtos = opcUaDevice.VariableTables?.SelectMany(vt => vt.Variables) - .Where(vd => vd.IsActive == true && - vd.Protocol == ProtocolType.OpcUa) - .ToList(); - foreach (var variableDto in variableDtos) - { - _opcUaVariables.TryAdd(variableDto.OpcUaNodeId,variableDto); - } - - totalVariableCount += variableDtos.Count; - _opcUaVariablesByDeviceId.AddOrUpdate(opcUaDevice.Id, variableDtos, (key, oldValue) => variableDtos); - } - - _logger.LogInformation( - $"OPC UA 变量加载成功,共加载OPC UA设备:{opcUaDevices.Count}个,变量数:{totalVariableCount}"); - return true; - } - catch (Exception e) - { - _logger.LogError(e, $"加载OPC UA变量的过程中发生了错误:{e.Message}"); - return false; - } - } - - /// - /// 连接到 OPC UA 服务器并订阅或轮询指定的变量。 - /// - private async Task ConnectOpcUaServiceAsync(CancellationToken stoppingToken) - { - if (stoppingToken.IsCancellationRequested) - { - return; - } - - var connectTasks = new List(); - - // 遍历_opcUaDevices中的所有设备,尝试连接 - foreach (var device in _opcUaDevices.Values.ToList()) - { - if (device.IsActive) - { - connectTasks.Add(ConnectSingleOpcUaDeviceAsync(device, stoppingToken)); - } - - } - - await Task.WhenAll(connectTasks); - } - - /// - /// 连接单个OPC UA设备。 - /// - /// 要连接的设备。 - /// 取消令牌。 - private async Task ConnectSingleOpcUaDeviceAsync(DeviceDto device, CancellationToken stoppingToken = default) - { - // Check if already connected - if (_opcUaServices.TryGetValue(device, out var existOpcUaService)) - { - if (existOpcUaService.IsConnected) - { - _logger.LogInformation($"已连接到 OPC UA 服务器: {device.OpcUaServerUrl}"); - return; - } - } - - _logger.LogInformation($"开始连接OPC UA服务器: {device.Name} ({device.OpcUaServerUrl})"); - try - { - OpcUaService opcUaService = new OpcUaService(); - await opcUaService.ConnectAsync(device.OpcUaServerUrl); - if (!opcUaService.IsConnected) - { - _logger.LogWarning($"创建OPC UA会话失败: {device.OpcUaServerUrl}"); - return; // 连接失败,直接返回 - } - - _opcUaServices.AddOrUpdate(device, opcUaService, (key, oldValue) => opcUaService); - _logger.LogInformation($"已连接到OPC UA服务器: {device.Name} ({device.OpcUaServerUrl})"); - } - catch (Exception e) - { - _logger.LogError(e, $"OPC UA服务连接 {device.Name} ({device.OpcUaServerUrl}) 过程中发生错误:{e.Message}"); - } - } - - /// - /// 更新变量数据,并将其推送到数据处理队列。 - /// - /// 要更新的变量。 - /// 读取到的数据值。 - private async Task UpdateAndEnqueueVariable(VariableDto variable, object value) - { - try - { - // 更新变量的原始数据值和显示值。 - variable.DataValue = value.ToString(); - variable.DisplayValue = value.ToString(); // 或者根据需要进行格式化 - variable.UpdatedAt = DateTime.Now; - // Console.WriteLine($"OpcUa后台服务轮询变量:{variable.Name},值:{variable.DataValue}"); - // 将更新后的数据推入处理队列。 - await _dataProcessingService.EnqueueAsync(variable); - } - catch (Exception ex) - { - _logger.LogError(ex, $"更新变量 {variable.Name} 并入队失败:{ex.Message}"); - } - } - - /// - /// 设置 OPC UA 订阅并添加监控项。 - /// - /// 取消令牌。 - private async Task SetupOpcUaSubscriptionAsync(CancellationToken stoppingToken) - { - foreach (var opcUaServiceKayValuePair in _opcUaServices) - { - var device = opcUaServiceKayValuePair.Key; - var opcUaService = opcUaServiceKayValuePair.Value; - - if (_opcUaVariablesByDeviceId.TryGetValue(device.Id, out var opcUaVariables)) - { - var variableGroup = opcUaVariables.GroupBy(variable => variable.PollingInterval); - foreach (var vGroup in variableGroup) - { - var pollingInterval = vGroup.Key; - var opcUaNodes - = vGroup.Select(variableDto => new OpcUaNode() { NodeId = variableDto.OpcUaNodeId }) - .ToList(); - - PollingIntervals.TryGetValue(pollingInterval, out var interval); - opcUaService.SubscribeToNode(opcUaNodes,HandleDataChanged,10000,1000); - } - } - } - } - - private async void HandleDataChanged(OpcUaNode opcUaNode) - { - if (_opcUaVariables.TryGetValue(opcUaNode.NodeId.ToString(), out var variabelDto)) - { - if (opcUaNode.Value == null) - { - return; - } - await UpdateAndEnqueueVariable(variabelDto, opcUaNode.Value); - } - } - - - /// - /// 断开所有 OPC UA 会话。 - /// - private async Task DisconnectAllOpcUaSessionsAsync() - { - if (_opcUaServices.IsEmpty) - return; - _logger.LogInformation("正在断开所有 OPC UA 会话..."); - var closeTasks = new List(); - - foreach (var device in _opcUaServices.Keys.ToList()) - { - closeTasks.Add(Task.Run(async () => - { - _logger.LogInformation($"正在断开 OPC UA 会话: {device.Name}"); - if (_opcUaServices.TryRemove(device, out var opcUaService)) - { - await opcUaService.DisconnectAsync(); - _logger.LogInformation($"已从 OPC UA 服务器断开连接: {device.Name}"); - } - })); - } - - await Task.WhenAll(closeTasks); - } -} \ No newline at end of file diff --git a/DMS.Infrastructure/Services/OpcUaServiceManager.cs b/DMS.Infrastructure/Services/OpcUaServiceManager.cs index 77e5e81..f95b211 100644 --- a/DMS.Infrastructure/Services/OpcUaServiceManager.cs +++ b/DMS.Infrastructure/Services/OpcUaServiceManager.cs @@ -1,14 +1,15 @@ using System.Collections.Concurrent; using System.Diagnostics; using DMS.Application.DTOs; +using DMS.Application.Events; using DMS.Application.Interfaces; using DMS.Core.Enums; -using DMS.Core.Models; using DMS.Infrastructure.Configuration; using DMS.Infrastructure.Interfaces.Services; using DMS.Infrastructure.Models; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; +using VariableValueChangedEventArgs = DMS.Core.Models.VariableValueChangedEventArgs; namespace DMS.Infrastructure.Services { @@ -20,6 +21,7 @@ namespace DMS.Infrastructure.Services private readonly ILogger _logger; private readonly IDataProcessingService _dataProcessingService; private readonly IAppDataCenterService _appDataCenterService; + private readonly IEventService _eventService; private readonly OpcUaServiceOptions _options; private readonly ConcurrentDictionary _deviceContexts; private readonly SemaphoreSlim _semaphore; @@ -28,15 +30,33 @@ namespace DMS.Infrastructure.Services public OpcUaServiceManager( ILogger logger, IDataProcessingService dataProcessingService, + IEventService eventService, IAppDataCenterService appDataCenterService, IOptions options) { _logger = logger ?? throw new ArgumentNullException(nameof(logger)); - _dataProcessingService = dataProcessingService ?? throw new ArgumentNullException(nameof(dataProcessingService)); - _appDataCenterService = appDataCenterService ?? throw new ArgumentNullException(nameof(appDataCenterService)); + _dataProcessingService + = dataProcessingService ?? throw new ArgumentNullException(nameof(dataProcessingService)); + _eventService = eventService; + _appDataCenterService + = appDataCenterService ?? throw new ArgumentNullException(nameof(appDataCenterService)); _options = options?.Value ?? throw new ArgumentNullException(nameof(options)); _deviceContexts = new ConcurrentDictionary(); _semaphore = new SemaphoreSlim(_options.MaxConcurrentConnections, _options.MaxConcurrentConnections); + + _eventService.OnDeviceActiveChanged += OnDeviceActiveChanged; + } + + private async void OnDeviceActiveChanged(object? sender, DeviceActiveChangedEventArgs e) + { + if (e.NewStatus) + { + await ConnectDeviceAsync(e.DeviceId, CancellationToken.None); + } + else + { + await DisconnectDeviceAsync(e.DeviceId, CancellationToken.None); + } } /// @@ -64,12 +84,12 @@ namespace DMS.Infrastructure.Services } var context = new DeviceContext - { - Device = device, - OpcUaService = new OpcUaService(), - Variables = new ConcurrentDictionary(), - IsConnected = false - }; + { + Device = device, + OpcUaService = new OpcUaService(), + Variables = new ConcurrentDictionary(), + IsConnected = false + }; _deviceContexts.AddOrUpdate(device.Id, context, (key, oldValue) => context); _logger.LogInformation("已添加设备 {DeviceId} 到监控列表", device.Id); @@ -99,6 +119,7 @@ namespace DMS.Infrastructure.Services { context.Variables.AddOrUpdate(variable.OpcUaNodeId, variable, (key, oldValue) => variable); } + _logger.LogInformation("已更新设备 {DeviceId} 的变量列表,共 {Count} 个变量", deviceId, variables.Count); } } @@ -139,23 +160,29 @@ namespace DMS.Infrastructure.Services if (context == null || string.IsNullOrEmpty(context.Device.OpcUaServerUrl)) return; + if (!context.Device.IsActive) + { + return; + } + await _semaphore.WaitAsync(cancellationToken); try { - _logger.LogInformation("正在连接设备 {DeviceName} ({EndpointUrl})", - context.Device.Name, context.Device.OpcUaServerUrl); + _logger.LogInformation("正在连接设备 {DeviceName} ({EndpointUrl})", + context.Device.Name, context.Device.OpcUaServerUrl); var stopwatch = Stopwatch.StartNew(); - + // 设置连接超时 using var timeoutToken = new CancellationTokenSource(_options.ConnectionTimeoutMs); - using var linkedToken = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutToken.Token); - + using var linkedToken + = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutToken.Token); + await context.OpcUaService.ConnectAsync(context.Device.OpcUaServerUrl); - + stopwatch.Stop(); - _logger.LogInformation("设备 {DeviceName} 连接耗时 {ElapsedMs} ms", - context.Device.Name, stopwatch.ElapsedMilliseconds); + _logger.LogInformation("设备 {DeviceName} 连接耗时 {ElapsedMs} ms", + context.Device.Name, stopwatch.ElapsedMilliseconds); if (context.OpcUaService.IsConnected) { @@ -170,8 +197,8 @@ namespace DMS.Infrastructure.Services } catch (Exception ex) { - _logger.LogError(ex, "连接设备 {DeviceName} 时发生错误: {ErrorMessage}", - context.Device.Name, ex.Message); + _logger.LogError(ex, "连接设备 {DeviceName} 时发生错误: {ErrorMessage}", + context.Device.Name, ex.Message); context.IsConnected = false; } finally @@ -197,8 +224,8 @@ namespace DMS.Infrastructure.Services } catch (Exception ex) { - _logger.LogError(ex, "断开设备 {DeviceName} 连接时发生错误: {ErrorMessage}", - context.Device.Name, ex.Message); + _logger.LogError(ex, "断开设备 {DeviceName} 连接时发生错误: {ErrorMessage}", + context.Device.Name, ex.Message); } } @@ -212,13 +239,13 @@ namespace DMS.Infrastructure.Services try { - _logger.LogInformation("正在为设备 {DeviceName} 设置订阅,变量数: {VariableCount}", - context.Device.Name, context.Variables.Count); + _logger.LogInformation("正在为设备 {DeviceName} 设置订阅,变量数: {VariableCount}", + context.Device.Name, context.Variables.Count); // 按PollingInterval对变量进行分组 var variablesByPollingInterval = context.Variables.Values - .GroupBy(v => v.PollingInterval) - .ToDictionary(g => g.Key, g => g.ToList()); + .GroupBy(v => v.PollingInterval) + .ToDictionary(g => g.Key, g => g.ToList()); // 为每个PollingInterval组设置单独的订阅 foreach (var group in variablesByPollingInterval) @@ -226,7 +253,8 @@ namespace DMS.Infrastructure.Services int pollingInterval = group.Key; var variables = group.Value; - _logger.LogInformation("为设备 {DeviceName} 设置PollingInterval {PollingInterval} 的订阅,变量数: {VariableCount}", + _logger.LogInformation( + "为设备 {DeviceName} 设置PollingInterval {PollingInterval} 的订阅,变量数: {VariableCount}", context.Device.Name, pollingInterval, variables.Count); // 根据PollingInterval计算发布间隔和采样间隔(毫秒) @@ -234,19 +262,19 @@ namespace DMS.Infrastructure.Services // var samplingInterval = GetSamplingIntervalFromPollLevel(pollLevel); var opcUaNodes = variables - .Select(v => new OpcUaNode { NodeId = v.OpcUaNodeId }) - .ToList(); + .Select(v => new OpcUaNode { NodeId = v.OpcUaNodeId }) + .ToList(); - context.OpcUaService.SubscribeToNode(opcUaNodes, HandleDataChanged, - publishingInterval, publishingInterval); + context.OpcUaService.SubscribeToNode(opcUaNodes, HandleDataChanged, + publishingInterval, publishingInterval); } _logger.LogInformation("设备 {DeviceName} 订阅设置完成", context.Device.Name); } catch (Exception ex) { - _logger.LogError(ex, "为设备 {DeviceName} 设置订阅时发生错误: {ErrorMessage}", - context.Device.Name, ex.Message); + _logger.LogError(ex, "为设备 {DeviceName} 设置订阅时发生错误: {ErrorMessage}", + context.Device.Name, ex.Message); } } @@ -257,24 +285,23 @@ namespace DMS.Infrastructure.Services { // 根据轮询间隔值映射到发布间隔 return pollingInterval switch - { - 100 => 100, // HundredMilliseconds -> 100ms发布间隔 - 500 => 500, // FiveHundredMilliseconds -> 500ms发布间隔 - 1000 => 1000, // OneSecond -> 1000ms发布间隔 - 5000 => 5000, // FiveSeconds -> 5000ms发布间隔 - 10000 => 10000, // TenSeconds -> 10000ms发布间隔 - 20000 => 20000, // TwentySeconds -> 20000ms发布间隔 - 30000 => 30000, // ThirtySeconds -> 30000ms发布间隔 - 60000 => 60000, // OneMinute -> 60000ms发布间隔 - 300000 => 300000, // FiveMinutes -> 300000ms发布间隔 - 600000 => 600000, // TenMinutes -> 600000ms发布间隔 - 1800000 => 1800000, // ThirtyMinutes -> 1800000ms发布间隔 - 3600000 => 3600000, // OneHour -> 3600000ms发布间隔 - _ => _options.SubscriptionPublishingIntervalMs // 默认值 - }; + { + 100 => 100, // HundredMilliseconds -> 100ms发布间隔 + 500 => 500, // FiveHundredMilliseconds -> 500ms发布间隔 + 1000 => 1000, // OneSecond -> 1000ms发布间隔 + 5000 => 5000, // FiveSeconds -> 5000ms发布间隔 + 10000 => 10000, // TenSeconds -> 10000ms发布间隔 + 20000 => 20000, // TwentySeconds -> 20000ms发布间隔 + 30000 => 30000, // ThirtySeconds -> 30000ms发布间隔 + 60000 => 60000, // OneMinute -> 60000ms发布间隔 + 300000 => 300000, // FiveMinutes -> 300000ms发布间隔 + 600000 => 600000, // TenMinutes -> 600000ms发布间隔 + 1800000 => 1800000, // ThirtyMinutes -> 1800000ms发布间隔 + 3600000 => 3600000, // OneHour -> 3600000ms发布间隔 + _ => _options.SubscriptionPublishingIntervalMs // 默认值 + }; } - /// /// 处理数据变化 @@ -294,12 +321,12 @@ namespace DMS.Infrastructure.Services // 保存旧值 var oldValue = variable.DataValue; var newValue = opcUaNode.Value.ToString(); - + // 更新变量值 variable.DataValue = newValue; variable.DisplayValue = newValue; variable.UpdatedAt = DateTime.Now; - + _logger.LogDebug($"节点:{variable.OpcUaNodeId}值发生了变化:{newValue}"); // 触发变量值变更事件 @@ -309,8 +336,8 @@ namespace DMS.Infrastructure.Services oldValue, newValue, variable.UpdatedAt); - - _appDataCenterService.VariableManagementService.VariableValueChanged( eventArgs); + + _appDataCenterService.VariableManagementService.VariableValueChanged(eventArgs); // 推送到数据处理队列 await _dataProcessingService.EnqueueAsync(variable); @@ -341,7 +368,7 @@ namespace DMS.Infrastructure.Services public async Task ConnectDevicesAsync(IEnumerable deviceIds, CancellationToken cancellationToken = default) { var connectTasks = new List(); - + foreach (var deviceId in deviceIds) { if (_deviceContexts.TryGetValue(deviceId, out var context)) @@ -349,7 +376,7 @@ namespace DMS.Infrastructure.Services connectTasks.Add(ConnectDeviceAsync(context, cancellationToken)); } } - + await Task.WhenAll(connectTasks); } @@ -367,15 +394,16 @@ namespace DMS.Infrastructure.Services /// /// 批量断开设备连接 /// - public async Task DisconnectDevicesAsync(IEnumerable deviceIds, CancellationToken cancellationToken = default) + public async Task DisconnectDevicesAsync(IEnumerable deviceIds, + CancellationToken cancellationToken = default) { var disconnectTasks = new List(); - + foreach (var deviceId in deviceIds) { disconnectTasks.Add(DisconnectDeviceAsync(deviceId, cancellationToken)); } - + await Task.WhenAll(disconnectTasks); } @@ -399,11 +427,12 @@ namespace DMS.Infrastructure.Services // 断开所有设备连接 var deviceIds = _deviceContexts.Keys.ToList(); - DisconnectDevicesAsync(deviceIds).Wait(TimeSpan.FromSeconds(10)); + DisconnectDevicesAsync(deviceIds) + .Wait(TimeSpan.FromSeconds(10)); // 释放其他资源 _semaphore?.Dispose(); - + _disposed = true; _logger.LogInformation("OPC UA服务管理器资源已释放"); } diff --git a/DMS.Infrastructure/Services/OptimizedOpcUaBackgroundService.cs b/DMS.Infrastructure/Services/OptimizedOpcUaBackgroundService.cs index d075370..49454f5 100644 --- a/DMS.Infrastructure/Services/OptimizedOpcUaBackgroundService.cs +++ b/DMS.Infrastructure/Services/OptimizedOpcUaBackgroundService.cs @@ -5,11 +5,6 @@ using DMS.Core.Enums; using DMS.Infrastructure.Interfaces.Services; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; namespace DMS.Infrastructure.Services { @@ -20,6 +15,7 @@ namespace DMS.Infrastructure.Services { private readonly IAppDataCenterService _appDataCenterService; private readonly IAppDataStorageService _appDataStorageService; + private readonly IEventService _eventService; private readonly IOpcUaServiceManager _opcUaServiceManager; private readonly ILogger _logger; private readonly SemaphoreSlim _reloadSemaphore = new SemaphoreSlim(0); @@ -38,6 +34,8 @@ namespace DMS.Infrastructure.Services _appDataCenterService.DataLoaderService.OnLoadDataCompleted += OnLoadDataCompleted; } + + private void OnLoadDataCompleted(object sender, DataLoadCompletedEventArgs e) { _logger.LogInformation("收到数据加载完成通知,触发OPC UA服务重新加载"); @@ -97,10 +95,10 @@ namespace DMS.Infrastructure.Services { // 获取所有活动的OPC UA设备 var opcUaDevices = _appDataStorageService.Devices.Values - .Where(d => d.Protocol == ProtocolType.OpcUa && d.IsActive) + .Where(d => d.Protocol == ProtocolType.OpcUa ) .ToList(); - _logger.LogInformation("找到 {DeviceCount} 个活动的OPC UA设备", opcUaDevices.Count); + _logger.LogInformation("找到 {DeviceCount} 个OPC UA设备", opcUaDevices.Count); if (opcUaDevices.Count == 0) return; diff --git a/DMS.WPF/ViewModels/Items/DeviceItemViewModel.cs b/DMS.WPF/ViewModels/Items/DeviceItemViewModel.cs index 4818ba4..9feee62 100644 --- a/DMS.WPF/ViewModels/Items/DeviceItemViewModel.cs +++ b/DMS.WPF/ViewModels/Items/DeviceItemViewModel.cs @@ -87,7 +87,7 @@ public partial class DeviceItemViewModel : ObservableObject if (Id > 0 && EventService != null ) { // 发布设备状态改变事件 - EventService.RaiseDeviceStatusChanged(this, new DeviceActiveChangedEventArgs(Id, Name, oldValue, newValue)); + EventService.RaiseDeviceActiveChanged(this, new DeviceActiveChangedEventArgs(Id, Name, oldValue, newValue)); } }