# 软件开发文档 - DMS.Infrastructure - 数据访问与服务 本文档详细阐述了 `DMS.Infrastructure` 层的设计,它是所有外部技术和服务的具体实现地。它实现了 `DMS.Core` 定义的接口,为 `DMS.Application` 层提供数据和功能支持,并负责数据持久化、事务管理以及与外部世界的通信。 ## 1. 目录结构 ``` DMS.Infrastructure/ ├── Data/ │ ├── SqlSugarDbContext.cs │ └── RepositoryManager.cs ├── Entities/ │ ├── DbDevice.cs │ ├── DbMqttServer.cs │ ├── DbVariable.cs │ ├── DbVariableTable.cs │ ├── DbVariableHistory.cs │ ├── DbVariableMqttAlias.cs │ ├── DbMenu.cs │ ├── DbNlog.cs │ └── DbUser.cs ├── Repositories/ │ ├── BaseRepository.cs │ ├── DeviceRepository.cs │ ├── MqttServerRepository.cs │ ├── VariableRepository.cs │ ├── VariableTableRepository.cs │ ├── VariableHistoryRepository.cs │ ├── VariableMqttAliasRepository.cs │ ├── MenuRepository.cs │ └── UserRepository.cs ├── Services/ │ ├── Communication/ │ │ ├── S7DeviceAgent.cs │ │ └── MqttPublishService.cs │ ├── Processing/ │ │ ├── ChangeDetectionProcessor.cs │ │ ├── HistoryStorageProcessor.cs │ │ └── MqttPublishProcessor.cs │ ├── S7BackgroundService.cs │ ├── DataProcessingService.cs │ ├── DatabaseInitializerService.cs │ └── MenuService.cs ├── Logging/ │ ├── ThrottlingDatabaseTarget.cs │ └── NLogService.cs ├── Profiles/ │ └── MappingProfile.cs └── DMS.Infrastructure.csproj ``` ## 2. 数据库设计与实体 (`Entities/`) 本文档详细描述了DMS系统的数据库结构,包括表、字段和关系。数据库实体类与数据库表一一对应,使用了 `SqlSugar` 的特性(Attribute)来定义主键、外键等。 ### 2.1. 数据库关系图 (ERD) ```mermaid erDiagram DEVICE { int Id PK varchar Name int Protocol varchar IpAddress int Port int Rack int Slot varchar OpcUaServerUrl bool IsActive } VARIABLE_TABLE { int Id PK varchar Name varchar Description bool IsActive int DeviceId FK } VARIABLE { int Id PK varchar Name varchar Address int DataType int PollLevel bool IsActive int VariableTableId FK varchar OpcUaNodeId bool IsHistoryEnabled double HistoryDeadband bool IsAlarmEnabled double AlarmMinValue double AlarmMaxValue double AlarmDeadband int Protocol int CSharpDataType varchar ConversionFormula datetime CreatedAt datetime UpdatedAt varchar UpdatedBy bool IsModified } MQTT_SERVER { int Id PK varchar ServerName varchar BrokerAddress int Port varchar Username varchar Password bool IsActive } VARIABLE_HISTORY { bigint Id PK int VariableId FK varchar Value datetime Timestamp } VARIABLE_MQTT_ALIAS { int Id PK int VariableId FK int MqttServerId FK varchar Alias } MENU { int Id PK int ParentId FK varchar Header varchar Icon varchar TargetViewKey varchar NavigationParameter int DisplayOrder } NLOG { bigint Id PK datetime Logged varchar Level varchar Message varchar Exception varchar CallSite varchar MethodName int AggregatedCount } USER { int Id PK varchar Username varchar PasswordHash varchar Role bool IsActive } DEVICE ||--o{ VARIABLE_TABLE : "包含" VARIABLE_TABLE ||--o{ VARIABLE : "包含" VARIABLE ||--o{ VARIABLE_HISTORY : "记录" VARIABLE }o--o{ VARIABLE_MQTT_ALIAS : "关联" MQTT_SERVER }o--o{ VARIABLE_MQTT_ALIAS : "关联" MENU ||--o{ MENU : "父子" ``` ### 2.2. 数据库实体类示例 #### `DbDevice.cs` ```csharp // 文件: DMS.Infrastructure/Entities/DbDevice.cs using SqlSugar; namespace DMS.Infrastructure.Entities; [SugarTable("Devices")] public class DbDevice { [SugarColumn(IsPrimaryKey = true, IsIdentity = true)] public int Id { get; set; } public string Name { get; set; } public int Protocol { get; set; } // 对应 ProtocolType 枚举 public string IpAddress { get; set; } public int Port { get; set; } public int Rack { get; set; } public int Slot { get; set; } public string OpcUaServerUrl { get; set; } public bool IsActive { get; set; } } ``` #### `DbVariableTable.cs` ```csharp // 文件: DMS.Infrastructure/Entities/DbVariableTable.cs using SqlSugar; namespace DMS.Infrastructure.Entities; [SugarTable("VariableTables")] public class DbVariableTable { [SugarColumn(IsPrimaryKey = true, IsIdentity = true)] public int Id { get; set; } public string Name { get; set; } public string Description { get; set; } public bool IsActive { get; set; } public int DeviceId { get; set; } public int Protocol { get; set; } // 对应 ProtocolType 枚举 } ``` #### `DbVariable.cs` ```csharp // 文件: DMS.Infrastructure/Entities/DbVariable.cs using SqlSugar; namespace DMS.Infrastructure.Entities; [SugarTable("Variables")] public class DbVariable { [SugarColumn(IsPrimaryKey = true, IsIdentity = true)] public int Id { get; set; } public string Name { get; set; } public string Address { get; set; } public int DataType { get; set; } // 对应 SignalType 枚举 public int PollLevel { get; set; } // 对应 PollLevelType 枚举 public bool IsActive { get; set; } public int VariableTableId { get; set; } public string OpcUaNodeId { get; set; } public bool IsHistoryEnabled { get; set; } public double HistoryDeadband { get; set; } public bool IsAlarmEnabled { get; set; } public double AlarmMinValue { get; set; } public double AlarmMaxValue { get; set; } public double AlarmDeadband { get; set; } public int Protocol { get; set; } // 对应 ProtocolType 枚举 public int CSharpDataType { get; set; } // 对应 CSharpDataType 枚举 public string ConversionFormula { get; set; } public DateTime CreatedAt { get; set; } public DateTime UpdatedAt { get; set; } public string UpdatedBy { get; set; } public bool IsModified { get; set; } } ``` #### `DbMqttServer.cs` ```csharp // 文件: DMS.Infrastructure/Entities/DbMqttServer.cs using SqlSugar; namespace DMS.Infrastructure.Entities; [SugarTable("MqttServers")] public class DbMqttServer { [SugarColumn(IsPrimaryKey = true, IsIdentity = true)] public int Id { get; set; } public string ServerName { get; set; } public string BrokerAddress { get; set; } public int Port { get; set; } public string Username { get; set; } public string Password { get; set; } public bool IsActive { get; set; } public string SubscribeTopic { get; set; } public string PublishTopic { get; set; } public string ClientId { get; set; } public DateTime CreatedAt { get; set; } public DateTime? ConnectedAt { get; set; } public long ConnectionDuration { get; set; } public string MessageFormat { get; set; } } ``` #### `DbVariableHistory.cs` ```csharp // 文件: DMS.Infrastructure/Entities/DbVariableHistory.cs using SqlSugar; namespace DMS.Infrastructure.Entities; [SugarTable("VariableHistories")] public class DbVariableHistory { [SugarColumn(IsPrimaryKey = true, IsIdentity = true)] public long Id { get; set; } public int VariableId { get; set; } public string Value { get; set; } public DateTime Timestamp { get; set; } } ``` #### `DbVariableMqttAlias.cs` ```csharp // 文件: DMS.Infrastructure/Entities/DbVariableMqttAlias.cs using SqlSugar; namespace DMS.Infrastructure.Entities; /// /// 数据库实体:对应数据库中的 VariableMqttAliases 表。 /// [SugarTable("VariableMqttAliases")] public class DbVariableMqttAlias { [SugarColumn(IsPrimaryKey = true, IsIdentity = true)] public int Id { get; set; } /// /// 外键,指向 Variables 表的 Id。 /// public int VariableId { get; set; } /// /// 外键,指向 MqttServers 表的 Id。 /// public int MqttServerId { get; set; } /// /// 针对此特定[变量-服务器]连接的发布别名。 /// public string Alias { get; set; } } ``` #### `DbMenu.cs` ```csharp // 文件: DMS.Infrastructure/Entities/DbMenu.cs using SqlSugar; namespace DMS.Infrastructure.Entities; [SugarTable("Menus")] public class DbMenu { [SugarColumn(IsPrimaryKey = true, IsIdentity = true)] public int Id { get; set; } [SugarColumn(IsNullable = true)] public int? ParentId { get; set; } public string Header { get; set; } public string Icon { get; set; } public string TargetViewKey { get; set; } [SugarColumn(IsNullable = true)] public string NavigationParameter { get; set; } public int DisplayOrder { get; set; } } ``` #### `DbNlog.cs` ```csharp // 文件: DMS.Infrastructure/Entities/DbNlog.cs using SqlSugar; using System; namespace DMS.Infrastructure.Entities; /// /// 数据库实体:对应数据库中的 Logs 表,用于存储应用程序日志。 /// [SugarTable("Logs")] public class DbNlog { [SugarColumn(IsPrimaryKey = true, IsIdentity = true)] public long Id { get; set; } /// /// 日志记录的时间戳。 /// public DateTime Logged { get; set; } /// /// 日志级别 (e.g., "Info", "Warn", "Error", "Debug")。 /// public string Level { get; set; } /// /// 日志消息主体。 /// [SugarColumn(Length = -1)] // 映射为NVARCHAR(MAX)或类似类型 public string Message { get; set; } /// /// 异常信息,包括堆栈跟踪。如果无异常则为null。 /// [SugarColumn(IsNullable = true, Length = -1)] public string Exception { get; set; } /// /// 记录日志的调用点信息 (文件路径:行号)。 /// public string CallSite { get; set; } /// /// 记录日志的方法名。 /// public string MethodName { get; set; } /// /// (用于聚合) 此条日志在指定时间窗口内被触发的总次数。默认为1。 /// public int AggregatedCount { get; set; } = 1; } ``` #### `DbUser.cs` ```csharp // 文件: DMS.Infrastructure/Entities/DbUser.cs using SqlSugar; namespace DMS.Infrastructure.Entities; [SugarTable("Users")] public class DbUser { [SugarColumn(IsPrimaryKey = true, IsIdentity = true)] public int Id { get; set; } public string Username { get; set; } public string PasswordHash { get; set; } public string Role { get; set; } public bool IsActive { get; set; } } ``` ## 3. 事务管理 (`RepositoryManager`) `RepositoryManager` 是工作单元(Unit of Work, UoW)模式的具体实现。它作为所有仓储的统一入口,并管理数据库事务,确保所有通过它获取的仓储实例都共享同一个 `ISqlSugarClient` 实例,从而确保它们在同一个数据库会话和事务中操作。 ### 3.1. 设计思路与考量 * **模式**:`RepositoryManager` 是工作单元(Unit of Work, UoW)模式的具体实现。它作为所有仓储的统一入口,并管理数据库事务。 * **共享上下文**:所有通过 `RepositoryManager` 获取的仓储实例都共享同一个 `ISqlSugarClient` 实例,从而确保它们在同一个数据库会话和事务中操作。 * **生命周期**:`RepositoryManager` 通常被注册为 `Scoped` 或 `Transient` 生命周期,确保每个业务操作都有一个独立的工作单元。 ### 3.2. 设计优势 * **原子性**:确保跨多个仓储的操作(如创建设备、变量表和菜单)要么全部成功,要么全部失败,维护数据一致性。 * **简化事务管理**:应用层无需直接与数据库事务API交互,只需调用 `BeginTransaction()`, `CommitAsync()`, `RollbackAsync()`。 * **解耦**:应用层不直接依赖具体的仓储实现,而是依赖于 `IRepositoryManager` 这一抽象。 * **资源优化**:在单个业务操作中,所有仓储共享同一个数据库连接,减少了连接开销。 ### 3.3. 设计劣势/权衡 * **复杂性增加**:相比于直接使用仓储,引入UoW模式增加了额外的抽象层和概念。 * **仓储依赖**:`IRepositoryManager` 接口需要列出所有它管理的仓储,当新增仓储时,需要修改此接口。 * **懒加载**:为了避免在 `RepositoryManager` 构造时就创建所有仓储实例的开销,通常会使用懒加载(`Lazy`),这增加了少量复杂性。 ### 3.4. 示例:`RepositoryManager.cs` ```csharp // 文件: DMS.Infrastructure/Data/RepositoryManager.cs using DMS.Core.Interfaces; using SqlSugar; using System; namespace DMS.Infrastructure.Data; /// /// IRepositoryManager 的 SqlSugar 实现,管理所有仓储实例和数据库事务。 /// public class RepositoryManager : IRepositoryManager { private readonly ISqlSugarClient _db; private readonly Lazy _lazyDevices; private readonly Lazy _lazyVariableTables; private readonly Lazy _lazyVariables; private readonly Lazy _lazyMqttServers; private readonly Lazy _lazyVariableMqttAliases; private readonly Lazy _lazyMenus; private readonly Lazy _lazyVariableHistories; private readonly Lazy _lazyUsers; /// /// 构造函数,通过依赖注入获取 SqlSugar 客户端实例。 /// public RepositoryManager(ISqlSugarClient db, IMapper mapper) // 添加 IMapper 依赖 { _db = db; // 使用 Lazy 实现仓储的懒加载,确保它们在第一次被访问时才创建。 // 所有仓储都共享同一个 _db 实例,以保证事务的一致性。 _lazyDevices = new Lazy(() => new DeviceRepository(_db, mapper)); _lazyVariableTables = new Lazy(() => new VariableTableRepository(_db, mapper)); _lazyVariables = new Lazy(() => new VariableRepository(_db, mapper)); _lazyMqttServers = new Lazy(() => new MqttServerRepository(_db, mapper)); _lazyVariableMqttAliases = new Lazy(() => new VariableMqttAliasRepository(_db, mapper)); _lazyMenus = new Lazy(() => new MenuRepository(_db, mapper)); _lazyVariableHistories = new Lazy(() => new VariableHistoryRepository(_db, mapper)); _lazyUsers = new Lazy(() => new UserRepository(_db, mapper)); } public IDeviceRepository Devices => _lazyDevices.Value; public IVariableTableRepository VariableTables => _lazyVariableTables.Value; public IVariableRepository Variables => _lazyVariables.Value; public IMqttServerRepository MqttServers => _lazyMqttServers.Value; public IVariableMqttAliasRepository VariableMqttAliases => _lazyVariableMqttAliases.Value; public IMenuRepository Menus => _lazyMenus.Value; public IVariableHistoryRepository VariableHistories => _lazyVariableHistories.Value; public IUserRepository Users => _lazyUsers.Value; /// /// 开始一个新的数据库事务。 /// public void BeginTransaction() { _db.BeginTran(); } /// /// 异步提交当前事务中的所有变更。 /// public async Task CommitAsync() { await _db.CommitTranAsync(); } /// /// 异步回滚当前事务中的所有变更。 /// public async Task RollbackAsync() { await _db.RollbackTranAsync(); } /// /// 释放 SqlSugar 客户端资源。通常由 DI 容器管理。 /// public void Dispose() { _db.Dispose(); } } ``` ## 4. 仓储实现 (`Repositories/`) 仓储(Repository)是数据访问层的一部分,负责实现 `DMS.Core` 中定义的仓储接口。它们使用ORM框架(如SqlSugar)与数据库进行实际交互,将数据库实体转换为领域模型,反之亦然。 ### 4.1. 设计思路与考量 * **职责**:仓储(Repository)是数据访问层的一部分,负责实现 `DMS.Core` 中定义的仓储接口。它们使用ORM框架(如SqlSugar)与数据库进行实际交互,将数据库实体转换为领域模型,反之亦然。 * **通用性**:可以有一个 `BaseRepository` 来实现通用的CRUD操作,减少重复代码。 * **特定查询**:为每个仓储接口实现其特有的查询方法(如 `GetActiveDevicesWithDetailsAsync`)。 ### 4.2. 设计优势 * **实现细节封装**:将ORM框架和数据库查询的细节封装在仓储内部,上层无需关心。 * **可替换性**:如果需要更换ORM框架或数据库类型,只需修改仓储实现,而无需修改应用层或核心层。 * **性能优化点**:可以在仓储层进行查询优化(如使用 `Include` 或 `Join` 预加载关联数据)。 ### 4.3. 设计劣势/权衡 * **代码量**:即使有 `BaseRepository`,每个仓储仍然需要一些样板代码。 * **复杂查询**:对于非常复杂的、跨多个聚合根的查询,有时仓储模式会显得笨拙,可能需要引入查询对象(Query Object)模式。 ### 4.4. 示例:`BaseRepository.cs` ```csharp // 文件: DMS.Infrastructure/Repositories/BaseRepository.cs using AutoMapper; using DMS.Core.Interfaces; using SqlSugar; using System.Collections.Generic; using System.Threading.Tasks; namespace DMS.Infrastructure.Repositories; /// /// 仓储基类,实现了 IBaseRepository 接口的通用 CRUD 操作。 /// /// 领域模型类型。 /// 数据库实体类型。 public abstract class BaseRepository : IBaseRepository where TDomain : class where TDbEntity : class, new() { protected readonly ISqlSugarClient _db; protected readonly IMapper _mapper; public BaseRepository(ISqlSugarClient db, IMapper mapper) { _db = db; _mapper = mapper; } public virtual async Task AddAsync(TDomain entity) { var dbEntity = _mapper.Map(entity); await _db.Insertable(dbEntity).ExecuteCommandAsync(); // 映射回ID,如果实体是自增ID _mapper.Map(dbEntity, entity); } public virtual async Task DeleteAsync(int id) { await _db.Deleteable(id).ExecuteCommandAsync(); } public virtual async Task> GetAllAsync() { var dbEntities = await _db.Queryable().ToListAsync(); return _mapper.Map>(dbEntities); } public virtual async Task GetByIdAsync(int id) { var dbEntity = await _db.Queryable().InSingleAsync(id); return _mapper.Map(dbEntity); } public virtual async Task UpdateAsync(TDomain entity) { var dbEntity = _mapper.Map(entity); await _db.Updateable(dbEntity).ExecuteCommandAsync(); } } ``` ### 4.5. 示例:`DeviceRepository.cs` ```csharp // 文件: DMS.Infrastructure/Repositories/DeviceRepository.cs using AutoMapper; using DMS.Core.Interfaces; using DMS.Core.Models; using DMS.Infrastructure.Entities; using SqlSugar; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; namespace DMS.Infrastructure.Repositories; /// /// 设备仓储的具体实现。 /// public class DeviceRepository : BaseRepository, IDeviceRepository { public DeviceRepository(ISqlSugarClient db, IMapper mapper) : base(db, mapper) { } /// /// 异步获取所有激活的S7设备,并级联加载其下的变量表和变量。 /// public async Task> GetActiveDevicesWithDetailsAsync(ProtocolType protocol) { var dbDevices = await _db.Queryable() .Where(d => d.IsActive && d.Protocol == (int)protocol) .Mapper(d => d.VariableTables, d => d.Id, d => d.VariableTables.Select(vt => vt.DeviceId).ToList()) .Mapper(d => d.VariableTables.Select(vt => vt.Variables), vt => vt.Id, vt => vt.Variables.Select(v => v.VariableTableId).ToList()) .ToListAsync(); return _mapper.Map>(dbDevices); } /// /// 异步根据设备ID获取设备及其所有详细信息(变量表、变量、MQTT别名等)。 /// public async Task GetDeviceWithDetailsAsync(int deviceId) { var dbDevice = await _db.Queryable() .Where(d => d.Id == deviceId) .Mapper(d => d.VariableTables, d => d.Id, d => d.VariableTables.Select(vt => vt.DeviceId).ToList()) .Mapper(d => d.VariableTables.Select(vt => vt.Variables), vt => vt.Id, vt => vt.Variables.Select(v => v.VariableTableId).ToList()) .FirstAsync(); if (dbDevice == null) return null; // 手动加载 VariableMqttAlias,因为 SqlSugar 的 Mapper 可能无法直接处理多层嵌套的关联实体 foreach (var variableTable in dbDevice.VariableTables) { foreach (var variable in variableTable.Variables) { var dbAliases = await _db.Queryable() .Where(a => a.VariableId == variable.Id) .Mapper(a => a.MqttServer, a => a.MqttServerId) .ToListAsync(); variable.MqttAliases = _mapper.Map>(dbAliases); } } return _mapper.Map(dbDevice); } } ``` ## 5. 外部服务 (`Services/`) ### 5.1. S7通信架构 (“编排者-代理”模式) 采用“编排者-代理”(Orchestrator-Agent)模式。`S7BackgroundService` 作为编排者,负责管理所有S7设备的生命周期;每个 `S7DeviceAgent` 作为代理,专门负责与**一个**S7 PLC进行所有交互。 #### 5.1.1. 设计思路与考量 * **模式**:采用“编排者-代理”(Orchestrator-Agent)模式。`S7BackgroundService` 作为编排者,负责管理所有S7设备的生命周期;每个 `S7DeviceAgent` 作为代理,专门负责与**一个**S7 PLC进行所有交互。 * **职责分离**:将设备管理(启动、停止、配置更新)与具体设备通信(连接、轮询、读写)的职责分离。 * **并发性**:每个 `S7DeviceAgent` 独立运行,可以并行处理多个设备的通信,提高系统吞吐量。 * **热重载**:通过消息机制,允许在运行时动态更新设备的变量配置,而无需重启整个服务。 #### 5.1.2. 设计优势 * **高可靠性**:单个设备的通信故障不会影响其他设备的正常运行。 * **可扩展性**:易于添加新的设备类型或通信协议,只需实现新的 `DeviceAgent`。 * **动态配置**:支持运行时修改设备和变量配置,提高了系统的灵活性和可用性。 * **资源隔离**:每个Agent管理自己的连接和资源,避免资源争用。 #### 5.1.3. 设计劣势/权衡 * **复杂性增加**:引入了Agent的概念和Agent与Orchestrator之间的通信机制,增加了初期设计和实现复杂性。 * **资源消耗**:每个Agent可能需要维护独立的连接和线程,当设备数量非常庞大时,可能会增加资源消耗。 #### 5.1.4. `S7BackgroundService.cs` (编排者) 作为 `IHostedService` 运行,负责从数据库加载激活的S7设备,并为每个设备创建和管理 `S7DeviceAgent` 实例。它还监听配置变更消息,以触发Agent的热重载。 ```csharp // 文件: DMS.Infrastructure/Services/S7BackgroundService.cs using Microsoft.Extensions.Hosting; using DMS.Core.Interfaces; using DMS.WPF.Services; using CommunityToolkit.Mvvm.Messaging; using System.Collections.Concurrent; using System.Threading; using System.Threading.Tasks; using DMS.Core.Enums; using DMS.WPF.Messages; namespace DMS.Infrastructure.Services; /// /// S7后台服务编排者,作为IHostedService运行,管理所有S7设备的通信代理。 /// public class S7BackgroundService : IHostedService { private readonly IRepositoryManager _repoManager; private readonly IChannelBus _channelBus; private readonly IMessenger _messenger; private readonly ConcurrentDictionary _activeAgents = new(); /// /// 构造函数,通过依赖注入获取所需服务。 /// public S7BackgroundService(IRepositoryManager repo, IChannelBus bus, IMessenger msg) { _repoManager = repo; _channelBus = bus; _messenger = msg; // 注册配置变更消息,以便在设备或变量配置更新时通知Agent _messenger.Register(this, async (r, m) => await HandleConfigChange(m)); } /// /// 服务启动时调用,加载所有激活的S7设备并启动其代理。 /// public async Task StartAsync(CancellationToken cancellationToken) { // 获取所有激活的S7设备及其详细信息 var s7Devices = await _repoManager.Devices.GetActiveDevicesWithDetailsAsync(ProtocolType.S7); foreach (var device in s7Devices) { // 为每个设备创建一个S7DeviceAgent实例 var agent = new S7DeviceAgent(device, _channelBus, _messenger); if (_activeAgents.TryAdd(device.Id, agent)) { await agent.StartAsync(cancellationToken); } } // 启动数据处理消费者服务,它将从ChannelBus中读取数据 var dataProcessor = new DataProcessingService(_channelBus, _messenger, _repoManager); _ = dataProcessor.StartProcessingAsync(cancellationToken); // 在后台运行,不阻塞启动 } /// /// 处理配置变更消息,通知相关Agent更新其变量列表。 /// private async Task HandleConfigChange(ConfigChangedMessage message) { // 从数据库重新加载受影响的设备及其最新配置 var updatedDevice = await _repoManager.Devices.GetDeviceWithDetailsAsync(message.DeviceId); if (updatedDevice != null && _activeAgents.TryGetValue(message.DeviceId, out var agent)) { // 指示Agent使用新的变量列表进行热重载 agent.UpdateVariableLists(updatedDevice.VariableTables.SelectMany(vt => vt.Variables).ToList()); } } /// /// 服务停止时调用,停止所有活动的Agent并释放资源。 /// public async Task StopAsync(CancellationToken cancellationToken) { foreach (var agent in _activeAgents.Values) { await agent.DisposeAsync(); } _activeAgents.Clear(); } } ``` #### 5.1.5. `S7DeviceAgent.cs` (代理) 负责与单个PLC建立连接、维护连接、按不同频率并行轮询变量,并将读取到的数据通过 `IChannelBus` 写入数据处理队列。 ```csharp // 文件: DMS.Infrastructure/Services/Communication/S7DeviceAgent.cs using S7.Net; using DMS.Core.Models; using DMS.Core.Enums; using DMS.WPF.Services; using CommunityToolkit.Mvvm.Messaging; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; using System; namespace DMS.Infrastructure.Services.Communication; /// /// 单个S7 PLC的通信代理,负责连接、轮询和数据发送。 /// public class S7DeviceAgent : IAsyncDisposable { private readonly Device _deviceConfig; private readonly ChannelWriter _processingQueueWriter; private readonly IMessenger _messenger; private readonly Plc _plcClient; private CancellationTokenSource _cts; // 用于控制Agent内部的轮询任务 // 存储按轮询级别分组的变量列表 private List _highFreqVars = new(); private List _mediumFreqVars = new(); private List _lowFreqVars = new(); /// /// 构造函数。 /// /// 设备的配置信息。 /// 中央通道总线服务。 /// 消息总线服务。 public S7DeviceAgent(Device device, IChannelBus channelBus, IMessenger messenger) { _deviceConfig = device; _messenger = messenger; // 从中央总线获取指定名称的通道的写入端 _processingQueueWriter = channelBus.GetWriter("DataProcessingQueue"); // 初始化S7.Net PLC客户端 _plcClient = new Plc( (CpuType)Enum.Parse(typeof(CpuType), _deviceConfig.Protocol.ToString()), // 根据协议类型解析CPU类型 _deviceConfig.IpAddress, (short)_deviceConfig.Rack, (short)_deviceConfig.Slot ); } /// /// 启动Agent的通信循环。 /// public async Task StartAsync(CancellationToken cancellationToken = default) { _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); await _plcClient.OpenAsync(); // 建立PLC连接 // 初始加载变量列表并分组 UpdateVariableLists(_deviceConfig.VariableTables.SelectMany(vt => vt.Variables).ToList()); // 启动并行的轮询任务,每个轮询级别一个任务 _ = Task.Run(() => PollingLoopAsync(_highFreqVars, 200, _cts.Token)); // 高频:200ms _ = Task.Run(() => PollingLoopAsync(_mediumFreqVars, 1000, _cts.Token)); // 中频:1000ms _ = Task.Run(() => PollingLoopAsync(_lowFreqVars, 5000, _cts.Token)); // 低频:5000ms } /// /// 热重载方法,用于响应配置变更,更新Agent内部的变量列表。 /// /// 最新的所有激活变量列表。 public void UpdateVariableLists(List allActiveVariables) { // 重新分组变量 _highFreqVars = allActiveVariables.Where(v => v.PollLevel == PollLevelType.High).ToList(); _mediumFreqVars = allActiveVariables.Where(v => v.PollLevel == PollLevelType.Medium).ToList(); _lowFreqVars = allActiveVariables.Where(v => v.PollLevel == PollLevelType.Low).ToList(); // 可以考虑在这里重新启动轮询任务,或者让现有任务检测到列表变化 } /// /// 核心轮询循环,负责批量读取变量并将数据写入处理队列。 /// private async Task PollingLoopAsync(List varsToRead, int interval, CancellationToken token) { while (!token.IsCancellationRequested) { if (!varsToRead.Any()) // 如果没有变量需要轮询,则等待 { await Task.Delay(interval, token); continue; } try { // 使用 S7.Net Plus 的 ReadMultipleVarsAsync 批量读取变量 // 注意:S7.Net Plus 会将读取到的值直接更新到 Variable 对象的 DataValue 属性中 await _plcClient.ReadMultipleVarsAsync(varsToRead); foreach (var variable in varsToRead) { // 将读取到的值(包含在VariableContext中)放入数据处理队列 var context = new VariableContext(variable, variable.DataValue); await _processingQueueWriter.WriteAsync(context, token); } } catch (Exception ex) { // 记录通信错误,但不中断整个Agent _messenger.Send(new LogMessage(LogLevel.Error, ex, $"S7DeviceAgent: 设备 {_deviceConfig.Name} ({_deviceConfig.IpAddress}) 轮询错误。")); } await Task.Delay(interval, token); // 等待下一个轮询周期 } } /// /// 异步释放Agent资源,包括停止轮询任务和关闭PLC连接。 /// public async ValueTask DisposeAsync() { _cts?.Cancel(); // 取消所有内部轮询任务 if (_plcClient.IsConnected) { _plcClient.Close(); // 关闭PLC连接 } _plcClient?.Dispose(); } } ``` ### 5.2. MQTT发布服务 (`MqttPublishService.cs`) 负责将数据发布到MQTT Broker。 ```csharp // 文件: DMS.Infrastructure/Services/Communication/MqttPublishService.cs using MQTTnet; using MQTTnet.Client; using MQTTnet.Client.Options; using System.Threading; using System.Threading.Tasks; namespace DMS.Infrastructure.Services.Communication; public interface IMqttPublishService { Task PublishAsync(MqttServer server, string topic, string payload); } public class MqttPublishService : IMqttPublishService { private readonly IMqttClient _mqttClient; public MqttPublishService() { var factory = new MqttFactory(); _mqttClient = factory.CreateMqttClient(); } public async Task PublishAsync(MqttServer server, string topic, string payload) { if (!_mqttClient.IsConnected) { var options = new MqttClientOptionsBuilder() .WithTcpServer(server.BrokerAddress, server.Port) .WithCredentials(server.Username, server.Password) .Build(); await _mqttClient.ConnectAsync(options, CancellationToken.None); } var message = new MqttApplicationMessageBuilder() .WithTopic(topic) .WithPayload(payload) .WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce) .Build(); await _mqttClient.PublishAsync(message, CancellationToken.None); } } ``` ### 5.3. 数据处理消费者 (`DataProcessingService.cs`) 从 `IChannelBus` 读取数据,并启动数据处理链。它作为数据处理链的入口点。 ```csharp // 文件: DMS.Infrastructure/Services/DataProcessingService.cs using DMS.Core.Models; using DMS.WPF.Services; using CommunityToolkit.Mvvm.Messaging; using System.Threading.Channels; using System.Threading.Tasks; using DMS.Infrastructure.Services.Processing; using DMS.Core.Interfaces; using DMS.WPF.Messages; using NLog; using Microsoft.Extensions.Caching.Memory; namespace DMS.Infrastructure.Services; /// /// 数据处理消费者服务,从ChannelBus中读取变量数据并启动处理链。 /// public class DataProcessingService { private readonly ChannelReader _queueReader; private readonly IMessenger _messenger; private readonly IRepositoryManager _repoManager; private readonly IMemoryCache _memoryCache; // 用于ChangeDetectionProcessor private readonly IMqttPublishService _mqttPublishService; // 用于MqttPublishProcessor private static readonly ILogger _logger = LogManager.GetCurrentClassLogger(); /// /// 构造函数,通过依赖注入获取所需服务。 /// public DataProcessingService(IChannelBus channelBus, IMessenger messenger, IRepositoryManager repo, IMemoryCache memoryCache, IMqttPublishService mqttPublishService) { // 从中央总线获取数据处理队列的读取端 _queueReader = channelBus.GetReader("DataProcessingQueue"); _messenger = messenger; _repoManager = repo; _memoryCache = memoryCache; _mqttPublishService = mqttPublishService; } /// /// 启动数据处理循环,持续从队列中读取数据并进行处理。 /// public async Task StartProcessingAsync(CancellationToken token) { await foreach (var context in _queueReader.ReadAllAsync(token)) { try { // 构建并执行数据处理链 var changeDetector = new ChangeDetectionProcessor(_memoryCache); var historyStorage = new HistoryStorageProcessor(_repoManager.VariableHistories); var mqttPublisher = new MqttPublishProcessor(_mqttPublishService, _repoManager); // 链式连接处理器 changeDetector.SetNext(historyStorage).SetNext(mqttPublisher); // 启动处理 await changeDetector.ProcessAsync(context); // 处理完成后,发送消息通知UI更新变量值 _messenger.Send(new VariableValueUpdatedMessage(context.Variable.Id, context.CurrentValue)); } catch (Exception ex) { _logger.Error(ex, $"数据处理链执行错误,变量ID: {context.Variable.Id}"); } } } } ``` ### 5.4. 数据处理器 (`Processing/`) 采用责任链模式(Chain of Responsibility)来处理采集到的变量数据。每个处理器(Processor)负责一个单一的数据处理步骤(如变化检测、历史存储、MQTT发布)。 #### 5.4.1. `VariableProcessorBase.cs` ```csharp // 文件: DMS.Application/Services/Processors/VariableProcessorBase.cs using DMS.Core.Models; using System.Threading.Tasks; namespace DMS.Infrastructure.Services.Processing; public abstract class VariableProcessorBase : IVariableProcessor { private IVariableProcessor _next; public IVariableProcessor SetNext(IVariableProcessor next) { _next = next; return next; } public virtual async Task ProcessAsync(VariableContext context) { if (context.IsProcessingTerminated) return; await HandleAsync(context); if (_next != null && !context.IsProcessingTerminated) { await _next.ProcessAsync(context); } } // 模板方法,由子类实现具体的处理逻辑 protected abstract Task HandleAsync(VariableContext context); } ``` #### 5.4.2. `ChangeDetectionProcessor.cs` **职责**:检测本次读取的值与上一次的值是否相同。如果相同,则终止后续处理,以节省资源。 ```csharp // 文件: DMS.Infrastructure/Services/Processors/ChangeDetectionProcessor.cs using DMS.Core.Models; using Microsoft.Extensions.Caching.Memory; using System; using System.Threading.Tasks; namespace DMS.Infrastructure.Services.Processing; public class ChangeDetectionProcessor : VariableProcessorBase { private readonly IMemoryCache _cache; // 使用内存缓存来存储上一次的值 public ChangeDetectionProcessor(IMemoryCache cache) { _cache = cache; } protected override Task HandleAsync(VariableContext context) { var lastValue = _cache.Get(context.Variable.Id); if (lastValue != null && lastValue.Equals(context.RawValue)) { context.IsProcessingTerminated = true; // 值未变化,终止处理 } else { context.IsValueChanged = true; _cache.Set(context.Variable.Id, context.RawValue, TimeSpan.FromDays(1)); } return Task.CompletedTask; } } ``` #### 5.4.3. `HistoryStorageProcessor.cs` **职责**:将变化后的值存入数据库历史记录表。 ```csharp // 文件: DMS.Infrastructure/Services/Processors/HistoryStorageProcessor.cs using DMS.Core.Interfaces; using DMS.Core.Models; using System.Threading.Tasks; namespace DMS.Infrastructure.Services.Processing; public class HistoryStorageProcessor : VariableProcessorBase { private readonly IVariableHistoryRepository _historyRepository; public HistoryStorageProcessor(IVariableHistoryRepository historyRepository) { _historyRepository = historyRepository; } protected override async Task HandleAsync(VariableContext context) { if (!context.IsValueChanged) return; var history = new VariableHistory { VariableId = context.Variable.Id, Value = context.CurrentValue.ToString(), Timestamp = context.Timestamp }; await _historyRepository.AddAsync(history); } } ``` #### 5.4.4. `MqttPublishProcessor.cs` **职责**:如果变量关联了MQTT服务器,则将值发布出去。 ```csharp // 文件: DMS.Infrastructure/Services/Processing/MqttPublishProcessor.cs using DMS.Core.Interfaces; using DMS.Core.Models; using DMS.Infrastructure.Services.Communication; using CommunityToolkit.Mvvm.Messaging; using System.Linq; using System.Text.Json; using System.Threading.Tasks; using NLog; namespace DMS.Infrastructure.Services.Processing; /// /// MQTT发布处理器,负责将变量值发布到关联的MQTT服务器,并使用专属别名。 /// public class MqttPublishProcessor : VariableProcessorBase { private readonly IMqttPublishService _mqttService; private readonly IRepositoryManager _repoManager; // 使用 RepositoryManager 来获取仓储 private static readonly ILogger _logger = LogManager.GetCurrentClassLogger(); /// /// 构造函数。 /// public MqttPublishProcessor(IMqttPublishService mqttService, IRepositoryManager repoManager) { _mqttService = mqttService; _repoManager = repoManager; } protected override async Task HandleAsync(VariableContext context) { if (!context.IsValueChanged) return; // 如果值未变化,则不发布 // 1. 从仓储获取变量及其完整的别名关联列表 // 这要求 IVariableRepository 有一个方法能加载 VariableMqttAlias 及其 MqttServer var variableWithAliases = await _repoManager.Variables.GetVariableWithMqttAliasesAsync(context.Variable.Id); if (variableWithAliases?.MqttAliases == null || !variableWithAliases.MqttAliases.Any()) { return; // 没有关联的MQTT服务器,无需发布 } foreach (var aliasInfo in variableWithAliases.MqttAliases) { try { // 确保 MqttServer 导航属性已加载且激活 var targetServer = aliasInfo.MqttServer; if (targetServer == null || !targetServer.IsActive) { _logger.Warn($"MQTT发布失败:变量 {context.Variable.Name} 关联的MQTT服务器 {aliasInfo.MqttServerId} 不存在或未激活。"); continue; } // 使用别名构建Topic // 示例Topic格式:DMS/DeviceName/VariableAlias var topic = $"DMS/{context.Variable.VariableTable.Device.Name}/{aliasInfo.Alias}"; var payload = JsonSerializer.Serialize(new { value = context.CurrentValue, timestamp = context.Timestamp }); await _mqttService.PublishAsync(targetServer, topic, payload); } catch (Exception ex) { _logger.Error(ex, $"MQTT发布失败:变量 {context.Variable.Name} 到服务器 {aliasInfo.MqttServer.ServerName},别名 {aliasInfo.Alias}"); } } } } ``` ## 6. 数据映射 (`Profiles/`) 为了在领域模型 (`DMS.Core.Models`) 和数据库实体 (`DMS.Infrastructure.Entities`) 之间进行转换,我们将使用 `AutoMapper`。 ```csharp // 文件: DMS.Infrastructure/Profiles/MappingProfile.cs using AutoMapper; using DMS.Core.Models; using DMS.Infrastructure.Entities; using DMS.Core.Enums; namespace DMS.Infrastructure.Profiles; public class MappingProfile : Profile { public MappingProfile() { CreateMap().ReverseMap(); CreateMap().ReverseMap(); CreateMap().ReverseMap(); CreateMap().ReverseMap(); CreateMap().ReverseMap(); CreateMap().ReverseMap(); CreateMap().ReverseMap(); CreateMap().ReverseMap(); // 枚举到int的映射 CreateMap().ConvertUsing(src => (int)src); CreateMap().ConvertUsing(src => (ProtocolType)src); CreateMap().ConvertUsing(src => (int)src); CreateMap().ConvertUsing(src => (SignalType)src); CreateMap().ConvertUsing(src => (int)src); CreateMap().ConvertUsing(src => (PollLevelType)src); CreateMap().ConvertUsing(src => (int)src); CreateMap().ConvertUsing(src => (CSharpDataType)src); } } ```