Compare commits

...

3 Commits

Author SHA1 Message Date
Diego
fba0723a6d 更新依赖 2025-07-31 18:39:46 +08:00
Diego
2db3f78f0c 添加 控制写操作与读操作的比率 的插件配置属性 2025-07-31 18:18:41 +08:00
Diego
badf61fe01 10.9.99 2025-07-31 16:25:47 +08:00
21 changed files with 112 additions and 60 deletions

View File

@@ -31,11 +31,12 @@
<PackageReference Include="CsvHelper" Version="33.1.0" />
<PackageReference Include="TDengine.Connector" Version="3.1.7" />
<PackageReference Include="Oracle.ManagedDataAccess.Core" Version="23.9.1" />
<PackageReference Include="Oscar.Data.SqlClient" Version="4.2.20" />
<PackageReference Include="Oscar.Data.SqlClient" Version="4.2.21" />
<PackageReference Include="System.Data.Common" Version="4.3.0" />
<PackageReference Include="Microsoft.Data.SqlClient" Version="6.1.0" />
<PackageReference Include="System.Reflection.Emit.Lightweight" Version="4.7.0" />
<PackageReference Include="System.Text.RegularExpressions" Version="4.3.1" />
<PackageReference Include="System.Formats.Asn1" Version="8.0.2" />
</ItemGroup>
</Project>

View File

@@ -1,9 +1,9 @@
<Project>
<PropertyGroup>
<PluginVersion>10.9.94</PluginVersion>
<ProPluginVersion>10.9.93</ProPluginVersion>
<DefaultVersion>10.9.98</DefaultVersion>
<PluginVersion>10.10.1</PluginVersion>
<ProPluginVersion>10.10.1</ProPluginVersion>
<DefaultVersion>10.10.1</DefaultVersion>
<AuthenticationVersion>2.9.29</AuthenticationVersion>
<SourceGeneratorVersion>10.9.29</SourceGeneratorVersion>
<NET8Version>8.0.18</NET8Version>

View File

@@ -575,12 +575,25 @@ public abstract class DeviceBase : DisposableObject, IDevice
var sendOperResult = await SendAsync(command, clientChannel, endPoint, cancellationToken).ConfigureAwait(false);
if (!sendOperResult.IsSuccess)
throw sendOperResult.Exception ?? new(sendOperResult.ErrorMessage ?? "unknown error");
return new MessageBase(sendOperResult);
waitData.SetCancellationToken(cancellationToken);
await waitData.WaitAsync(timeout).ConfigureAwait(false);
try
{
waitData.SetCancellationToken(Channel.ClosedToken);
await waitData.WaitAsync(timeout).ConfigureAwait(false);
}
catch (Exception ex)
{
if (cancellationToken.IsCancellationRequested)
{
if (!this.DisposedValue)
{
await Task.Delay(timeout, Channel.ClosedToken).ConfigureAwait(false);
}
}
return new MessageBase(ex);
}
var result = waitData.Check();
if (result.IsSuccess)
{
@@ -588,33 +601,11 @@ public abstract class DeviceBase : DisposableObject, IDevice
}
else
{
if (cancellationToken.IsCancellationRequested && result.Exception is OperationCanceledException)
{
waitData.Reset();
waitData.SetCancellationToken(CancellationToken.None);
await waitData.WaitAsync(timeout).ConfigureAwait(false);
result = waitData.Check();
if (result.IsSuccess)
{
return waitData.WaitResult;
}
else
{
return new MessageBase(result);
}
}
return new MessageBase(result);
}
}
catch (Exception ex)
{
if (cancellationToken.IsCancellationRequested)
{
if (!this.DisposedValue)
{
await Task.Delay(timeout, CancellationToken.None).ConfigureAwait(false);
}
}
return new MessageBase(ex);
}
finally

View File

@@ -14,48 +14,88 @@ namespace ThingsGateway.Gateway.Application;
public class AsyncReadWriteLock
{
private readonly int _writeReadRatio = 3; // 写3次会允许1次读但写入也不会被阻止具体协议取决于插件协议实现
public AsyncReadWriteLock(int writeReadRatio)
{
_writeReadRatio = writeReadRatio;
}
private AsyncAutoResetEvent _readerLock = new AsyncAutoResetEvent(false); // 控制读计数
private long _writerCount = 0; // 当前活跃的写线程数
private long _readerCount = 0; // 当前被阻塞的读线程数
private CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();
/// <summary>
/// 获取读锁,支持多个线程并发读取,但写入时会阻止所有读取。
/// </summary>
public async Task<CancellationToken> ReaderLockAsync(CancellationToken cancellationToken)
{
if (Interlocked.Read(ref _writerCount) > 0)
{
Interlocked.Increment(ref _readerCount);
// 第一个读者需要获取写入锁,防止写操作
await _readerLock.WaitOneAsync(cancellationToken).ConfigureAwait(false);
Interlocked.Decrement(ref _readerCount);
}
return _cancellationTokenSource.Token;
}
public bool WriteWaited => _writerCount > 0;
/// <summary>
/// 获取写锁,阻止所有读取。
/// </summary>
public IDisposable WriterLock()
public async Task<IDisposable> WriterLockAsync(CancellationToken cancellationToken)
{
if (Interlocked.Increment(ref _writerCount) == 1)
{
var cancellationTokenSource = _cancellationTokenSource;
_cancellationTokenSource = new();
cancellationTokenSource.Cancel();//取消读取
await cancellationTokenSource.CancelAsync().ConfigureAwait(false); // 取消读取
cancellationTokenSource.SafeDispose();
}
return new Writer(this);
}
private void ReleaseWriter()
{
if (Interlocked.Decrement(ref _writerCount) == 0)
var writerCount = Interlocked.Decrement(ref _writerCount);
if (writerCount == 0)
{
var resetEvent = _readerLock;
_readerLock = new(false);
Interlocked.Exchange(ref _writeSinceLastReadCount, 0);
resetEvent.SetAll();
}
else
{
// 读写占空比, 用于控制写操作与读操作的比率。该比率 n 次写入操作会执行一次读取操作。即使在应用程序执行大量的连续写入操作时,也必须确保足够的读取数据处理时间。相对于更加均衡的读写数据流而言,该特点使得外部写入可连续无顾忌操作
if (_writeReadRatio > 0)
{
if (Interlocked.Read(ref _readerCount) > 0)
{
var count = Interlocked.Increment(ref _writeSinceLastReadCount);
if (count >= _writeReadRatio)
{
Interlocked.Exchange(ref _writeSinceLastReadCount, 0);
_readerLock.Set();
}
}
}
else
{
_readerLock.Set();
}
}
}
private int _writeSinceLastReadCount = 0;
private struct Writer : IDisposable
{
private readonly AsyncReadWriteLock _lock;

View File

@@ -194,6 +194,8 @@ public abstract class CollectBase : DriverBase, IRpcDriver
// 从插件服务中获取当前设备关联的驱动方法信息列表
DriverMethodInfos = GlobalData.PluginService.GetDriverMethodInfos(device.PluginName, this);
ReadWriteLock = new(CollectProperties.DutyCycle);
}
public virtual string GetAddressDescription()
@@ -474,7 +476,7 @@ public abstract class CollectBase : DriverBase, IRpcDriver
/// <returns></returns>
protected abstract Task<List<VariableSourceRead>> ProtectedLoadSourceReadAsync(List<VariableRuntime> deviceVariables);
protected AsyncReadWriteLock ReadWriteLock = new();
protected AsyncReadWriteLock ReadWriteLock;
/// <summary>
/// 采集驱动读取,读取成功后直接赋值变量
@@ -565,7 +567,8 @@ public abstract class CollectBase : DriverBase, IRpcDriver
ConcurrentDictionary<string, OperResult<object>> operResults = new();
using var writeLock = ReadWriteLock.WriterLock();
using var writeLock = await ReadWriteLock.WriterLockAsync(cancellationToken).ConfigureAwait(false);
var list = writeInfoLists
.Where(a => !results.Any(b => b.Key == a.Key.Name))
.ToDictionary(item => item.Key, item => item.Value).ToArray();

View File

@@ -175,7 +175,7 @@ public abstract class CollectFoundationBase : CollectBase
/// <returns></returns>
protected override async ValueTask<Dictionary<string, OperResult>> WriteValuesAsync(Dictionary<VariableRuntime, JToken> writeInfoLists, CancellationToken cancellationToken)
{
using var writeLock = ReadWriteLock.WriterLock();
using var writeLock = await ReadWriteLock.WriterLockAsync(cancellationToken).ConfigureAwait(false);
// 检查协议是否为空,如果为空则抛出异常
if (FoundationDevice == null)
throw new NotSupportedException();

View File

@@ -31,6 +31,11 @@ public abstract class CollectPropertyBase : DriverPropertyBase
/// 失败重试次数默认3
/// </summary>
public virtual int RetryCount { get; set; } = 3;
/// <summary>
/// 读写占空比
/// </summary>
public virtual int DutyCycle { get; set; } = 3;
}
/// <summary>
@@ -45,4 +50,8 @@ public abstract class CollectPropertyRetryBase : CollectPropertyBase
/// </summary>
[DynamicProperty]
public override int RetryCount { get; set; } = 3;
[DynamicProperty(Remark = "n 次写入操作会执行一次读取")]
public override int DutyCycle { get; set; } = 3;
}

View File

@@ -295,7 +295,8 @@
"RetryCount": "RetryCount"
},
"ThingsGateway.Gateway.Application.CollectPropertyRetryBase": {
"RetryCount": "RetryCount"
"RetryCount": "RetryCount",
"DutyCycle": "DutyCycle"
},
"ThingsGateway.Gateway.Application.ControlController": {
"BatchSaveChannelAsync": "BatchSaveChannel",

View File

@@ -294,7 +294,8 @@
"RetryCount": "失败重试次数"
},
"ThingsGateway.Gateway.Application.CollectPropertyRetryBase": {
"RetryCount": "失败重试次数"
"RetryCount": "失败重试次数",
"DutyCycle": "占空比"
},
"ThingsGateway.Gateway.Application.ControlController": {
"BatchSaveChannelAsync": "保存通道",

View File

@@ -54,7 +54,7 @@ public static class PluginServiceUtil
{
{ "title", classAttribute.Remark }
};
tc.ComponentParameters.AddItem(
tc.ComponentParameters = tc.ComponentParameters.AddItem(
new("title", classAttribute.Remark)
);
}

View File

@@ -286,7 +286,7 @@ public class OpcUaMaster : IDisposable
}
catch (Exception)
{
m_session.RemoveSubscription(m_subscription);
await m_session.RemoveSubscriptionAsync(m_subscription, cancellationToken).ConfigureAwait(false);
throw;
}
}
@@ -312,7 +312,7 @@ public class OpcUaMaster : IDisposable
else if (_subscriptionDicts.TryGetValue(subscriptionName, out var existingSubscription))
{
// remove
existingSubscription.Delete(true);
await existingSubscription.DeleteAsync(true, cancellationToken).ConfigureAwait(false);
await m_session.RemoveSubscriptionAsync(existingSubscription, cancellationToken).ConfigureAwait(false);
try { existingSubscription.Dispose(); } catch { }
_subscriptionDicts[subscriptionName] = m_subscription;
@@ -715,7 +715,7 @@ public class OpcUaMaster : IDisposable
{
foreach (var item in _subscriptionDicts)
{
item.Value.Delete(true);
await item.Value.DeleteAsync(true).ConfigureAwait(false);
await m_session.RemoveSubscriptionAsync(item.Value).ConfigureAwait(false);
try { item.Value.Dispose(); } catch { }
}
@@ -731,7 +731,7 @@ public class OpcUaMaster : IDisposable
if (_subscriptionDicts.TryGetValue(subscriptionName, out var subscription))
{
// remove
subscription.Delete(true);
await subscription.DeleteAsync(true).ConfigureAwait(false);
await m_session.RemoveSubscriptionAsync(subscription).ConfigureAwait(false);
try { subscription.Dispose(); } catch { }
_subscriptionDicts.TryRemove(subscriptionName, out _);
@@ -758,7 +758,7 @@ public class OpcUaMaster : IDisposable
var dataValue = JsonUtils.Decode(
m_session.MessageContext,
variableNode.DataType,
TypeInfo.GetBuiltInType(variableNode.DataType, m_session.SystemContext.TypeTable),
await TypeInfo.GetBuiltInTypeAsync(variableNode.DataType, m_session.SystemContext.TypeTable, cancellationToken).ConfigureAwait(false),
item.Value.CalculateActualValueRank(),
item.Value
);
@@ -983,7 +983,7 @@ public class OpcUaMaster : IDisposable
for (int i = 0; i < results.Count; i++)
{
var variableNode = await ReadNodeAsync(nodeIds[i].ToString(), false, StatusCode.IsGood(results[i].StatusCode), cancellationToken).ConfigureAwait(false);
var type = TypeInfo.GetBuiltInType(variableNode.DataType, m_session.SystemContext.TypeTable);
var type = await TypeInfo.GetBuiltInTypeAsync(variableNode.DataType, m_session.SystemContext.TypeTable, cancellationToken).ConfigureAwait(false);
var jToken = JsonUtils.Encode(m_session.MessageContext, type, results[i].Value);
jTokens.Add((variableNode.NodeId.ToString(), results[i], jToken));
}
@@ -1078,7 +1078,7 @@ public class OpcUaMaster : IDisposable
var responseHeader = readResponse.ResponseHeader;
VariableNode variableNode = GetVariableNodes(itemsToRead, values, diagnosticInfos, responseHeader).FirstOrDefault();
if (OpcUaProperty.LoadType && variableNode.DataType != NodeId.Null && TypeInfo.GetBuiltInType(variableNode.DataType, m_session.SystemContext.TypeTable) == BuiltInType.ExtensionObject)
if (OpcUaProperty.LoadType && variableNode.DataType != NodeId.Null && (await TypeInfo.GetBuiltInTypeAsync(variableNode.DataType, m_session.SystemContext.TypeTable, cancellationToken).ConfigureAwait(false)) == BuiltInType.ExtensionObject)
await typeSystem.LoadType(variableNode.DataType, ct: cancellationToken).ConfigureAwait(false);
if (cache)
@@ -1178,7 +1178,7 @@ public class OpcUaMaster : IDisposable
{
if (cache)
_variableDicts.AddOrUpdate(nodeIdStrs[i], a => node, (a, b) => node);
if (node.DataType != NodeId.Null && TypeInfo.GetBuiltInType(node.DataType, m_session.SystemContext.TypeTable) == BuiltInType.ExtensionObject)
if (node.DataType != NodeId.Null && (await TypeInfo.GetBuiltInTypeAsync(node.DataType, m_session.SystemContext.TypeTable, cancellationToken).ConfigureAwait(false)) == BuiltInType.ExtensionObject)
{
await typeSystem.LoadType(node.DataType, ct: cancellationToken).ConfigureAwait(false);
}

View File

@@ -4,7 +4,7 @@
<Import Project="..\..\PackNuget.props" />
<Import Project="..\..\Version.props" />
<PropertyGroup>
<TargetFrameworks>net48;netstandard2.1;net6.0;</TargetFrameworks>
<TargetFrameworks>net48;netstandard2.1;net8.0;</TargetFrameworks>
<Description>工业设备通讯协议-OpcUa协议</Description>
<GenerateDocumentationFile>True</GenerateDocumentationFile>
<DocumentationFile></DocumentationFile>
@@ -12,7 +12,7 @@
<ItemGroup>
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua.Client.ComplexTypes" Version="1.5.376.235" />
<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua.Client.ComplexTypes" Version="1.5.376.244" />
</ItemGroup>
<ItemGroup>

View File

@@ -10,6 +10,7 @@
"IsWriteMemory": "IsWriteMemory",
"ModbusType": "ModbusType",
"MulStation": "MultipleStations",
"SendDelayTime": "SendDelayTime",
"Station": "DefaultStation"
},
"ThingsGateway.Plugin.Modbus.ModbusSlaveVariableProperty": {

View File

@@ -10,6 +10,7 @@
"IsWriteMemory": "立即写入内存",
"ModbusType": "协议类型",
"MulStation": "多站点",
"SendDelayTime": "发送延时",
"Station": "默认站号"
},
"ThingsGateway.Plugin.Modbus.ModbusSlaveVariableProperty": {

View File

@@ -100,6 +100,7 @@ public class ModbusSlave : BusinessBase
_plc.IsWriteMemory = _driverPropertys.IsWriteMemory;
_plc.MulStation = _driverPropertys.MulStation;
_plc.ModbusType = _driverPropertys.ModbusType;
_plc.SendDelayTime = _driverPropertys.SendDelayTime;
_plc.InitChannel(channel, LogMessage);
await base.InitChannelAsync(channel, cancellationToken).ConfigureAwait(false);

View File

@@ -53,4 +53,7 @@ public class ModbusSlaveProperty : BusinessPropertyBase
/// </summary>
[DynamicProperty]
public bool IsWriteMemory { get; set; } = true;
[DynamicProperty]
public int SendDelayTime { get; set; }
}

View File

@@ -154,7 +154,7 @@ public class OpcDaMaster : CollectBase
/// <inheritdoc/>
protected override async ValueTask<Dictionary<string, OperResult>> WriteValuesAsync(Dictionary<VariableRuntime, JToken> writeInfoLists, CancellationToken cancellationToken)
{
using var writeLock = ReadWriteLock.WriterLock();
using var writeLock = await ReadWriteLock.WriterLockAsync(cancellationToken).ConfigureAwait(false);
await ValueTask.CompletedTask.ConfigureAwait(false);
var result = _plc.WriteItem(writeInfoLists.ToDictionary(a => a.Key.RegisterAddress!, a => a.Value.GetObjectFromJToken()!));
var results = new ConcurrentDictionary<string, OperResult>(result.ToDictionary<KeyValuePair<string, Tuple<bool, string>>, string, OperResult>(a => writeInfoLists.Keys.FirstOrDefault(b => b.RegisterAddress == a.Key).Name, a =>

View File

@@ -275,7 +275,7 @@ public class OpcUaMaster : CollectBase
/// <inheritdoc/>
protected override async ValueTask<Dictionary<string, OperResult>> WriteValuesAsync(Dictionary<VariableRuntime, JToken> writeInfoLists, CancellationToken cancellationToken)
{
using var writeLock = ReadWriteLock.WriterLock();
using var writeLock = await ReadWriteLock.WriterLockAsync(cancellationToken).ConfigureAwait(false);
var result = await _plc.WriteNodeAsync(writeInfoLists.ToDictionary(a => a.Key.RegisterAddress!, a => a.Value), cancellationToken).ConfigureAwait(false);
var results = new ConcurrentDictionary<string, OperResult>(result.ToDictionary<KeyValuePair<string, Tuple<bool, string>>, string, OperResult>(a => writeInfoLists.Keys.FirstOrDefault(b => b.RegisterAddress == a.Key)?.Name!
, a =>

View File

@@ -22,27 +22,27 @@
</ProjectReference>
<ProjectReference Include="..\ThingsGateway.Foundation.OpcUa\ThingsGateway.Foundation.OpcUa.csproj" />
<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua.Server" Version="1.5.376.235" GeneratePathProperty="true">
<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua.Server" Version="1.5.376.244" GeneratePathProperty="true">
<PrivateAssets>contentFiles;compile;build;buildMultitargeting;buildTransitive;analyzers;</PrivateAssets>
</PackageReference>
<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua.Client.ComplexTypes" Version="1.5.376.235" GeneratePathProperty="true">
<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua.Client.ComplexTypes" Version="1.5.376.244" GeneratePathProperty="true">
<PrivateAssets>contentFiles;compile;build;buildMultitargeting;buildTransitive;analyzers;</PrivateAssets>
</PackageReference>
<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua.Client" Version="1.5.376.235" GeneratePathProperty="true">
<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua.Client" Version="1.5.376.244" GeneratePathProperty="true">
<PrivateAssets>contentFiles;compile;build;buildMultitargeting;buildTransitive;analyzers;</PrivateAssets>
</PackageReference>
<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua.Core" Version="1.5.376.235" GeneratePathProperty="true">
<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua.Core" Version="1.5.376.244" GeneratePathProperty="true">
<PrivateAssets>contentFiles;compile;build;buildMultitargeting;buildTransitive;analyzers;</PrivateAssets>
</PackageReference>
<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua.Configuration" Version="1.5.376.235" GeneratePathProperty="true">
<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua.Configuration" Version="1.5.376.244" GeneratePathProperty="true">
<PrivateAssets>contentFiles;compile;build;buildMultitargeting;buildTransitive;analyzers;</PrivateAssets>
</PackageReference>
<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua.Security.Certificates" Version="1.5.376.235" GeneratePathProperty="true">
<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua.Security.Certificates" Version="1.5.376.244" GeneratePathProperty="true">
<PrivateAssets>contentFiles;compile;build;buildMultitargeting;buildTransitive;analyzers;</PrivateAssets>
</PackageReference>

View File

@@ -95,7 +95,7 @@ public class SiemensS7Master : CollectFoundationBase
protected override async ValueTask<Dictionary<string, OperResult>> WriteValuesAsync(Dictionary<VariableRuntime, JToken> writeInfoLists, CancellationToken cancellationToken)
{
using var writeLock = ReadWriteLock.WriterLock();
using var writeLock = await ReadWriteLock.WriterLockAsync(cancellationToken).ConfigureAwait(false);
// 检查协议是否为空,如果为空则抛出异常
if (FoundationDevice == null)

View File

@@ -215,7 +215,7 @@
// }
// /// <summary>
// /// 写入变量实现设备写入操作注意执行写锁using var writeLock = ReadWriteLock.WriterLock();
// /// 写入变量,实现设备写入操作,注意执行写锁, using var writeLock =await ReadWriteLock.WriterLockAsync(cancellationToken).ConfigureAwait(false);
// /// </summary>
// protected override ValueTask<Dictionary<string, OperResult>> WriteValuesAsync(Dictionary<VariableRuntime, JToken> writeInfoLists, CancellationToken cancellationToken)
// {