2025-07-15 22:19:07 +08:00
|
|
|
|
using System.Threading.Channels;
|
2025-09-04 13:40:07 +08:00
|
|
|
|
using DMS.Application.DTOs;
|
|
|
|
|
|
using DMS.Application.Interfaces;
|
|
|
|
|
|
using DMS.Application.Models;
|
|
|
|
|
|
using DMS.Core.Interfaces;
|
2025-07-26 10:05:43 +08:00
|
|
|
|
using DMS.Core.Models;
|
2025-07-15 22:19:07 +08:00
|
|
|
|
using Microsoft.Extensions.Hosting;
|
2025-09-04 13:40:07 +08:00
|
|
|
|
using Microsoft.Extensions.Logging;
|
2025-07-15 22:19:07 +08:00
|
|
|
|
|
2025-09-04 13:40:07 +08:00
|
|
|
|
namespace DMS.Application.Services.Processors;
|
2025-07-15 22:19:07 +08:00
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 核心数据处理服务,作为后台服务运行。
|
|
|
|
|
|
/// 它维护一个无界通道(Channel)作为处理队列,并按顺序执行已注册的数据处理器。
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
public class DataProcessingService : BackgroundService, IDataProcessingService
|
|
|
|
|
|
{
|
|
|
|
|
|
// 使用 Channel 作为高性能的生产者/消费者队列
|
2025-07-17 20:13:21 +08:00
|
|
|
|
private readonly Channel<VariableContext> _queue;
|
2025-07-16 15:08:56 +08:00
|
|
|
|
|
2025-07-15 22:19:07 +08:00
|
|
|
|
// 存储数据处理器的链表
|
2025-07-17 20:13:21 +08:00
|
|
|
|
private readonly List<IVariableProcessor> _processors;
|
2025-09-04 17:29:24 +08:00
|
|
|
|
|
|
|
|
|
|
// 日志记录器
|
|
|
|
|
|
private readonly ILogger<DataProcessingService> _logger;
|
2025-07-15 22:19:07 +08:00
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 构造函数,注入日志记录器。
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
/// <param name="logger">日志记录器实例。</param>
|
2025-09-04 17:29:24 +08:00
|
|
|
|
public DataProcessingService(ILogger<DataProcessingService> logger)
|
2025-07-15 22:19:07 +08:00
|
|
|
|
{
|
|
|
|
|
|
// 创建一个无边界的 Channel,允许生产者快速写入而不会被阻塞。
|
2025-07-17 20:13:21 +08:00
|
|
|
|
_queue = Channel.CreateUnbounded<VariableContext>();
|
|
|
|
|
|
_processors = new List<IVariableProcessor>();
|
2025-09-04 17:29:24 +08:00
|
|
|
|
_logger = logger;
|
2025-07-15 22:19:07 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 向处理链中添加一个数据处理器。
|
|
|
|
|
|
/// 处理器将按照添加的顺序执行。
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
/// <param name="processor">要添加的数据处理器实例。</param>
|
2025-07-17 20:13:21 +08:00
|
|
|
|
public void AddProcessor(IVariableProcessor processor)
|
2025-07-15 22:19:07 +08:00
|
|
|
|
{
|
|
|
|
|
|
_processors.Add(processor);
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 将一个变量数据项异步推入处理队列。
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
/// <param name="data">要入队的变量数据。</param>
|
2025-09-04 13:40:07 +08:00
|
|
|
|
public async ValueTask EnqueueAsync(VariableDto data)
|
2025-07-15 22:19:07 +08:00
|
|
|
|
{
|
|
|
|
|
|
if (data == null)
|
|
|
|
|
|
{
|
|
|
|
|
|
return;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-07-17 20:13:21 +08:00
|
|
|
|
var context = new VariableContext(data);
|
2025-07-15 22:19:07 +08:00
|
|
|
|
// 将数据项写入 Channel,供后台服务处理。
|
2025-07-16 15:08:56 +08:00
|
|
|
|
await _queue.Writer.WriteAsync(context);
|
2025-07-15 22:19:07 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 后台服务的核心执行逻辑。
|
|
|
|
|
|
/// 此方法会持续运行,从队列中读取数据并交由处理器链处理。
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
/// <param name="stoppingToken">用于通知服务停止的取消令牌。</param>
|
|
|
|
|
|
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
|
|
|
|
|
{
|
2025-09-04 17:29:24 +08:00
|
|
|
|
_logger.LogInformation("数据处理服务已启动。");
|
2025-07-15 22:19:07 +08:00
|
|
|
|
|
|
|
|
|
|
// 当服务未被请求停止时,持续循环
|
|
|
|
|
|
while (!stoppingToken.IsCancellationRequested)
|
|
|
|
|
|
{
|
|
|
|
|
|
try
|
|
|
|
|
|
{
|
|
|
|
|
|
// 从队列中异步读取一个数据项,如果队列为空,则等待。
|
2025-07-16 15:08:56 +08:00
|
|
|
|
var context = await _queue.Reader.ReadAsync(stoppingToken);
|
2025-07-15 22:19:07 +08:00
|
|
|
|
|
|
|
|
|
|
// 依次调用处理链中的每一个处理器
|
|
|
|
|
|
foreach (var processor in _processors)
|
|
|
|
|
|
{
|
2025-07-16 15:08:56 +08:00
|
|
|
|
if (context.IsHandled)
|
|
|
|
|
|
{
|
2025-09-04 17:29:24 +08:00
|
|
|
|
// _logger.LogInformation($"{context.Data.Name}的数据处理已短路,跳过后续处理器。");
|
2025-07-16 15:08:56 +08:00
|
|
|
|
break; // 短路,跳过后续处理器
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
await processor.ProcessAsync(context);
|
2025-07-15 22:19:07 +08:00
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
catch (OperationCanceledException)
|
|
|
|
|
|
{
|
|
|
|
|
|
// 当 stoppingToken 被触发时,ReadAsync 会抛出此异常,属正常停止流程,无需处理。
|
|
|
|
|
|
}
|
|
|
|
|
|
catch (Exception ex)
|
|
|
|
|
|
{
|
2025-09-04 17:29:24 +08:00
|
|
|
|
_logger.LogError(ex, $"处理变量数据时发生错误:{ex.Message}");
|
2025-07-15 22:19:07 +08:00
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-09-04 17:29:24 +08:00
|
|
|
|
_logger.LogInformation("数据处理服务已停止。");
|
2025-07-15 22:19:07 +08:00
|
|
|
|
}
|
2025-09-04 13:40:07 +08:00
|
|
|
|
}
|