mirror of
https://gitee.com/ThingsGateway/ThingsGateway.git
synced 2025-10-20 10:50:48 +08:00
添加 控制写操作与读操作的比率 的插件配置属性
This commit is contained in:
@@ -1,9 +1,9 @@
|
||||
<Project>
|
||||
|
||||
<PropertyGroup>
|
||||
<PluginVersion>10.9.94</PluginVersion>
|
||||
<ProPluginVersion>10.9.93</ProPluginVersion>
|
||||
<DefaultVersion>10.9.99</DefaultVersion>
|
||||
<PluginVersion>10.10.1</PluginVersion>
|
||||
<ProPluginVersion>10.10.1</ProPluginVersion>
|
||||
<DefaultVersion>10.10.1</DefaultVersion>
|
||||
<AuthenticationVersion>2.9.29</AuthenticationVersion>
|
||||
<SourceGeneratorVersion>10.9.29</SourceGeneratorVersion>
|
||||
<NET8Version>8.0.18</NET8Version>
|
||||
|
@@ -14,48 +14,88 @@ namespace ThingsGateway.Gateway.Application;
|
||||
|
||||
public class AsyncReadWriteLock
|
||||
{
|
||||
private readonly int _writeReadRatio = 3; // 写3次会允许1次读,但写入也不会被阻止,具体协议取决于插件协议实现
|
||||
public AsyncReadWriteLock(int writeReadRatio)
|
||||
{
|
||||
_writeReadRatio = writeReadRatio;
|
||||
}
|
||||
private AsyncAutoResetEvent _readerLock = new AsyncAutoResetEvent(false); // 控制读计数
|
||||
private long _writerCount = 0; // 当前活跃的写线程数
|
||||
private long _readerCount = 0; // 当前被阻塞的读线程数
|
||||
private CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();
|
||||
/// <summary>
|
||||
/// 获取读锁,支持多个线程并发读取,但写入时会阻止所有读取。
|
||||
/// </summary>
|
||||
public async Task<CancellationToken> ReaderLockAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
|
||||
if (Interlocked.Read(ref _writerCount) > 0)
|
||||
{
|
||||
Interlocked.Increment(ref _readerCount);
|
||||
|
||||
// 第一个读者需要获取写入锁,防止写操作
|
||||
await _readerLock.WaitOneAsync(cancellationToken).ConfigureAwait(false);
|
||||
|
||||
Interlocked.Decrement(ref _readerCount);
|
||||
|
||||
}
|
||||
return _cancellationTokenSource.Token;
|
||||
}
|
||||
|
||||
public bool WriteWaited => _writerCount > 0;
|
||||
|
||||
/// <summary>
|
||||
/// 获取写锁,阻止所有读取。
|
||||
/// </summary>
|
||||
public IDisposable WriterLock()
|
||||
public async Task<IDisposable> WriterLockAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
|
||||
if (Interlocked.Increment(ref _writerCount) == 1)
|
||||
{
|
||||
var cancellationTokenSource = _cancellationTokenSource;
|
||||
_cancellationTokenSource = new();
|
||||
cancellationTokenSource.Cancel();//取消读取
|
||||
await cancellationTokenSource.CancelAsync().ConfigureAwait(false); // 取消读取
|
||||
cancellationTokenSource.SafeDispose();
|
||||
}
|
||||
|
||||
return new Writer(this);
|
||||
}
|
||||
|
||||
private void ReleaseWriter()
|
||||
{
|
||||
if (Interlocked.Decrement(ref _writerCount) == 0)
|
||||
var writerCount = Interlocked.Decrement(ref _writerCount);
|
||||
if (writerCount == 0)
|
||||
{
|
||||
var resetEvent = _readerLock;
|
||||
_readerLock = new(false);
|
||||
Interlocked.Exchange(ref _writeSinceLastReadCount, 0);
|
||||
resetEvent.SetAll();
|
||||
}
|
||||
else
|
||||
{
|
||||
|
||||
// 读写占空比, 用于控制写操作与读操作的比率。该比率 n 次写入操作会执行一次读取操作。即使在应用程序执行大量的连续写入操作时,也必须确保足够的读取数据处理时间。相对于更加均衡的读写数据流而言,该特点使得外部写入可连续无顾忌操作
|
||||
|
||||
if (_writeReadRatio > 0)
|
||||
{
|
||||
if (Interlocked.Read(ref _readerCount) > 0)
|
||||
{
|
||||
var count = Interlocked.Increment(ref _writeSinceLastReadCount);
|
||||
if (count >= _writeReadRatio)
|
||||
{
|
||||
Interlocked.Exchange(ref _writeSinceLastReadCount, 0);
|
||||
_readerLock.Set();
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
_readerLock.Set();
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
private int _writeSinceLastReadCount = 0;
|
||||
private struct Writer : IDisposable
|
||||
{
|
||||
private readonly AsyncReadWriteLock _lock;
|
||||
|
@@ -194,6 +194,8 @@ public abstract class CollectBase : DriverBase, IRpcDriver
|
||||
|
||||
// 从插件服务中获取当前设备关联的驱动方法信息列表
|
||||
DriverMethodInfos = GlobalData.PluginService.GetDriverMethodInfos(device.PluginName, this);
|
||||
|
||||
ReadWriteLock = new(CollectProperties.DutyCycle);
|
||||
}
|
||||
|
||||
public virtual string GetAddressDescription()
|
||||
@@ -474,7 +476,7 @@ public abstract class CollectBase : DriverBase, IRpcDriver
|
||||
/// <returns></returns>
|
||||
protected abstract Task<List<VariableSourceRead>> ProtectedLoadSourceReadAsync(List<VariableRuntime> deviceVariables);
|
||||
|
||||
protected AsyncReadWriteLock ReadWriteLock = new();
|
||||
protected AsyncReadWriteLock ReadWriteLock;
|
||||
|
||||
/// <summary>
|
||||
/// 采集驱动读取,读取成功后直接赋值变量
|
||||
@@ -565,7 +567,8 @@ public abstract class CollectBase : DriverBase, IRpcDriver
|
||||
|
||||
ConcurrentDictionary<string, OperResult<object>> operResults = new();
|
||||
|
||||
using var writeLock = ReadWriteLock.WriterLock();
|
||||
|
||||
using var writeLock = await ReadWriteLock.WriterLockAsync(cancellationToken).ConfigureAwait(false);
|
||||
var list = writeInfoLists
|
||||
.Where(a => !results.Any(b => b.Key == a.Key.Name))
|
||||
.ToDictionary(item => item.Key, item => item.Value).ToArray();
|
||||
|
@@ -175,7 +175,7 @@ public abstract class CollectFoundationBase : CollectBase
|
||||
/// <returns></returns>
|
||||
protected override async ValueTask<Dictionary<string, OperResult>> WriteValuesAsync(Dictionary<VariableRuntime, JToken> writeInfoLists, CancellationToken cancellationToken)
|
||||
{
|
||||
using var writeLock = ReadWriteLock.WriterLock();
|
||||
using var writeLock = await ReadWriteLock.WriterLockAsync(cancellationToken).ConfigureAwait(false);
|
||||
// 检查协议是否为空,如果为空则抛出异常
|
||||
if (FoundationDevice == null)
|
||||
throw new NotSupportedException();
|
||||
|
@@ -31,6 +31,11 @@ public abstract class CollectPropertyBase : DriverPropertyBase
|
||||
/// 失败重试次数,默认3
|
||||
/// </summary>
|
||||
public virtual int RetryCount { get; set; } = 3;
|
||||
|
||||
/// <summary>
|
||||
/// 读写占空比
|
||||
/// </summary>
|
||||
public virtual int DutyCycle { get; set; } = 3;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@@ -45,4 +50,8 @@ public abstract class CollectPropertyRetryBase : CollectPropertyBase
|
||||
/// </summary>
|
||||
[DynamicProperty]
|
||||
public override int RetryCount { get; set; } = 3;
|
||||
|
||||
[DynamicProperty(Remark = "n 次写入操作会执行一次读取")]
|
||||
public override int DutyCycle { get; set; } = 3;
|
||||
|
||||
}
|
@@ -295,7 +295,8 @@
|
||||
"RetryCount": "RetryCount"
|
||||
},
|
||||
"ThingsGateway.Gateway.Application.CollectPropertyRetryBase": {
|
||||
"RetryCount": "RetryCount"
|
||||
"RetryCount": "RetryCount",
|
||||
"DutyCycle": "DutyCycle"
|
||||
},
|
||||
"ThingsGateway.Gateway.Application.ControlController": {
|
||||
"BatchSaveChannelAsync": "BatchSaveChannel",
|
||||
|
@@ -294,7 +294,8 @@
|
||||
"RetryCount": "失败重试次数"
|
||||
},
|
||||
"ThingsGateway.Gateway.Application.CollectPropertyRetryBase": {
|
||||
"RetryCount": "失败重试次数"
|
||||
"RetryCount": "失败重试次数",
|
||||
"DutyCycle": "占空比"
|
||||
},
|
||||
"ThingsGateway.Gateway.Application.ControlController": {
|
||||
"BatchSaveChannelAsync": "保存通道",
|
||||
|
@@ -54,7 +54,7 @@ public static class PluginServiceUtil
|
||||
{
|
||||
{ "title", classAttribute.Remark }
|
||||
};
|
||||
tc.ComponentParameters.AddItem(
|
||||
tc.ComponentParameters = tc.ComponentParameters.AddItem(
|
||||
new("title", classAttribute.Remark)
|
||||
);
|
||||
}
|
||||
|
@@ -10,6 +10,7 @@
|
||||
"IsWriteMemory": "IsWriteMemory",
|
||||
"ModbusType": "ModbusType",
|
||||
"MulStation": "MultipleStations",
|
||||
"SendDelayTime": "SendDelayTime",
|
||||
"Station": "DefaultStation"
|
||||
},
|
||||
"ThingsGateway.Plugin.Modbus.ModbusSlaveVariableProperty": {
|
||||
|
@@ -10,6 +10,7 @@
|
||||
"IsWriteMemory": "立即写入内存",
|
||||
"ModbusType": "协议类型",
|
||||
"MulStation": "多站点",
|
||||
"SendDelayTime": "发送延时",
|
||||
"Station": "默认站号"
|
||||
},
|
||||
"ThingsGateway.Plugin.Modbus.ModbusSlaveVariableProperty": {
|
||||
|
@@ -100,6 +100,7 @@ public class ModbusSlave : BusinessBase
|
||||
_plc.IsWriteMemory = _driverPropertys.IsWriteMemory;
|
||||
_plc.MulStation = _driverPropertys.MulStation;
|
||||
_plc.ModbusType = _driverPropertys.ModbusType;
|
||||
_plc.SendDelayTime = _driverPropertys.SendDelayTime;
|
||||
_plc.InitChannel(channel, LogMessage);
|
||||
await base.InitChannelAsync(channel, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
|
@@ -53,4 +53,7 @@ public class ModbusSlaveProperty : BusinessPropertyBase
|
||||
/// </summary>
|
||||
[DynamicProperty]
|
||||
public bool IsWriteMemory { get; set; } = true;
|
||||
|
||||
[DynamicProperty]
|
||||
public int SendDelayTime { get; set; }
|
||||
}
|
||||
|
@@ -154,7 +154,7 @@ public class OpcDaMaster : CollectBase
|
||||
/// <inheritdoc/>
|
||||
protected override async ValueTask<Dictionary<string, OperResult>> WriteValuesAsync(Dictionary<VariableRuntime, JToken> writeInfoLists, CancellationToken cancellationToken)
|
||||
{
|
||||
using var writeLock = ReadWriteLock.WriterLock();
|
||||
using var writeLock = await ReadWriteLock.WriterLockAsync(cancellationToken).ConfigureAwait(false);
|
||||
await ValueTask.CompletedTask.ConfigureAwait(false);
|
||||
var result = _plc.WriteItem(writeInfoLists.ToDictionary(a => a.Key.RegisterAddress!, a => a.Value.GetObjectFromJToken()!));
|
||||
var results = new ConcurrentDictionary<string, OperResult>(result.ToDictionary<KeyValuePair<string, Tuple<bool, string>>, string, OperResult>(a => writeInfoLists.Keys.FirstOrDefault(b => b.RegisterAddress == a.Key).Name, a =>
|
||||
|
@@ -275,7 +275,7 @@ public class OpcUaMaster : CollectBase
|
||||
/// <inheritdoc/>
|
||||
protected override async ValueTask<Dictionary<string, OperResult>> WriteValuesAsync(Dictionary<VariableRuntime, JToken> writeInfoLists, CancellationToken cancellationToken)
|
||||
{
|
||||
using var writeLock = ReadWriteLock.WriterLock();
|
||||
using var writeLock = await ReadWriteLock.WriterLockAsync(cancellationToken).ConfigureAwait(false);
|
||||
var result = await _plc.WriteNodeAsync(writeInfoLists.ToDictionary(a => a.Key.RegisterAddress!, a => a.Value), cancellationToken).ConfigureAwait(false);
|
||||
var results = new ConcurrentDictionary<string, OperResult>(result.ToDictionary<KeyValuePair<string, Tuple<bool, string>>, string, OperResult>(a => writeInfoLists.Keys.FirstOrDefault(b => b.RegisterAddress == a.Key)?.Name!
|
||||
, a =>
|
||||
|
@@ -95,7 +95,7 @@ public class SiemensS7Master : CollectFoundationBase
|
||||
|
||||
protected override async ValueTask<Dictionary<string, OperResult>> WriteValuesAsync(Dictionary<VariableRuntime, JToken> writeInfoLists, CancellationToken cancellationToken)
|
||||
{
|
||||
using var writeLock = ReadWriteLock.WriterLock();
|
||||
using var writeLock = await ReadWriteLock.WriterLockAsync(cancellationToken).ConfigureAwait(false);
|
||||
|
||||
// 检查协议是否为空,如果为空则抛出异常
|
||||
if (FoundationDevice == null)
|
||||
|
@@ -215,7 +215,7 @@
|
||||
// }
|
||||
|
||||
// /// <summary>
|
||||
// /// 写入变量,实现设备写入操作,注意执行写锁,using var writeLock = ReadWriteLock.WriterLock();
|
||||
// /// 写入变量,实现设备写入操作,注意执行写锁, using var writeLock =await ReadWriteLock.WriterLockAsync(cancellationToken).ConfigureAwait(false);
|
||||
// /// </summary>
|
||||
// protected override ValueTask<Dictionary<string, OperResult>> WriteValuesAsync(Dictionary<VariableRuntime, JToken> writeInfoLists, CancellationToken cancellationToken)
|
||||
// {
|
||||
|
Reference in New Issue
Block a user