将所有的VariableData改为Variable,将DataVariables改为Variables,修复了变量添加MQTT服务器后,服务器列表不更新的问题
This commit is contained in:
@@ -17,10 +17,10 @@ namespace PMSWPF.Services;
|
||||
public class DataProcessingService : BackgroundService, IDataProcessingService
|
||||
{
|
||||
// 使用 Channel 作为高性能的生产者/消费者队列
|
||||
private readonly Channel<VariableDataContext> _queue;
|
||||
private readonly Channel<VariableContext> _queue;
|
||||
|
||||
// 存储数据处理器的链表
|
||||
private readonly List<IVariableDataProcessor> _processors;
|
||||
private readonly List<IVariableProcessor> _processors;
|
||||
|
||||
/// <summary>
|
||||
/// 构造函数,注入日志记录器。
|
||||
@@ -29,8 +29,8 @@ public class DataProcessingService : BackgroundService, IDataProcessingService
|
||||
public DataProcessingService()
|
||||
{
|
||||
// 创建一个无边界的 Channel,允许生产者快速写入而不会被阻塞。
|
||||
_queue = Channel.CreateUnbounded<VariableDataContext>();
|
||||
_processors = new List<IVariableDataProcessor>();
|
||||
_queue = Channel.CreateUnbounded<VariableContext>();
|
||||
_processors = new List<IVariableProcessor>();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@@ -38,7 +38,7 @@ public class DataProcessingService : BackgroundService, IDataProcessingService
|
||||
/// 处理器将按照添加的顺序执行。
|
||||
/// </summary>
|
||||
/// <param name="processor">要添加的数据处理器实例。</param>
|
||||
public void AddProcessor(IVariableDataProcessor processor)
|
||||
public void AddProcessor(IVariableProcessor processor)
|
||||
{
|
||||
_processors.Add(processor);
|
||||
}
|
||||
@@ -47,14 +47,14 @@ public class DataProcessingService : BackgroundService, IDataProcessingService
|
||||
/// 将一个变量数据项异步推入处理队列。
|
||||
/// </summary>
|
||||
/// <param name="data">要入队的变量数据。</param>
|
||||
public async ValueTask EnqueueAsync(VariableData data)
|
||||
public async ValueTask EnqueueAsync(Variable data)
|
||||
{
|
||||
if (data == null)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
var context = new VariableDataContext(data);
|
||||
var context = new VariableContext(data);
|
||||
// 将数据项写入 Channel,供后台服务处理。
|
||||
await _queue.Writer.WriteAsync(context);
|
||||
}
|
||||
|
||||
@@ -31,7 +31,7 @@ public partial class DataServices : ObservableRecipient, IRecipient<LoadMessage>
|
||||
|
||||
// 变量数据列表。
|
||||
[ObservableProperty]
|
||||
private List<VariableData> _variableDatas;
|
||||
private List<Variable> _variables;
|
||||
|
||||
// 菜单树列表。
|
||||
[ObservableProperty]
|
||||
@@ -42,7 +42,7 @@ public partial class DataServices : ObservableRecipient, IRecipient<LoadMessage>
|
||||
private List<Mqtt> _mqtts;
|
||||
|
||||
|
||||
public ConcurrentDictionary<int, VariableData> AllVariables;
|
||||
public ConcurrentDictionary<int, Variable> AllVariables;
|
||||
|
||||
// 设备数据仓库,用于设备数据的CRUD操作。
|
||||
private readonly DeviceRepository _deviceRepository;
|
||||
@@ -97,8 +97,8 @@ public partial class DataServices : ObservableRecipient, IRecipient<LoadMessage>
|
||||
_menuRepository = menuRepository;
|
||||
_mqttRepository = mqttRepository;
|
||||
_varDataRepository = varDataRepository;
|
||||
_variableDatas = new List<VariableData>();
|
||||
AllVariables = new ConcurrentDictionary<int, VariableData>();
|
||||
_variables = new List<Variable>();
|
||||
AllVariables = new ConcurrentDictionary<int, Variable>();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@@ -161,9 +161,9 @@ public partial class DataServices : ObservableRecipient, IRecipient<LoadMessage>
|
||||
}
|
||||
|
||||
var allVar = await _varDataRepository.GetAllAsync();
|
||||
foreach (var variableData in allVar)
|
||||
foreach (var variable in allVar)
|
||||
{
|
||||
AllVariables.AddOrUpdate(variableData.Id, variableData, (key, old) => variableData);
|
||||
AllVariables.AddOrUpdate(variable.Id, variable, (key, old) => variable);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -234,19 +234,19 @@ public partial class DataServices : ObservableRecipient, IRecipient<LoadMessage>
|
||||
/// 异步加载变量数据。
|
||||
/// </summary>
|
||||
/// <returns>表示异步操作的任务。</returns>
|
||||
private async Task LoadVariableDatas()
|
||||
private async Task LoadVariables()
|
||||
{
|
||||
VariableDatas = await _varDataRepository.GetAllAsync();
|
||||
Variables = await _varDataRepository.GetAllAsync();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 异步更新变量数据。
|
||||
/// </summary>
|
||||
/// <param name="variableData">要更新的变量数据。</param>
|
||||
/// <param name="variable">要更新的变量数据。</param>
|
||||
/// <returns>表示异步操作的任务。</returns>
|
||||
public async Task UpdateVariableDataAsync(VariableData variableData)
|
||||
public async Task UpdateVariableAsync(Variable variable)
|
||||
{
|
||||
await _varDataRepository.UpdateAsync(variableData);
|
||||
await _varDataRepository.UpdateAsync(variable);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -11,11 +11,11 @@ namespace PMSWPF.Services;
|
||||
|
||||
public class DialogService :IDialogService
|
||||
{
|
||||
private readonly MqttRepository _mqttRepository;
|
||||
private readonly DataServices _dataServices;
|
||||
|
||||
public DialogService(MqttRepository mqttRepository)
|
||||
public DialogService(DataServices dataServices)
|
||||
{
|
||||
_mqttRepository = mqttRepository;
|
||||
_dataServices = dataServices;
|
||||
}
|
||||
|
||||
public async Task<Device> ShowAddDeviceDialog()
|
||||
@@ -120,17 +120,17 @@ public class DialogService :IDialogService
|
||||
return null;
|
||||
}
|
||||
|
||||
public async Task<VariableData> ShowAddVarDataDialog()
|
||||
public async Task<Variable> ShowAddVarDataDialog()
|
||||
{
|
||||
VarDataDialogViewModel vm = new();
|
||||
vm.Title = "添加变量";
|
||||
vm.PrimaryButtonText = "添加变量";
|
||||
vm.VariableData = new VariableData();
|
||||
vm.Variable = new Variable();
|
||||
var dialog = new VarDataDialog(vm);
|
||||
var res = await dialog.ShowAsync();
|
||||
if (res == ContentDialogResult.Primary)
|
||||
{
|
||||
return vm.VariableData;
|
||||
return vm.Variable;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
@@ -141,17 +141,17 @@ public class DialogService :IDialogService
|
||||
MessageBox.Show(message);
|
||||
}
|
||||
|
||||
public async Task<VariableData> ShowEditVarDataDialog(VariableData variableData)
|
||||
public async Task<Variable> ShowEditVarDataDialog(Variable variable)
|
||||
{
|
||||
VarDataDialogViewModel vm = new();
|
||||
vm.Title = "编辑变量";
|
||||
vm.PrimaryButtonText = "编辑变量";
|
||||
vm.VariableData = variableData;
|
||||
vm.Variable = variable;
|
||||
var dialog = new VarDataDialog(vm);
|
||||
var res = await dialog.ShowAsync();
|
||||
if (res == ContentDialogResult.Primary)
|
||||
{
|
||||
return vm.VariableData;
|
||||
return vm.Variable;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
@@ -192,13 +192,13 @@ public class DialogService :IDialogService
|
||||
|
||||
public async Task<Mqtt?> ShowMqttSelectionDialog()
|
||||
{
|
||||
var vm = new MqttSelectionDialogViewModel(_mqttRepository);
|
||||
var vm = new MqttSelectionDialogViewModel(_dataServices);
|
||||
var dialog = new MqttSelectionDialog(vm);
|
||||
var result = await dialog.ShowAsync();
|
||||
return result == ContentDialogResult.Primary ? vm.SelectedMqtt : null;
|
||||
}
|
||||
|
||||
public async Task<List<VariableData>> ShowOpcUaImportDialog(string endpointUrl)
|
||||
public async Task<List<Variable>> ShowOpcUaImportDialog(string endpointUrl)
|
||||
{
|
||||
var vm= new OpcUaImportDialogViewModel();
|
||||
vm.EndpointUrl = endpointUrl;
|
||||
@@ -250,7 +250,7 @@ public class DialogService :IDialogService
|
||||
return null;
|
||||
}
|
||||
|
||||
public async Task<List<VariableMqtt>> ShowMqttAliasBatchEditDialog(List<VariableData> selectedVariables, Mqtt selectedMqtt)
|
||||
public async Task<List<VariableMqtt>> ShowMqttAliasBatchEditDialog(List<Variable> selectedVariables, Mqtt selectedMqtt)
|
||||
{
|
||||
var vm = new MqttAliasBatchEditDialogViewModel(selectedVariables, selectedMqtt);
|
||||
var dialog = new MqttAliasBatchEditDialog(vm);
|
||||
|
||||
@@ -13,12 +13,12 @@ public interface IDataProcessingService
|
||||
/// 向处理链中添加一个数据处理器。
|
||||
/// </summary>
|
||||
/// <param name="processor">要添加的数据处理器实例。</param>
|
||||
void AddProcessor(IVariableDataProcessor processor);
|
||||
void AddProcessor(IVariableProcessor processor);
|
||||
|
||||
/// <summary>
|
||||
/// 将一个变量数据项异步推入处理队列。
|
||||
/// </summary>
|
||||
/// <param name="data">要入队的变量数据。</param>
|
||||
/// <returns>一个表示入队操作的 ValueTask。</returns>
|
||||
ValueTask EnqueueAsync(VariableData data);
|
||||
ValueTask EnqueueAsync(Variable data);
|
||||
}
|
||||
|
||||
@@ -15,18 +15,18 @@ public interface IDialogService
|
||||
Task<VariableTable> ShowAddVarTableDialog();
|
||||
Task<VariableTable> ShowEditVarTableDialog(VariableTable variableTable);
|
||||
|
||||
Task<VariableData> ShowAddVarDataDialog();
|
||||
Task<Variable> ShowAddVarDataDialog();
|
||||
|
||||
void ShowMessageDialog(string title, string message);
|
||||
Task<VariableData> ShowEditVarDataDialog(VariableData variableData);
|
||||
Task<Variable> ShowEditVarDataDialog(Variable variable);
|
||||
Task<string> ShowImportExcelDialog();
|
||||
ContentDialog ShowProcessingDialog(string title, string message);
|
||||
Task<PollLevelType?> ShowPollLevelDialog(PollLevelType pollLevelType);
|
||||
Task<Mqtt?> ShowMqttSelectionDialog();
|
||||
Task<List<VariableData>> ShowOpcUaImportDialog(string endpointUrl);
|
||||
Task<List<Variable>> ShowOpcUaImportDialog(string endpointUrl);
|
||||
Task<OpcUaUpdateType?> ShowOpcUaUpdateTypeDialog();
|
||||
Task<bool?> ShowIsActiveDialog(bool currentIsActive);
|
||||
Task ShowImportResultDialog(List<string> importedVariables, List<string> existingVariables);
|
||||
Task<string?> ShowMqttAliasDialog(string variableName, string mqttServerName);
|
||||
Task<List<VariableMqtt>> ShowMqttAliasBatchEditDialog(List<VariableData> selectedVariables, Mqtt selectedMqtt);
|
||||
Task<List<VariableMqtt>> ShowMqttAliasBatchEditDialog(List<Variable> selectedVariables, Mqtt selectedMqtt);
|
||||
}
|
||||
@@ -7,12 +7,12 @@ namespace PMSWPF.Services;
|
||||
/// 定义了变量数据处理器的通用接口。
|
||||
/// 任何需要加入数据处理链的类都必须实现此接口。
|
||||
/// </summary>
|
||||
public interface IVariableDataProcessor
|
||||
public interface IVariableProcessor
|
||||
{
|
||||
/// <summary>
|
||||
/// 异步处理单个变量数据。
|
||||
/// </summary>
|
||||
/// <param name="data">要处理的变量数据。</param>
|
||||
/// <returns>一个表示异步操作的任务。</returns>
|
||||
Task ProcessAsync(VariableDataContext context);
|
||||
Task ProcessAsync(VariableContext context);
|
||||
}
|
||||
@@ -23,13 +23,13 @@ namespace PMSWPF.Services
|
||||
private readonly ConcurrentDictionary<string, Subscription> _opcUaSubscriptions;
|
||||
|
||||
// 存储活动的 OPC UA 变量,键为变量的OpcNodeId
|
||||
private readonly ConcurrentDictionary<string, VariableData> _opcUaPollVariablesByNodeId;
|
||||
private readonly ConcurrentDictionary<string, Variable> _opcUaPollVariablesByNodeId;
|
||||
|
||||
// 储存所有要轮询更新的变量,键是Device.Id,值是这个设备所有要轮询的变量
|
||||
private readonly ConcurrentDictionary<int, List<VariableData>> _opcUaPollVariablesByDeviceId;
|
||||
private readonly ConcurrentDictionary<int, List<Variable>> _opcUaPollVariablesByDeviceId;
|
||||
|
||||
// 储存所有要订阅更新的变量,键是Device.Id,值是这个设备所有要轮询的变量
|
||||
private readonly ConcurrentDictionary<int, List<VariableData>> _opcUaSubVariablesByDeviceId;
|
||||
private readonly ConcurrentDictionary<int, List<Variable>> _opcUaSubVariablesByDeviceId;
|
||||
|
||||
private readonly SemaphoreSlim _reloadSemaphore = new SemaphoreSlim(0);
|
||||
|
||||
@@ -49,9 +49,9 @@ namespace PMSWPF.Services
|
||||
_opcUaDevices = new ConcurrentDictionary<int, Device>();
|
||||
_opcUaSessions = new ConcurrentDictionary<string, Session>();
|
||||
_opcUaSubscriptions = new ConcurrentDictionary<string, Subscription>();
|
||||
_opcUaPollVariablesByNodeId = new ConcurrentDictionary<string, VariableData>();
|
||||
_opcUaPollVariablesByDeviceId = new ConcurrentDictionary<int, List<VariableData>>();
|
||||
_opcUaSubVariablesByDeviceId = new ConcurrentDictionary<int, List<VariableData>>();
|
||||
_opcUaPollVariablesByNodeId = new ConcurrentDictionary<string, Variable>();
|
||||
_opcUaPollVariablesByDeviceId = new ConcurrentDictionary<int, List<Variable>>();
|
||||
_opcUaSubVariablesByDeviceId = new ConcurrentDictionary<int, List<Variable>>();
|
||||
|
||||
_dataServices.OnDeviceListChanged += HandleDeviceListChanged;
|
||||
_dataServices.OnDeviceIsActiveChanged += HandleDeviceIsActiveChanged;
|
||||
@@ -189,23 +189,23 @@ namespace PMSWPF.Services
|
||||
_opcUaDevices.AddOrUpdate(opcUaDevice.Id, opcUaDevice, (key, oldValue) => opcUaDevice);
|
||||
|
||||
//查找设备中所有要轮询的变量
|
||||
var dPollList = opcUaDevice.VariableTables?.SelectMany(vt => vt.DataVariables)
|
||||
var dPollList = opcUaDevice.VariableTables?.SelectMany(vt => vt.Variables)
|
||||
.Where(vd => vd.IsActive == true &&
|
||||
vd.ProtocolType == ProtocolType.OpcUA &&
|
||||
vd.OpcUaUpdateType == OpcUaUpdateType.OpcUaPoll)
|
||||
.ToList();
|
||||
// 将变量保存到字典中,方便Read后还原
|
||||
foreach (var variableData in dPollList)
|
||||
foreach (var variable in dPollList)
|
||||
{
|
||||
_opcUaPollVariablesByNodeId.AddOrUpdate(variableData.OpcUaNodeId, variableData,
|
||||
(key, oldValue) => variableData);
|
||||
_opcUaPollVariablesByNodeId.AddOrUpdate(variable.OpcUaNodeId, variable,
|
||||
(key, oldValue) => variable);
|
||||
}
|
||||
|
||||
totalPollVariableCount += dPollList.Count;
|
||||
_opcUaPollVariablesByDeviceId.AddOrUpdate(opcUaDevice.Id, dPollList, (key, oldValue) => dPollList);
|
||||
|
||||
//查找设备中所有要订阅的变量
|
||||
var dSubList = opcUaDevice.VariableTables?.SelectMany(vt => vt.DataVariables)
|
||||
var dSubList = opcUaDevice.VariableTables?.SelectMany(vt => vt.Variables)
|
||||
.Where(vd => vd.IsActive == true &&
|
||||
vd.ProtocolType == ProtocolType.OpcUA &&
|
||||
vd.OpcUaUpdateType == OpcUaUpdateType.OpcUaSubscription)
|
||||
@@ -364,7 +364,7 @@ namespace PMSWPF.Services
|
||||
/// <param name="session">OPC UA 会话。</param>
|
||||
/// <param name="variable">要读取的变量。</param>
|
||||
/// <param name="stoppingToken">取消令牌。</param>
|
||||
private async Task ReadAndProcessOpcUaVariableAsync(Session session, VariableData variable, CancellationToken stoppingToken)
|
||||
private async Task ReadAndProcessOpcUaVariableAsync(Session session, Variable variable, CancellationToken stoppingToken)
|
||||
{
|
||||
var nodesToRead = new ReadValueIdCollection
|
||||
{
|
||||
@@ -387,7 +387,7 @@ namespace PMSWPF.Services
|
||||
return;
|
||||
}
|
||||
|
||||
await UpdateAndEnqueueVariableData(variable, result.Value);
|
||||
await UpdateAndEnqueueVariable(variable, result.Value);
|
||||
}
|
||||
catch (ServiceResultException ex) when (ex.StatusCode == StatusCodes.BadSessionIdInvalid)
|
||||
{
|
||||
@@ -409,7 +409,7 @@ namespace PMSWPF.Services
|
||||
/// </summary>
|
||||
/// <param name="variable">要更新的变量。</param>
|
||||
/// <param name="value">读取到的数据值。</param>
|
||||
private async Task UpdateAndEnqueueVariableData(VariableData variable, object value)
|
||||
private async Task UpdateAndEnqueueVariable(Variable variable, object value)
|
||||
{
|
||||
try
|
||||
{
|
||||
@@ -482,7 +482,7 @@ namespace PMSWPF.Services
|
||||
// 将变量添加到订阅
|
||||
if (_opcUaSubVariablesByDeviceId.TryGetValue(deviceId, out var variablesToSubscribe))
|
||||
{
|
||||
foreach (VariableData variable in variablesToSubscribe)
|
||||
foreach (Variable variable in variablesToSubscribe)
|
||||
{
|
||||
// 7. 创建监控项并添加到订阅中。
|
||||
MonitoredItem monitoredItem = new MonitoredItem(subscription.DefaultItem);
|
||||
@@ -513,7 +513,7 @@ namespace PMSWPF.Services
|
||||
/// <param name="variable">发生变化的变量。</param>
|
||||
/// <param name="monitoredItem"></param>
|
||||
/// <param name="e"></param>
|
||||
private async void OnSubNotification(VariableData variable, MonitoredItem monitoredItem,
|
||||
private async void OnSubNotification(Variable variable, MonitoredItem monitoredItem,
|
||||
MonitoredItemNotificationEventArgs e)
|
||||
{
|
||||
|
||||
@@ -523,7 +523,7 @@ namespace PMSWPF.Services
|
||||
$"[OPC UA 通知] {monitoredItem.DisplayName}: {value.Value} | 时间戳: {value.SourceTimestamp.ToLocalTime()} | 状态: {value.StatusCode}");
|
||||
if (StatusCode.IsGood(value.StatusCode))
|
||||
{
|
||||
await UpdateAndEnqueueVariableData(variable, value.Value);
|
||||
await UpdateAndEnqueueVariable(variable, value.Value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,7 +3,7 @@ using PMSWPF.Models;
|
||||
|
||||
namespace PMSWPF.Services.Processors;
|
||||
|
||||
public class CheckValueChangedProcessor : IVariableDataProcessor
|
||||
public class CheckValueChangedProcessor : IVariableProcessor
|
||||
{
|
||||
private readonly DataServices _dataServices;
|
||||
|
||||
@@ -11,10 +11,10 @@ public class CheckValueChangedProcessor : IVariableDataProcessor
|
||||
{
|
||||
_dataServices = dataServices;
|
||||
}
|
||||
public Task ProcessAsync(VariableDataContext context)
|
||||
public Task ProcessAsync(VariableContext context)
|
||||
{
|
||||
VariableData newVariable = context.Data;
|
||||
if (!_dataServices.AllVariables.TryGetValue(newVariable.Id, out VariableData oldVariable))
|
||||
Variable newVariable = context.Data;
|
||||
if (!_dataServices.AllVariables.TryGetValue(newVariable.Id, out Variable oldVariable))
|
||||
{
|
||||
NlogHelper.Warn($"检查变量值是否改变时在_dataServices.AllVariables中找不到Id:{newVariable.Id},Name:{newVariable.Name}的变量。");
|
||||
context.IsHandled = true;
|
||||
|
||||
@@ -13,22 +13,22 @@ using PMSWPF.Services;
|
||||
|
||||
namespace PMSWPF.Services.Processors;
|
||||
|
||||
public class HistoryDataProcessor : IVariableDataProcessor, IDisposable
|
||||
public class HistoryProcessor : IVariableProcessor, IDisposable
|
||||
{
|
||||
private const int BATCH_SIZE = 50; // 批量写入的阈值
|
||||
private const int TIMER_INTERVAL_MS = 30 * 1000; // 30秒
|
||||
|
||||
private readonly ConcurrentQueue<DbVariableDataHistory> _queue = new();
|
||||
private readonly ConcurrentQueue<DbVariableHistory> _queue = new();
|
||||
private readonly Timer _timer;
|
||||
|
||||
public HistoryDataProcessor()
|
||||
public HistoryProcessor()
|
||||
{
|
||||
|
||||
_timer = new Timer(async _ => await FlushQueueToDatabase(), null, Timeout.Infinite, Timeout.Infinite);
|
||||
_timer.Change(TIMER_INTERVAL_MS, TIMER_INTERVAL_MS); // 启动定时器
|
||||
}
|
||||
|
||||
public async Task ProcessAsync(VariableDataContext context)
|
||||
public async Task ProcessAsync(VariableContext context)
|
||||
{
|
||||
// 只有当数据发生变化时才记录历史
|
||||
if (!context.Data.IsSave) // 如果数据已经被其他处理器处理过或者不需要保存,则跳过
|
||||
@@ -36,13 +36,13 @@ public class HistoryDataProcessor : IVariableDataProcessor, IDisposable
|
||||
return;
|
||||
}
|
||||
|
||||
// 将 VariableData 转换为 DbVariableDataHistory
|
||||
var historyData = new DbVariableDataHistory
|
||||
// 将 Variable 转换为 DbVariableHistory
|
||||
var historyData = new DbVariableHistory
|
||||
{
|
||||
Name = context.Data.Name,
|
||||
NodeId = context.Data.NodeId,
|
||||
DataValue = context.Data.DataValue,
|
||||
VariableDataId = context.Data.Id,
|
||||
VariableId = context.Data.Id,
|
||||
Timestamp = DateTime.Now // 记录当前时间
|
||||
};
|
||||
|
||||
@@ -59,7 +59,7 @@ public class HistoryDataProcessor : IVariableDataProcessor, IDisposable
|
||||
// 停止定时器,防止在写入过程中再次触发
|
||||
_timer.Change(Timeout.Infinite, Timeout.Infinite);
|
||||
|
||||
var itemsToProcess = new List<DbVariableDataHistory>();
|
||||
var itemsToProcess = new List<DbVariableHistory>();
|
||||
while (_queue.TryDequeue(out var item))
|
||||
{
|
||||
itemsToProcess.Add(item);
|
||||
@@ -9,15 +9,15 @@ namespace PMSWPF.Services.Processors;
|
||||
/// 一个简单的数据处理器实现,用于演示。
|
||||
/// 其主要功能是记录接收到的变量数据的名称和值。
|
||||
/// </summary>
|
||||
public class LoggingDataProcessor : IVariableDataProcessor
|
||||
public class LoggingProcessor : IVariableProcessor
|
||||
{
|
||||
public LoggingDataProcessor()
|
||||
public LoggingProcessor()
|
||||
{
|
||||
}
|
||||
|
||||
public Task ProcessAsync(VariableDataContext context)
|
||||
public Task ProcessAsync(VariableContext context)
|
||||
{
|
||||
NlogHelper.Info($"处理数据: {context.Data.Name}, 值: {context.Data.DataValue}");
|
||||
// NlogHelper.Info($"处理数据: {context.Data.Name}, 值: {context.Data.DataValue}");
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
}
|
||||
@@ -4,7 +4,7 @@ using PMSWPF.Helper;
|
||||
|
||||
namespace PMSWPF.Services.Processors
|
||||
{
|
||||
public class UpdateDbVariableProcessor : IVariableDataProcessor
|
||||
public class UpdateDbVariableProcessor : IVariableProcessor
|
||||
{
|
||||
private readonly DataServices _dataServices;
|
||||
|
||||
@@ -13,15 +13,15 @@ namespace PMSWPF.Services.Processors
|
||||
_dataServices = dataServices;
|
||||
}
|
||||
|
||||
public async Task ProcessAsync(VariableDataContext context)
|
||||
public async Task ProcessAsync(VariableContext context)
|
||||
{
|
||||
try
|
||||
{
|
||||
// 假设 DataServices 有一个方法来更新 VariableData
|
||||
await _dataServices.UpdateVariableDataAsync(context.Data);
|
||||
NlogHelper.Info($"数据库变量 {context.Data.Name} 更新成功,值为: {context.Data.DataValue}");
|
||||
// 假设 DataServices 有一个方法来更新 Variable
|
||||
await _dataServices.UpdateVariableAsync(context.Data);
|
||||
// NlogHelper.Info($"数据库变量 {context.Data.Name} 更新成功,值为: {context.Data.DataValue}");
|
||||
|
||||
if (!_dataServices.AllVariables.TryGetValue(context.Data.Id, out VariableData oldVariable))
|
||||
if (!_dataServices.AllVariables.TryGetValue(context.Data.Id, out Variable oldVariable))
|
||||
{
|
||||
NlogHelper.Warn($"数据库更新完成修改变量值是否改变时在_dataServices.AllVariables中找不到Id:{context.Data.Id},Name:{context.Data.Name}的变量。");
|
||||
context.IsHandled = true;
|
||||
|
||||
@@ -24,13 +24,13 @@ namespace PMSWPF.Services
|
||||
private readonly ConcurrentDictionary<int, Device> _s7Devices;
|
||||
|
||||
// 储存所有要轮询更新的变量,键是Device.Id,值是这个设备所有要轮询的变量
|
||||
private readonly ConcurrentDictionary<int, List<VariableData>> _s7PollVariablesByDeviceId; // Key: VariableData.Id
|
||||
private readonly ConcurrentDictionary<int, List<Variable>> _s7PollVariablesByDeviceId; // Key: Variable.Id
|
||||
|
||||
// 存储S7 PLC客户端实例的字典,键为设备ID,值为Plc对象。
|
||||
private readonly ConcurrentDictionary<string, Plc> _s7PlcClientsByIp;
|
||||
|
||||
// 储存所有变量的字典,方便通过id获取变量对象
|
||||
private readonly Dictionary<int, VariableData> _s7VariablesById;
|
||||
private readonly Dictionary<int, Variable> _s7VariablesById;
|
||||
|
||||
// S7轮询一次读取的变量数,不得大于15
|
||||
private readonly int _s7PollOnceReadMultipleVars = 9;
|
||||
@@ -53,7 +53,7 @@ namespace PMSWPF.Services
|
||||
_dataServices = dataServices;
|
||||
_dataProcessingService = dataProcessingService;
|
||||
_s7Devices = new ConcurrentDictionary<int, Device>();
|
||||
_s7PollVariablesByDeviceId = new ConcurrentDictionary<int, List<VariableData>>();
|
||||
_s7PollVariablesByDeviceId = new ConcurrentDictionary<int, List<Variable>>();
|
||||
_s7PlcClientsByIp = new ConcurrentDictionary<string, Plc>();
|
||||
_s7VariablesById = new();
|
||||
|
||||
@@ -203,8 +203,8 @@ namespace PMSWPF.Services
|
||||
}
|
||||
|
||||
// 轮询当前设备的所有变量
|
||||
var dataItemsToRead = new Dictionary<int, DataItem>(); // Key: VariableData.Id, Value: DataItem
|
||||
var variablesToProcess = new List<VariableData>(); // List of variables to process in this batch
|
||||
var dataItemsToRead = new Dictionary<int, DataItem>(); // Key: Variable.Id, Value: DataItem
|
||||
var variablesToProcess = new List<Variable>(); // List of variables to process in this batch
|
||||
|
||||
foreach (var variable in variableList)
|
||||
{
|
||||
@@ -242,7 +242,7 @@ namespace PMSWPF.Services
|
||||
if (dataItemsToRead.TryGetValue(varData.Id, out var dataItem))
|
||||
{
|
||||
// Now dataItem has the updated value from the PLC
|
||||
await UpdateAndEnqueueVariableData(varData, dataItem);
|
||||
await UpdateAndEnqueueVariable(varData, dataItem);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -277,7 +277,7 @@ namespace PMSWPF.Services
|
||||
/// </summary>
|
||||
/// <param name="variable">要更新的变量。</param>
|
||||
/// <param name="dataItem">包含读取到的数据项。</param>
|
||||
private async Task UpdateAndEnqueueVariableData(VariableData variable, DataItem dataItem)
|
||||
private async Task UpdateAndEnqueueVariable(Variable variable, DataItem dataItem)
|
||||
{
|
||||
try
|
||||
{
|
||||
@@ -380,8 +380,8 @@ namespace PMSWPF.Services
|
||||
|
||||
// 过滤出当前设备和S7协议相关的变量。
|
||||
var deviceS7Variables = device.VariableTables
|
||||
.Where(vt => vt.ProtocolType == ProtocolType.S7 && vt.IsActive)
|
||||
.SelectMany(vt => vt.DataVariables)
|
||||
.Where(vt => vt.ProtocolType == ProtocolType.S7 && vt.IsActive && vt.Variables != null)
|
||||
.SelectMany(vt => vt.Variables)
|
||||
.Where(vd => vd.IsActive == true)
|
||||
.ToList(); // 转换为列表,避免多次枚举
|
||||
|
||||
@@ -393,6 +393,11 @@ namespace PMSWPF.Services
|
||||
_s7PollVariablesByDeviceId.AddOrUpdate(device.Id, deviceS7Variables, (key, oldValue) => deviceS7Variables);
|
||||
}
|
||||
|
||||
if (totalVariableCount==0)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
NlogHelper.Info($"S7变量加载成功,共加载S7设备:{s7Devices.Count}个,变量数:{totalVariableCount}");
|
||||
return true;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user