diff --git a/App.xaml.cs b/App.xaml.cs index b80e31d..78b813b 100644 --- a/App.xaml.cs +++ b/App.xaml.cs @@ -66,6 +66,7 @@ public partial class App : Application dataProcessingService.AddProcessor(Host.Services.GetRequiredService()); dataProcessingService.AddProcessor(Host.Services.GetRequiredService()); dataProcessingService.AddProcessor(Host.Services.GetRequiredService()); + dataProcessingService.AddProcessor(Host.Services.GetRequiredService()); } catch (Exception exception) { @@ -116,6 +117,7 @@ public partial class App : Application services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); + services.AddSingleton(); // 注册数据仓库 services.AddSingleton(); @@ -245,6 +247,7 @@ public partial class App : Application _db.CodeFirst.InitTables(); _db.CodeFirst.InitTables(); _db.CodeFirst.InitTables(); + _db.CodeFirst.InitTables(); _db.CodeFirst.InitTables(); _db.CodeFirst.InitTables(); _db.CodeFirst.InitTables(); diff --git a/Data/Entities/DbVariableData.cs b/Data/Entities/DbVariableData.cs index 3b07ac1..81b4c72 100644 --- a/Data/Entities/DbVariableData.cs +++ b/Data/Entities/DbVariableData.cs @@ -163,4 +163,10 @@ public class DbVariableData /// [Navigate(typeof(DbVariableDataMqtt), "VariableDataId", "MqttId")] public List? Mqtts { get; set; } + + /// + /// 关联的历史记录列表。 + /// + [Navigate(NavigateType.OneToMany, nameof(DbVariableDataHistory.VariableDataId))] + public List? HistoryRecords { get; set; } } \ No newline at end of file diff --git a/Data/Entities/DbVariableDataHistory.cs b/Data/Entities/DbVariableDataHistory.cs new file mode 100644 index 0000000..a2e78aa --- /dev/null +++ b/Data/Entities/DbVariableDataHistory.cs @@ -0,0 +1,49 @@ +using System; +using SqlSugar; + +namespace PMSWPF.Data.Entities; + +/// +/// 表示数据库中的变量数据历史实体。 +/// +[SugarTable("VarDataHistory")] +public class DbVariableDataHistory +{ + /// + /// 历史记录唯一标识符。 + /// + [SugarColumn(IsPrimaryKey = true, IsIdentity = true)] + public int Id { get; set; } + + /// + /// 变量名称。 + /// + public string Name { get; set; } + + /// + /// 节点ID,用于标识变量在设备或系统中的唯一路径。 + /// + [SugarColumn(IsNullable = true)] + public string NodeId { get; set; } = String.Empty; + + /// + /// 变量当前原始数据值。 + /// + public string DataValue { get; set; } = String.Empty; + + /// + /// 关联的DbVariableData的ID。 + /// + public int VariableDataId { get; set; } + + /// + /// 关联的DbVariableData实体。 + /// + [Navigate(NavigateType.ManyToOne, nameof(VariableDataId))] + public DbVariableData? VariableData { get; set; } + + /// + /// 历史记录的时间戳。 + /// + public DateTime Timestamp { get; set; } = DateTime.Now; +} \ No newline at end of file diff --git a/Services/Processors/HistoryDataProcessor.cs b/Services/Processors/HistoryDataProcessor.cs new file mode 100644 index 0000000..7dd48c5 --- /dev/null +++ b/Services/Processors/HistoryDataProcessor.cs @@ -0,0 +1,94 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using PMSWPF.Data; +using PMSWPF.Data.Entities; +using PMSWPF.Helper; +using PMSWPF.Models; +using PMSWPF.Services; + +namespace PMSWPF.Services.Processors; + +public class HistoryDataProcessor : IVariableDataProcessor, IDisposable +{ + private const int BATCH_SIZE = 50; // 批量写入的阈值 + private const int TIMER_INTERVAL_MS = 30 * 1000; // 30秒 + + private readonly ConcurrentQueue _queue = new(); + private readonly Timer _timer; + + public HistoryDataProcessor() + { + + _timer = new Timer(async _ => await FlushQueueToDatabase(), null, Timeout.Infinite, Timeout.Infinite); + _timer.Change(TIMER_INTERVAL_MS, TIMER_INTERVAL_MS); // 启动定时器 + } + + public async Task ProcessAsync(VariableDataContext context) + { + // 只有当数据发生变化时才记录历史 + if (!context.Data.IsSave) // 如果数据已经被其他处理器处理过或者不需要保存,则跳过 + { + return; + } + + // 将 VariableData 转换为 DbVariableDataHistory + var historyData = new DbVariableDataHistory + { + Name = context.Data.Name, + NodeId = context.Data.NodeId, + DataValue = context.Data.DataValue, + VariableDataId = context.Data.Id, + Timestamp = DateTime.Now // 记录当前时间 + }; + + _queue.Enqueue(historyData); + + if (_queue.Count >= BATCH_SIZE) + { + await FlushQueueToDatabase(); + } + } + + private async Task FlushQueueToDatabase() + { + // 停止定时器,防止在写入过程中再次触发 + _timer.Change(Timeout.Infinite, Timeout.Infinite); + + var itemsToProcess = new List(); + while (_queue.TryDequeue(out var item)) + { + itemsToProcess.Add(item); + } + + if (itemsToProcess.Any()) + { + try + { + using var db = DbContext.GetInstance(); + await db.Insertable(itemsToProcess).ExecuteCommandAsync(); + NlogHelper.Info($"成功批量插入 {itemsToProcess.Count} 条历史变量数据。"); + } + catch (Exception ex) + { + NlogHelper.Error( $"批量插入历史变量数据时发生错误: {ex.Message}",ex); + // 错误处理:可以将未成功插入的数据重新放回队列,或者记录到日志中以便后续处理 + // 为了简单起见,这里不重新入队,避免无限循环 + } + } + + // 重新启动定时器 + _timer.Change(TIMER_INTERVAL_MS, TIMER_INTERVAL_MS); + } + + public void Dispose() + { + _timer?.Dispose(); + // 在 Dispose 时,尝试将剩余数据写入数据库 + FlushQueueToDatabase().Wait(); + } +} diff --git a/ValueConverts/NullableBooleanConverter.cs b/ValueConverts/NullableBooleanConverter.cs new file mode 100644 index 0000000..55762ad --- /dev/null +++ b/ValueConverts/NullableBooleanConverter.cs @@ -0,0 +1,33 @@ +using System; +using System.Globalization; +using System.Windows.Data; + +namespace PMSWPF.ValueConverts +{ + public class NullableBooleanConverter : IValueConverter + { + public object Convert(object value, Type targetType, object parameter, CultureInfo culture) + { + if (value is bool b && parameter is string paramString) + { + if (bool.TryParse(paramString, out bool paramBool)) + { + return b == paramBool; + } + } + return Binding.DoNothing; + } + + public object ConvertBack(object value, Type targetType, object parameter, CultureInfo culture) + { + if (value is bool b && b && parameter is string paramString) + { + if (bool.TryParse(paramString, out bool paramBool)) + { + return paramBool; + } + } + return Binding.DoNothing; + } + } +} \ No newline at end of file