# 软件开发文档 - DMS.Infrastructure - 数据访问与服务
本文档详细阐述了 `DMS.Infrastructure` 层的设计,它是所有外部技术和服务的具体实现地。它实现了 `DMS.Core` 定义的接口,为 `DMS.Application` 层提供数据和功能支持,并负责数据持久化、事务管理以及与外部世界的通信。
## 1. 目录结构
```
DMS.Infrastructure/
├── Data/
│ ├── SqlSugarDbContext.cs
│ └── RepositoryManager.cs
├── Entities/
│ ├── DbDevice.cs
│ ├── DbMqttServer.cs
│ ├── DbVariable.cs
│ ├── DbVariableTable.cs
│ ├── DbVariableHistory.cs
│ ├── DbVariableMqttAlias.cs
│ ├── DbMenu.cs
│ ├── DbNlog.cs
│ └── DbUser.cs
├── Repositories/
│ ├── BaseRepository.cs
│ ├── DeviceRepository.cs
│ ├── MqttServerRepository.cs
│ ├── VariableRepository.cs
│ ├── VariableTableRepository.cs
│ ├── VariableHistoryRepository.cs
│ ├── VariableMqttAliasRepository.cs
│ ├── MenuRepository.cs
│ └── UserRepository.cs
├── Services/
│ ├── Communication/
│ │ ├── S7DeviceAgent.cs
│ │ └── MqttPublishService.cs
│ ├── Processing/
│ │ ├── ChangeDetectionProcessor.cs
│ │ ├── HistoryStorageProcessor.cs
│ │ └── MqttPublishProcessor.cs
│ ├── S7BackgroundService.cs
│ ├── DataProcessingService.cs
│ ├── DatabaseInitializerService.cs
│ └── MenuService.cs
├── Logging/
│ ├── ThrottlingDatabaseTarget.cs
│ └── NLogService.cs
├── Profiles/
│ └── MappingProfile.cs
└── DMS.Infrastructure.csproj
```
## 2. 数据库设计与实体 (`Entities/`)
本文档详细描述了DMS系统的数据库结构,包括表、字段和关系。数据库实体类与数据库表一一对应,使用了 `SqlSugar` 的特性(Attribute)来定义主键、外键等。
### 2.1. 数据库关系图 (ERD)
```mermaid
erDiagram
DEVICE {
int Id PK
varchar Name
int Protocol
varchar IpAddress
int Port
bool IsActive
}
VARIABLE_TABLE {
int Id PK
varchar Name
varchar Description
bool IsActive
int DeviceId FK
}
VARIABLE {
int Id PK
varchar Name
varchar Address
int DataType
bool IsActive
int VariableTableId FK
}
MQTT_SERVER {
int Id PK
varchar ServerName
varchar BrokerAddress
int Port
varchar Username
varchar Password
bool IsActive
}
VARIABLE_HISTORY {
bigint Id PK
int VariableId FK
varchar Value
datetime Timestamp
}
VARIABLE_MQTT_ALIAS {
int Id PK
int VariableId FK
int MqttServerId FK
varchar Alias
}
MENU {
int Id PK
int ParentId FK
varchar Header
varchar Icon
varchar TargetViewKey
varchar NavigationParameter
int DisplayOrder
}
NLOG {
bigint Id PK
datetime Logged
varchar Level
varchar Message
varchar Exception
varchar CallSite
varchar MethodName
int AggregatedCount
}
USER {
int Id PK
varchar Username
varchar PasswordHash
varchar Role
bool IsActive
}
DEVICE ||--o{ VARIABLE_TABLE : "包含"
VARIABLE_TABLE ||--o{ VARIABLE : "包含"
VARIABLE ||--o{ VARIABLE_HISTORY : "记录"
VARIABLE }o--o{ VARIABLE_MQTT_ALIAS : "关联"
MQTT_SERVER }o--o{ VARIABLE_MQTT_ALIAS : "关联"
MENU ||--o{ MENU : "父子"
```
### 2.2. 数据库实体类示例
#### `DbDevice.cs`
```csharp
// 文件: DMS.Infrastructure/Entities/DbDevice.cs
using SqlSugar;
namespace DMS.Infrastructure.Entities;
[SugarTable("Devices")]
public class DbDevice
{
[SugarColumn(IsPrimaryKey = true, IsIdentity = true)]
public int Id { get; set; }
public string Name { get; set; }
public int Protocol { get; set; } // 对应 ProtocolType 枚举
public string IpAddress { get; set; }
public int Port { get; set; }
public bool IsActive { get; set; }
public int Rack { get; set; }
public int Slot { get; set; }
}
```
#### `DbVariableTable.cs`
```csharp
// 文件: DMS.Infrastructure/Entities/DbVariableTable.cs
using SqlSugar;
namespace DMS.Infrastructure.Entities;
[SugarTable("VariableTables")]
public class DbVariableTable
{
[SugarColumn(IsPrimaryKey = true, IsIdentity = true)]
public int Id { get; set; }
public string Name { get; set; }
public string Description { get; set; }
public bool IsActive { get; set; }
public int DeviceId { get; set; }
public int Protocol { get; set; } // 对应 ProtocolType 枚举
}
```
#### `DbVariable.cs`
```csharp
// 文件: DMS.Infrastructure/Entities/DbVariable.cs
using SqlSugar;
namespace DMS.Infrastructure.Entities;
[SugarTable("Variables")]
public class DbVariable
{
[SugarColumn(IsPrimaryKey = true, IsIdentity = true)]
public int Id { get; set; }
public string Name { get; set; }
public string Address { get; set; }
public int DataType { get; set; } // 对应 SignalType 枚举
public int PollLevel { get; set; } // 对应 PollLevelType 枚举
public bool IsActive { get; set; }
public int VariableTableId { get; set; }
public int Protocol { get; set; } // 对应 ProtocolType 枚举
public int CSharpDataType { get; set; } // 对应 CSharpDataType 枚举
public string ConversionFormula { get; set; }
public DateTime CreatedAt { get; set; }
public DateTime UpdatedAt { get; set; }
public string UpdatedBy { get; set; }
public bool IsModified { get; set; }
}
```
#### `DbMqttServer.cs`
```csharp
// 文件: DMS.Infrastructure/Entities/DbMqttServer.cs
using SqlSugar;
namespace DMS.Infrastructure.Entities;
[SugarTable("MqttServers")]
public class DbMqttServer
{
[SugarColumn(IsPrimaryKey = true, IsIdentity = true)]
public int Id { get; set; }
public string ServerName { get; set; }
public string BrokerAddress { get; set; }
public int Port { get; set; }
public string Username { get; set; }
public string Password { get; set; }
public bool IsActive { get; set; }
public string SubscribeTopic { get; set; }
public string PublishTopic { get; set; }
public string ClientId { get; set; }
public DateTime CreatedAt { get; set; }
public DateTime? ConnectedAt { get; set; }
public long ConnectionDuration { get; set; }
public string MessageFormat { get; set; }
}
```
#### `DbVariableHistory.cs`
```csharp
// 文件: DMS.Infrastructure/Entities/DbVariableHistory.cs
using SqlSugar;
namespace DMS.Infrastructure.Entities;
[SugarTable("VariableHistories")]
public class DbVariableHistory
{
[SugarColumn(IsPrimaryKey = true, IsIdentity = true)]
public long Id { get; set; }
public int VariableId { get; set; }
public string Value { get; set; }
public DateTime Timestamp { get; set; }
}
```
#### `DbVariableMqttAlias.cs`
```csharp
// 文件: DMS.Infrastructure/Entities/DbVariableMqttAlias.cs
using SqlSugar;
namespace DMS.Infrastructure.Entities;
///
/// 数据库实体:对应数据库中的 VariableMqttAliases 表。
///
[SugarTable("VariableMqttAliases")]
public class DbVariableMqttAlias
{
[SugarColumn(IsPrimaryKey = true, IsIdentity = true)]
public int Id { get; set; }
///
/// 外键,指向 Variables 表的 Id。
///
public int VariableId { get; set; }
///
/// 外键,指向 MqttServers 表的 Id。
///
public int MqttServerId { get; set; }
///
/// 针对此特定[变量-服务器]连接的发布别名。
///
public string Alias { get; set; }
}
```
#### `DbMenu.cs`
```csharp
// 文件: DMS.Infrastructure/Entities/DbMenu.cs
using SqlSugar;
namespace DMS.Infrastructure.Entities;
[SugarTable("Menus")]
public class DbMenu
{
[SugarColumn(IsPrimaryKey = true, IsIdentity = true)]
public int Id { get; set; }
[SugarColumn(IsNullable = true)]
public int? ParentId { get; set; }
public string Header { get; set; }
public string Icon { get; set; }
public string TargetViewKey { get; set; }
[SugarColumn(IsNullable = true)]
public string NavigationParameter { get; set; }
public int DisplayOrder { get; set; }
}
```
#### `DbNlog.cs`
```csharp
// 文件: DMS.Infrastructure/Entities/DbNlog.cs
using SqlSugar;
using System;
namespace DMS.Infrastructure.Entities;
///
/// 数据库实体:对应数据库中的 Logs 表,用于存储应用程序日志。
///
[SugarTable("Logs")]
public class DbNlog
{
[SugarColumn(IsPrimaryKey = true, IsIdentity = true)]
public long Id { get; set; }
///
/// 日志记录的时间戳。
///
public DateTime Logged { get; set; }
///
/// 日志级别 (e.g., "Info", "Warn", "Error", "Debug")。
///
public string Level { get; set; }
///
/// 日志消息主体。
///
[SugarColumn(Length = -1)] // 映射为NVARCHAR(MAX)或类似类型
public string Message { get; set; }
///
/// 异常信息,包括堆栈跟踪。如果无异常则为null。
///
[SugarColumn(IsNullable = true, Length = -1)]
public string Exception { get; set; }
///
/// 记录日志的调用点信息 (文件路径:行号)。
///
public string CallSite { get; set; }
///
/// 记录日志的方法名。
///
public string MethodName { get; set; }
///
/// (用于聚合) 此条日志在指定时间窗口内被触发的总次数。默认为1。
///
public int AggregatedCount { get; set; } = 1;
}
```
#### `DbUser.cs`
```csharp
// 文件: DMS.Infrastructure/Entities/DbUser.cs
using SqlSugar;
namespace DMS.Infrastructure.Entities;
[SugarTable("Users")]
public class DbUser
{
[SugarColumn(IsPrimaryKey = true, IsIdentity = true)]
public int Id { get; set; }
public string Username { get; set; }
public string PasswordHash { get; set; }
public string Role { get; set; }
public bool IsActive { get; set; }
}
```
## 3. 事务管理 (`RepositoryManager`)
`RepositoryManager` 是工作单元(Unit of Work, UoW)模式的具体实现。它作为所有仓储的统一入口,并管理数据库事务,确保所有通过它获取的仓储实例都共享同一个 `ISqlSugarClient` 实例,从而确保它们在同一个数据库会话和事务中操作。
### 3.1. 设计思路与考量
* **模式**:`RepositoryManager` 是工作单元(Unit of Work, UoW)模式的具体实现。它作为所有仓储的统一入口,并管理数据库事务。
* **共享上下文**:所有通过 `RepositoryManager` 获取的仓储实例都共享同一个 `ISqlSugarClient` 实例,从而确保它们在同一个数据库会话和事务中操作。
* **生命周期**:`RepositoryManager` 通常被注册为 `Scoped` 或 `Transient` 生命周期,确保每个业务操作都有一个独立的工作单元。
### 3.2. 设计优势
* **原子性**:确保跨多个仓储的操作(如创建设备、变量表和菜单)要么全部成功,要么全部失败,维护数据一致性。
* **简化事务管理**:应用层无需直接与数据库事务API交互,只需调用 `BeginTransaction()`, `CommitAsync()`, `RollbackAsync()`。
* **解耦**:应用层不直接依赖具体的仓储实现,而是依赖于 `IRepositoryManager` 这一抽象。
* **资源优化**:在单个业务操作中,所有仓储共享同一个数据库连接,减少了连接开销。
### 3.3. 设计劣势/权衡
* **复杂性增加**:相比于直接使用仓储,引入UoW模式增加了额外的抽象层和概念。
* **仓储依赖**:`IRepositoryManager` 接口需要列出所有它管理的仓储,当新增仓储时,需要修改此接口。
* **懒加载**:为了避免在 `RepositoryManager` 构造时就创建所有仓储实例的开销,通常会使用懒加载(`Lazy`),这增加了少量复杂性。
### 3.4. 示例:`RepositoryManager.cs`
```csharp
// 文件: DMS.Infrastructure/Data/RepositoryManager.cs
using DMS.Core.Interfaces;
using SqlSugar;
using System;
namespace DMS.Infrastructure.Data;
///
/// IRepositoryManager 的 SqlSugar 实现,管理所有仓储实例和数据库事务。
///
public class RepositoryManager : IRepositoryManager
{
private readonly ISqlSugarClient _db;
private readonly Lazy _lazyDevices;
private readonly Lazy _lazyVariableTables;
private readonly Lazy _lazyVariables;
private readonly Lazy _lazyMqttServers;
private readonly Lazy _lazyVariableMqttAliases;
private readonly Lazy _lazyMenus;
private readonly Lazy _lazyVariableHistories;
private readonly Lazy _lazyUsers;
///
/// 构造函数,通过依赖注入获取 SqlSugar 客户端实例。
///
public RepositoryManager(ISqlSugarClient db, IMapper mapper) // 添加 IMapper 依赖
{
_db = db;
// 使用 Lazy 实现仓储的懒加载,确保它们在第一次被访问时才创建。
// 所有仓储都共享同一个 _db 实例,以保证事务的一致性。
_lazyDevices = new Lazy(() => new DeviceRepository(_db, mapper));
_lazyVariableTables = new Lazy(() => new VariableTableRepository(_db, mapper));
_lazyVariables = new Lazy(() => new VariableRepository(_db, mapper));
_lazyMqttServers = new Lazy(() => new MqttServerRepository(_db, mapper));
_lazyVariableMqttAliases = new Lazy(() => new VariableMqttAliasRepository(_db, mapper));
_lazyMenus = new Lazy(() => new MenuRepository(_db, mapper));
_lazyVariableHistories = new Lazy(() => new VariableHistoryRepository(_db, mapper));
_lazyUsers = new Lazy(() => new UserRepository(_db, mapper));
}
public IDeviceRepository Devices => _lazyDevices.Value;
public IVariableTableRepository VariableTables => _lazyVariableTables.Value;
public IVariableRepository Variables => _lazyVariables.Value;
public IMqttServerRepository MqttServers => _lazyMqttServers.Value;
public IVariableMqttAliasRepository VariableMqttAliases => _lazyVariableMqttAliases.Value;
public IMenuRepository Menus => _lazyMenus.Value;
public IVariableHistoryRepository VariableHistories => _lazyVariableHistories.Value;
public IUserRepository Users => _lazyUsers.Value;
///
/// 开始一个新的数据库事务。
///
public void BeginTransaction()
{
_db.BeginTran();
}
///
/// 异步提交当前事务中的所有变更。
///
public async Task CommitAsync()
{
await _db.CommitTranAsync();
}
///
/// 异步回滚当前事务中的所有变更。
///
public async Task RollbackAsync()
{
await _db.RollbackTranAsync();
}
///
/// 释放 SqlSugar 客户端资源。通常由 DI 容器管理。
///
public void Dispose()
{
_db.Dispose();
}
}
```
## 4. 仓储实现 (`Repositories/`)
仓储(Repository)是数据访问层的一部分,负责实现 `DMS.Core` 中定义的仓储接口。它们使用ORM框架(如SqlSugar)与数据库进行实际交互,将数据库实体转换为领域模型,反之亦然。
### 4.1. 设计思路与考量
* **职责**:仓储(Repository)是数据访问层的一部分,负责实现 `DMS.Core` 中定义的仓储接口。它们使用ORM框架(如SqlSugar)与数据库进行实际交互,将数据库实体转换为领域模型,反之亦然。
* **通用性**:可以有一个 `BaseRepository` 来实现通用的CRUD操作,减少重复代码。
* **特定查询**:为每个仓储接口实现其特有的查询方法(如 `GetActiveDevicesWithDetailsAsync`)。
### 4.2. 设计优势
* **实现细节封装**:将ORM框架和数据库查询的细节封装在仓储内部,上层无需关心。
* **可替换性**:如果需要更换ORM框架或数据库类型,只需修改仓储实现,而无需修改应用层或核心层。
* **性能优化点**:可以在仓储层进行查询优化(如使用 `Include` 或 `Join` 预加载关联数据)。
### 4.3. 设计劣势/权衡
* **代码量**:即使有 `BaseRepository`,每个仓储仍然需要一些样板代码。
* **复杂查询**:对于非常复杂的、跨多个聚合根的查询,有时仓储模式会显得笨拙,可能需要引入查询对象(Query Object)模式。
### 4.4. 示例:`BaseRepository.cs`
```csharp
// 文件: DMS.Infrastructure/Repositories/BaseRepository.cs
using AutoMapper;
using DMS.Core.Interfaces;
using SqlSugar;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace DMS.Infrastructure.Repositories;
///
/// 仓储基类,实现了 IBaseRepository 接口的通用 CRUD 操作。
///
/// 领域模型类型。
/// 数据库实体类型。
public abstract class BaseRepository : IBaseRepository
where TDomain : class
where TDbEntity : class, new()
{
protected readonly ISqlSugarClient _db;
protected readonly IMapper _mapper;
public BaseRepository(ISqlSugarClient db, IMapper mapper)
{
_db = db;
_mapper = mapper;
}
public virtual async Task AddAsync(TDomain entity)
{
var dbEntity = _mapper.Map(entity);
await _db.Insertable(dbEntity).ExecuteCommandAsync();
// 映射回ID,如果实体是自增ID
_mapper.Map(dbEntity, entity);
}
public virtual async Task DeleteAsync(int id)
{
await _db.Deleteable(id).ExecuteCommandAsync();
}
public virtual async Task> GetAllAsync()
{
var dbEntities = await _db.Queryable().ToListAsync();
return _mapper.Map>(dbEntities);
}
public virtual async Task GetByIdAsync(int id)
{
var dbEntity = await _db.Queryable().InSingleAsync(id);
return _mapper.Map(dbEntity);
}
public virtual async Task UpdateAsync(TDomain entity)
{
var dbEntity = _mapper.Map(entity);
await _db.Updateable(dbEntity).ExecuteCommandAsync();
}
}
```
### 4.5. 示例:`DeviceRepository.cs`
```csharp
// 文件: DMS.Infrastructure/Repositories/DeviceRepository.cs
using AutoMapper;
using DMS.Core.Interfaces;
using DMS.Core.Models;
using DMS.Infrastructure.Entities;
using SqlSugar;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace DMS.Infrastructure.Repositories;
///
/// 设备仓储的具体实现。
///
public class DeviceRepository : BaseRepository, IDeviceRepository
{
public DeviceRepository(ISqlSugarClient db, IMapper mapper) : base(db, mapper) { }
///
/// 异步获取所有激活的S7设备,并级联加载其下的变量表和变量。
///
public async Task> GetActiveDevicesWithDetailsAsync(ProtocolType protocol)
{
var dbDevices = await _db.Queryable()
.Where(d => d.IsActive && d.Protocol == (int)protocol)
.Mapper(d => d.VariableTables, d => d.Id, d => d.VariableTables.Select(vt => vt.DeviceId).ToList())
.Mapper(d => d.VariableTables.Select(vt => vt.Variables), vt => vt.Id, vt => vt.Variables.Select(v => v.VariableTableId).ToList())
.ToListAsync();
return _mapper.Map>(dbDevices);
}
///
/// 异步根据设备ID获取设备及其所有详细信息(变量表、变量、MQTT别名等)。
///
public async Task GetDeviceWithDetailsAsync(int deviceId)
{
var dbDevice = await _db.Queryable()
.Where(d => d.Id == deviceId)
.Mapper(d => d.VariableTables, d => d.Id, d => d.VariableTables.Select(vt => vt.DeviceId).ToList())
.Mapper(d => d.VariableTables.Select(vt => vt.Variables), vt => vt.Id, vt => vt.Variables.Select(v => v.VariableTableId).ToList())
.FirstAsync();
if (dbDevice == null) return null;
// 手动加载 VariableMqttAlias,因为 SqlSugar 的 Mapper 可能无法直接处理多层嵌套的关联实体
foreach (var variableTable in dbDevice.VariableTables)
{
foreach (var variable in variableTable.Variables)
{
var dbAliases = await _db.Queryable()
.Where(a => a.VariableId == variable.Id)
.Mapper(a => a.MqttServer, a => a.MqttServerId)
.ToListAsync();
variable.MqttAliases = _mapper.Map>(dbAliases);
}
}
return _mapper.Map(dbDevice);
}
}
```
## 5. 外部服务 (`Services/`)
### 5.1. S7通信架构 (“编排者-代理”模式)
采用“编排者-代理”(Orchestrator-Agent)模式。`S7BackgroundService` 作为编排者,负责管理所有S7设备的生命周期;每个 `S7DeviceAgent` 作为代理,专门负责与**一个**S7 PLC进行所有交互。
#### 5.1.1. 设计思路与考量
* **模式**:采用“编排者-代理”(Orchestrator-Agent)模式。`S7BackgroundService` 作为编排者,负责管理所有S7设备的生命周期;每个 `S7DeviceAgent` 作为代理,专门负责与**一个**S7 PLC进行所有交互。
* **职责分离**:将设备管理(启动、停止、配置更新)与具体设备通信(连接、轮询、读写)的职责分离。
* **并发性**:每个 `S7DeviceAgent` 独立运行,可以并行处理多个设备的通信,提高系统吞吐量。
* **热重载**:通过消息机制,允许在运行时动态更新设备的变量配置,而无需重启整个服务。
#### 5.1.2. 设计优势
* **高可靠性**:单个设备的通信故障不会影响其他设备的正常运行。
* **可扩展性**:易于添加新的设备类型或通信协议,只需实现新的 `DeviceAgent`。
* **动态配置**:支持运行时修改设备和变量配置,提高了系统的灵活性和可用性。
* **资源隔离**:每个Agent管理自己的连接和资源,避免资源争用。
#### 5.1.3. 设计劣势/权衡
* **复杂性增加**:引入了Agent的概念和Agent与Orchestrator之间的通信机制,增加了初期设计和实现复杂性。
* **资源消耗**:每个Agent可能需要维护独立的连接和线程,当设备数量非常庞大时,可能会增加资源消耗。
#### 5.1.4. `S7BackgroundService.cs` (编排者)
作为 `IHostedService` 运行,负责从数据库加载激活的S7设备,并为每个设备创建和管理 `S7DeviceAgent` 实例。它还监听配置变更消息,以触发Agent的热重载。
```csharp
// 文件: DMS.Infrastructure/Services/S7BackgroundService.cs
using Microsoft.Extensions.Hosting;
using DMS.Core.Interfaces;
using DMS.WPF.Services;
using CommunityToolkit.Mvvm.Messaging;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using DMS.Core.Enums;
using DMS.WPF.Messages;
namespace DMS.Infrastructure.Services;
///
/// S7后台服务编排者,作为IHostedService运行,管理所有S7设备的通信代理。
///
public class S7BackgroundService : IHostedService
{
private readonly IRepositoryManager _repoManager;
private readonly IChannelBus _channelBus;
private readonly IMessenger _messenger;
private readonly ConcurrentDictionary _activeAgents = new();
///
/// 构造函数,通过依赖注入获取所需服务。
///
public S7BackgroundService(IRepositoryManager repo, IChannelBus bus, IMessenger msg)
{
_repoManager = repo;
_channelBus = bus;
_messenger = msg;
// 注册配置变更消息,以便在设备或变量配置更新时通知Agent
_messenger.Register(this, async (r, m) => await HandleConfigChange(m));
}
///
/// 服务启动时调用,加载所有激活的S7设备并启动其代理。
///
public async Task StartAsync(CancellationToken cancellationToken)
{
// 获取所有激活的S7设备及其详细信息
var s7Devices = await _repoManager.Devices.GetActiveDevicesWithDetailsAsync(ProtocolType.S7);
foreach (var device in s7Devices)
{
// 为每个设备创建一个S7DeviceAgent实例
var agent = new S7DeviceAgent(device, _channelBus, _messenger);
if (_activeAgents.TryAdd(device.Id, agent))
{
await agent.StartAsync(cancellationToken);
}
}
// 启动数据处理消费者服务,它将从ChannelBus中读取数据
var dataProcessor = new DataProcessingService(_channelBus, _messenger, _repoManager);
_ = dataProcessor.StartProcessingAsync(cancellationToken); // 在后台运行,不阻塞启动
}
///
/// 处理配置变更消息,通知相关Agent更新其变量列表。
///
private async Task HandleConfigChange(ConfigChangedMessage message)
{
// 从数据库重新加载受影响的设备及其最新配置
var updatedDevice = await _repoManager.Devices.GetDeviceWithDetailsAsync(message.DeviceId);
if (updatedDevice != null && _activeAgents.TryGetValue(message.DeviceId, out var agent))
{
// 指示Agent使用新的变量列表进行热重载
agent.UpdateVariableLists(updatedDevice.VariableTables.SelectMany(vt => vt.Variables).ToList());
}
}
///
/// 服务停止时调用,停止所有活动的Agent并释放资源。
///
public async Task StopAsync(CancellationToken cancellationToken)
{
foreach (var agent in _activeAgents.Values)
{
await agent.DisposeAsync();
}
_activeAgents.Clear();
}
}
```
#### 5.1.5. `S7DeviceAgent.cs` (代理)
负责与单个PLC建立连接、维护连接、按不同频率并行轮询变量,并将读取到的数据通过 `IChannelBus` 写入数据处理队列。
```csharp
// 文件: DMS.Infrastructure/Services/Communication/S7DeviceAgent.cs
using S7.Net;
using DMS.Core.Models;
using DMS.Core.Enums;
using DMS.WPF.Services;
using CommunityToolkit.Mvvm.Messaging;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using System;
namespace DMS.Infrastructure.Services.Communication;
///
/// 单个S7 PLC的通信代理,负责连接、轮询和数据发送。
///
public class S7DeviceAgent : IAsyncDisposable
{
private readonly Device _deviceConfig;
private readonly ChannelWriter _processingQueueWriter;
private readonly IMessenger _messenger;
private readonly Plc _plcClient;
private CancellationTokenSource _cts; // 用于控制Agent内部的轮询任务
// 存储按轮询级别分组的变量列表
private List _highFreqVars = new();
private List _mediumFreqVars = new();
private List _lowFreqVars = new();
///
/// 构造函数。
///
/// 设备的配置信息。
/// 中央通道总线服务。
/// 消息总线服务。
public S7DeviceAgent(Device device, IChannelBus channelBus, IMessenger messenger)
{
_deviceConfig = device;
_messenger = messenger;
// 从中央总线获取指定名称的通道的写入端
_processingQueueWriter = channelBus.GetWriter("DataProcessingQueue");
// 初始化S7.Net PLC客户端
_plcClient = new Plc(
(CpuType)Enum.Parse(typeof(CpuType), _deviceConfig.Protocol.ToString()), // 根据协议类型解析CPU类型
_deviceConfig.IpAddress,
(short)_deviceConfig.Rack,
(short)_deviceConfig.Slot
);
}
///
/// 启动Agent的通信循环。
///
public async Task StartAsync(CancellationToken cancellationToken = default)
{
_cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
await _plcClient.OpenAsync(); // 建立PLC连接
// 初始加载变量列表并分组
UpdateVariableLists(_deviceConfig.VariableTables.SelectMany(vt => vt.Variables).ToList());
// 启动并行的轮询任务,每个轮询级别一个任务
_ = Task.Run(() => PollingLoopAsync(_highFreqVars, 200, _cts.Token)); // 高频:200ms
_ = Task.Run(() => PollingLoopAsync(_mediumFreqVars, 1000, _cts.Token)); // 中频:1000ms
_ = Task.Run(() => PollingLoopAsync(_lowFreqVars, 5000, _cts.Token)); // 低频:5000ms
}
///
/// 热重载方法,用于响应配置变更,更新Agent内部的变量列表。
///
/// 最新的所有激活变量列表。
public void UpdateVariableLists(List allActiveVariables)
{
// 重新分组变量
_highFreqVars = allActiveVariables.Where(v => v.PollLevel == PollLevelType.High).ToList();
_mediumFreqVars = allActiveVariables.Where(v => v.PollLevel == PollLevelType.Medium).ToList();
_lowFreqVars = allActiveVariables.Where(v => v.PollLevel == PollLevelType.Low).ToList();
// 可以考虑在这里重新启动轮询任务,或者让现有任务检测到列表变化
}
///
/// 核心轮询循环,负责批量读取变量并将数据写入处理队列。
///
private async Task PollingLoopAsync(List varsToRead, int interval, CancellationToken token)
{
while (!token.IsCancellationRequested)
{
if (!varsToRead.Any()) // 如果没有变量需要轮询,则等待
{
await Task.Delay(interval, token);
continue;
}
try
{
// 使用 S7.Net Plus 的 ReadMultipleVarsAsync 批量读取变量
// 注意:S7.Net Plus 会将读取到的值直接更新到 Variable 对象的 DataValue 属性中
await _plcClient.ReadMultipleVarsAsync(varsToRead);
foreach (var variable in varsToRead)
{
// 将读取到的值(包含在VariableContext中)放入数据处理队列
var context = new VariableContext(variable, variable.DataValue);
await _processingQueueWriter.WriteAsync(context, token);
}
}
catch (Exception ex)
{
// 记录通信错误,但不中断整个Agent
_messenger.Send(new LogMessage(LogLevel.Error, ex, $"S7DeviceAgent: 设备 {_deviceConfig.Name} ({_deviceConfig.IpAddress}) 轮询错误。"));
}
await Task.Delay(interval, token); // 等待下一个轮询周期
}
}
///
/// 异步释放Agent资源,包括停止轮询任务和关闭PLC连接。
///
public async ValueTask DisposeAsync()
{
_cts?.Cancel(); // 取消所有内部轮询任务
if (_plcClient.IsConnected)
{
_plcClient.Close(); // 关闭PLC连接
}
_plcClient?.Dispose();
}
}
```
### 5.2. MQTT发布服务 (`MqttPublishService.cs`)
负责将数据发布到MQTT Broker。
```csharp
// 文件: DMS.Infrastructure/Services/Communication/MqttPublishService.cs
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Options;
using System.Threading;
using System.Threading.Tasks;
namespace DMS.Infrastructure.Services.Communication;
public interface IMqttPublishService
{
Task PublishAsync(MqttServer server, string topic, string payload);
}
public class MqttPublishService : IMqttPublishService
{
private readonly IMqttClient _mqttClient;
public MqttPublishService()
{
var factory = new MqttFactory();
_mqttClient = factory.CreateMqttClient();
}
public async Task PublishAsync(MqttServer server, string topic, string payload)
{
if (!_mqttClient.IsConnected)
{
var options = new MqttClientOptionsBuilder()
.WithTcpServer(server.BrokerAddress, server.Port)
.WithCredentials(server.Username, server.Password)
.Build();
await _mqttClient.ConnectAsync(options, CancellationToken.None);
}
var message = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(payload)
.WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce)
.Build();
await _mqttClient.PublishAsync(message, CancellationToken.None);
}
}
```
### 5.3. 数据处理消费者 (`DataProcessingService.cs`)
从 `IChannelBus` 读取数据,并启动数据处理链。它作为数据处理链的入口点。
```csharp
// 文件: DMS.Infrastructure/Services/DataProcessingService.cs
using DMS.Core.Models;
using DMS.WPF.Services;
using CommunityToolkit.Mvvm.Messaging;
using System.Threading.Channels;
using System.Threading.Tasks;
using DMS.Infrastructure.Services.Processing;
using DMS.Core.Interfaces;
using DMS.WPF.Messages;
using NLog;
using Microsoft.Extensions.Caching.Memory;
namespace DMS.Infrastructure.Services;
///
/// 数据处理消费者服务,从ChannelBus中读取变量数据并启动处理链。
///
public class DataProcessingService
{
private readonly ChannelReader _queueReader;
private readonly IMessenger _messenger;
private readonly IRepositoryManager _repoManager;
private readonly IMemoryCache _memoryCache; // 用于ChangeDetectionProcessor
private readonly IMqttPublishService _mqttPublishService; // 用于MqttPublishProcessor
private static readonly ILogger _logger = LogManager.GetCurrentClassLogger();
///
/// 构造函数,通过依赖注入获取所需服务。
///
public DataProcessingService(IChannelBus channelBus, IMessenger messenger, IRepositoryManager repo, IMemoryCache memoryCache, IMqttPublishService mqttPublishService)
{
// 从中央总线获取数据处理队列的读取端
_queueReader = channelBus.GetReader("DataProcessingQueue");
_messenger = messenger;
_repoManager = repo;
_memoryCache = memoryCache;
_mqttPublishService = mqttPublishService;
}
///
/// 启动数据处理循环,持续从队列中读取数据并进行处理。
///
public async Task StartProcessingAsync(CancellationToken token)
{
await foreach (var context in _queueReader.ReadAllAsync(token))
{
try
{
// 构建并执行数据处理链
var changeDetector = new ChangeDetectionProcessor(_memoryCache);
var historyStorage = new HistoryStorageProcessor(_repoManager.VariableHistories);
var mqttPublisher = new MqttPublishProcessor(_mqttPublishService, _repoManager);
// 链式连接处理器
changeDetector.SetNext(historyStorage).SetNext(mqttPublisher);
// 启动处理
await changeDetector.ProcessAsync(context);
// 处理完成后,发送消息通知UI更新变量值
_messenger.Send(new VariableValueUpdatedMessage(context.Variable.Id, context.CurrentValue));
}
catch (Exception ex)
{
_logger.Error(ex, $"数据处理链执行错误,变量ID: {context.Variable.Id}");
}
}
}
}
```
### 5.4. 数据处理器 (`Processing/`)
采用责任链模式(Chain of Responsibility)来处理采集到的变量数据。每个处理器(Processor)负责一个单一的数据处理步骤(如变化检测、历史存储、MQTT发布)。
#### 5.4.1. `VariableProcessorBase.cs`
```csharp
// 文件: DMS.Application/Services/Processors/VariableProcessorBase.cs
using DMS.Core.Models;
using System.Threading.Tasks;
namespace DMS.Infrastructure.Services.Processing;
public abstract class VariableProcessorBase : IVariableProcessor
{
private IVariableProcessor _next;
public IVariableProcessor SetNext(IVariableProcessor next)
{
_next = next;
return next;
}
public virtual async Task ProcessAsync(VariableContext context)
{
if (context.IsProcessingTerminated) return;
await HandleAsync(context);
if (_next != null && !context.IsProcessingTerminated)
{
await _next.ProcessAsync(context);
}
}
// 模板方法,由子类实现具体的处理逻辑
protected abstract Task HandleAsync(VariableContext context);
}
```
#### 5.4.2. `ChangeDetectionProcessor.cs`
**职责**:检测本次读取的值与上一次的值是否相同。如果相同,则终止后续处理,以节省资源。
```csharp
// 文件: DMS.Infrastructure/Services/Processors/ChangeDetectionProcessor.cs
using DMS.Core.Models;
using Microsoft.Extensions.Caching.Memory;
using System;
using System.Threading.Tasks;
namespace DMS.Infrastructure.Services.Processing;
public class ChangeDetectionProcessor : VariableProcessorBase
{
private readonly IMemoryCache _cache; // 使用内存缓存来存储上一次的值
public ChangeDetectionProcessor(IMemoryCache cache)
{
_cache = cache;
}
protected override Task HandleAsync(VariableContext context)
{
var lastValue = _cache.Get(context.Variable.Id);
if (lastValue != null && lastValue.Equals(context.RawValue))
{
context.IsProcessingTerminated = true; // 值未变化,终止处理
}
else
{
context.IsValueChanged = true;
_cache.Set(context.Variable.Id, context.RawValue, TimeSpan.FromDays(1));
}
return Task.CompletedTask;
}
}
```
#### 5.4.3. `HistoryStorageProcessor.cs`
**职责**:将变化后的值存入数据库历史记录表。
```csharp
// 文件: DMS.Infrastructure/Services/Processors/HistoryStorageProcessor.cs
using DMS.Core.Interfaces;
using DMS.Core.Models;
using System.Threading.Tasks;
namespace DMS.Infrastructure.Services.Processing;
public class HistoryStorageProcessor : VariableProcessorBase
{
private readonly IVariableHistoryRepository _historyRepository;
public HistoryStorageProcessor(IVariableHistoryRepository historyRepository)
{
_historyRepository = historyRepository;
}
protected override async Task HandleAsync(VariableContext context)
{
if (!context.IsValueChanged) return;
var history = new VariableHistory
{
VariableId = context.Variable.Id,
Value = context.CurrentValue.ToString(),
Timestamp = context.Timestamp
};
await _historyRepository.AddAsync(history);
}
}
```
#### 5.4.4. `MqttPublishProcessor.cs`
**职责**:如果变量关联了MQTT服务器,则将值发布出去。
```csharp
// 文件: DMS.Infrastructure/Services/Processing/MqttPublishProcessor.cs
using DMS.Core.Interfaces;
using DMS.Core.Models;
using DMS.Infrastructure.Services.Communication;
using CommunityToolkit.Mvvm.Messaging;
using System.Linq;
using System.Text.Json;
using System.Threading.Tasks;
using NLog;
namespace DMS.Infrastructure.Services.Processing;
///
/// MQTT发布处理器,负责将变量值发布到关联的MQTT服务器,并使用专属别名。
///
public class MqttPublishProcessor : VariableProcessorBase
{
private readonly IMqttPublishService _mqttService;
private readonly IRepositoryManager _repoManager; // 使用 RepositoryManager 来获取仓储
private static readonly ILogger _logger = LogManager.GetCurrentClassLogger();
///
/// 构造函数。
///
public MqttPublishProcessor(IMqttPublishService mqttService, IRepositoryManager repoManager)
{
_mqttService = mqttService;
_repoManager = repoManager;
}
protected override async Task HandleAsync(VariableContext context)
{
if (!context.IsValueChanged) return; // 如果值未变化,则不发布
// 1. 从仓储获取变量及其完整的别名关联列表
// 这要求 IVariableRepository 有一个方法能加载 VariableMqttAlias 及其 MqttServer
var variableWithAliases = await _repoManager.Variables.GetVariableWithMqttAliasesAsync(context.Variable.Id);
if (variableWithAliases?.MqttAliases == null || !variableWithAliases.MqttAliases.Any())
{
return; // 没有关联的MQTT服务器,无需发布
}
foreach (var aliasInfo in variableWithAliases.MqttAliases)
{
try
{
// 确保 MqttServer 导航属性已加载且激活
var targetServer = aliasInfo.MqttServer;
if (targetServer == null || !targetServer.IsActive)
{
_logger.Warn($"MQTT发布失败:变量 {context.Variable.Name} 关联的MQTT服务器 {aliasInfo.MqttServerId} 不存在或未激活。");
continue;
}
// 使用别名构建Topic
// 示例Topic格式:DMS/DeviceName/VariableAlias
var topic = $"DMS/{context.Variable.VariableTable.Device.Name}/{aliasInfo.Alias}";
var payload = JsonSerializer.Serialize(new { value = context.CurrentValue, timestamp = context.Timestamp });
await _mqttService.PublishAsync(targetServer, topic, payload);
}
catch (Exception ex)
{
_logger.Error(ex, $"MQTT发布失败:变量 {context.Variable.Name} 到服务器 {aliasInfo.MqttServer.ServerName},别名 {aliasInfo.Alias}");
}
}
}
}
```
## 6. 数据映射 (`Profiles/`)
为了在领域模型 (`DMS.Core.Models`) 和数据库实体 (`DMS.Infrastructure.Entities`) 之间进行转换,我们将使用 `AutoMapper`。
```csharp
// 文件: DMS.Infrastructure/Profiles/MappingProfile.cs
using AutoMapper;
using DMS.Core.Models;
using DMS.Infrastructure.Entities;
using DMS.Core.Enums;
namespace DMS.Infrastructure.Profiles;
public class MappingProfile : Profile
{
public MappingProfile()
{
CreateMap().ReverseMap();
CreateMap().ReverseMap();
CreateMap().ReverseMap();
CreateMap().ReverseMap();
CreateMap().ReverseMap();
CreateMap().ReverseMap();
CreateMap().ReverseMap();
CreateMap().ReverseMap();
// 枚举到int的映射
CreateMap().ConvertUsing(src => (int)src);
CreateMap().ConvertUsing(src => (ProtocolType)src);
CreateMap().ConvertUsing(src => (int)src);
CreateMap().ConvertUsing(src => (SignalType)src);
CreateMap().ConvertUsing(src => (int)src);
CreateMap().ConvertUsing(src => (PollLevelType)src);
CreateMap().ConvertUsing(src => (int)src);
CreateMap().ConvertUsing(src => (CSharpDataType)src);
}
}
```