重新设计OpcUaBackgroundService后台服务

This commit is contained in:
2025-09-04 21:04:27 +08:00
parent 7fbc99c273
commit 4e73488e8f
7 changed files with 688 additions and 94 deletions

View File

@@ -244,49 +244,6 @@ public class OpcUaBackgroundService : BackgroundService
}
}
// /// <summary>
// /// 读取单个 OPC UA 变量并处理其数据。
// /// </summary>
// /// <param name="session">OPC UA 会话。</param>
// /// <param name="variable">要读取的变量。</param>
// /// <param name="stoppingToken">取消令牌。</param>
// private async Task ReadAndProcessOpcUaVariableAsync(Session session, Variable variable,
// CancellationToken stoppingToken)
// {
// var nodesToRead = new ReadValueIdCollection
// {
// new ReadValueId
// {
// NodeId = new NodeId(variable.OpcUaNodeId),
// AttributeId = Attributes.Value
// }
// };
//
// try
// {
// var readResponse = await session.ReadAsync(null, 0, TimestampsToReturn.Both, nodesToRead, stoppingToken);
// var result = readResponse.Results?.FirstOrDefault();
// if (result == null) return;
//
// if (!StatusCode.IsGood(result.StatusCode))
// {
// _logger.LogWarning($"读取 OPC UA 变量 {variable.Name} ({variable.OpcUaNodeId}) 失败: {result.StatusCode}");
// return;
// }
//
// await UpdateAndEnqueueVariable(variable, result.Value);
// }
// catch (ServiceResultException ex) when (ex.StatusCode == StatusCodes.BadSessionIdInvalid)
// {
// _logger.LogError(ex, $"OPC UA会话ID无效变量: {variable.Name} ({variable.OpcUaNodeId})。正在尝试重新连接...");
//
// }
// catch (Exception ex)
// {
// _logger.LogError(ex, $"轮询OPC UA变量 {variable.Name} ({variable.OpcUaNodeId}) 时发生未知错误: {ex.Message}");
// }
// }
/// <summary>
/// 更新变量数据,并将其推送到数据处理队列。
/// </summary>

View File

@@ -0,0 +1,358 @@
using System.Collections.Concurrent;
using System.Diagnostics;
using DMS.Application.DTOs;
using DMS.Application.Interfaces;
using DMS.Core.Enums;
using DMS.Infrastructure.Configuration;
using DMS.Infrastructure.Interfaces.Services;
using DMS.Infrastructure.Models;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace DMS.Infrastructure.Services
{
/// <summary>
/// OPC UA服务管理器负责管理OPC UA连接、订阅和变量监控
/// </summary>
public class OpcUaServiceManager : IOpcUaServiceManager
{
private readonly ILogger<OpcUaServiceManager> _logger;
private readonly IDataProcessingService _dataProcessingService;
private readonly OpcUaServiceOptions _options;
private readonly ConcurrentDictionary<int, DeviceContext> _deviceContexts;
private readonly SemaphoreSlim _semaphore;
private bool _disposed = false;
public OpcUaServiceManager(
ILogger<OpcUaServiceManager> logger,
IDataProcessingService dataProcessingService,
IOptions<OpcUaServiceOptions> options)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_dataProcessingService = dataProcessingService ?? throw new ArgumentNullException(nameof(dataProcessingService));
_options = options?.Value ?? throw new ArgumentNullException(nameof(options));
_deviceContexts = new ConcurrentDictionary<int, DeviceContext>();
_semaphore = new SemaphoreSlim(_options.MaxConcurrentConnections, _options.MaxConcurrentConnections);
}
/// <summary>
/// 初始化服务管理器
/// </summary>
public async Task InitializeAsync(CancellationToken cancellationToken = default)
{
_logger.LogInformation("OPC UA服务管理器正在初始化...");
// 初始化逻辑可以在需要时添加
_logger.LogInformation("OPC UA服务管理器初始化完成");
}
/// <summary>
/// 添加设备到监控列表
/// </summary>
public void AddDevice(DeviceDto device)
{
if (device == null)
throw new ArgumentNullException(nameof(device));
if (device.Protocol != ProtocolType.OpcUa)
{
_logger.LogWarning("设备 {DeviceId} 不是OPC UA协议跳过添加", device.Id);
return;
}
var context = new DeviceContext
{
Device = device,
OpcUaService = new OpcUaService(),
Variables = new ConcurrentDictionary<string, VariableDto>(),
IsConnected = false
};
_deviceContexts.AddOrUpdate(device.Id, context, (key, oldValue) => context);
_logger.LogInformation("已添加设备 {DeviceId} 到监控列表", device.Id);
}
/// <summary>
/// 移除设备监控
/// </summary>
public async Task RemoveDeviceAsync(int deviceId, CancellationToken cancellationToken = default)
{
if (_deviceContexts.TryRemove(deviceId, out var context))
{
await DisconnectDeviceAsync(context, cancellationToken);
_logger.LogInformation("已移除设备 {DeviceId} 的监控", deviceId);
}
}
/// <summary>
/// 更新设备变量
/// </summary>
public void UpdateVariables(int deviceId, List<VariableDto> variables)
{
if (_deviceContexts.TryGetValue(deviceId, out var context))
{
context.Variables.Clear();
foreach (var variable in variables)
{
context.Variables.AddOrUpdate(variable.OpcUaNodeId, variable, (key, oldValue) => variable);
}
_logger.LogInformation("已更新设备 {DeviceId} 的变量列表,共 {Count} 个变量", deviceId, variables.Count);
}
}
/// <summary>
/// 获取设备连接状态
/// </summary>
public bool IsDeviceConnected(int deviceId)
{
return _deviceContexts.TryGetValue(deviceId, out var context) && context.IsConnected;
}
/// <summary>
/// 重新连接设备
/// </summary>
public async Task ReconnectDeviceAsync(int deviceId, CancellationToken cancellationToken = default)
{
if (_deviceContexts.TryGetValue(deviceId, out var context))
{
await DisconnectDeviceAsync(context, cancellationToken);
await ConnectDeviceAsync(context, cancellationToken);
}
}
/// <summary>
/// 获取所有监控的设备ID
/// </summary>
public IEnumerable<int> GetMonitoredDeviceIds()
{
return _deviceContexts.Keys.ToList();
}
/// <summary>
/// 连接设备
/// </summary>
public async Task ConnectDeviceAsync(DeviceContext context, CancellationToken cancellationToken = default)
{
if (context == null || string.IsNullOrEmpty(context.Device.OpcUaServerUrl))
return;
await _semaphore.WaitAsync(cancellationToken);
try
{
_logger.LogInformation("正在连接设备 {DeviceName} ({EndpointUrl})",
context.Device.Name, context.Device.OpcUaServerUrl);
var stopwatch = Stopwatch.StartNew();
// 设置连接超时
using var timeoutToken = new CancellationTokenSource(_options.ConnectionTimeoutMs);
using var linkedToken = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutToken.Token);
await context.OpcUaService.ConnectAsync(context.Device.OpcUaServerUrl);
stopwatch.Stop();
_logger.LogInformation("设备 {DeviceName} 连接耗时 {ElapsedMs} ms",
context.Device.Name, stopwatch.ElapsedMilliseconds);
if (context.OpcUaService.IsConnected)
{
context.IsConnected = true;
await SetupSubscriptionsAsync(context, cancellationToken);
_logger.LogInformation("设备 {DeviceName} 连接成功", context.Device.Name);
}
else
{
_logger.LogWarning("设备 {DeviceName} 连接失败", context.Device.Name);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "连接设备 {DeviceName} 时发生错误: {ErrorMessage}",
context.Device.Name, ex.Message);
context.IsConnected = false;
}
finally
{
_semaphore.Release();
}
}
/// <summary>
/// 断开设备连接
/// </summary>
private async Task DisconnectDeviceAsync(DeviceContext context, CancellationToken cancellationToken = default)
{
if (context == null)
return;
try
{
_logger.LogInformation("正在断开设备 {DeviceName} 的连接", context.Device.Name);
await context.OpcUaService.DisconnectAsync();
context.IsConnected = false;
_logger.LogInformation("设备 {DeviceName} 连接已断开", context.Device.Name);
}
catch (Exception ex)
{
_logger.LogError(ex, "断开设备 {DeviceName} 连接时发生错误: {ErrorMessage}",
context.Device.Name, ex.Message);
}
}
/// <summary>
/// 设置订阅
/// </summary>
private async Task SetupSubscriptionsAsync(DeviceContext context, CancellationToken cancellationToken = default)
{
if (!context.IsConnected || !context.Variables.Any())
return;
try
{
_logger.LogInformation("正在为设备 {DeviceName} 设置订阅,变量数: {VariableCount}",
context.Device.Name, context.Variables.Count);
var opcUaNodes = context.Variables.Values
.Select(v => new OpcUaNode { NodeId = v.OpcUaNodeId })
.ToList();
context.OpcUaService.SubscribeToNode(opcUaNodes, HandleDataChanged,
_options.SubscriptionPublishingIntervalMs, _options.SubscriptionSamplingIntervalMs);
_logger.LogInformation("设备 {DeviceName} 订阅设置完成", context.Device.Name);
}
catch (Exception ex)
{
_logger.LogError(ex, "为设备 {DeviceName} 设置订阅时发生错误: {ErrorMessage}",
context.Device.Name, ex.Message);
}
}
/// <summary>
/// 处理数据变化
/// </summary>
private async void HandleDataChanged(OpcUaNode opcUaNode)
{
if (opcUaNode?.Value == null)
return;
try
{
// 查找对应的变量
foreach (var context in _deviceContexts.Values)
{
if (context.Variables.TryGetValue(opcUaNode.NodeId.ToString(), out var variable))
{
// 更新变量值
variable.DataValue = opcUaNode.Value.ToString();
variable.DisplayValue = opcUaNode.Value.ToString();
variable.UpdatedAt = DateTime.Now;
// 推送到数据处理队列
await _dataProcessingService.EnqueueAsync(variable);
break;
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, "处理数据变化时发生错误: {ErrorMessage}", ex.Message);
}
}
/// <summary>
/// 连接指定设备
/// </summary>
public async Task ConnectDeviceAsync(int deviceId, CancellationToken cancellationToken = default)
{
if (_deviceContexts.TryGetValue(deviceId, out var context))
{
await ConnectDeviceAsync(context, cancellationToken);
}
}
/// <summary>
/// 批量连接设备
/// </summary>
public async Task ConnectDevicesAsync(IEnumerable<int> deviceIds, CancellationToken cancellationToken = default)
{
var connectTasks = new List<Task>();
foreach (var deviceId in deviceIds)
{
connectTasks.Add(ConnectDeviceAsync(deviceId, cancellationToken));
}
await Task.WhenAll(connectTasks);
}
/// <summary>
/// 断开指定设备连接
/// </summary>
public async Task DisconnectDeviceAsync(int deviceId, CancellationToken cancellationToken = default)
{
if (_deviceContexts.TryGetValue(deviceId, out var context))
{
await DisconnectDeviceAsync(context, cancellationToken);
}
}
/// <summary>
/// 批量断开设备连接
/// </summary>
public async Task DisconnectDevicesAsync(IEnumerable<int> deviceIds, CancellationToken cancellationToken = default)
{
var disconnectTasks = new List<Task>();
foreach (var deviceId in deviceIds)
{
if (_deviceContexts.TryGetValue(deviceId, out var context))
{
disconnectTasks.Add(DisconnectDeviceAsync(deviceId, cancellationToken));
}
}
await Task.WhenAll(disconnectTasks);
}
/// <summary>
/// 释放资源
/// </summary>
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
/// <summary>
/// 释放资源
/// </summary>
protected virtual void Dispose(bool disposing)
{
if (!_disposed && disposing)
{
_logger.LogInformation("正在释放OPC UA服务管理器资源...");
// 断开所有设备连接
var deviceIds = _deviceContexts.Keys.ToList();
DisconnectDevicesAsync(deviceIds).Wait(TimeSpan.FromSeconds(10));
// 释放其他资源
_semaphore?.Dispose();
_disposed = true;
_logger.LogInformation("OPC UA服务管理器资源已释放");
}
}
}
/// <summary>
/// 设备上下文
/// </summary>
public class DeviceContext
{
public DeviceDto Device { get; set; }
public OpcUaService OpcUaService { get; set; }
public ConcurrentDictionary<string, VariableDto> Variables { get; set; }
public bool IsConnected { get; set; }
}
}

View File

@@ -0,0 +1,156 @@
using DMS.Application.DTOs;
using DMS.Application.DTOs.Events;
using DMS.Application.Interfaces;
using DMS.Core.Enums;
using DMS.Infrastructure.Interfaces.Services;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace DMS.Infrastructure.Services
{
/// <summary>
/// 优化后的OPC UA后台服务
/// </summary>
public class OptimizedOpcUaBackgroundService : BackgroundService
{
private readonly IDataCenterService _dataCenterService;
private readonly IOpcUaServiceManager _opcUaServiceManager;
private readonly ILogger<OptimizedOpcUaBackgroundService> _logger;
private readonly SemaphoreSlim _reloadSemaphore = new SemaphoreSlim(0);
public OptimizedOpcUaBackgroundService(
IDataCenterService dataCenterService,
IOpcUaServiceManager opcUaServiceManager,
ILogger<OptimizedOpcUaBackgroundService> logger)
{
_dataCenterService = dataCenterService ?? throw new ArgumentNullException(nameof(dataCenterService));
_opcUaServiceManager = opcUaServiceManager ?? throw new ArgumentNullException(nameof(opcUaServiceManager));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_dataCenterService.DataLoadCompleted += OnDataLoadCompleted;
}
private void OnDataLoadCompleted(object sender, DataLoadCompletedEventArgs e)
{
_logger.LogInformation("收到数据加载完成通知触发OPC UA服务重新加载");
_reloadSemaphore.Release();
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("优化后的OPC UA后台服务正在启动...");
try
{
// 初始化服务管理器
await _opcUaServiceManager.InitializeAsync(stoppingToken);
while (!stoppingToken.IsCancellationRequested)
{
await _reloadSemaphore.WaitAsync(stoppingToken);
if (stoppingToken.IsCancellationRequested)
break;
if (_dataCenterService.Devices.IsEmpty)
{
_logger.LogInformation("没有可用的OPC UA设备等待设备列表更新...");
continue;
}
await LoadAndConnectDevicesAsync(stoppingToken);
}
}
catch (OperationCanceledException)
{
_logger.LogInformation("OPC UA后台服务已收到停止请求");
}
catch (Exception ex)
{
_logger.LogError(ex, "OPC UA后台服务运行时发生未处理的异常: {ErrorMessage}", ex.Message);
}
finally
{
_logger.LogInformation("正在清理OPC UA后台服务资源...");
// 服务管理器会在Dispose时自动清理资源
}
_logger.LogInformation("优化后的OPC UA后台服务已停止");
}
/// <summary>
/// 加载并连接设备
/// </summary>
private async Task LoadAndConnectDevicesAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("开始加载和连接OPC UA设备...");
try
{
// 获取所有活动的OPC UA设备
var opcUaDevices = _dataCenterService.Devices.Values
.Where(d => d.Protocol == ProtocolType.OpcUa && d.IsActive)
.ToList();
_logger.LogInformation("找到 {DeviceCount} 个活动的OPC UA设备", opcUaDevices.Count);
if (opcUaDevices.Count == 0)
return;
// 添加设备到监控列表
foreach (var device in opcUaDevices)
{
_opcUaServiceManager.AddDevice(device);
// 获取设备变量
var variables = device.VariableTables?
.SelectMany(vt => vt.Variables)
.Where(v => v.IsActive && v.Protocol == ProtocolType.OpcUa)
.ToList() ?? new List<VariableDto>();
_opcUaServiceManager.UpdateVariables(device.Id, variables);
}
// 批量连接设备
var deviceIds = opcUaDevices.Select(d => d.Id).ToList();
await _opcUaServiceManager.ConnectDevicesAsync(deviceIds, stoppingToken);
_logger.LogInformation("OPC UA设备加载和连接完成");
}
catch (Exception ex)
{
_logger.LogError(ex, "加载和连接OPC UA设备时发生错误: {ErrorMessage}", ex.Message);
}
}
public override async Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("正在停止OPC UA后台服务...");
// 释放信号量以确保ExecuteAsync可以退出
_reloadSemaphore.Release();
await base.StopAsync(cancellationToken);
_logger.LogInformation("OPC UA后台服务停止完成");
}
public override void Dispose()
{
_logger.LogInformation("正在释放OPC UA后台服务资源...");
_dataCenterService.DataLoadCompleted -= OnDataLoadCompleted;
_reloadSemaphore?.Dispose();
base.Dispose();
_logger.LogInformation("OPC UA后台服务资源已释放");
}
}
}