Files
DMS/软件设计文档/01.第一版设计文档/03-DMS.Infrastructure - 数据访问与服务.md

44 KiB
Raw Blame History

软件开发文档 - DMS.Infrastructure - 数据访问与服务

本文档详细阐述了 DMS.Infrastructure 层的设计,它是所有外部技术和服务的具体实现地。它实现了 DMS.Core 定义的接口,为 DMS.Application 层提供数据和功能支持,并负责数据持久化、事务管理以及与外部世界的通信。

1. 目录结构

DMS.Infrastructure/
├── Data/
│   ├── SqlSugarDbContext.cs
│   └── RepositoryManager.cs
├── Entities/
│   ├── DbDevice.cs
│   ├── DbMqttServer.cs
│   ├── DbVariable.cs
│   ├── DbVariableTable.cs
│   ├── DbVariableHistory.cs
│   ├── DbVariableMqttAlias.cs
│   ├── DbMenu.cs
│   ├── DbNlog.cs
│   └── DbUser.cs
├── Repositories/
│   ├── BaseRepository.cs
│   ├── DeviceRepository.cs
│   ├── MqttServerRepository.cs
│   ├── VariableRepository.cs
│   ├── VariableTableRepository.cs
│   ├── VariableHistoryRepository.cs
│   ├── VariableMqttAliasRepository.cs
│   ├── MenuRepository.cs
│   └── UserRepository.cs
├── Services/
│   ├── Communication/
│   │   ├── S7DeviceAgent.cs
│   │   └── MqttPublishService.cs
│   ├── Processing/
│   │   ├── ChangeDetectionProcessor.cs
│   │   ├── HistoryStorageProcessor.cs
│   │   └── MqttPublishProcessor.cs
│   ├── S7BackgroundService.cs
│   ├── DataProcessingService.cs
│   ├── DatabaseInitializerService.cs
│   └── MenuService.cs
├── Logging/
│   ├── ThrottlingDatabaseTarget.cs
│   └── NLogService.cs
├── Profiles/
│   └── MappingProfile.cs
└── DMS.Infrastructure.csproj

2. 数据库设计与实体 (Entities/)

本文档详细描述了DMS系统的数据库结构包括表、字段和关系。数据库实体类与数据库表一一对应使用了 SqlSugar 的特性Attribute来定义主键、外键等。

2.1. 数据库关系图 (ERD)

erDiagram
    DEVICE {
        int Id PK
        varchar Name
        int Protocol
        varchar IpAddress
        int Port
        bool IsActive
    }

    VARIABLE_TABLE {
        int Id PK
        varchar Name
        varchar Description
        bool IsActive
        int DeviceId FK
    }

    VARIABLE {
        int Id PK
        varchar Name
        varchar Address
        int DataType
        bool IsActive
        int VariableTableId FK
    }

    MQTT_SERVER {
        int Id PK
        varchar ServerName
        varchar BrokerAddress
        int Port
        varchar Username
        varchar Password
        bool IsActive
    }

    VARIABLE_HISTORY {
        bigint Id PK
        int VariableId FK
        varchar Value
        datetime Timestamp
    }

    VARIABLE_MQTT_ALIAS {
        int Id PK
        int VariableId FK
        int MqttServerId FK
        varchar Alias
    }

    MENU {
        int Id PK
        int ParentId FK
        varchar Header
        varchar Icon
        varchar TargetViewKey
        varchar NavigationParameter
        int DisplayOrder
    }

    NLOG {
        bigint Id PK
        datetime Logged
        varchar Level
        varchar Message
        varchar Exception
        varchar CallSite
        varchar MethodName
        int AggregatedCount
    }

    USER {
        int Id PK
        varchar Username
        varchar PasswordHash
        varchar Role
        bool IsActive
    }

    DEVICE ||--o{ VARIABLE_TABLE : "包含"
    VARIABLE_TABLE ||--o{ VARIABLE : "包含"
    VARIABLE ||--o{ VARIABLE_HISTORY : "记录"
    VARIABLE }o--o{ VARIABLE_MQTT_ALIAS : "关联"
    MQTT_SERVER }o--o{ VARIABLE_MQTT_ALIAS : "关联"
    MENU ||--o{ MENU : "父子"

2.2. 数据库实体类示例

DbDevice.cs

// 文件: DMS.Infrastructure/Entities/DbDevice.cs
using SqlSugar;

namespace DMS.Infrastructure.Entities;

[SugarTable("Devices")]
public class DbDevice
{
    [SugarColumn(IsPrimaryKey = true, IsIdentity = true)]
    public int Id { get; set; }
    public string Name { get; set; }
    public int Protocol { get; set; } // 对应 ProtocolType 枚举
    public string IpAddress { get; set; }
    public int Port { get; set; }
    public bool IsActive { get; set; }
    public int Rack { get; set; }
    public int Slot { get; set; }
}

DbVariableTable.cs

// 文件: DMS.Infrastructure/Entities/DbVariableTable.cs
using SqlSugar;

namespace DMS.Infrastructure.Entities;

[SugarTable("VariableTables")]
public class DbVariableTable
{
    [SugarColumn(IsPrimaryKey = true, IsIdentity = true)]
    public int Id { get; set; }
    public string Name { get; set; }
    public string Description { get; set; }
    public bool IsActive { get; set; }
    public int DeviceId { get; set; }
    public int Protocol { get; set; } // 对应 ProtocolType 枚举
}

DbVariable.cs

// 文件: DMS.Infrastructure/Entities/DbVariable.cs
using SqlSugar;

namespace DMS.Infrastructure.Entities;

[SugarTable("Variables")]
public class DbVariable
{
    [SugarColumn(IsPrimaryKey = true, IsIdentity = true)]
    public int Id { get; set; }
    public string Name { get; set; }
    public string Address { get; set; }
    public int DataType { get; set; } // 对应 SignalType 枚举
    public int PollLevel { get; set; } // 对应 PollLevelType 枚举
    public bool IsActive { get; set; }
    public int VariableTableId { get; set; }
    public int Protocol { get; set; } // 对应 ProtocolType 枚举
    public int CSharpDataType { get; set; } // 对应 CSharpDataType 枚举
    public string ConversionFormula { get; set; }
    public DateTime CreatedAt { get; set; }
    public DateTime UpdatedAt { get; set; }
    public string UpdatedBy { get; set; }
    public bool IsModified { get; set; }
}

DbMqttServer.cs

// 文件: DMS.Infrastructure/Entities/DbMqttServer.cs
using SqlSugar;

namespace DMS.Infrastructure.Entities;

[SugarTable("MqttServers")]
public class DbMqttServer
{
    [SugarColumn(IsPrimaryKey = true, IsIdentity = true)]
    public int Id { get; set; }
    public string ServerName { get; set; }
    public string BrokerAddress { get; set; }
    public int Port { get; set; }
    public string Username { get; set; }
    public string Password { get; set; }
    public bool IsActive { get; set; }
    public string SubscribeTopic { get; set; }
    public string PublishTopic { get; set; }
    public string ClientId { get; set; }
    public DateTime CreatedAt { get; set; }
    public DateTime? ConnectedAt { get; set; }
    public long ConnectionDuration { get; set; }
    public string MessageFormat { get; set; }
}

DbVariableHistory.cs

// 文件: DMS.Infrastructure/Entities/DbVariableHistory.cs
using SqlSugar;

namespace DMS.Infrastructure.Entities;

[SugarTable("VariableHistories")]
public class DbVariableHistory
{
    [SugarColumn(IsPrimaryKey = true, IsIdentity = true)]
    public long Id { get; set; }
    public int VariableId { get; set; }
    public string Value { get; set; }
    public DateTime Timestamp { get; set; }
}

DbVariableMqttAlias.cs

// 文件: DMS.Infrastructure/Entities/DbVariableMqttAlias.cs
using SqlSugar;

namespace DMS.Infrastructure.Entities;

/// <summary>
/// 数据库实体:对应数据库中的 VariableMqttAliases 表。
/// </summary>
[SugarTable("VariableMqttAliases")]
public class DbVariableMqttAlias
{
    [SugarColumn(IsPrimaryKey = true, IsIdentity = true)]
    public int Id { get; set; }

    /// <summary>
    /// 外键,指向 Variables 表的 Id。
    /// </summary>
    public int VariableId { get; set; }

    /// <summary>
    /// 外键,指向 MqttServers 表的 Id。
    /// </summary>
    public int MqttServerId { get; set; }

    /// <summary>
    /// 针对此特定[变量-服务器]连接的发布别名。
    /// </summary>
    public string Alias { get; set; }
}

DbMenu.cs

// 文件: DMS.Infrastructure/Entities/DbMenu.cs
using SqlSugar;

namespace DMS.Infrastructure.Entities;

[SugarTable("Menus")]
public class DbMenu
{
    [SugarColumn(IsPrimaryKey = true, IsIdentity = true)]
    public int Id { get; set; }

    [SugarColumn(IsNullable = true)]
    public int? ParentId { get; set; }

    public string Header { get; set; }

    public string Icon { get; set; }

    public string TargetViewKey { get; set; }

    [SugarColumn(IsNullable = true)]
    public string NavigationParameter { get; set; }

    public int DisplayOrder { get; set; }
}

DbNlog.cs

// 文件: DMS.Infrastructure/Entities/DbNlog.cs
using SqlSugar;
using System;

namespace DMS.Infrastructure.Entities;

/// <summary>
/// 数据库实体:对应数据库中的 Logs 表,用于存储应用程序日志。
/// </summary>
[SugarTable("Logs")]
public class DbNlog
{
    [SugarColumn(IsPrimaryKey = true, IsIdentity = true)]
    public long Id { get; set; }

    /// <summary>
    /// 日志记录的时间戳。
    /// </summary>
    public DateTime Logged { get; set; }

    /// <summary>
    /// 日志级别 (e.g., "Info", "Warn", "Error", "Debug")。
    /// </summary>
    public string Level { get; set; }

    /// <summary>
    /// 日志消息主体。
    /// </summary>
    [SugarColumn(Length = -1)] // 映射为NVARCHAR(MAX)或类似类型
    public string Message { get; set; }

    /// <summary>
    /// 异常信息包括堆栈跟踪。如果无异常则为null。
    /// </summary>
    [SugarColumn(IsNullable = true, Length = -1)]
    public string Exception { get; set; }

    /// <summary>
    /// 记录日志的调用点信息 (文件路径:行号)。
    /// </summary>
    public string CallSite { get; set; }

    /// <summary>
    /// 记录日志的方法名。
    /// </summary>
    public string MethodName { get; set; }

    /// <summary>
    /// (用于聚合) 此条日志在指定时间窗口内被触发的总次数。默认为1。
    /// </summary>
    public int AggregatedCount { get; set; } = 1;
}

DbUser.cs

// 文件: DMS.Infrastructure/Entities/DbUser.cs
using SqlSugar;

namespace DMS.Infrastructure.Entities;

[SugarTable("Users")]
public class DbUser
{
    [SugarColumn(IsPrimaryKey = true, IsIdentity = true)]
    public int Id { get; set; }
    public string Username { get; set; }
    public string PasswordHash { get; set; }
    public string Role { get; set; }
    public bool IsActive { get; set; }
}

3. 事务管理 (RepositoryManager)

RepositoryManager 是工作单元Unit of Work, UoW模式的具体实现。它作为所有仓储的统一入口并管理数据库事务确保所有通过它获取的仓储实例都共享同一个 ISqlSugarClient 实例,从而确保它们在同一个数据库会话和事务中操作。

3.1. 设计思路与考量

  • 模式RepositoryManager 是工作单元Unit of Work, UoW模式的具体实现。它作为所有仓储的统一入口并管理数据库事务。
  • 共享上下文:所有通过 RepositoryManager 获取的仓储实例都共享同一个 ISqlSugarClient 实例,从而确保它们在同一个数据库会话和事务中操作。
  • 生命周期RepositoryManager 通常被注册为 ScopedTransient 生命周期,确保每个业务操作都有一个独立的工作单元。

3.2. 设计优势

  • 原子性:确保跨多个仓储的操作(如创建设备、变量表和菜单)要么全部成功,要么全部失败,维护数据一致性。
  • 简化事务管理应用层无需直接与数据库事务API交互只需调用 BeginTransaction(), CommitAsync(), RollbackAsync()
  • 解耦:应用层不直接依赖具体的仓储实现,而是依赖于 IRepositoryManager 这一抽象。
  • 资源优化:在单个业务操作中,所有仓储共享同一个数据库连接,减少了连接开销。

3.3. 设计劣势/权衡

  • 复杂性增加相比于直接使用仓储引入UoW模式增加了额外的抽象层和概念。
  • 仓储依赖IRepositoryManager 接口需要列出所有它管理的仓储,当新增仓储时,需要修改此接口。
  • 懒加载:为了避免在 RepositoryManager 构造时就创建所有仓储实例的开销,通常会使用懒加载(Lazy<T>),这增加了少量复杂性。

3.4. 示例:RepositoryManager.cs

// 文件: DMS.Infrastructure/Data/RepositoryManager.cs
using DMS.Core.Interfaces;
using SqlSugar;
using System;

namespace DMS.Infrastructure.Data;

/// <summary>
/// IRepositoryManager 的 SqlSugar 实现,管理所有仓储实例和数据库事务。
/// </summary>
public class RepositoryManager : IRepositoryManager
{
    private readonly ISqlSugarClient _db;
    private readonly Lazy<IDeviceRepository> _lazyDevices;
    private readonly Lazy<IVariableTableRepository> _lazyVariableTables;
    private readonly Lazy<IVariableRepository> _lazyVariables;
    private readonly Lazy<IMqttServerRepository> _lazyMqttServers;
    private readonly Lazy<IVariableMqttAliasRepository> _lazyVariableMqttAliases;
    private readonly Lazy<IMenuRepository> _lazyMenus;
    private readonly Lazy<IVariableHistoryRepository> _lazyVariableHistories;
    private readonly Lazy<IUserRepository> _lazyUsers;

    /// <summary>
    /// 构造函数,通过依赖注入获取 SqlSugar 客户端实例。
    /// </summary>
    public RepositoryManager(ISqlSugarClient db, IMapper mapper) // 添加 IMapper 依赖
    {
        _db = db;
        // 使用 Lazy<T> 实现仓储的懒加载,确保它们在第一次被访问时才创建。
        // 所有仓储都共享同一个 _db 实例,以保证事务的一致性。
        _lazyDevices = new Lazy<IDeviceRepository>(() => new DeviceRepository(_db, mapper));
        _lazyVariableTables = new Lazy<IVariableTableRepository>(() => new VariableTableRepository(_db, mapper));
        _lazyVariables = new Lazy<IVariableRepository>(() => new VariableRepository(_db, mapper));
        _lazyMqttServers = new Lazy<IMqttServerRepository>(() => new MqttServerRepository(_db, mapper));
        _lazyVariableMqttAliases = new Lazy<IVariableMqttAliasRepository>(() => new VariableMqttAliasRepository(_db, mapper));
        _lazyMenus = new Lazy<IMenuRepository>(() => new MenuRepository(_db, mapper));
        _lazyVariableHistories = new Lazy<IVariableHistoryRepository>(() => new VariableHistoryRepository(_db, mapper));
        _lazyUsers = new Lazy<IUserRepository>(() => new UserRepository(_db, mapper));
    }

    public IDeviceRepository Devices => _lazyDevices.Value;
    public IVariableTableRepository VariableTables => _lazyVariableTables.Value;
    public IVariableRepository Variables => _lazyVariables.Value;
    public IMqttServerRepository MqttServers => _lazyMqttServers.Value;
    public IVariableMqttAliasRepository VariableMqttAliases => _lazyVariableMqttAliases.Value;
    public IMenuRepository Menus => _lazyMenus.Value;
    public IVariableHistoryRepository VariableHistories => _lazyVariableHistories.Value;
    public IUserRepository Users => _lazyUsers.Value;

    /// <summary>
    /// 开始一个新的数据库事务。
    /// </summary>
    public void BeginTransaction()
    {
        _db.BeginTran();
    }

    /// <summary>
    /// 异步提交当前事务中的所有变更。
    /// </summary>
    public async Task CommitAsync()
    {
        await _db.CommitTranAsync();
    }

    /// <summary>
    /// 异步回滚当前事务中的所有变更。
    /// </summary>
    public async Task RollbackAsync()
    {
        await _db.RollbackTranAsync();
    }

    /// <summary>
    /// 释放 SqlSugar 客户端资源。通常由 DI 容器管理。
    /// </summary>
    public void Dispose()
    {
        _db.Dispose();
    }
}

4. 仓储实现 (Repositories/)

仓储Repository是数据访问层的一部分负责实现 DMS.Core 中定义的仓储接口。它们使用ORM框架如SqlSugar与数据库进行实际交互将数据库实体转换为领域模型反之亦然。

4.1. 设计思路与考量

  • 职责仓储Repository是数据访问层的一部分负责实现 DMS.Core 中定义的仓储接口。它们使用ORM框架如SqlSugar与数据库进行实际交互将数据库实体转换为领域模型反之亦然。
  • 通用性:可以有一个 BaseRepository 来实现通用的CRUD操作减少重复代码。
  • 特定查询:为每个仓储接口实现其特有的查询方法(如 GetActiveDevicesWithDetailsAsync)。

4.2. 设计优势

  • 实现细节封装将ORM框架和数据库查询的细节封装在仓储内部上层无需关心。
  • 可替换性如果需要更换ORM框架或数据库类型只需修改仓储实现而无需修改应用层或核心层。
  • 性能优化点:可以在仓储层进行查询优化(如使用 IncludeJoin 预加载关联数据)。

4.3. 设计劣势/权衡

  • 代码量:即使有 BaseRepository,每个仓储仍然需要一些样板代码。
  • 复杂查询对于非常复杂的、跨多个聚合根的查询有时仓储模式会显得笨拙可能需要引入查询对象Query Object模式。

4.4. 示例:BaseRepository.cs

// 文件: DMS.Infrastructure/Repositories/BaseRepository.cs
using AutoMapper;
using DMS.Core.Interfaces;
using SqlSugar;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace DMS.Infrastructure.Repositories;

/// <summary>
/// 仓储基类,实现了 IBaseRepository 接口的通用 CRUD 操作。
/// </summary>
/// <typeparam name="TDomain">领域模型类型。</typeparam>
/// <typeparam name="TDbEntity">数据库实体类型。</typeparam>
public abstract class BaseRepository<TDomain, TDbEntity> : IBaseRepository<TDomain>
    where TDomain : class
    where TDbEntity : class, new()
{
    protected readonly ISqlSugarClient _db;
    protected readonly IMapper _mapper;

    public BaseRepository(ISqlSugarClient db, IMapper mapper)
    {
        _db = db;
        _mapper = mapper;
    }

    public virtual async Task AddAsync(TDomain entity)
    {
        var dbEntity = _mapper.Map<TDbEntity>(entity);
        await _db.Insertable(dbEntity).ExecuteCommandAsync();
        // 映射回ID如果实体是自增ID
        _mapper.Map(dbEntity, entity);
    }

    public virtual async Task DeleteAsync(int id)
    {
        await _db.Deleteable<TDbEntity>(id).ExecuteCommandAsync();
    }

    public virtual async Task<List<TDomain>> GetAllAsync()
    {
        var dbEntities = await _db.Queryable<TDbEntity>().ToListAsync();
        return _mapper.Map<List<TDomain>>(dbEntities);
    }

    public virtual async Task<TDomain> GetByIdAsync(int id)
    {
        var dbEntity = await _db.Queryable<TDbEntity>().InSingleAsync(id);
        return _mapper.Map<TDomain>(dbEntity);
    }

    public virtual async Task UpdateAsync(TDomain entity)
    {
        var dbEntity = _mapper.Map<TDbEntity>(entity);
        await _db.Updateable(dbEntity).ExecuteCommandAsync();
    }
}

4.5. 示例:DeviceRepository.cs

// 文件: DMS.Infrastructure/Repositories/DeviceRepository.cs
using AutoMapper;
using DMS.Core.Interfaces;
using DMS.Core.Models;
using DMS.Infrastructure.Entities;
using SqlSugar;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace DMS.Infrastructure.Repositories;

/// <summary>
/// 设备仓储的具体实现。
/// </summary>
public class DeviceRepository : BaseRepository<Device, DbDevice>, IDeviceRepository
{
    public DeviceRepository(ISqlSugarClient db, IMapper mapper) : base(db, mapper) { }

    /// <summary>
    /// 异步获取所有激活的S7设备并级联加载其下的变量表和变量。
    /// </summary>
    public async Task<List<Device>> GetActiveDevicesWithDetailsAsync(ProtocolType protocol)
    {
        var dbDevices = await _db.Queryable<DbDevice>()
                                 .Where(d => d.IsActive && d.Protocol == (int)protocol)
                                 .Mapper(d => d.VariableTables, d => d.Id, d => d.VariableTables.Select(vt => vt.DeviceId).ToList())
                                 .Mapper(d => d.VariableTables.Select(vt => vt.Variables), vt => vt.Id, vt => vt.Variables.Select(v => v.VariableTableId).ToList())
                                 .ToListAsync();
        return _mapper.Map<List<Device>>(dbDevices);
    }

    /// <summary>
    /// 异步根据设备ID获取设备及其所有详细信息变量表、变量、MQTT别名等
    /// </summary>
    public async Task<Device> GetDeviceWithDetailsAsync(int deviceId)
    {
        var dbDevice = await _db.Queryable<DbDevice>()
                                .Where(d => d.Id == deviceId)
                                .Mapper(d => d.VariableTables, d => d.Id, d => d.VariableTables.Select(vt => vt.DeviceId).ToList())
                                .Mapper(d => d.VariableTables.Select(vt => vt.Variables), vt => vt.Id, vt => vt.Variables.Select(v => v.VariableTableId).ToList())
                                .FirstAsync();

        if (dbDevice == null) return null;

        // 手动加载 VariableMqttAlias因为 SqlSugar 的 Mapper 可能无法直接处理多层嵌套的关联实体
        foreach (var variableTable in dbDevice.VariableTables)
        {
            foreach (var variable in variableTable.Variables)
            {
                var dbAliases = await _db.Queryable<DbVariableMqttAlias>()
                                         .Where(a => a.VariableId == variable.Id)
                                         .Mapper(a => a.MqttServer, a => a.MqttServerId)
                                         .ToListAsync();
                variable.MqttAliases = _mapper.Map<List<VariableMqttAlias>>(dbAliases);
            }
        }

        return _mapper.Map<Device>(dbDevice);
    }
}

5. 外部服务 (Services/)

5.1. S7通信架构 (“编排者-代理”模式)

采用“编排者-代理”Orchestrator-Agent模式。S7BackgroundService 作为编排者负责管理所有S7设备的生命周期每个 S7DeviceAgent 作为代理,专门负责与一个S7 PLC进行所有交互。

5.1.1. 设计思路与考量

  • 模式:采用“编排者-代理”Orchestrator-Agent模式。S7BackgroundService 作为编排者负责管理所有S7设备的生命周期每个 S7DeviceAgent 作为代理,专门负责与一个S7 PLC进行所有交互。
  • 职责分离:将设备管理(启动、停止、配置更新)与具体设备通信(连接、轮询、读写)的职责分离。
  • 并发性:每个 S7DeviceAgent 独立运行,可以并行处理多个设备的通信,提高系统吞吐量。
  • 热重载:通过消息机制,允许在运行时动态更新设备的变量配置,而无需重启整个服务。

5.1.2. 设计优势

  • 高可靠性:单个设备的通信故障不会影响其他设备的正常运行。
  • 可扩展性:易于添加新的设备类型或通信协议,只需实现新的 DeviceAgent
  • 动态配置:支持运行时修改设备和变量配置,提高了系统的灵活性和可用性。
  • 资源隔离每个Agent管理自己的连接和资源避免资源争用。

5.1.3. 设计劣势/权衡

  • 复杂性增加引入了Agent的概念和Agent与Orchestrator之间的通信机制增加了初期设计和实现复杂性。
  • 资源消耗每个Agent可能需要维护独立的连接和线程当设备数量非常庞大时可能会增加资源消耗。

5.1.4. S7BackgroundService.cs (编排者)

作为 IHostedService 运行负责从数据库加载激活的S7设备并为每个设备创建和管理 S7DeviceAgent 实例。它还监听配置变更消息以触发Agent的热重载。

// 文件: DMS.Infrastructure/Services/S7BackgroundService.cs
using Microsoft.Extensions.Hosting;
using DMS.Core.Interfaces;
using DMS.WPF.Services;
using CommunityToolkit.Mvvm.Messaging;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using DMS.Core.Enums;
using DMS.WPF.Messages;

namespace DMS.Infrastructure.Services;

/// <summary>
/// S7后台服务编排者作为IHostedService运行管理所有S7设备的通信代理。
/// </summary>
public class S7BackgroundService : IHostedService
{
    private readonly IRepositoryManager _repoManager;
    private readonly IChannelBus _channelBus;
    private readonly IMessenger _messenger;
    private readonly ConcurrentDictionary<int, S7DeviceAgent> _activeAgents = new();

    /// <summary>
    /// 构造函数,通过依赖注入获取所需服务。
    /// </summary>
    public S7BackgroundService(IRepositoryManager repo, IChannelBus bus, IMessenger msg)
    {
        _repoManager = repo;
        _channelBus = bus;
        _messenger = msg;
        // 注册配置变更消息以便在设备或变量配置更新时通知Agent
        _messenger.Register<ConfigChangedMessage>(this, async (r, m) => await HandleConfigChange(m));
    }

    /// <summary>
    /// 服务启动时调用加载所有激活的S7设备并启动其代理。
    /// </summary>
    public async Task StartAsync(CancellationToken cancellationToken)
    {
        // 获取所有激活的S7设备及其详细信息
        var s7Devices = await _repoManager.Devices.GetActiveDevicesWithDetailsAsync(ProtocolType.S7);
        foreach (var device in s7Devices)
        {
            // 为每个设备创建一个S7DeviceAgent实例
            var agent = new S7DeviceAgent(device, _channelBus, _messenger);
            if (_activeAgents.TryAdd(device.Id, agent))
            {
                await agent.StartAsync(cancellationToken);
            }
        }

        // 启动数据处理消费者服务它将从ChannelBus中读取数据
        var dataProcessor = new DataProcessingService(_channelBus, _messenger, _repoManager);
        _ = dataProcessor.StartProcessingAsync(cancellationToken); // 在后台运行,不阻塞启动
    }

    /// <summary>
    /// 处理配置变更消息通知相关Agent更新其变量列表。
    /// </summary>
    private async Task HandleConfigChange(ConfigChangedMessage message)
    {
        // 从数据库重新加载受影响的设备及其最新配置
        var updatedDevice = await _repoManager.Devices.GetDeviceWithDetailsAsync(message.DeviceId);
        if (updatedDevice != null && _activeAgents.TryGetValue(message.DeviceId, out var agent))
        {
            // 指示Agent使用新的变量列表进行热重载
            agent.UpdateVariableLists(updatedDevice.VariableTables.SelectMany(vt => vt.Variables).ToList());
        }
    }

    /// <summary>
    /// 服务停止时调用停止所有活动的Agent并释放资源。
    /// </summary>
    public async Task StopAsync(CancellationToken cancellationToken)
    {
        foreach (var agent in _activeAgents.Values)
        {
            await agent.DisposeAsync();
        }
        _activeAgents.Clear();
    }
}

5.1.5. S7DeviceAgent.cs (代理)

负责与单个PLC建立连接、维护连接、按不同频率并行轮询变量并将读取到的数据通过 IChannelBus 写入数据处理队列。

// 文件: DMS.Infrastructure/Services/Communication/S7DeviceAgent.cs
using S7.Net;
using DMS.Core.Models;
using DMS.Core.Enums;
using DMS.WPF.Services;
using CommunityToolkit.Mvvm.Messaging;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using System;

namespace DMS.Infrastructure.Services.Communication;

/// <summary>
/// 单个S7 PLC的通信代理负责连接、轮询和数据发送。
/// </summary>
public class S7DeviceAgent : IAsyncDisposable
{
    private readonly Device _deviceConfig;
    private readonly ChannelWriter<VariableContext> _processingQueueWriter;
    private readonly IMessenger _messenger;
    private readonly Plc _plcClient;
    private CancellationTokenSource _cts; // 用于控制Agent内部的轮询任务

    // 存储按轮询级别分组的变量列表
    private List<Variable> _highFreqVars = new();
    private List<Variable> _mediumFreqVars = new();
    private List<Variable> _lowFreqVars = new();

    /// <summary>
    /// 构造函数。
    /// </summary>
    /// <param name="device">设备的配置信息。</param>
    /// <param name="channelBus">中央通道总线服务。</param>
    /// <param name="messenger">消息总线服务。</param>
    public S7DeviceAgent(Device device, IChannelBus channelBus, IMessenger messenger)
    {
        _deviceConfig = device;
        _messenger = messenger;
        // 从中央总线获取指定名称的通道的写入端
        _processingQueueWriter = channelBus.GetWriter<VariableContext>("DataProcessingQueue");

        // 初始化S7.Net PLC客户端
        _plcClient = new Plc(
            (CpuType)Enum.Parse(typeof(CpuType), _deviceConfig.Protocol.ToString()), // 根据协议类型解析CPU类型
            _deviceConfig.IpAddress,
            (short)_deviceConfig.Rack,
            (short)_deviceConfig.Slot
        );
    }

    /// <summary>
    /// 启动Agent的通信循环。
    /// </summary>
    public async Task StartAsync(CancellationToken cancellationToken = default)
    {
        _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
        await _plcClient.OpenAsync(); // 建立PLC连接

        // 初始加载变量列表并分组
        UpdateVariableLists(_deviceConfig.VariableTables.SelectMany(vt => vt.Variables).ToList());

        // 启动并行的轮询任务,每个轮询级别一个任务
        _ = Task.Run(() => PollingLoopAsync(_highFreqVars, 200, _cts.Token)); // 高频200ms
        _ = Task.Run(() => PollingLoopAsync(_mediumFreqVars, 1000, _cts.Token)); // 中频1000ms
        _ = Task.Run(() => PollingLoopAsync(_lowFreqVars, 5000, _cts.Token)); // 低频5000ms
    }

    /// <summary>
    /// 热重载方法用于响应配置变更更新Agent内部的变量列表。
    /// </summary>
    /// <param name="allActiveVariables">最新的所有激活变量列表。</param>
    public void UpdateVariableLists(List<Variable> allActiveVariables)
    {
        // 重新分组变量
        _highFreqVars = allActiveVariables.Where(v => v.PollLevel == PollLevelType.High).ToList();
        _mediumFreqVars = allActiveVariables.Where(v => v.PollLevel == PollLevelType.Medium).ToList();
        _lowFreqVars = allActiveVariables.Where(v => v.PollLevel == PollLevelType.Low).ToList();
        // 可以考虑在这里重新启动轮询任务,或者让现有任务检测到列表变化
    }

    /// <summary>
    /// 核心轮询循环,负责批量读取变量并将数据写入处理队列。
    /// </summary>
    private async Task PollingLoopAsync(List<Variable> varsToRead, int interval, CancellationToken token)
    {
        while (!token.IsCancellationRequested)
        {
            if (!varsToRead.Any()) // 如果没有变量需要轮询,则等待
            {
                await Task.Delay(interval, token);
                continue;
            }

            try
            {
                // 使用 S7.Net Plus 的 ReadMultipleVarsAsync 批量读取变量
                // 注意S7.Net Plus 会将读取到的值直接更新到 Variable 对象的 DataValue 属性中
                await _plcClient.ReadMultipleVarsAsync(varsToRead);

                foreach (var variable in varsToRead)
                {
                    // 将读取到的值包含在VariableContext中放入数据处理队列
                    var context = new VariableContext(variable, variable.DataValue);
                    await _processingQueueWriter.WriteAsync(context, token);
                }
            }
            catch (Exception ex)
            {
                // 记录通信错误但不中断整个Agent
                _messenger.Send(new LogMessage(LogLevel.Error, ex, $"S7DeviceAgent: 设备 {_deviceConfig.Name} ({_deviceConfig.IpAddress}) 轮询错误。"));
            }
            await Task.Delay(interval, token); // 等待下一个轮询周期
        }
    }

    /// <summary>
    /// 异步释放Agent资源包括停止轮询任务和关闭PLC连接。
    /// </summary>
    public async ValueTask DisposeAsync()
    {
        _cts?.Cancel(); // 取消所有内部轮询任务
        if (_plcClient.IsConnected)
        {
            _plcClient.Close(); // 关闭PLC连接
        }
        _plcClient?.Dispose();
    }
}

5.2. MQTT发布服务 (MqttPublishService.cs)

负责将数据发布到MQTT Broker。

// 文件: DMS.Infrastructure/Services/Communication/MqttPublishService.cs
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Options;
using System.Threading;
using System.Threading.Tasks;

namespace DMS.Infrastructure.Services.Communication;

public interface IMqttPublishService
{
    Task PublishAsync(MqttServer server, string topic, string payload);
}

public class MqttPublishService : IMqttPublishService
{
    private readonly IMqttClient _mqttClient;

    public MqttPublishService()
    {
        var factory = new MqttFactory();
        _mqttClient = factory.CreateMqttClient();
    }

    public async Task PublishAsync(MqttServer server, string topic, string payload)
    {
        if (!_mqttClient.IsConnected)
        {
            var options = new MqttClientOptionsBuilder()
                .WithTcpServer(server.BrokerAddress, server.Port)
                .WithCredentials(server.Username, server.Password)
                .Build();
            await _mqttClient.ConnectAsync(options, CancellationToken.None);
        }

        var message = new MqttApplicationMessageBuilder()
            .WithTopic(topic)
            .WithPayload(payload)
            .WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce)
            .Build();

        await _mqttClient.PublishAsync(message, CancellationToken.None);
    }
}

5.3. 数据处理消费者 (DataProcessingService.cs)

IChannelBus 读取数据,并启动数据处理链。它作为数据处理链的入口点。

// 文件: DMS.Infrastructure/Services/DataProcessingService.cs
using DMS.Core.Models;
using DMS.WPF.Services;
using CommunityToolkit.Mvvm.Messaging;
using System.Threading.Channels;
using System.Threading.Tasks;
using DMS.Infrastructure.Services.Processing;
using DMS.Core.Interfaces;
using DMS.WPF.Messages;
using NLog;
using Microsoft.Extensions.Caching.Memory;

namespace DMS.Infrastructure.Services;

/// <summary>
/// 数据处理消费者服务从ChannelBus中读取变量数据并启动处理链。
/// </summary>
public class DataProcessingService
{
    private readonly ChannelReader<VariableContext> _queueReader;
    private readonly IMessenger _messenger;
    private readonly IRepositoryManager _repoManager;
    private readonly IMemoryCache _memoryCache; // 用于ChangeDetectionProcessor
    private readonly IMqttPublishService _mqttPublishService; // 用于MqttPublishProcessor
    private static readonly ILogger _logger = LogManager.GetCurrentClassLogger();

    /// <summary>
    /// 构造函数,通过依赖注入获取所需服务。
    /// </summary>
    public DataProcessingService(IChannelBus channelBus, IMessenger messenger, IRepositoryManager repo, IMemoryCache memoryCache, IMqttPublishService mqttPublishService)
    {
        // 从中央总线获取数据处理队列的读取端
        _queueReader = channelBus.GetReader<VariableContext>("DataProcessingQueue");
        _messenger = messenger;
        _repoManager = repo;
        _memoryCache = memoryCache;
        _mqttPublishService = mqttPublishService;
    }

    /// <summary>
    /// 启动数据处理循环,持续从队列中读取数据并进行处理。
    /// </summary>
    public async Task StartProcessingAsync(CancellationToken token)
    {
        await foreach (var context in _queueReader.ReadAllAsync(token))
        {
            try
            {
                // 构建并执行数据处理链
                var changeDetector = new ChangeDetectionProcessor(_memoryCache);
                var historyStorage = new HistoryStorageProcessor(_repoManager.VariableHistories);
                var mqttPublisher = new MqttPublishProcessor(_mqttPublishService, _repoManager);

                // 链式连接处理器
                changeDetector.SetNext(historyStorage).SetNext(mqttPublisher);

                // 启动处理
                await changeDetector.ProcessAsync(context);

                // 处理完成后发送消息通知UI更新变量值
                _messenger.Send(new VariableValueUpdatedMessage(context.Variable.Id, context.CurrentValue));
            }
            catch (Exception ex)
            {
                _logger.Error(ex, $"数据处理链执行错误变量ID: {context.Variable.Id}");
            }
        }
    }
}

5.4. 数据处理器 (Processing/)

采用责任链模式Chain of Responsibility来处理采集到的变量数据。每个处理器Processor负责一个单一的数据处理步骤如变化检测、历史存储、MQTT发布

5.4.1. VariableProcessorBase.cs

// 文件: DMS.Application/Services/Processors/VariableProcessorBase.cs
using DMS.Core.Models;
using System.Threading.Tasks;

namespace DMS.Infrastructure.Services.Processing;

public abstract class VariableProcessorBase : IVariableProcessor
{
    private IVariableProcessor _next;

    public IVariableProcessor SetNext(IVariableProcessor next)
    {
        _next = next;
        return next;
    }

    public virtual async Task ProcessAsync(VariableContext context)
    {
        if (context.IsProcessingTerminated) return;

        await HandleAsync(context);

        if (_next != null && !context.IsProcessingTerminated)
        {
            await _next.ProcessAsync(context);
        }
    }

    // 模板方法,由子类实现具体的处理逻辑
    protected abstract Task HandleAsync(VariableContext context);
}

5.4.2. ChangeDetectionProcessor.cs

职责:检测本次读取的值与上一次的值是否相同。如果相同,则终止后续处理,以节省资源。

// 文件: DMS.Infrastructure/Services/Processors/ChangeDetectionProcessor.cs
using DMS.Core.Models;
using Microsoft.Extensions.Caching.Memory;
using System;
using System.Threading.Tasks;

namespace DMS.Infrastructure.Services.Processing;

public class ChangeDetectionProcessor : VariableProcessorBase
{
    private readonly IMemoryCache _cache; // 使用内存缓存来存储上一次的值

    public ChangeDetectionProcessor(IMemoryCache cache)
    {
        _cache = cache;
    }

    protected override Task HandleAsync(VariableContext context)
    {
        var lastValue = _cache.Get(context.Variable.Id);
        if (lastValue != null && lastValue.Equals(context.RawValue))
        {
            context.IsProcessingTerminated = true; // 值未变化,终止处理
        }
        else
        {
            context.IsValueChanged = true;
            _cache.Set(context.Variable.Id, context.RawValue, TimeSpan.FromDays(1));
        }
        return Task.CompletedTask;
    }
}

5.4.3. HistoryStorageProcessor.cs

职责:将变化后的值存入数据库历史记录表。

// 文件: DMS.Infrastructure/Services/Processors/HistoryStorageProcessor.cs
using DMS.Core.Interfaces;
using DMS.Core.Models;
using System.Threading.Tasks;

namespace DMS.Infrastructure.Services.Processing;

public class HistoryStorageProcessor : VariableProcessorBase
{
    private readonly IVariableHistoryRepository _historyRepository;

    public HistoryStorageProcessor(IVariableHistoryRepository historyRepository)
    {
        _historyRepository = historyRepository;
    }

    protected override async Task HandleAsync(VariableContext context)
    {
        if (!context.IsValueChanged) return;

        var history = new VariableHistory
        {
            VariableId = context.Variable.Id,
            Value = context.CurrentValue.ToString(),
            Timestamp = context.Timestamp
        };
        await _historyRepository.AddAsync(history);
    }
}

5.4.4. MqttPublishProcessor.cs

职责如果变量关联了MQTT服务器则将值发布出去。

// 文件: DMS.Infrastructure/Services/Processing/MqttPublishProcessor.cs
using DMS.Core.Interfaces;
using DMS.Core.Models;
using DMS.Infrastructure.Services.Communication;
using CommunityToolkit.Mvvm.Messaging;
using System.Linq;
using System.Text.Json;
using System.Threading.Tasks;
using NLog;

namespace DMS.Infrastructure.Services.Processing;

/// <summary>
/// MQTT发布处理器负责将变量值发布到关联的MQTT服务器并使用专属别名。
/// </summary>
public class MqttPublishProcessor : VariableProcessorBase
{
    private readonly IMqttPublishService _mqttService;
    private readonly IRepositoryManager _repoManager; // 使用 RepositoryManager 来获取仓储
    private static readonly ILogger _logger = LogManager.GetCurrentClassLogger();

    /// <summary>
    /// 构造函数。
    /// </summary>
    public MqttPublishProcessor(IMqttPublishService mqttService, IRepositoryManager repoManager)
    {
        _mqttService = mqttService;
        _repoManager = repoManager;
    }

    protected override async Task HandleAsync(VariableContext context)
    {
        if (!context.IsValueChanged) return; // 如果值未变化,则不发布

        // 1. 从仓储获取变量及其完整的别名关联列表
        //    这要求 IVariableRepository 有一个方法能加载 VariableMqttAlias 及其 MqttServer
        var variableWithAliases = await _repoManager.Variables.GetVariableWithMqttAliasesAsync(context.Variable.Id);
        
        if (variableWithAliases?.MqttAliases == null || !variableWithAliases.MqttAliases.Any())
        {
            return; // 没有关联的MQTT服务器无需发布
        }

        foreach (var aliasInfo in variableWithAliases.MqttAliases)
        {
            try
            {
                // 确保 MqttServer 导航属性已加载且激活
                var targetServer = aliasInfo.MqttServer;
                if (targetServer == null || !targetServer.IsActive)
                {
                    _logger.Warn($"MQTT发布失败变量 {context.Variable.Name} 关联的MQTT服务器 {aliasInfo.MqttServerId} 不存在或未激活。");
                    continue;
                }

                // 使用别名构建Topic
                // 示例Topic格式DMS/DeviceName/VariableAlias
                var topic = $"DMS/{context.Variable.VariableTable.Device.Name}/{aliasInfo.Alias}";
                var payload = JsonSerializer.Serialize(new { value = context.CurrentValue, timestamp = context.Timestamp });

                await _mqttService.PublishAsync(targetServer, topic, payload);
            }
            catch (Exception ex)
            {
                _logger.Error(ex, $"MQTT发布失败变量 {context.Variable.Name} 到服务器 {aliasInfo.MqttServer.ServerName},别名 {aliasInfo.Alias}");
            }
        }
    }
}

6. 数据映射 (Profiles/)

为了在领域模型 (DMS.Core.Models) 和数据库实体 (DMS.Infrastructure.Entities) 之间进行转换,我们将使用 AutoMapper

// 文件: DMS.Infrastructure/Profiles/MappingProfile.cs
using AutoMapper;
using DMS.Core.Models;
using DMS.Infrastructure.Entities;
using DMS.Core.Enums;

namespace DMS.Infrastructure.Profiles;

public class MappingProfile : Profile
{
    public MappingProfile()
    {
        CreateMap<Device, DbDevice>().ReverseMap();
        CreateMap<VariableTable, DbVariableTable>().ReverseMap();
        CreateMap<Variable, DbVariable>().ReverseMap();
        CreateMap<MqttServer, DbMqttServer>().ReverseMap();
        CreateMap<VariableHistory, DbVariableHistory>().ReverseMap();
        CreateMap<VariableMqttAlias, DbVariableMqttAlias>().ReverseMap();
        CreateMap<MenuBean, DbMenu>().ReverseMap();
        CreateMap<User, DbUser>().ReverseMap();

        // 枚举到int的映射
        CreateMap<ProtocolType, int>().ConvertUsing(src => (int)src);
        CreateMap<int, ProtocolType>().ConvertUsing(src => (ProtocolType)src);
        CreateMap<SignalType, int>().ConvertUsing(src => (int)src);
        CreateMap<int, SignalType>().ConvertUsing(src => (SignalType)src);
        CreateMap<PollLevelType, int>().ConvertUsing(src => (int)src);
        CreateMap<int, PollLevelType>().ConvertUsing(src => (PollLevelType)src);
        CreateMap<CSharpDataType, int>().ConvertUsing(src => (int)src);
        CreateMap<int, CSharpDataType>().ConvertUsing(src => (CSharpDataType)src);
    }
}