修复S7服务轮询问题

This commit is contained in:
2025-09-15 20:54:32 +08:00
parent 4773e87886
commit 5ab18f95f0
21 changed files with 351 additions and 260 deletions

View File

@@ -3,6 +3,7 @@ using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using DMS.Application.DTOs;
using DMS.Infrastructure.Services;
namespace DMS.Infrastructure.Interfaces.Services
{
@@ -65,5 +66,10 @@ namespace DMS.Infrastructure.Interfaces.Services
/// 批量断开设备连接
/// </summary>
Task DisconnectDevicesAsync(IEnumerable<int> deviceIds, CancellationToken cancellationToken = default);
/// <summary>
/// 获取所有监控的设备ID
/// </summary>
List<S7DeviceContext> GetAllDeviceContexts();
}
}

View File

@@ -3,13 +3,14 @@ using System.Diagnostics;
using DMS.Application.DTOs;
using DMS.Application.Events;
using DMS.Application.Interfaces;
using DMS.Application.Models;
using DMS.Core.Enums;
using DMS.Infrastructure.Configuration;
using DMS.Infrastructure.Interfaces.Services;
using DMS.Infrastructure.Models;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using VariableValueChangedEventArgs = DMS.Core.Models.VariableValueChangedEventArgs;
using VariableValueChangedEventArgs = DMS.Core.Events.VariableValueChangedEventArgs;
namespace DMS.Infrastructure.Services
{
@@ -329,30 +330,8 @@ namespace DMS.Infrastructure.Services
{
if (context.Variables.TryGetValue(opcUaNode.NodeId.ToString(), out var variable))
{
// 保存旧值
var oldValue = variable.DataValue;
var newValue = opcUaNode.Value.ToString();
// 更新变量值
variable.DataValue = newValue;
variable.DisplayValue = newValue;
variable.UpdateNumericValue(); // 更新数值属性
variable.UpdatedAt = DateTime.Now;
_logger.LogDebug($"节点:{variable.OpcUaNodeId}值发生了变化:{newValue}");
// 触发变量值变更事件
var eventArgs = new VariableValueChangedEventArgs(
variable.Id,
variable.Name,
oldValue,
newValue,
variable.UpdatedAt);
_appDataCenterService.VariableManagementService.VariableValueChanged(eventArgs);
// 推送到数据处理队列
await _dataProcessingService.EnqueueAsync(variable);
await _dataProcessingService.EnqueueAsync(new VariableContext(variable,opcUaNode.Value));
break;
}
}

View File

@@ -12,6 +12,8 @@ using Microsoft.Extensions.Logging;
using DMS.Core.Interfaces;
using DMS.Infrastructure.Interfaces.Services;
using System.Diagnostics;
using System.Globalization;
using DMS.Application.Models;
namespace DMS.Infrastructure.Services;
@@ -32,25 +34,8 @@ public class OptimizedS7BackgroundService : BackgroundService
// 存储每个设备的变量按轮询间隔分组
private readonly ConcurrentDictionary<int, Dictionary<int, List<VariableDto>>> _variablesByPollingInterval = new();
// 模拟 PollingIntervals实际应用中可能从配置或数据库加载
private static readonly Dictionary<int, TimeSpan> PollingIntervals = new Dictionary<int, TimeSpan>
{
{ 10, TimeSpan.FromMilliseconds(10) }, // TenMilliseconds
{ 100, TimeSpan.FromMilliseconds(100) }, // HundredMilliseconds
{ 500, TimeSpan.FromMilliseconds(500) }, // FiveHundredMilliseconds
{ 1000, TimeSpan.FromMilliseconds(1000) }, // OneSecond
{ 5000, TimeSpan.FromMilliseconds(5000) }, // FiveSeconds
{ 10000, TimeSpan.FromMilliseconds(10000) }, // TenSeconds
{ 20000, TimeSpan.FromMilliseconds(20000) }, // TwentySeconds
{ 30000, TimeSpan.FromMilliseconds(30000) }, // ThirtySeconds
{ 60000, TimeSpan.FromMilliseconds(60000) }, // OneMinute
{ 180000, TimeSpan.FromMilliseconds(180000) }, // ThreeMinutes
{ 300000, TimeSpan.FromMilliseconds(300000) }, // FiveMinutes
{ 600000, TimeSpan.FromMilliseconds(600000) }, // TenMinutes
{ 1800000, TimeSpan.FromMilliseconds(1800000) } // ThirtyMinutes
};
/// <summary>
/// 构造函数,注入数据服务和数据处理服务。
/// </summary>
@@ -108,7 +93,7 @@ public class OptimizedS7BackgroundService : BackgroundService
// 持续轮询,直到取消请求或需要重新加载
while (!stoppingToken.IsCancellationRequested && _reloadSemaphore.CurrentCount == 0)
{
await PollS7VariablesByPollingIntervalAsync(stoppingToken);
await PollS7VariablesAsync(stoppingToken);
await Task.Delay(_s7PollOnceSleepTimeMs, stoppingToken);
}
}
@@ -136,28 +121,27 @@ public class OptimizedS7BackgroundService : BackgroundService
{
_variablesByPollingInterval.Clear();
_logger.LogInformation("开始加载S7变量....");
var s7Devices = _appDataStorageService
.Devices.Values.Where(d => d.Protocol == ProtocolType.S7 && d.IsActive == true)
.ToList();
foreach (var s7Device in s7Devices)
{
_s7ServiceManager.AddDevice(s7Device);
// 查找设备中所有要轮询的变量
var variables = s7Device.VariableTables?.SelectMany(vt => vt.Variables)
.Where(v => v.IsActive == true && v.Protocol == ProtocolType.S7)
.ToList() ?? new List<VariableDto>();
var variables = new List<VariableDto>();
foreach (var variableTable in s7Device.VariableTables)
{
if (variableTable.IsActive && variableTable.Protocol == ProtocolType.S7)
{
variables.AddRange(variableTable.Variables.Where(v => v.IsActive));
}
}
_s7ServiceManager.UpdateVariables(s7Device.Id, variables);
// 按轮询间隔分组变量
var variablesByPollingInterval = variables
.GroupBy(v => v.PollingInterval)
.ToDictionary(g => g.Key, g => g.ToList());
_variablesByPollingInterval.AddOrUpdate(s7Device.Id, variablesByPollingInterval, (key, oldValue) => variablesByPollingInterval);
}
_logger.LogInformation($"S7 变量加载成功共加载S7设备{s7Devices.Count}个");
@@ -175,102 +159,96 @@ public class OptimizedS7BackgroundService : BackgroundService
/// </summary>
private async Task ConnectS7ServiceAsync(CancellationToken stoppingToken)
{
var s7Devices = _appDataStorageService
.Devices.Values.Where(d => d.Protocol == ProtocolType.S7 && d.IsActive == true)
.ToList();
var deviceIds = s7Devices.Select(d => d.Id).ToList();
var deviceIds = s7Devices.Select(d => d.Id)
.ToList();
await _s7ServiceManager.ConnectDevicesAsync(deviceIds, stoppingToken);
_logger.LogInformation("已连接所有S7设备");
}
/// <summary>
/// 按轮询间隔轮询S7变量
/// </summary>
private async Task PollS7VariablesByPollingIntervalAsync(CancellationToken stoppingToken)
private async Task PollS7VariablesAsync(CancellationToken stoppingToken)
{
try
{
var pollTasks = new List<Task>();
// 为每个设备创建轮询任务
foreach (var deviceEntry in _variablesByPollingInterval)
var s7DeviceContexts = _s7ServiceManager.GetAllDeviceContexts();
foreach (var context in s7DeviceContexts)
{
var deviceId = deviceEntry.Key;
var variablesByPollingInterval = deviceEntry.Value;
if (stoppingToken.IsCancellationRequested) break;
// 为每个轮询间隔创建轮询任务
foreach (var pollingIntervalEntry in variablesByPollingInterval)
// 收集该设备所有需要轮询的变量
var variablesToPoll = context.Variables.Values.ToList();
if (variablesToPoll.Any())
{
var pollingInterval = pollingIntervalEntry.Key;
var variables = pollingIntervalEntry.Value;
// 检查是否达到轮询时间
if (ShouldPollVariables(variables, pollingInterval))
{
pollTasks.Add(PollVariablesForDeviceAsync(deviceId, variables, stoppingToken));
}
await PollVariablesForDeviceAsync(context, variablesToPoll, stoppingToken);
}
}
await Task.WhenAll(pollTasks);
}
catch (Exception ex)
{
_logger.LogError(ex, $"按轮询间隔轮询S7变量时发生错误{ex.Message}");
}
}
/// <summary>
/// 检查是否应该轮询变量
/// </summary>
private bool ShouldPollVariables(List<VariableDto> variables, int pollingInterval)
{
if (!PollingIntervals.TryGetValue(pollingInterval, out var interval))
return false;
// 检查是否有任何一个变量需要轮询
return variables.Any(v => (DateTime.Now - v.UpdatedAt) >= interval);
}
/// <summary>
/// 轮询设备的变量
/// </summary>
private async Task PollVariablesForDeviceAsync(int deviceId, List<VariableDto> variables, CancellationToken stoppingToken)
private async Task PollVariablesForDeviceAsync(S7DeviceContext context, List<VariableDto> variables,
CancellationToken stoppingToken)
{
if (!_appDataStorageService.Devices.TryGetValue(deviceId, out var device))
if (!_appDataStorageService.Devices.TryGetValue(context.Device.Id, out var device))
{
_logger.LogWarning($"轮询时没有找到设备ID{deviceId}");
_logger.LogWarning($"轮询时没有找到设备ID{context.Device.Id}");
return;
}
if (!_s7ServiceManager.IsDeviceConnected(deviceId))
var s7Service = context.S7Service;
if (s7Service == null || !s7Service.IsConnected)
{
_logger.LogWarning($"轮询时设备 {device.Name} 没有连接");
_logger.LogWarning($"轮询时设备 {device.Name} 没有连接或服务不可用");
return;
}
try
{
var stopwatch = Stopwatch.StartNew();
// 按地址分组变量以进行批量读取
var addresses = variables.Select(v => v.S7Address).ToList();
// 这里应该使用IS7Service来读取变量
// 由于接口限制,我们暂时跳过实际读取,仅演示逻辑
var addresses = variables.Where(v=>(DateTime.Now-v.UpdatedAt)>=TimeSpan.FromMilliseconds(v.PollingInterval)).Select(v => v.S7Address)
.ToList();
if (!addresses.Any())
{
return;
}
// 批量读取变量值
var readResults = await s7Service.ReadVariablesAsync(addresses);
stopwatch.Stop();
_logger.LogDebug($"设备 {device.Name} 轮询 {variables.Count} 个变量耗时 {stopwatch.ElapsedMilliseconds} ms");
// 更新变量值并推送到处理队列
foreach (var variable in variables)
{
// 模拟读取到的值
var value = DateTime.Now.Ticks.ToString();
await UpdateAndEnqueueVariable(variable, value);
if (readResults.TryGetValue(variable.S7Address, out var value))
{
// 将更新后的数据推入处理队列。
await _dataProcessingService.EnqueueAsync(new VariableContext(variable, value));
}
else
{
_logger.LogWarning($"未能从设备 {device.Name} 读取变量 {variable.S7Address} 的值");
}
}
}
catch (Exception ex)
@@ -279,26 +257,7 @@ public class OptimizedS7BackgroundService : BackgroundService
}
}
/// <summary>
/// 更新变量数据,并将其推送到数据处理队列。
/// </summary>
private async Task UpdateAndEnqueueVariable(VariableDto variable, string value)
{
try
{
// 更新变量的原始数据值和显示值。
variable.DataValue = value;
variable.DisplayValue = value;
variable.UpdatedAt = DateTime.Now;
// 将更新后的数据推入处理队列。
await _dataProcessingService.EnqueueAsync(variable);
}
catch (Exception ex)
{
_logger.LogError(ex, $"更新变量 {variable.Name} 并入队失败:{ex.Message}");
}
}
/// <summary>
/// 断开所有 S7 会话。
@@ -306,10 +265,10 @@ public class OptimizedS7BackgroundService : BackgroundService
private async Task DisconnectAllS7SessionsAsync()
{
_logger.LogInformation("正在断开所有 S7 会话...");
var deviceIds = _s7ServiceManager.GetMonitoredDeviceIds();
await _s7ServiceManager.DisconnectDevicesAsync(deviceIds);
_logger.LogInformation("已断开所有 S7 会话");
}
}

View File

@@ -11,6 +11,8 @@ namespace DMS.Infrastructure.Services
/// </summary>
public class S7Service : IS7Service
{
private const int ReadMultipleVarsCount = 10;
private Plc _plc;
private readonly ILogger<S7Service> _logger;
@@ -91,13 +93,16 @@ namespace DMS.Infrastructure.Services
{
throw new InvalidOperationException("PLC未连接");
}
try
{
{
var result = new Dictionary<string, object>();
var dataItems = addresses.Select(DataItem.FromAddress).ToList();
await _plc.ReadMultipleVarsAsync(dataItems);
var result = new Dictionary<string, object>();
for (int i = 0; i < addresses.Count; i++)
{
result[addresses[i]] = dataItems[i].Value;

View File

@@ -135,6 +135,13 @@ namespace DMS.Infrastructure.Services
{
return _deviceContexts.Keys.ToList();
}
/// <summary>
/// 获取所有监控的设备ID
/// </summary>
public List<S7DeviceContext> GetAllDeviceContexts()
{
return _deviceContexts.Values.ToList();
}
/// <summary>
/// 连接设备