Files
DMS/DMS.Application/Services/Processors/HistoryProcessor.cs
David P.G d6bb606b0e 本次提交包含了两个主要部分:一个关键的数据库并发问题修复,以及一个关于历史记录功能的增强。
1. 数据库并发修复 (Bug Fix):
   2. 历史记录功能增强 (Feature):
  `
  fix(db): 修复数据库并发连接问题并增强历史记录功能
  `

  正文:

  `
  本次提交主要包含一个关键的 Bug 修复和一项功能增强。

  1.  修复数据库并发问题:
      *   重构 SqlSugarDbContext,使其作为 SqlSugarClient 的工厂。
      *   GetInstance() 方法现在每次调用都会返回一个新的客户端实例,解决了因单例客户端导致的多线程并发访问 MySqlConnection 的问题。

  2.  增强历史记录功能:
      *   为 VariableHistory 相关的模型、DTO 和实体添加了 NumericValue 属性,以便在历史记录中同时存储数值和字符串值。
      *   更新了 HistoryProcessor 以保存 NumericValue。
      *   对 ValueConvertProcessor 的逻辑进行了重构,以更好地支持值转换流程。
2025-10-03 00:05:17 +08:00

103 lines
4.1 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using DMS.Application.Interfaces;
using DMS.Application.Models;
using DMS.Core.Interfaces;
using DMS.Core.Models;
using System.Collections.Concurrent;
using Microsoft.Extensions.Logging;
namespace DMS.Application.Services.Processors;
public class HistoryProcessor : IVariableProcessor, IDisposable
{
private const int BATCH_SIZE = 50; // 批量写入的阈值
private const int TIMER_INTERVAL_MS = 30 * 1000; // 30秒
private readonly ConcurrentQueue<VariableHistory> _queue = new();
private readonly Timer _timer;
private readonly IRepositoryManager _repositoryManager;
private readonly ILogger<HistoryProcessor> _logger;
public HistoryProcessor(IRepositoryManager repositoryManager, ILogger<HistoryProcessor> logger)
{
_repositoryManager = repositoryManager;
_logger = logger;
_timer = new Timer(async _ => await FlushQueueToDatabase(), null, Timeout.Infinite, Timeout.Infinite);
_timer.Change(TIMER_INTERVAL_MS, TIMER_INTERVAL_MS); // 启动定时器
_logger.LogInformation("HistoryProcessor 初始化,批量大小 {BatchSize},定时器间隔 {TimerInterval}ms", BATCH_SIZE, TIMER_INTERVAL_MS);
}
public async Task ProcessAsync(VariableContext context)
{
// 只有当数据需要保存时才记录历史
if (!context.Data.IsHistoryEnabled) // 如果数据已经被其他处理器处理过或者不需要保存,则跳过
{
// _logger.LogDebug("变量 {VariableName} (ID: {VariableId}) 历史记录已禁用,跳过处理", context.Data.Name, context.Data.Id);
return;
}
// 将 VariableDto 转换为 VariableHistory
var historyData = new VariableHistory
{
VariableId = context.Data.Id,
Value = context.Data.DisplayValue?.ToString() ?? string.Empty,
NumericValue = context.Data.NumericValue,
Timestamp = DateTime.Now // 记录当前时间
};
_queue.Enqueue(historyData);
_logger.LogDebug("变量 {VariableName} (ID: {VariableId}) 历史数据已入队,队列数量: {QueueCount}", context.Data.Name, context.Data.Id, _queue.Count);
if (_queue.Count >= BATCH_SIZE)
{
_logger.LogInformation("达到批量大小 ({BatchSize}),正在刷新队列到数据库", BATCH_SIZE);
await FlushQueueToDatabase();
}
}
private async Task FlushQueueToDatabase()
{
// 停止定时器,防止在写入过程中再次触发
_timer.Change(Timeout.Infinite, Timeout.Infinite);
_logger.LogDebug("数据库刷新期间定时器已停止");
var itemsToProcess = new List<VariableHistory>();
while (_queue.TryDequeue(out var item))
{
itemsToProcess.Add(item);
}
if (itemsToProcess.Any())
{
try
{
await _repositoryManager.VariableHistories.AddBatchAsync(itemsToProcess);
_logger.LogInformation("成功插入 {Count} 条变量历史记录到数据库", itemsToProcess.Count);
}
catch (Exception ex)
{
_logger.LogError(ex, "批量插入 {Count} 条变量历史记录时发生错误: {ErrorMessage}", itemsToProcess.Count, ex.Message);
// 错误处理:可以将未成功插入的数据重新放回队列,或者记录到日志中以便后续处理
// 为了简单起见,这里不重新入队,避免无限循环
}
}
else
{
_logger.LogDebug("队列中没有需要处理的项目");
}
// 重新启动定时器
_timer.Change(TIMER_INTERVAL_MS, TIMER_INTERVAL_MS);
_logger.LogDebug("定时器已重启,间隔 {TimerInterval}ms", TIMER_INTERVAL_MS);
}
public void Dispose()
{
_logger.LogInformation("正在释放 HistoryProcessor刷新队列中剩余的 {Count} 个项目", _queue.Count);
_timer?.Dispose();
// 在 Dispose 时,尝试将剩余数据写入数据库
FlushQueueToDatabase().Wait();
_logger.LogInformation("HistoryProcessor 已释放");
}
}