添加了数据处理服务

This commit is contained in:
2025-07-15 22:19:07 +08:00
parent 25f3ba6f83
commit 23bb60c836
7 changed files with 220 additions and 21 deletions

View File

@@ -1,10 +0,0 @@
using System.Windows;
[assembly: ThemeInfo(
ResourceDictionaryLocation.None, //where theme specific resource dictionaries are located
//(used if a resource is not found in the page,
// or application resource dictionaries)
ResourceDictionaryLocation.SourceAssembly //where the generic resource dictionary is located
//(used if a resource is not found in the page,
// app, or any theme specific resource dictionaries)
)]

View File

@@ -0,0 +1,32 @@
using System;
using AutoMapper;
using PMSWPF.Data.Entities;
using PMSWPF.Models;
namespace PMSWPF.Profiles;
/// <summary>
/// AutoMapper 的配置类,用于定义对象之间的映射规则。
/// </summary>
public class MappingProfile : Profile
{
public MappingProfile()
{
// --- 用户映射 ---
// --- 设备映射 (包含List的父对象) ---
// AutoMapper 会自动使用上面的规则来处理 VariableTables 属性
CreateMap<DbDevice, Device>().ReverseMap();
// --- 变量表映射 (List中的元素) ---
CreateMap<DbVariableTable, VariableTable>().ReverseMap();
CreateMap<DbVariableData, VariableData>().ReverseMap();
// --- MQTT 和 变量数据 映射 ---
CreateMap<DbMqtt, Mqtt>().ReverseMap();
CreateMap<DbMenu, MenuBean>().ReverseMap();
}
}

View File

@@ -0,0 +1,96 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using PMSWPF.Models;
namespace PMSWPF.Services;
/// <summary>
/// 核心数据处理服务,作为后台服务运行。
/// 它维护一个无界通道Channel作为处理队列并按顺序执行已注册的数据处理器。
/// </summary>
public class DataProcessingService : BackgroundService, IDataProcessingService
{
private readonly ILogger<DataProcessingService> _logger;
// 使用 Channel 作为高性能的生产者/消费者队列
private readonly Channel<VariableData> _queue;
// 存储数据处理器的链表
private readonly List<IVariableDataProcessor> _processors;
/// <summary>
/// 构造函数,注入日志记录器。
/// </summary>
/// <param name="logger">日志记录器实例。</param>
public DataProcessingService(ILogger<DataProcessingService> logger)
{
_logger = logger;
// 创建一个无边界的 Channel允许生产者快速写入而不会被阻塞。
_queue = Channel.CreateUnbounded<VariableData>();
_processors = new List<IVariableDataProcessor>();
}
/// <summary>
/// 向处理链中添加一个数据处理器。
/// 处理器将按照添加的顺序执行。
/// </summary>
/// <param name="processor">要添加的数据处理器实例。</param>
public void AddProcessor(IVariableDataProcessor processor)
{
_processors.Add(processor);
}
/// <summary>
/// 将一个变量数据项异步推入处理队列。
/// </summary>
/// <param name="data">要入队的变量数据。</param>
public async ValueTask EnqueueAsync(VariableData data)
{
if (data == null)
{
return;
}
// 将数据项写入 Channel供后台服务处理。
await _queue.Writer.WriteAsync(data);
}
/// <summary>
/// 后台服务的核心执行逻辑。
/// 此方法会持续运行,从队列中读取数据并交由处理器链处理。
/// </summary>
/// <param name="stoppingToken">用于通知服务停止的取消令牌。</param>
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("数据处理服务已启动。");
// 当服务未被请求停止时,持续循环
while (!stoppingToken.IsCancellationRequested)
{
try
{
// 从队列中异步读取一个数据项,如果队列为空,则等待。
var data = await _queue.Reader.ReadAsync(stoppingToken);
// 依次调用处理链中的每一个处理器
foreach (var processor in _processors)
{
await processor.ProcessAsync(data);
}
}
catch (OperationCanceledException)
{
// 当 stoppingToken 被触发时ReadAsync 会抛出此异常,属正常停止流程,无需处理。
}
catch (Exception ex)
{
_logger.LogError(ex, "处理变量数据时发生错误。");
}
}
_logger.LogInformation("数据处理服务已停止。");
}
}

View File

@@ -0,0 +1,24 @@
using System.Threading.Tasks;
using PMSWPF.Models;
namespace PMSWPF.Services;
/// <summary>
/// 定义了数据处理服务的接口。
/// 该服务负责管理数据处理队列和处理器链。
/// </summary>
public interface IDataProcessingService
{
/// <summary>
/// 向处理链中添加一个数据处理器。
/// </summary>
/// <param name="processor">要添加的数据处理器实例。</param>
void AddProcessor(IVariableDataProcessor processor);
/// <summary>
/// 将一个变量数据项异步推入处理队列。
/// </summary>
/// <param name="data">要入队的变量数据。</param>
/// <returns>一个表示入队操作的 ValueTask。</returns>
ValueTask EnqueueAsync(VariableData data);
}

View File

@@ -0,0 +1,18 @@
using System.Threading.Tasks;
using PMSWPF.Models;
namespace PMSWPF.Services;
/// <summary>
/// 定义了变量数据处理器的通用接口。
/// 任何需要加入数据处理链的类都必须实现此接口。
/// </summary>
public interface IVariableDataProcessor
{
/// <summary>
/// 异步处理单个变量数据。
/// </summary>
/// <param name="data">要处理的变量数据。</param>
/// <returns>一个表示异步操作的任务。</returns>
Task ProcessAsync(VariableData data);
}

View File

@@ -0,0 +1,35 @@
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using PMSWPF.Helper;
using PMSWPF.Models;
namespace PMSWPF.Services.Processors;
/// <summary>
/// 一个简单的数据处理器实现,用于演示。
/// 其主要功能是记录接收到的变量数据的名称和值。
/// </summary>
public class LoggingDataProcessor : IVariableDataProcessor
{
/// <summary>
/// 构造函数,注入日志记录器。
/// </summary>
/// <param name="logger">日志记录器实例。</param>
public LoggingDataProcessor()
{
}
/// <summary>
/// 实现处理逻辑,此处为记录日志。
/// </summary>
/// <param name="data">要处理的变量数据。</param>
/// <returns>一个表示完成的异步任务。</returns>
public Task ProcessAsync(VariableData data)
{
// 使用日志记录器输出变量的名称和值
NlogHelper.Info($"处理数据: {data.Name}, 值: {data.DataValue}");
// 由于此操作是同步的,直接返回一个已完成的任务。
return Task.CompletedTask;
}
}

View File

@@ -14,6 +14,8 @@ namespace PMSWPF.Services
{
// 数据服务实例,用于访问和操作应用程序数据,如设备配置。
private readonly DataServices _dataServices;
// 数据处理服务实例,用于将读取到的数据推入处理队列。
private readonly IDataProcessingService _dataProcessingService;
// 存储 S7设备键为设备Id值为会话对象。
private readonly Dictionary<int, Device> _deviceDic;
@@ -44,13 +46,14 @@ namespace PMSWPF.Services
private Thread _serviceMainThread;
/// <summary>
/// 构造函数,注入ILogger和DataServices
/// 构造函数,注入数据服务和数据处理服务
/// </summary>
/// <param name="logger">日志记录器实例。</param>
/// <param name="dataServices">数据服务实例。</param>
public S7BackgroundService(DataServices dataServices)
/// <param name="dataProcessingService">数据处理服务实例。</param>
public S7BackgroundService(DataServices dataServices, IDataProcessingService dataProcessingService)
{
_dataServices = dataServices;
_dataProcessingService = dataProcessingService;
_deviceDic = new();
_pollVariableDic = new();
_s7PlcClientDic = new();
@@ -205,17 +208,17 @@ namespace PMSWPF.Services
}
/// <summary>
/// 读取变量数据
/// 从 PLC 读取变量数据,并将其推送到数据处理队列。
/// </summary>
/// <param name="variable"></param>
/// <param name="plcClient"></param>
/// <param name="device"></param>
private void ReadVariableData(VariableData variable,
/// <param name="variable">要读取的变量。</param>
/// <param name="plcClient">S7 PLC 客户端实例。</param>
/// <param name="device">关联的设备。</param>
private async void ReadVariableData(VariableData variable,
Plc plcClient, Device device)
{
try
{
_readVariableDic.Add(variable.Id, DataItem.FromAddress(variable.S7Address));
_readVariableDic[variable.Id]=DataItem.FromAddress(variable.S7Address);
if (_readVariableDic.Count == S7PollOnceReadMultipleVars)
{
// 批量读取
@@ -232,7 +235,8 @@ namespace PMSWPF.Services
// 更新变量的原始数据值和显示值。
variableData.DataValue = dataItem.Value.ToString();
variableData.UpdateTime = DateTime.Now;
Console.WriteLine($"S7轮询变量:{variableData.Name},值:{variableData.DataValue}");
// 将更新后的数据推入处理队列,而不是直接在控制台输出。
await _dataProcessingService.EnqueueAsync(variableData);
}
_readVariableDic.Clear();
@@ -241,7 +245,7 @@ namespace PMSWPF.Services
}
catch (Exception ex)
{
NlogHelper.Warn($"从设备 {device.Name} 读取变量 {variable.Name} 失败:{ex.Message}");
NlogHelper.Error($"从设备 {device.Name} 读取变量 {variable.Name} 失败:{ex.Message}",ex);
}
}