Files
ThingsGateway/src/Gateway/ThingsGateway.Gateway.Application/Driver/Collect/CollectFoundationBase.cs
2248356998 qq.com 3461f34240 feat: 网关监控页面树节点js更新状态
perf: 优化变量页面刷新性能
perf: 优化热点方法异步性能
2025-10-17 00:41:47 +08:00

347 lines
14 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//------------------------------------------------------------------------------
// 此代码版权声明为全文件覆盖,如有原作者特别声明,会在下方手动补充
// 此代码版权除特别声明外的代码归作者本人Diego所有
// 源代码使用协议遵循本仓库的开源协议及附加协议
// Gitee源代码仓库https://gitee.com/diego2098/ThingsGateway
// Github源代码仓库https://github.com/kimdiego2098/ThingsGateway
// 使用文档https://thingsgateway.cn/
// QQ群605534569
//------------------------------------------------------------------------------
using Microsoft.Extensions.Logging;
using Newtonsoft.Json.Linq;
using System.Collections.Concurrent;
using ThingsGateway.Common.Extension;
using ThingsGateway.NewLife.Threading;
using TouchSocket.Core;
namespace ThingsGateway.Gateway.Application;
/// <summary>
/// <para></para>
/// 采集插件继承实现不同PLC通讯
/// <para></para>
/// </summary>
public abstract class CollectFoundationBase : CollectBase
{
/// <summary>
/// 底层驱动有可能为null
/// </summary>
public virtual IDevice? FoundationDevice { get; }
public override string ToString()
{
return FoundationDevice?.ToString() ?? base.ToString();
}
/// <inheritdoc/>
protected override async Task DisposeAsync(bool disposing)
{
if (FoundationDevice != null)
await FoundationDevice.SafeDisposeAsync().ConfigureAwait(false);
await base.DisposeAsync(disposing).ConfigureAwait(false);
}
public override string GetAddressDescription()
{
return FoundationDevice?.GetAddressDescription();
}
#if !Management
/// <summary>
/// 是否连接成功
/// </summary>
public override bool IsConnected()
{
return FoundationDevice?.OnLine == true;
}
/// <summary>
/// 开始通讯执行的方法
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns></returns>
protected override async Task ProtectedStartAsync(CancellationToken cancellationToken)
{
if (FoundationDevice != null)
{
await FoundationDevice.ConnectAsync(cancellationToken).ConfigureAwait(false);
}
}
protected override async Task TestOnline(object? state, CancellationToken cancellationToken)
{
if (FoundationDevice != null)
{
if (!FoundationDevice.OnLine)
{
if (!FoundationDevice.DisposedValue || FoundationDevice.Channel?.DisposedValue != false) return;
Exception exception = null;
try
{
if (!cancellationToken.IsCancellationRequested)
{
if (!FoundationDevice.DisposedValue || FoundationDevice.Channel?.DisposedValue != false) return;
await FoundationDevice.ConnectAsync(cancellationToken).ConfigureAwait(false);
if (CurrentDevice.DeviceStatusChangeTime < TimerX.Now.AddMinutes(-1))
{
await Task.Delay(30000, cancellationToken).ConfigureAwait(false);
}
}
}
catch (OperationCanceledException)
{
}
catch (Exception ex)
{
exception = ex;
}
if (cancellationToken.IsCancellationRequested)
{
return;
}
if (FoundationDevice.OnLine == false && exception != null)
{
foreach (var item in CurrentDevice.VariableSourceReads)
{
if (item.LastErrorMessage != exception.Message)
{
if (!cancellationToken.IsCancellationRequested)
LogMessage?.LogWarning(exception, string.Format(AppResource.CollectFail, DeviceName, item?.RegisterAddress, item?.Length, exception.Message));
}
item.LastErrorMessage = exception.Message;
CurrentDevice.SetDeviceStatus(TimerX.Now, null, exception.Message);
var time = DateTime.Now;
item.VariableRuntimes.ForEach(a => a.SetValue(null, time, isOnline: false));
}
foreach (var item in CurrentDevice.ReadVariableMethods)
{
if (item.LastErrorMessage != exception.Message)
{
if (!cancellationToken.IsCancellationRequested)
LogMessage?.LogWarning(exception, string.Format(AppResource.MethodFail, DeviceName, item.MethodInfo.Name, exception.Message));
}
item.LastErrorMessage = exception.Message;
CurrentDevice.SetDeviceStatus(TimerX.Now, null, exception.Message);
var time = DateTime.Now;
item.Variable.SetValue(null, time, isOnline: false);
}
return;
}
}
}
return;
}
//protected override async ValueTask<OperResult<ReadOnlyMemory<byte>>> ReadSourceAsync(VariableSourceRead variableSourceRead, CancellationToken cancellationToken)
//{
// try
// {
// if (cancellationToken.IsCancellationRequested)
// return new(new OperationCanceledException());
// // 从协议读取数据
// OperResult<ReadOnlyMemory<byte>> read = default;
// var readTask = FoundationDevice.ReadAsync(variableSourceRead.AddressObject, cancellationToken);
// if (!readTask.IsCompleted)
// {
// read = await readTask.ConfigureAwait(false);
// }
// else
// {
// read = readTask.Result;
// }
// // 如果读取成功且有有效内容,则解析结构化内容
// if (read.IsSuccess)
// {
// var prase = variableSourceRead.VariableRuntimes.PraseStructContent(FoundationDevice, read.Content.Span, false);
// return new OperResult<ReadOnlyMemory<byte>>(prase);
// }
// // 返回读取结果
// return read;
// }
// catch (Exception ex)
// {
// // 捕获异常并返回失败结果
// return new OperResult<ReadOnlyMemory<byte>>(ex);
// }
//}
/// <summary>
/// 采集驱动读取,读取成功后直接赋值变量,失败不做处理,注意非通用设备需重写
/// </summary>
protected override ValueTask<OperResult<ReadOnlyMemory<byte>>> ReadSourceAsync(VariableSourceRead variableSourceRead, CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
return new ValueTask<OperResult<ReadOnlyMemory<byte>>>( new OperResult<ReadOnlyMemory<byte>>(new OperationCanceledException()));
// 值类型状态机
var stateMachine = new ReadSourceStateMachine(this, variableSourceRead, cancellationToken);
return stateMachine.MoveNextAsync();
}
private struct ReadSourceStateMachine
{
private readonly VariableSourceRead _variableSourceRead;
private readonly CancellationToken _cancellationToken;
private readonly CollectFoundationBase _owner;
private OperResult<ReadOnlyMemory<byte>> _result;
private ValueTask<OperResult<ReadOnlyMemory<byte>>> _readTask;
public ReadSourceStateMachine(CollectFoundationBase owner, VariableSourceRead variableSourceRead, CancellationToken cancellationToken)
{
_owner = owner;
_variableSourceRead = variableSourceRead;
_cancellationToken = cancellationToken;
_result = default;
State = 0;
}
public int State { get; private set; }
public ValueTask<OperResult<ReadOnlyMemory<byte>>> MoveNextAsync()
{
try
{
switch (State)
{
case 0:
// 异步读取
if (_cancellationToken.IsCancellationRequested)
{
_result = new OperResult<ReadOnlyMemory<byte>>(new OperationCanceledException());
return new ValueTask<OperResult<ReadOnlyMemory<byte>>>(_result);
}
#pragma warning disable CA2012 // 正确使用 ValueTask
_readTask = _owner.FoundationDevice.ReadAsync(_variableSourceRead.AddressObject, _cancellationToken);
#pragma warning restore CA2012 // 正确使用 ValueTask
// 检查是否任务已完成
if (_readTask.IsCompleted)
{
_result = _readTask.Result;
State = 1;
return MoveNextAsync();
}
// 如果任务尚未完成,继续等待
State = 2;
return Awaited(_readTask);
case 1:
// 解析结构化内容
if (_result.IsSuccess)
{
var parsedResult = _variableSourceRead.VariableRuntimes.PraseStructContent(_owner.FoundationDevice, _result.Content.Span, false);
return new ValueTask<OperResult<ReadOnlyMemory<byte>>>(new OperResult<ReadOnlyMemory<byte>>(parsedResult));
}
return new ValueTask<OperResult<ReadOnlyMemory<byte>>>(_result);
case 2:
// 完成任务后,解析内容
_result = _readTask.Result;
if (_result.IsSuccess)
{
var parsedResult = _variableSourceRead.VariableRuntimes.PraseStructContent(_owner.FoundationDevice, _result.Content.Span, false);
return new ValueTask<OperResult<ReadOnlyMemory<byte>>>(new OperResult<ReadOnlyMemory<byte>>(parsedResult));
}
return new ValueTask<OperResult<ReadOnlyMemory<byte>>>(_result);
default:
throw new InvalidOperationException("Unexpected state.");
}
}
catch (Exception ex)
{
return new ValueTask<OperResult<ReadOnlyMemory<byte>>>(new OperResult<ReadOnlyMemory<byte>>(ex));
}
}
private async ValueTask<OperResult<ReadOnlyMemory<byte>>> Awaited(ValueTask<OperResult<ReadOnlyMemory<byte>>> vt)
{
try
{
await vt.ConfigureAwait(false);
return await MoveNextAsync().ConfigureAwait(false);
}
catch (Exception ex)
{
return new OperResult<ReadOnlyMemory<byte>>(ex);
}
}
}
/// <summary>
/// 批量写入变量值,需返回变量名称/结果,注意非通用设备需重写
/// </summary>
/// <returns></returns>
protected override async ValueTask<Dictionary<string, OperResult>> WriteValuesAsync(Dictionary<VariableRuntime, JToken> writeInfoLists, CancellationToken cancellationToken)
{
using var writeLock = await ReadWriteLock.WriterLockAsync(cancellationToken).ConfigureAwait(false);
// 检查协议是否为空,如果为空则抛出异常
if (FoundationDevice == null)
throw new NotSupportedException();
// 创建用于存储操作结果的并发字典
NonBlockingDictionary<string, OperResult> operResults = new();
// 使用并发方式遍历写入信息列表,并进行异步写入操作
await writeInfoLists.ParallelForEachAsync(async (writeInfo, cancellationToken) =>
{
try
{
// 调用协议的写入方法,将写入信息中的数据写入到对应的寄存器地址,并获取操作结果
var result = await FoundationDevice.WriteJTokenAsync(writeInfo.Key.RegisterAddress, writeInfo.Value, writeInfo.Key.DataType, cancellationToken).ConfigureAwait(false);
if (result.IsSuccess)
{
if (LogMessage?.LogLevel <= TouchSocket.Core.LogLevel.Debug)
LogMessage?.Debug(string.Format("{0} - Write [{1} - {2} - {3}] data succeeded", DeviceName, writeInfo.Key.RegisterAddress, writeInfo.Value, writeInfo.Key.DataType));
}
else
{
LogMessage?.Warning(string.Format("{0} - Write [{1} - {2} - {3}] data failed {4}", DeviceName, writeInfo.Key.RegisterAddress, writeInfo.Value, writeInfo.Key.DataType, result.ToString()));
}
// 将操作结果添加到结果字典中,使用变量名称作为键
operResults.TryAdd(writeInfo.Key.Name, result);
}
catch (Exception ex)
{
operResults.TryAdd(writeInfo.Key.Name, new(ex));
}
}, CollectProperties.MaxConcurrentCount, cancellationToken).ConfigureAwait(false);
await Check(writeInfoLists, operResults, cancellationToken).ConfigureAwait(false);
// 返回包含操作结果的字典
return new Dictionary<string, OperResult>(operResults);
}
#endif
}