refactor: 删除不必要的锁操作
This commit is contained in:
@@ -88,10 +88,9 @@ public abstract class DeviceHostedService : BackgroundService
|
||||
/// <returns>通道线程管理器</returns>
|
||||
protected ChannelThread GetChannelThread(DriverBase driverBase)
|
||||
{
|
||||
//lock (this)
|
||||
var channelId = driverBase.CurrentDevice.ChannelId;
|
||||
lock (ChannelThreads)
|
||||
{
|
||||
long channelId = driverBase.CurrentDevice.ChannelId;
|
||||
|
||||
// 尝试从现有的通道线程管理器列表中查找匹配的通道线程
|
||||
var channelThread = ChannelThreads.FirstOrDefault(t => t.ChannelId == channelId);
|
||||
if (channelThread != null)
|
||||
@@ -103,43 +102,43 @@ public abstract class DeviceHostedService : BackgroundService
|
||||
|
||||
// 如果未找到匹配的通道线程,则创建一个新的通道线程
|
||||
return NewChannelThread(driverBase, channelId);
|
||||
}
|
||||
|
||||
// 创建新的通道线程的内部方法
|
||||
ChannelThread NewChannelThread(DriverBase driverBase, long channelId)
|
||||
// 创建新的通道线程的内部方法
|
||||
ChannelThread NewChannelThread(DriverBase driverBase, long channelId)
|
||||
{
|
||||
// 根据通道ID获取通道信息
|
||||
var channel = ChannelService.GetChannelById(channelId);
|
||||
if (channel == null)
|
||||
{
|
||||
// 根据通道ID获取通道信息
|
||||
var channel = ChannelService.GetChannelById(channelId);
|
||||
if (channel == null)
|
||||
{
|
||||
_logger.LogWarning(Localizer["ChannelNotNull", driverBase.CurrentDevice.Name, channelId]);
|
||||
return null;
|
||||
}
|
||||
// 检查通道是否启用
|
||||
if (!channel.Enable)
|
||||
{
|
||||
_logger.LogWarning(Localizer["ChannelNotEnable", driverBase.CurrentDevice.Name, channel.Name]);
|
||||
return null;
|
||||
}
|
||||
// 确保通道不为 null
|
||||
ArgumentNullException.ThrowIfNull(channel);
|
||||
if (ChannelThreads.Count > ChannelThread.MaxCount)
|
||||
{
|
||||
throw new Exception($"Exceeded maximum number of channels:{ChannelThread.MaxCount}");
|
||||
}
|
||||
if (DriverBases.Select(a => a.CurrentDevice.VariableRunTimes.Count).Sum() > ChannelThread.MaxVariableCount)
|
||||
{
|
||||
throw new Exception($"Exceeded maximum number of variables:{ChannelThread.MaxVariableCount}");
|
||||
}
|
||||
|
||||
// 创建新的通道线程,并将驱动程序添加到其中
|
||||
ChannelThread channelThread = new ChannelThread(channel, (a =>
|
||||
{
|
||||
return ChannelService.GetChannel(channel, a);
|
||||
}));
|
||||
channelThread.AddDriver(driverBase);
|
||||
ChannelThreads.Add(channelThread);
|
||||
return channelThread;
|
||||
_logger.LogWarning(Localizer["ChannelNotNull", driverBase.CurrentDevice.Name, channelId]);
|
||||
return null;
|
||||
}
|
||||
// 检查通道是否启用
|
||||
if (!channel.Enable)
|
||||
{
|
||||
_logger.LogWarning(Localizer["ChannelNotEnable", driverBase.CurrentDevice.Name, channel.Name]);
|
||||
return null;
|
||||
}
|
||||
// 确保通道不为 null
|
||||
ArgumentNullException.ThrowIfNull(channel);
|
||||
if (ChannelThreads.Count > ChannelThread.MaxCount)
|
||||
{
|
||||
throw new Exception($"Exceeded maximum number of channels:{ChannelThread.MaxCount}");
|
||||
}
|
||||
if (DriverBases.Select(a => a.CurrentDevice.VariableRunTimes.Count).Sum() > ChannelThread.MaxVariableCount)
|
||||
{
|
||||
throw new Exception($"Exceeded maximum number of variables:{ChannelThread.MaxVariableCount}");
|
||||
}
|
||||
|
||||
// 创建新的通道线程,并将驱动程序添加到其中
|
||||
ChannelThread channelThread = new ChannelThread(channel, (a =>
|
||||
{
|
||||
return ChannelService.GetChannel(channel, a);
|
||||
}));
|
||||
channelThread.AddDriver(driverBase);
|
||||
ChannelThreads.Add(channelThread);
|
||||
return channelThread;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -180,18 +179,15 @@ public abstract class DeviceHostedService : BackgroundService
|
||||
// 遍历通道线程列表,并在每个通道线程上执行 BeforeStopThread 方法
|
||||
ChannelThreads.ParallelForEach((channelThread) =>
|
||||
{
|
||||
_ = Task.Run(() =>
|
||||
try
|
||||
{
|
||||
try
|
||||
{
|
||||
channelThread.BeforeStopThread();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
// 记录执行 BeforeStopThread 方法时的异常信息
|
||||
_logger?.LogError(ex, channelThread.ToString());
|
||||
}
|
||||
});
|
||||
channelThread.BeforeStopThread();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
// 记录执行 BeforeStopThread 方法时的异常信息
|
||||
_logger?.LogError(ex, channelThread.ToString());
|
||||
}
|
||||
});
|
||||
|
||||
// 等待一小段时间,以确保 BeforeStopThread 方法有足够的时间执行
|
||||
|
@@ -422,24 +422,6 @@ public class ChannelThread
|
||||
}
|
||||
}
|
||||
}
|
||||
// foreach (var cancellationToken in CancellationTokenSources)
|
||||
//{
|
||||
// _ = Task.Run(() =>
|
||||
// {
|
||||
// try
|
||||
// {
|
||||
// if (!cancellationToken.Value.IsCancellationRequested)// 检查是否已请求取消,若未请求取消则尝试取消操作
|
||||
// {
|
||||
// cancellationToken.Value?.Cancel();
|
||||
// cancellationToken.Value?.Dispose();
|
||||
// }
|
||||
// }
|
||||
// catch
|
||||
// {
|
||||
// // 捕获异常以确保不会影响其他令牌的取消操作
|
||||
// }
|
||||
// });
|
||||
//}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
@@ -89,8 +89,8 @@ public abstract class CollectBase : DriverBase
|
||||
{
|
||||
try
|
||||
{
|
||||
if (IsSingleThread)
|
||||
await WriteLock.WaitAsync(cancellationToken).ConfigureAwait(false);
|
||||
//if (IsSingleThread)
|
||||
// await WriteLock.WaitAsync(cancellationToken).ConfigureAwait(false);
|
||||
|
||||
if (cancellationToken.IsCancellationRequested)
|
||||
throw new OperationCanceledException();
|
||||
@@ -111,8 +111,8 @@ public abstract class CollectBase : DriverBase
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (IsSingleThread)
|
||||
WriteLock.Release();
|
||||
//if (IsSingleThread)
|
||||
// WriteLock.Release();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -125,8 +125,8 @@ public abstract class CollectBase : DriverBase
|
||||
try
|
||||
{
|
||||
// 如果是单线程模式,则等待写入锁
|
||||
if (IsSingleThread)
|
||||
await WriteLock.WaitAsync(cancellationToken).ConfigureAwait(false);
|
||||
//if (IsSingleThread)
|
||||
// await WriteLock.WaitAsync(cancellationToken).ConfigureAwait(false);
|
||||
|
||||
// 检查协议是否为空,如果为空则抛出异常
|
||||
if (Protocol == null)
|
||||
@@ -151,8 +151,8 @@ public abstract class CollectBase : DriverBase
|
||||
finally
|
||||
{
|
||||
// 如果是单线程模式,则释放写入锁
|
||||
if (IsSingleThread)
|
||||
WriteLock.Release();
|
||||
//if (IsSingleThread)
|
||||
// WriteLock.Release();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -263,7 +263,7 @@ public abstract class CollectBase : DriverBase
|
||||
if (await TestOnline(cancellationToken))
|
||||
return;
|
||||
|
||||
if (CollectProperties.ConcurrentCount > 1 && !IsSingleThread)
|
||||
if (CollectProperties.ConcurrentCount > 1)
|
||||
{
|
||||
// 并行处理每个变量读取
|
||||
await CurrentDevice.VariableSourceReads.ParallelForEachAsync(async (variableSourceRead, cancellationToken) =>
|
||||
@@ -287,7 +287,7 @@ public abstract class CollectBase : DriverBase
|
||||
}
|
||||
}
|
||||
|
||||
if (CollectProperties.ConcurrentCount > 1 && !IsSingleThread)
|
||||
if (CollectProperties.ConcurrentCount > 1)
|
||||
{
|
||||
// 并行处理每个方法调用
|
||||
await CurrentDevice.ReadVariableMethods.ParallelForEachAsync(async (readVariableMethods, cancellationToken) =>
|
||||
@@ -532,8 +532,8 @@ public abstract class CollectBase : DriverBase
|
||||
try
|
||||
{
|
||||
// 如果配置为单线程模式,则获取写入锁定
|
||||
if (IsSingleThread)
|
||||
await WriteLock.WaitAsync(cancellationToken).ConfigureAwait(false);
|
||||
//if (IsSingleThread)
|
||||
// await WriteLock.WaitAsync(cancellationToken).ConfigureAwait(false);
|
||||
|
||||
// 初始化操作结果
|
||||
OperResult<object> result = new OperResult<object>();
|
||||
@@ -583,8 +583,8 @@ public abstract class CollectBase : DriverBase
|
||||
finally
|
||||
{
|
||||
// 如果配置为单线程模式,则释放写入锁定
|
||||
if (IsSingleThread)
|
||||
WriteLock.Release();
|
||||
//if (IsSingleThread)
|
||||
// WriteLock.Release();
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -147,13 +147,6 @@ public abstract class DriverBase : DisposableObject
|
||||
/// </summary>
|
||||
public virtual bool IsCollectDevice => CurrentDevice.PluginType == PluginTypeEnum.Collect;
|
||||
|
||||
/// <summary>
|
||||
/// 读写锁,通常对于主从协议来说都需要,返回false时,需要在底层实现读写锁
|
||||
/// 并且读取或者写入会并发进行,需要另外在底层实现锁
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
protected virtual bool IsSingleThread => Protocol?.IsSingleThread ?? true;
|
||||
|
||||
/// <summary>
|
||||
/// 全局插件服务
|
||||
/// </summary>
|
||||
|
@@ -122,8 +122,7 @@ public class OpcDaMaster : CollectBase
|
||||
{
|
||||
try
|
||||
{
|
||||
if (IsSingleThread)
|
||||
await WriteLock.WaitAsync(cancellationToken).ConfigureAwait(false);
|
||||
await WriteLock.WaitAsync(cancellationToken).ConfigureAwait(false);
|
||||
_plc.ReadItemsWithGroup(deviceVariableSourceRead.RegisterAddress);
|
||||
return OperResult.CreateSuccessResult(Array.Empty<byte>());
|
||||
}
|
||||
@@ -133,8 +132,7 @@ public class OpcDaMaster : CollectBase
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (IsSingleThread)
|
||||
WriteLock.Release();
|
||||
WriteLock.Release();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -143,8 +141,7 @@ public class OpcDaMaster : CollectBase
|
||||
{
|
||||
try
|
||||
{
|
||||
if (IsSingleThread)
|
||||
await WriteLock.WaitAsync(cancellationToken).ConfigureAwait(false);
|
||||
await WriteLock.WaitAsync(cancellationToken).ConfigureAwait(false);
|
||||
var result = _plc.WriteItem(writeInfoLists.ToDictionary(a => a.Key.RegisterAddress!, a => a.Value.GetObjectFromJToken()!));
|
||||
return result.ToDictionary<KeyValuePair<string, Tuple<bool, string>>, string, OperResult>(a =>
|
||||
{
|
||||
@@ -160,8 +157,7 @@ public class OpcDaMaster : CollectBase
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (IsSingleThread)
|
||||
WriteLock.Release();
|
||||
WriteLock.Release();
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -51,14 +51,6 @@ public class OpcUaMaster : CollectBase
|
||||
return $"{_driverProperties.OpcUrl}";
|
||||
}
|
||||
|
||||
protected override bool IsSingleThread
|
||||
{
|
||||
get
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
/// <inheritdoc/>
|
||||
public override void Init(IChannel? channel = null)
|
||||
{
|
||||
@@ -146,8 +138,7 @@ public class OpcUaMaster : CollectBase
|
||||
{
|
||||
try
|
||||
{
|
||||
if (IsSingleThread)
|
||||
await WriteLock.WaitAsync(cancellationToken).ConfigureAwait(false);
|
||||
await WriteLock.WaitAsync(cancellationToken).ConfigureAwait(false);
|
||||
var result = await _plc.ReadJTokenValueAsync(deviceVariableSourceRead.VariableRunTimes.Where(a => !a.RegisterAddress.IsNullOrEmpty()).Select(a => a.RegisterAddress!).ToArray(), cancellationToken).ConfigureAwait(false);
|
||||
foreach (var data in result)
|
||||
{
|
||||
@@ -197,8 +188,7 @@ public class OpcUaMaster : CollectBase
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (IsSingleThread)
|
||||
WriteLock.Release();
|
||||
WriteLock.Release();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -207,8 +197,7 @@ public class OpcUaMaster : CollectBase
|
||||
{
|
||||
try
|
||||
{
|
||||
if (IsSingleThread)
|
||||
await WriteLock.WaitAsync(cancellationToken).ConfigureAwait(false);
|
||||
await WriteLock.WaitAsync(cancellationToken).ConfigureAwait(false);
|
||||
var result = await _plc.WriteNodeAsync(writeInfoLists.ToDictionary(a => a.Key.RegisterAddress!, a => a.Value), cancellationToken).ConfigureAwait(false);
|
||||
return result.ToDictionary<KeyValuePair<string, Tuple<bool, string>>, string, OperResult>(a =>
|
||||
{
|
||||
@@ -224,8 +213,7 @@ public class OpcUaMaster : CollectBase
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (IsSingleThread)
|
||||
WriteLock.Release();
|
||||
WriteLock.Release();
|
||||
}
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user