diff --git a/DMS.Application/DMS.Application.csproj b/DMS.Application/DMS.Application.csproj
index c41d6a0..bc490e4 100644
--- a/DMS.Application/DMS.Application.csproj
+++ b/DMS.Application/DMS.Application.csproj
@@ -15,6 +15,8 @@
+
+
diff --git a/DMS.Application/Interfaces/IDataProcessingService.cs b/DMS.Application/Interfaces/IDataProcessingService.cs
index a0fbfbb..06d21ff 100644
--- a/DMS.Application/Interfaces/IDataProcessingService.cs
+++ b/DMS.Application/Interfaces/IDataProcessingService.cs
@@ -1,9 +1,25 @@
using DMS.Core.Models;
using System.Threading.Tasks;
+using DMS.Application.DTOs;
namespace DMS.Application.Interfaces;
+//
+/// 定义了数据处理服务的接口。
+/// 该服务负责管理数据处理队列和处理器链。
+///
public interface IDataProcessingService
{
- Task EnqueueAsync(Variable variable);
+ ///
+ /// 向处理链中添加一个数据处理器。
+ ///
+ /// 要添加的数据处理器实例。
+ void AddProcessor(IVariableProcessor processor);
+
+ ///
+ /// 将一个变量数据项异步推入处理队列。
+ ///
+ /// 要入队的变量数据。
+ /// 一个表示入队操作的 ValueTask。
+ ValueTask EnqueueAsync(VariableDto data);
}
\ No newline at end of file
diff --git a/DMS.WPF/Interfaces/IVariableProcessor.cs b/DMS.Application/Interfaces/IVariableProcessor.cs
similarity index 87%
rename from DMS.WPF/Interfaces/IVariableProcessor.cs
rename to DMS.Application/Interfaces/IVariableProcessor.cs
index 25f2150..940ba15 100644
--- a/DMS.WPF/Interfaces/IVariableProcessor.cs
+++ b/DMS.Application/Interfaces/IVariableProcessor.cs
@@ -1,6 +1,7 @@
+using DMS.Application.Models;
using DMS.Core.Models;
-namespace DMS.WPF.Interfaces;
+namespace DMS.Application.Interfaces;
///
/// 定义了变量数据处理器的通用接口。
diff --git a/DMS.Core/Models/VariableContext.cs b/DMS.Application/Models/VariableContext.cs
similarity index 57%
rename from DMS.Core/Models/VariableContext.cs
rename to DMS.Application/Models/VariableContext.cs
index 4ffa76f..1efe679 100644
--- a/DMS.Core/Models/VariableContext.cs
+++ b/DMS.Application/Models/VariableContext.cs
@@ -1,13 +1,14 @@
+using DMS.Application.DTOs;
using DMS.Core.Models;
-namespace DMS.Core.Models
+namespace DMS.Application.Models
{
public class VariableContext
{
- public Variable Data { get; set; }
+ public VariableDto Data { get; set; }
public bool IsHandled { get; set; }
- public VariableContext(Variable data)
+ public VariableContext(VariableDto data)
{
Data = data;
IsHandled = false; // 默认未处理
diff --git a/DMS.WPF/Services/Processors/CheckValueChangedProcessor.cs b/DMS.Application/Services/Processors/CheckValueChangedProcessor.cs
similarity index 73%
rename from DMS.WPF/Services/Processors/CheckValueChangedProcessor.cs
rename to DMS.Application/Services/Processors/CheckValueChangedProcessor.cs
index 246e204..7eb8111 100644
--- a/DMS.WPF/Services/Processors/CheckValueChangedProcessor.cs
+++ b/DMS.Application/Services/Processors/CheckValueChangedProcessor.cs
@@ -1,22 +1,19 @@
using DMS.Core.Helper;
using DMS.Core.Models;
-using DMS.Helper;
-using DMS.WPF.Interfaces;
-using DMS.WPF.Services;
+using DMS.Application.Interfaces;
+using DMS.Application.Models;
-namespace DMS.Services.Processors;
+namespace DMS.Application.Services.Processors;
public class CheckValueChangedProcessor : IVariableProcessor
{
- private readonly DataServices _dataServices;
- public CheckValueChangedProcessor(DataServices dataServices)
+ public CheckValueChangedProcessor()
{
- _dataServices = dataServices;
}
public Task ProcessAsync(VariableContext context)
{
- Variable newVariable = context.Data;
+ // Variable newVariable = context.Data;
// if (!_dataServices.AllVariables.TryGetValue(newVariable.Id, out Variable oldVariable))
// {
// NlogHelper.Warn($"检查变量值是否改变时在_dataServices.AllVariables中找不到Id:{newVariable.Id},Name:{newVariable.Name}的变量。");
diff --git a/DMS.WPF/Services/DataProcessingService.cs b/DMS.Application/Services/Processors/DataProcessingService.cs
similarity index 92%
rename from DMS.WPF/Services/DataProcessingService.cs
rename to DMS.Application/Services/Processors/DataProcessingService.cs
index bf64f7b..7039ec4 100644
--- a/DMS.WPF/Services/DataProcessingService.cs
+++ b/DMS.Application/Services/Processors/DataProcessingService.cs
@@ -1,10 +1,14 @@
using System.Threading.Channels;
+using DMS.Application.DTOs;
+using DMS.Application.Interfaces;
+using DMS.Application.Models;
using DMS.Core.Helper;
+using DMS.Core.Interfaces;
using DMS.Core.Models;
-using DMS.WPF.Interfaces;
using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
-namespace DMS.Services;
+namespace DMS.Application.Services.Processors;
///
/// 核心数据处理服务,作为后台服务运行。
@@ -43,7 +47,7 @@ public class DataProcessingService : BackgroundService, IDataProcessingService
/// 将一个变量数据项异步推入处理队列。
///
/// 要入队的变量数据。
- public async ValueTask EnqueueAsync(Variable data)
+ public async ValueTask EnqueueAsync(VariableDto data)
{
if (data == null)
{
@@ -96,4 +100,4 @@ public class DataProcessingService : BackgroundService, IDataProcessingService
NlogHelper.Info("数据处理服务已停止。");
}
-}
\ No newline at end of file
+}
\ No newline at end of file
diff --git a/DMS.WPF/Services/Processors/HistoryProcessor.cs b/DMS.Application/Services/Processors/HistoryProcessor.cs
similarity index 95%
rename from DMS.WPF/Services/Processors/HistoryProcessor.cs
rename to DMS.Application/Services/Processors/HistoryProcessor.cs
index d811867..fdf4c13 100644
--- a/DMS.WPF/Services/Processors/HistoryProcessor.cs
+++ b/DMS.Application/Services/Processors/HistoryProcessor.cs
@@ -1,10 +1,9 @@
+using DMS.Application.Interfaces;
+using DMS.Application.Models;
using DMS.Core.Helper;
using DMS.Core.Models;
-using DMS.Services;
-using DMS.WPF.Interfaces;
-using Microsoft.Extensions.Logging;
-namespace DMS.WPF.Services.Processors;
+namespace DMS.Application.Services.Processors;
public class HistoryProcessor : IVariableProcessor, IDisposable
{
diff --git a/DMS.WPF/Services/Processors/LoggingProcessor.cs b/DMS.Application/Services/Processors/LoggingProcessor.cs
similarity index 73%
rename from DMS.WPF/Services/Processors/LoggingProcessor.cs
rename to DMS.Application/Services/Processors/LoggingProcessor.cs
index 8327246..c2c7725 100644
--- a/DMS.WPF/Services/Processors/LoggingProcessor.cs
+++ b/DMS.Application/Services/Processors/LoggingProcessor.cs
@@ -1,10 +1,8 @@
-using System.Threading.Tasks;
-using DMS.Core.Models;
-using Microsoft.Extensions.Logging;
-using DMS.Helper;
-using DMS.WPF.Interfaces;
+using DMS.Application.Interfaces;
+using DMS.Application.Models;
-namespace DMS.Services.Processors;
+
+namespace DMS.Application.Services.Processors;
///
/// 一个简单的数据处理器实现,用于演示。
diff --git a/DMS.Application/Services/Processors/MqttPublishProcessor.cs b/DMS.Application/Services/Processors/MqttPublishProcessor.cs
new file mode 100644
index 0000000..0516302
--- /dev/null
+++ b/DMS.Application/Services/Processors/MqttPublishProcessor.cs
@@ -0,0 +1,40 @@
+using System.Threading.Tasks;
+using DMS.Application.Interfaces;
+using DMS.Application.Models;
+using DMS.Core.Models;
+
+namespace DMS.Application.Services.Processors;
+
+///
+/// 负责将变量数据发布到MQTT的处理器。
+///
+public class MqttPublishProcessor : IVariableProcessor
+{
+ // private readonly MqttBackgroundService _mqttBackgroundService;
+ //
+ // public MqttPublishProcessor(MqttBackgroundService mqttBackgroundService)
+ // {
+ // _mqttBackgroundService = mqttBackgroundService;
+ // }
+
+ ///
+ /// 处理单个变量上下文,如果有关联的MQTT配置,则将其推送到发送队列。
+ ///
+ /// 包含变量及其元数据的上下文对象。
+ public async Task ProcessAsync(VariableContext context)
+ {
+ // var variable = context.Data;
+ // if (variable?.VariableMqtts == null || variable.VariableMqtts.Count == 0)
+ // {
+ // return; // 没有关联的MQTT配置,直接返回
+ // }
+ //
+ // // 遍历所有关联的MQTT配置,并将其推入发送队列
+ // foreach (var variableMqtt in variable.VariableMqtts)
+ // {
+ // // 确保VariableMqtt对象中包含了最新的Variable数据
+ // variableMqtt.Variable = variable;
+ // await _mqttBackgroundService.SendVariableAsync(variableMqtt);
+ // }
+ }
+}
\ No newline at end of file
diff --git a/DMS.Application/Services/Processors/UpdateDbVariableProcessor.cs b/DMS.Application/Services/Processors/UpdateDbVariableProcessor.cs
new file mode 100644
index 0000000..c79b566
--- /dev/null
+++ b/DMS.Application/Services/Processors/UpdateDbVariableProcessor.cs
@@ -0,0 +1,36 @@
+using System.Threading.Tasks;
+using DMS.Application.Interfaces;
+using DMS.Application.Models;
+using DMS.Core.Helper;
+using DMS.Core.Models;
+
+namespace DMS.Application.Services.Processors;
+
+public class UpdateDbVariableProcessor : IVariableProcessor
+{
+
+ public UpdateDbVariableProcessor()
+ {
+ }
+
+ public async Task ProcessAsync(VariableContext context)
+ {
+ try
+ {
+ // 假设 DataServices 有一个方法来更新 Variable
+ // await _dataServices.UpdateVariableAsync(context.Data);
+ // NlogHelper.Info($"数据库变量 {context.Data.Name} 更新成功,值为: {context.Data.DataValue}");
+
+ // if (!_dataServices.AllVariables.TryGetValue(context.Data.Id, out Variable oldVariable))
+ // {
+ // NlogHelper.Warn($"数据库更新完成修改变量值是否改变时在_dataServices.AllVariables中找不到Id:{context.Data.Id},Name:{context.Data.Name}的变量。");
+ // context.IsHandled = true;
+ // }
+ // oldVariable.DataValue = context.Data.DataValue;
+ }
+ catch (Exception ex)
+ {
+ NlogHelper.Error($"更新数据库变量 {context.Data.Name} 失败: {ex.Message}", ex);
+ }
+ }
+}
\ No newline at end of file
diff --git a/DMS.Infrastructure/Services/DataProcessingService.cs b/DMS.Infrastructure/Services/DataProcessingService.cs
deleted file mode 100644
index b0c26d8..0000000
--- a/DMS.Infrastructure/Services/DataProcessingService.cs
+++ /dev/null
@@ -1,34 +0,0 @@
-using DMS.Application.Interfaces;
-using DMS.Core.Interfaces;
-using DMS.Core.Models;
-using Microsoft.Extensions.Logging;
-using System.Threading.Tasks;
-
-namespace DMS.Infrastructure.Services;
-
-public class DataProcessingService : IDataProcessingService
-{
- private readonly ILogger _logger;
- private readonly IRepositoryManager _repositoryManager;
-
- public DataProcessingService(ILogger logger, IRepositoryManager repositoryManager)
- {
- _logger = logger;
- _repositoryManager = repositoryManager;
- }
-
- public async Task EnqueueAsync(Variable variable)
- {
- _logger.LogInformation($"Processing variable: {variable.Name}, Value: {variable.DataValue}");
-
- // 这里可以添加将变量数据保存到数据库的逻辑
- // 例如:保存到 VariableHistory 表
- var history = new VariableHistory
- {
- VariableId = variable.Id,
- Value = variable.DataValue.ToString(),
- Timestamp = System.DateTime.Now
- };
- await _repositoryManager.VariableHistories.AddAsync(history);
- }
-}
\ No newline at end of file
diff --git a/DMS.Infrastructure/Services/OpcUaBackgroundService.cs b/DMS.Infrastructure/Services/OpcUaBackgroundService.cs
index 3a7b173..1126b01 100644
--- a/DMS.Infrastructure/Services/OpcUaBackgroundService.cs
+++ b/DMS.Infrastructure/Services/OpcUaBackgroundService.cs
@@ -17,6 +17,7 @@ namespace DMS.Infrastructure.Services;
public class OpcUaBackgroundService : BackgroundService
{
private readonly IDataCenterService _dataCenterService;
+ private readonly IDataProcessingService _dataProcessingService;
// private readonly IDataProcessingService _dataProcessingService;
private readonly ILogger _logger;
@@ -31,7 +32,7 @@ public class OpcUaBackgroundService : BackgroundService
private readonly ConcurrentDictionary _opcUaSubscriptions;
// 存储活动的 OPC UA 变量,键为变量的OpcNodeId
- private readonly ConcurrentDictionary _opcUaPollVariablesByNodeId;
+ private readonly ConcurrentDictionary _opcUaVariables;
// 储存所有要轮询更新的变量,键是Device.Id,值是这个设备所有要轮询的变量
private readonly ConcurrentDictionary> _opcUaPollVariablesByDeviceId;
@@ -72,15 +73,14 @@ public class OpcUaBackgroundService : BackgroundService
{ PollLevelType.ThirtyMinutes, TimeSpan.FromMilliseconds((int)PollLevelType.ThirtyMinutes) }
};
- public OpcUaBackgroundService(IDataCenterService dataCenterService,
- ILogger logger)
+ public OpcUaBackgroundService(IDataCenterService dataCenterService,IDataProcessingService dataProcessingService, ILogger logger)
{
_dataCenterService = dataCenterService;
- // _dataProcessingService = dataProcessingService;
+ _dataProcessingService = dataProcessingService;
_logger = logger;
_opcUaServices = new ConcurrentDictionary();
_opcUaSubscriptions = new ConcurrentDictionary();
- _opcUaPollVariablesByNodeId = new ConcurrentDictionary();
+ _opcUaVariables = new ConcurrentDictionary();
_opcUaPollVariablesByDeviceId = new ConcurrentDictionary>();
_opcUaVariablesByDeviceId = new ConcurrentDictionary>();
@@ -123,12 +123,6 @@ public class OpcUaBackgroundService : BackgroundService
await SetupOpcUaSubscriptionAsync(stoppingToken);
_logger.LogInformation("OPC UA 后台服务已启动。");
- // // 持续轮询,直到取消请求或需要重新加载
- // while (!stoppingToken.IsCancellationRequested && _reloadSemaphore.CurrentCount == 0)
- // {
- // await PollOpcUaVariableOnceAsync(stoppingToken);
- // await Task.Delay(_opcUaPollIntervalMs, stoppingToken);
- // }
}
}
catch (OperationCanceledException)
@@ -146,44 +140,6 @@ public class OpcUaBackgroundService : BackgroundService
}
- // private async void HandleDeviceIsActiveChanged(Device device, bool isActive)
- // {
- // if (device.Protocol != ProtocolType.OpcUa)
- // return;
- //
- // _logger.LogInformation($"设备 {device.Name} (ID: {device.Id}) 的IsActive状态改变为 {isActive}。");
- //
- // if (!isActive)
- // {
- // // 设备变为非活动状态,断开连接
- // if (_opcUaServices.TryRemove(device.OpcUaServerUrl, out var session))
- // {
- // try
- // {
- // if (_opcUaSubscriptions.TryRemove(device.OpcUaServerUrl, out var subscription))
- // {
- // // 删除订阅。
- // await subscription.DeleteAsync(true);
- // _logger.LogInformation($"已删除设备 {device.Name} ({device.OpcUaServerUrl}) 的订阅。");
- // }
- //
- // if (session.Connected)
- // {
- // await session.CloseAsync();
- // _logger.LogInformation($"已断开设备 {device.Name} ({device.OpcUaServerUrl}) 的连接。");
- // }
- // }
- // catch (Exception ex)
- // {
- // _logger.LogError(ex, $"断开设备 {device.Name} ({device.OpcUaServerUrl}) 连接时发生错误:{ex.Message}");
- // }
- // }
- // }
- //
- // // 触发重新加载,让LoadVariables和ConnectOpcUaServiceAsync处理设备列表的更新
- // _reloadSemaphore.Release();
- // }
-
///
/// 从数据库加载所有活动的 OPC UA 变量,并进行相应的连接和订阅管理。
///
@@ -194,7 +150,7 @@ public class OpcUaBackgroundService : BackgroundService
_opcUaDevices.Clear();
_opcUaPollVariablesByDeviceId.Clear();
_opcUaVariablesByDeviceId.Clear();
- _opcUaPollVariablesByNodeId.Clear();
+ _opcUaVariables.Clear();
_logger.LogInformation("开始加载OPC UA变量....");
var opcUaDevices = _dataCenterService
@@ -210,6 +166,11 @@ public class OpcUaBackgroundService : BackgroundService
.Where(vd => vd.IsActive == true &&
vd.Protocol == ProtocolType.OpcUa)
.ToList();
+ foreach (var variableDto in variableDtos)
+ {
+ _opcUaVariables.TryAdd(variableDto.OpcUaNodeId,variableDto);
+ }
+
totalVariableCount += variableDtos.Count;
_opcUaVariablesByDeviceId.AddOrUpdate(opcUaDevice.Id, variableDtos, (key, oldValue) => variableDtos);
}
@@ -283,128 +244,55 @@ public class OpcUaBackgroundService : BackgroundService
}
}
- // private async Task PollOpcUaVariableOnceAsync(CancellationToken stoppingToken)
+ // ///
+ // /// 读取单个 OPC UA 变量并处理其数据。
+ // ///
+ // /// OPC UA 会话。
+ // /// 要读取的变量。
+ // /// 取消令牌。
+ // private async Task ReadAndProcessOpcUaVariableAsync(Session session, Variable variable,
+ // CancellationToken stoppingToken)
// {
+ // var nodesToRead = new ReadValueIdCollection
+ // {
+ // new ReadValueId
+ // {
+ // NodeId = new NodeId(variable.OpcUaNodeId),
+ // AttributeId = Attributes.Value
+ // }
+ // };
+ //
// try
// {
- // var deviceIdsToPoll = _opcUaPollVariablesByDeviceId.Keys.ToList();
+ // var readResponse = await session.ReadAsync(null, 0, TimestampsToReturn.Both, nodesToRead, stoppingToken);
+ // var result = readResponse.Results?.FirstOrDefault();
+ // if (result == null) return;
//
- // var pollingTasks = deviceIdsToPoll
- // .Select(deviceId => PollSingleDeviceVariablesAsync(deviceId, stoppingToken))
- // .ToList();
+ // if (!StatusCode.IsGood(result.StatusCode))
+ // {
+ // _logger.LogWarning($"读取 OPC UA 变量 {variable.Name} ({variable.OpcUaNodeId}) 失败: {result.StatusCode}");
+ // return;
+ // }
//
- // await Task.WhenAll(pollingTasks);
+ // await UpdateAndEnqueueVariable(variable, result.Value);
// }
- // catch (OperationCanceledException)
+ // catch (ServiceResultException ex) when (ex.StatusCode == StatusCodes.BadSessionIdInvalid)
// {
- // _logger.LogInformation("OPC UA 后台服务轮询变量被取消。");
+ // _logger.LogError(ex, $"OPC UA会话ID无效,变量: {variable.Name} ({variable.OpcUaNodeId})。正在尝试重新连接...");
+ //
// }
// catch (Exception ex)
// {
- // _logger.LogError(ex, $"OPC UA 后台服务在轮询变量过程中发生错误:{ex.Message}");
+ // _logger.LogError(ex, $"轮询OPC UA变量 {variable.Name} ({variable.OpcUaNodeId}) 时发生未知错误: {ex.Message}");
// }
// }
- // ///
- // /// 轮询单个设备的所有 OPC UA 变量。
- // ///
- // /// 设备的 ID。
- // /// 取消令牌。
- // private async Task PollSingleDeviceVariablesAsync(int deviceId, CancellationToken stoppingToken)
- // {
- // if (stoppingToken.IsCancellationRequested) return;
- //
- // if (!_opcUaDevices.TryGetValue(deviceId, out var device) || device.OpcUaServerUrl == null)
- // {
- // _logger.LogWarning($"OpcUa轮询变量时,在deviceDic中未找到ID为 {deviceId} 的设备,或其服务器地址为空,请检查!");
- // return;
- // }
- //
- // if (!device.IsActive) return;
- //
- // if (!_opcUaServices.TryGetValue(device.OpcUaServerUrl, out var session) || !session.Connected)
- // {
- // if (device.IsActive)
- // {
- // _logger.LogWarning($"用于 {device.OpcUaServerUrl} 的 OPC UA 会话未连接。正在尝试重新连接...");
- // // await ConnectSingleOpcUaDeviceAsync(device, stoppingToken);
- // }
- //
- // return;
- // }
- //
- // if (!_opcUaPollVariablesByDeviceId.TryGetValue(deviceId, out var variableList) || variableList.Count == 0)
- // {
- // return;
- // }
- //
- // foreach (var variable in variableList)
- // {
- // if (stoppingToken.IsCancellationRequested) return;
- //
- // if (!PollingIntervals.TryGetValue(variable.PollLevel, out var interval) ||
- // (DateTime.Now - variable.UpdatedAt) < interval)
- // {
- // continue;
- // }
- //
- // await ReadAndProcessOpcUaVariableAsync(session, variable, stoppingToken);
- // }
- // }
-
- ///
- /// 读取单个 OPC UA 变量并处理其数据。
- ///
- /// OPC UA 会话。
- /// 要读取的变量。
- /// 取消令牌。
- private async Task ReadAndProcessOpcUaVariableAsync(Session session, Variable variable,
- CancellationToken stoppingToken)
- {
- var nodesToRead = new ReadValueIdCollection
- {
- new ReadValueId
- {
- NodeId = new NodeId(variable.OpcUaNodeId),
- AttributeId = Attributes.Value
- }
- };
-
- try
- {
- var readResponse = await session.ReadAsync(null, 0, TimestampsToReturn.Both, nodesToRead, stoppingToken);
- var result = readResponse.Results?.FirstOrDefault();
- if (result == null) return;
-
- if (!StatusCode.IsGood(result.StatusCode))
- {
- _logger.LogWarning($"读取 OPC UA 变量 {variable.Name} ({variable.OpcUaNodeId}) 失败: {result.StatusCode}");
- return;
- }
-
- await UpdateAndEnqueueVariable(variable, result.Value);
- }
- catch (ServiceResultException ex) when (ex.StatusCode == StatusCodes.BadSessionIdInvalid)
- {
- _logger.LogError(ex, $"OPC UA会话ID无效,变量: {variable.Name} ({variable.OpcUaNodeId})。正在尝试重新连接...");
- // Assuming device can be retrieved from variable or passed as parameter if needed for ConnectSingleOpcUaDeviceAsync
- // For now, I'll just log and let the outer loop handle reconnection if the session is truly invalid for the device.
- // If a full device object is needed here, it would need to be passed down from PollSingleDeviceVariablesAsync.
- // For simplicity, I'll remove the direct reconnection attempt here and rely on the outer loop.
- // await ConnectSingleOpcUaDeviceAsync(variable.VariableTable.Device, stoppingToken);
- }
- catch (Exception ex)
- {
- _logger.LogError(ex, $"轮询OPC UA变量 {variable.Name} ({variable.OpcUaNodeId}) 时发生未知错误: {ex.Message}");
- }
- }
-
///
/// 更新变量数据,并将其推送到数据处理队列。
///
/// 要更新的变量。
/// 读取到的数据值。
- private async Task UpdateAndEnqueueVariable(Variable variable, object value)
+ private async Task UpdateAndEnqueueVariable(VariableDto variable, object value)
{
try
{
@@ -414,7 +302,7 @@ public class OpcUaBackgroundService : BackgroundService
variable.UpdatedAt = DateTime.Now;
// Console.WriteLine($"OpcUa后台服务轮询变量:{variable.Name},值:{variable.DataValue}");
// 将更新后的数据推入处理队列。
- // await _dataProcessingService.EnqueueAsync(variable);
+ await _dataProcessingService.EnqueueAsync(variable);
}
catch (Exception ex)
{
@@ -442,37 +330,27 @@ public class OpcUaBackgroundService : BackgroundService
var opcUaNodes
= vGroup.Select(variableDto => new OpcUaNode() { NodeId = variableDto.OpcUaNodeId })
.ToList();
- opcUaService.SubscribeToNode(opcUaNodes,HandleDataChanged);
+
+ PollingIntervals.TryGetValue(pollLevelType, out var pollLevel);
+ opcUaService.SubscribeToNode(opcUaNodes,HandleDataChanged,10000,1000);
}
}
}
}
- private void HandleDataChanged(OpcUaNode opcUaNode)
+ private async void HandleDataChanged(OpcUaNode opcUaNode)
{
-
- }
-
- ///
- /// 订阅变量变化的通知
- ///
- /// 发生变化的变量。
- ///
- ///
- private async void OnSubNotification(Variable variable, MonitoredItem monitoredItem,
- MonitoredItemNotificationEventArgs e)
- {
- foreach (var value in monitoredItem.DequeueValues())
+ if (_opcUaVariables.TryGetValue(opcUaNode.NodeId.ToString(), out var variabelDto))
{
- _logger.LogInformation(
- $"[OPC UA 通知] {monitoredItem.DisplayName}: {value.Value} | 时间戳: {value.SourceTimestamp.ToLocalTime()} | 状态: {value.StatusCode}");
- if (StatusCode.IsGood(value.StatusCode))
+ if (opcUaNode.Value == null)
{
- await UpdateAndEnqueueVariable(variable, value.Value);
+ return;
}
+ await UpdateAndEnqueueVariable(variabelDto, opcUaNode.Value);
}
}
+
///
/// 断开所有 OPC UA 会话。
///
diff --git a/DMS.Infrastructure/Services/S7BackgroundService.cs b/DMS.Infrastructure/Services/S7BackgroundService.cs
index f193557..f779b45 100644
--- a/DMS.Infrastructure/Services/S7BackgroundService.cs
+++ b/DMS.Infrastructure/Services/S7BackgroundService.cs
@@ -295,7 +295,7 @@ public class S7BackgroundService : BackgroundService
variable.UpdatedAt = DateTime.Now;
// Console.WriteLine($"S7后台服务轮询变量:{variable.Name},值:{variable.DataValue}");
// 将更新后的数据推入处理队列。
- await _dataProcessingService.EnqueueAsync(variable);
+ // await _dataProcessingService.EnqueueAsync(variable);
}
catch (Exception ex)
{
diff --git a/DMS.WPF/App.xaml.cs b/DMS.WPF/App.xaml.cs
index ffee7b8..1dcd44a 100644
--- a/DMS.WPF/App.xaml.cs
+++ b/DMS.WPF/App.xaml.cs
@@ -2,11 +2,11 @@
using AutoMapper.Internal;
using DMS.Application.Interfaces;
using DMS.Application.Services;
+using DMS.Application.Services.Processors;
using DMS.Core.Enums;
using DMS.Core.Interfaces;
using DMS.Core.Interfaces.Repositories;
using DMS.Helper;
-using DMS.Services.Processors;
using DMS.WPF.ViewModels;
using DMS.WPF.Views;
using iNKORE.UI.WPF.Modern.Common.IconKeys;
@@ -21,10 +21,7 @@ using DMS.Infrastructure.Services;
using Microsoft.Extensions.Hosting;
using DMS.WPF.Helper;
using DMS.WPF.Services;
-using DMS.WPF.Services.Processors;
using DMS.WPF.ViewModels.Dialogs;
-using DataProcessingService = DMS.Services.DataProcessingService;
-using IDataProcessingService = DMS.WPF.Interfaces.IDataProcessingService;
using LogLevel = Microsoft.Extensions.Logging.LogLevel;
using DMS.Core.Interfaces.Services;
using DMS.Infrastructure.Interfaces.Services;
diff --git a/DMS.WPF/DMS.WPF.csproj b/DMS.WPF/DMS.WPF.csproj
index 3833ec2..5f00591 100644
--- a/DMS.WPF/DMS.WPF.csproj
+++ b/DMS.WPF/DMS.WPF.csproj
@@ -154,7 +154,6 @@
-
diff --git a/DMS.WPF/Interfaces/IDataProcessingService.cs b/DMS.WPF/Interfaces/IDataProcessingService.cs
deleted file mode 100644
index 8e72677..0000000
--- a/DMS.WPF/Interfaces/IDataProcessingService.cs
+++ /dev/null
@@ -1,24 +0,0 @@
-using DMS.Core.Models;
-using DMS.Services;
-
-namespace DMS.WPF.Interfaces;
-
-///
-/// 定义了数据处理服务的接口。
-/// 该服务负责管理数据处理队列和处理器链。
-///
-public interface IDataProcessingService
-{
- ///
- /// 向处理链中添加一个数据处理器。
- ///
- /// 要添加的数据处理器实例。
- void AddProcessor(IVariableProcessor processor);
-
- ///
- /// 将一个变量数据项异步推入处理队列。
- ///
- /// 要入队的变量数据。
- /// 一个表示入队操作的 ValueTask。
- ValueTask EnqueueAsync(Variable data);
-}
diff --git a/DMS.WPF/Services/Processors/MqttPublishProcessor.cs b/DMS.WPF/Services/Processors/MqttPublishProcessor.cs
deleted file mode 100644
index 66c8dee..0000000
--- a/DMS.WPF/Services/Processors/MqttPublishProcessor.cs
+++ /dev/null
@@ -1,40 +0,0 @@
-using System.Threading.Tasks;
-using DMS.Core.Models;
-using DMS.WPF.Interfaces;
-
-namespace DMS.Services.Processors
-{
- ///
- /// 负责将变量数据发布到MQTT的处理器。
- ///
- public class MqttPublishProcessor : IVariableProcessor
- {
- // private readonly MqttBackgroundService _mqttBackgroundService;
- //
- // public MqttPublishProcessor(MqttBackgroundService mqttBackgroundService)
- // {
- // _mqttBackgroundService = mqttBackgroundService;
- // }
-
- ///
- /// 处理单个变量上下文,如果有关联的MQTT配置,则将其推送到发送队列。
- ///
- /// 包含变量及其元数据的上下文对象。
- public async Task ProcessAsync(VariableContext context)
- {
- // var variable = context.Data;
- // if (variable?.VariableMqtts == null || variable.VariableMqtts.Count == 0)
- // {
- // return; // 没有关联的MQTT配置,直接返回
- // }
- //
- // // 遍历所有关联的MQTT配置,并将其推入发送队列
- // foreach (var variableMqtt in variable.VariableMqtts)
- // {
- // // 确保VariableMqtt对象中包含了最新的Variable数据
- // variableMqtt.Variable = variable;
- // await _mqttBackgroundService.SendVariableAsync(variableMqtt);
- // }
- }
- }
-}
diff --git a/DMS.WPF/Services/Processors/UpdateDbVariableProcessor.cs b/DMS.WPF/Services/Processors/UpdateDbVariableProcessor.cs
deleted file mode 100644
index 18484c5..0000000
--- a/DMS.WPF/Services/Processors/UpdateDbVariableProcessor.cs
+++ /dev/null
@@ -1,40 +0,0 @@
-using System.Threading.Tasks;
-using DMS.Core.Helper;
-using DMS.Core.Models;
-using DMS.Helper;
-using DMS.WPF.Interfaces;
-using DMS.WPF.Services;
-
-namespace DMS.Services.Processors
-{
- public class UpdateDbVariableProcessor : IVariableProcessor
- {
- private readonly DataServices _dataServices;
-
- public UpdateDbVariableProcessor(DataServices dataServices)
- {
- _dataServices = dataServices;
- }
-
- public async Task ProcessAsync(VariableContext context)
- {
- try
- {
- // 假设 DataServices 有一个方法来更新 Variable
- // await _dataServices.UpdateVariableAsync(context.Data);
- // NlogHelper.Info($"数据库变量 {context.Data.Name} 更新成功,值为: {context.Data.DataValue}");
-
- // if (!_dataServices.AllVariables.TryGetValue(context.Data.Id, out Variable oldVariable))
- // {
- // NlogHelper.Warn($"数据库更新完成修改变量值是否改变时在_dataServices.AllVariables中找不到Id:{context.Data.Id},Name:{context.Data.Name}的变量。");
- // context.IsHandled = true;
- // }
- // oldVariable.DataValue = context.Data.DataValue;
- }
- catch (Exception ex)
- {
- NlogHelper.Error($"更新数据库变量 {context.Data.Name} 失败: {ex.Message}", ex);
- }
- }
- }
-}
\ No newline at end of file