mirror of
https://gitee.com/ThingsGateway/ThingsGateway.git
synced 2025-10-20 10:50:48 +08:00
恢复restart标记
This commit is contained in:
@@ -17,6 +17,7 @@ using System.Diagnostics;
|
||||
using System.Net;
|
||||
|
||||
using ThingsGateway.HttpRemote.Extensions;
|
||||
using ThingsGateway.NewLife.Log;
|
||||
using ThingsGateway.Utilities;
|
||||
|
||||
namespace ThingsGateway.HttpRemote;
|
||||
@@ -254,7 +255,7 @@ public sealed class ProfilerDelegatingHandler(ILogger<Logging> logger, IOptions<
|
||||
}
|
||||
else
|
||||
{
|
||||
Console.WriteLine(message);
|
||||
XTrace.WriteLine(message);
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -16,14 +16,16 @@ namespace ThingsGateway.NewLife;
|
||||
public sealed class WaitLock : IDisposable
|
||||
{
|
||||
private readonly SemaphoreSlim _waiterLock;
|
||||
|
||||
private readonly string _name;
|
||||
/// <summary>
|
||||
/// 构造方法
|
||||
/// </summary>
|
||||
/// <param name="name">名称</param>
|
||||
/// <param name="maxCount">最大并发数</param>
|
||||
/// <param name="initialZeroState">初始无信号量</param>
|
||||
public WaitLock(int maxCount = 1, bool initialZeroState = false)
|
||||
public WaitLock(string name, int maxCount = 1, bool initialZeroState = false)
|
||||
{
|
||||
_name = name;
|
||||
if (initialZeroState)
|
||||
_waiterLock = new SemaphoreSlim(0, maxCount);
|
||||
else
|
||||
@@ -48,19 +50,27 @@ public sealed class WaitLock : IDisposable
|
||||
public int CurrentCount => _waiterLock.CurrentCount;
|
||||
public bool Waitting => _waiterLock.CurrentCount < MaxCount;
|
||||
|
||||
private object m_lockObj = new();
|
||||
/// <summary>
|
||||
/// 离开锁
|
||||
/// </summary>
|
||||
public void Release()
|
||||
{
|
||||
if (DisposedValue) return;
|
||||
try
|
||||
{
|
||||
_waiterLock.Release();
|
||||
}
|
||||
catch (SemaphoreFullException)
|
||||
lock (m_lockObj)
|
||||
{
|
||||
if (Waitting)
|
||||
{
|
||||
try
|
||||
{
|
||||
_waiterLock.Release();
|
||||
}
|
||||
catch (SemaphoreFullException)
|
||||
{
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
@@ -2,6 +2,7 @@
|
||||
|
||||
using ThingsGateway.NewLife;
|
||||
using ThingsGateway.NewLife.Compression;
|
||||
using ThingsGateway.NewLife.Log;
|
||||
|
||||
#if NET8_0_OR_GREATER
|
||||
using System.Formats.Tar;
|
||||
@@ -578,7 +579,7 @@ public static class PathHelper
|
||||
});
|
||||
if (rs?.Length > 0) list.AddRange(rs);
|
||||
}
|
||||
catch (Exception ex) { Console.WriteLine(" " + ex.Message); }
|
||||
catch (Exception ex) { XTrace.WriteLine(ex.Message); }
|
||||
}
|
||||
|
||||
return list.ToArray();
|
||||
|
@@ -1,11 +1,11 @@
|
||||
<Project>
|
||||
|
||||
<PropertyGroup>
|
||||
<PluginVersion>10.9.41</PluginVersion>
|
||||
<ProPluginVersion>10.9.41</ProPluginVersion>
|
||||
<DefaultVersion>10.9.41</DefaultVersion>
|
||||
<AuthenticationVersion>2.9.18</AuthenticationVersion>
|
||||
<SourceGeneratorVersion>10.9.18</SourceGeneratorVersion>
|
||||
<PluginVersion>10.9.44</PluginVersion>
|
||||
<ProPluginVersion>10.9.44</ProPluginVersion>
|
||||
<DefaultVersion>10.9.44</DefaultVersion>
|
||||
<AuthenticationVersion>2.9.20</AuthenticationVersion>
|
||||
<SourceGeneratorVersion>10.9.20</SourceGeneratorVersion>
|
||||
<NET8Version>8.0.18</NET8Version>
|
||||
<NET9Version>9.0.7</NET9Version>
|
||||
<SatelliteResourceLanguages>zh-Hans;en-US</SatelliteResourceLanguages>
|
||||
|
@@ -84,7 +84,7 @@ public partial class LogConsole : IDisposable
|
||||
Disposed = true;
|
||||
GC.SuppressFinalize(this);
|
||||
}
|
||||
private WaitLock WaitLock = new();
|
||||
private WaitLock WaitLock = new(nameof(LogConsole));
|
||||
protected async Task ExecuteAsync()
|
||||
{
|
||||
if (WaitLock.Waited) return;
|
||||
|
@@ -15,7 +15,7 @@ namespace ThingsGateway.Foundation;
|
||||
/// <inheritdoc/>
|
||||
public class ChannelOptions : ChannelOptionsBase, IChannelOptions, IDisposable
|
||||
{
|
||||
public WaitLock WaitLock { get; private set; } = new WaitLock();
|
||||
public WaitLock WaitLock { get; private set; } = new WaitLock(nameof(ChannelOptions));
|
||||
/// <inheritdoc/>
|
||||
public override int MaxConcurrentCount
|
||||
{
|
||||
@@ -31,7 +31,7 @@ public class ChannelOptions : ChannelOptionsBase, IChannelOptions, IDisposable
|
||||
if (WaitLock?.MaxCount != MaxConcurrentCount)
|
||||
{
|
||||
var _lock = WaitLock;
|
||||
WaitLock = new WaitLock(_maxConcurrentCount);
|
||||
WaitLock = new WaitLock(nameof(ChannelOptions), _maxConcurrentCount);
|
||||
_lock?.SafeDispose();
|
||||
}
|
||||
}
|
||||
|
@@ -65,7 +65,7 @@ public class DDPTcpSessionClientChannel : TcpSessionClientChannel
|
||||
|
||||
|
||||
private DeviceSingleStreamDataHandleAdapter<DDPTcpMessage> DDPAdapter = new();
|
||||
private WaitLock _waitLock = new();
|
||||
private WaitLock _waitLock = new(nameof(DDPTcpSessionClientChannel));
|
||||
protected override async ValueTask<bool> OnTcpReceiving(ByteBlock byteBlock)
|
||||
{
|
||||
DDPMessage? message = null;
|
||||
|
@@ -91,7 +91,7 @@ public class DDPUdpSessionChannel : UdpSessionChannel, IClientChannel, IDtuUdpSe
|
||||
public override WaitLock GetLock(string key)
|
||||
{
|
||||
if (key.IsNullOrEmpty()) return WaitLock;
|
||||
return WaitLocks.GetOrAdd(key, (a) => new WaitLock(WaitLock.MaxCount));
|
||||
return WaitLocks.GetOrAdd(key, (a) => new WaitLock(nameof(DDPUdpSessionChannel), WaitLock.MaxCount));
|
||||
}
|
||||
|
||||
public override Task<Result> StopAsync(CancellationToken token)
|
||||
@@ -108,7 +108,7 @@ public class DDPUdpSessionChannel : UdpSessionChannel, IClientChannel, IDtuUdpSe
|
||||
var byteBlock = e.ByteBlock;
|
||||
var endPoint = e.EndPoint;
|
||||
DDPMessage? message = null;
|
||||
var waitLock = _waitLocks.GetOrAdd(endPoint, new WaitLock());
|
||||
var waitLock = _waitLocks.GetOrAdd(endPoint, new WaitLock(nameof(DDPUdpSessionChannel)));
|
||||
try
|
||||
{
|
||||
await waitLock.WaitAsync().ConfigureAwait(false);
|
||||
|
@@ -61,7 +61,7 @@ public abstract class TcpServiceChannelBase<TClient> : TcpService<TClient>, ITcp
|
||||
}
|
||||
}
|
||||
|
||||
private readonly WaitLock _connectLock = new WaitLock();
|
||||
private readonly WaitLock _connectLock = new WaitLock(nameof(TcpServiceChannelBase<TClient>));
|
||||
/// <inheritdoc/>
|
||||
public override async Task StartAsync()
|
||||
{
|
||||
@@ -280,7 +280,7 @@ public class TcpServiceChannel<TClient> : TcpServiceChannelBase<TClient>, IChann
|
||||
{
|
||||
client.ChannelOptions = ChannelOptions;
|
||||
|
||||
client.WaitLock = new NewLife.WaitLock(ChannelOptions.WaitLock.MaxCount);
|
||||
client.WaitLock = new NewLife.WaitLock(nameof(TcpServiceChannelBase<TClient>), ChannelOptions.WaitLock.MaxCount);
|
||||
|
||||
|
||||
base.ClientInitialized(client);
|
||||
|
@@ -60,7 +60,7 @@ public class TcpSessionClientChannel : TcpSessionClient, IClientChannel
|
||||
public WaitHandlePool<MessageBase> WaitHandlePool { get; private set; } = new();
|
||||
|
||||
/// <inheritdoc/>
|
||||
public WaitLock WaitLock { get; internal set; } = new();
|
||||
public WaitLock WaitLock { get; internal set; } = new(nameof(TcpSessionClientChannel));
|
||||
public virtual WaitLock GetLock(string key) => WaitLock;
|
||||
|
||||
/// <inheritdoc/>
|
||||
|
@@ -19,7 +19,7 @@ namespace ThingsGateway.Foundation;
|
||||
/// </summary>
|
||||
public class UdpSessionChannel : UdpSession, IClientChannel
|
||||
{
|
||||
private readonly WaitLock _connectLock = new WaitLock();
|
||||
private readonly WaitLock _connectLock = new WaitLock(nameof(UdpSessionChannel));
|
||||
|
||||
/// <inheritdoc/>
|
||||
public UdpSessionChannel(IChannelOptions channelOptions)
|
||||
|
@@ -350,6 +350,9 @@ public abstract class DeviceBase : DisposableObject, IDevice
|
||||
if (SendDelayTime != 0)
|
||||
await Task.Delay(SendDelayTime, token).ConfigureAwait(false);
|
||||
|
||||
if (token.IsCancellationRequested)
|
||||
return new OperResult(new OperationCanceledException());
|
||||
|
||||
if (channel is IDtuUdpSessionChannel udpSession)
|
||||
{
|
||||
await udpSession.SendAsync(endPoint, sendMessage).ConfigureAwait(false);
|
||||
@@ -386,7 +389,7 @@ public abstract class DeviceBase : DisposableObject, IDevice
|
||||
|
||||
}
|
||||
|
||||
private WaitLock connectWaitLock = new();
|
||||
private WaitLock connectWaitLock = new(nameof(DeviceBase));
|
||||
|
||||
public async Task ConnectAsync(CancellationToken token)
|
||||
{
|
||||
@@ -418,9 +421,7 @@ public abstract class DeviceBase : DisposableObject, IDevice
|
||||
var dtuId = this is IDtu dtu1 ? dtu1.DtuId : null;
|
||||
var channelResult = GetChannel(dtuId);
|
||||
if (!channelResult.IsSuccess) return new OperResult<byte[]>(channelResult);
|
||||
WaitLock? waitLock = null;
|
||||
EndPoint? endPoint = GetUdpEndpoint(dtuId);
|
||||
waitLock = GetWaitLock(channelResult.Content, waitLock, dtuId);
|
||||
WaitLock? waitLock = GetWaitLock(channelResult.Content, dtuId);
|
||||
|
||||
try
|
||||
{
|
||||
@@ -431,6 +432,8 @@ public abstract class DeviceBase : DisposableObject, IDevice
|
||||
if (channelResult.Content.ReadOnlyDataHandlingAdapter != null)
|
||||
channelResult.Content.ReadOnlyDataHandlingAdapter.Logger = Logger;
|
||||
|
||||
EndPoint? endPoint = GetUdpEndpoint(dtuId);
|
||||
|
||||
return await SendAsync(sendMessage, channelResult.Content, endPoint, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
finally
|
||||
@@ -545,25 +548,37 @@ public abstract class DeviceBase : DisposableObject, IDevice
|
||||
var waitData = clientChannel.WaitHandlePool.GetWaitDataAsync(out var sign);
|
||||
command.Sign = sign;
|
||||
WaitLock? waitLock = null;
|
||||
var dtuId = this is IDtu dtu1 ? dtu1.DtuId : null;
|
||||
EndPoint? endPoint = GetUdpEndpoint(dtuId);
|
||||
try
|
||||
{
|
||||
waitLock = GetWaitLock(clientChannel, waitLock, dtuId);
|
||||
|
||||
var dtuId = this is IDtu dtu1 ? dtu1.DtuId : null;
|
||||
waitLock = GetWaitLock(clientChannel, dtuId);
|
||||
|
||||
await BefortSendAsync(clientChannel, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
await waitLock.WaitAsync(cancellationToken).ConfigureAwait(false);
|
||||
|
||||
|
||||
EndPoint? endPoint = GetUdpEndpoint(dtuId);
|
||||
|
||||
if (cancellationToken.IsCancellationRequested)
|
||||
return new MessageBase(new OperationCanceledException());
|
||||
|
||||
if (clientChannel.ReadOnlyDataHandlingAdapter != null)
|
||||
clientChannel.ReadOnlyDataHandlingAdapter.Logger = Logger;
|
||||
|
||||
waitData.SetCancellationToken(cancellationToken);
|
||||
|
||||
Channel.ChannelReceivedWaitDict.TryAdd(sign, ChannelReceived);
|
||||
|
||||
if (cancellationToken.IsCancellationRequested)
|
||||
return new MessageBase(new OperationCanceledException());
|
||||
|
||||
var sendOperResult = await SendAsync(command, clientChannel, endPoint, cancellationToken).ConfigureAwait(false);
|
||||
if (!sendOperResult.IsSuccess)
|
||||
throw sendOperResult.Exception ?? new(sendOperResult.ErrorMessage ?? "unknown error");
|
||||
|
||||
waitData.SetCancellationToken(cancellationToken);
|
||||
|
||||
await waitData.WaitAsync(timeout).ConfigureAwait(false);
|
||||
|
||||
var result = waitData.Check();
|
||||
@@ -573,20 +588,39 @@ public abstract class DeviceBase : DisposableObject, IDevice
|
||||
}
|
||||
else
|
||||
{
|
||||
if (cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
if (!this.DisposedValue)
|
||||
{
|
||||
await Task.Delay(timeout, CancellationToken.None).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
return new MessageBase(result);
|
||||
}
|
||||
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
if (cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
if (!this.DisposedValue)
|
||||
{
|
||||
await Task.Delay(timeout, CancellationToken.None).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
return new MessageBase(ex);
|
||||
}
|
||||
finally
|
||||
{
|
||||
waitLock.Release();
|
||||
clientChannel.WaitHandlePool.Destroy(sign);
|
||||
Channel.ChannelReceivedWaitDict.TryRemove(sign, out _);
|
||||
waitLock?.Release();
|
||||
clientChannel.WaitHandlePool.Destroy(sign);
|
||||
}
|
||||
}
|
||||
|
||||
private static WaitLock GetWaitLock(IClientChannel clientChannel, WaitLock? waitLock, string dtuId)
|
||||
private static WaitLock GetWaitLock(IClientChannel clientChannel, string dtuId)
|
||||
{
|
||||
WaitLock? waitLock = null;
|
||||
if (clientChannel is IDtuUdpSessionChannel udpSessionChannel)
|
||||
{
|
||||
waitLock = udpSessionChannel.GetLock(dtuId);
|
||||
|
@@ -16,17 +16,18 @@ public class AsyncReadWriteLock
|
||||
{
|
||||
private AsyncAutoResetEvent _readerLock = new AsyncAutoResetEvent(false); // 控制读计数
|
||||
private long _writerCount = 0; // 当前活跃的写线程数
|
||||
|
||||
private CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();
|
||||
/// <summary>
|
||||
/// 获取读锁,支持多个线程并发读取,但写入时会阻止所有读取。
|
||||
/// </summary>
|
||||
public async Task ReaderLockAsync(CancellationToken cancellationToken)
|
||||
public async Task<CancellationToken> ReaderLockAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
if (Interlocked.Read(ref _writerCount) > 0)
|
||||
{
|
||||
// 第一个读者需要获取写入锁,防止写操作
|
||||
await _readerLock.WaitOneAsync(cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
return _cancellationTokenSource.Token;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@@ -34,7 +35,14 @@ public class AsyncReadWriteLock
|
||||
/// </summary>
|
||||
public IDisposable WriterLock()
|
||||
{
|
||||
Interlocked.Increment(ref _writerCount);
|
||||
if (Interlocked.Increment(ref _writerCount) == 1)
|
||||
{
|
||||
var cancellationTokenSource = _cancellationTokenSource;
|
||||
_cancellationTokenSource = new();
|
||||
cancellationTokenSource.Cancel();//取消读取
|
||||
cancellationTokenSource.SafeDispose();
|
||||
}
|
||||
|
||||
return new Writer(this);
|
||||
}
|
||||
|
||||
|
@@ -148,7 +148,7 @@ public class ControlController : ControllerBase
|
||||
/// </summary>
|
||||
[HttpPost("batchSaveChannel")]
|
||||
[DisplayName("保存通道")]
|
||||
public Task<bool> BatchSaveChannelAsync([FromBody] List<ChannelInput> channels, ItemChangedType type, bool restart)
|
||||
public Task<bool> BatchSaveChannelAsync([FromBody] List<ChannelInput> channels, ItemChangedType type, bool restart = true)
|
||||
{
|
||||
return GlobalData.ChannelRuntimeService.BatchSaveChannelAsync(channels.AdaptListChannel(), type, restart);
|
||||
}
|
||||
@@ -158,7 +158,7 @@ public class ControlController : ControllerBase
|
||||
/// </summary>
|
||||
[HttpPost("batchSaveDevice")]
|
||||
[DisplayName("保存设备")]
|
||||
public Task<bool> BatchSaveDeviceAsync([FromBody] List<DeviceInput> devices, ItemChangedType type, bool restart)
|
||||
public Task<bool> BatchSaveDeviceAsync([FromBody] List<DeviceInput> devices, ItemChangedType type, bool restart = true)
|
||||
{
|
||||
return GlobalData.DeviceRuntimeService.BatchSaveDeviceAsync(devices.AdaptListDevice(), type, restart);
|
||||
}
|
||||
@@ -168,7 +168,7 @@ public class ControlController : ControllerBase
|
||||
/// </summary>
|
||||
[HttpPost("batchSaveVariable")]
|
||||
[DisplayName("保存变量")]
|
||||
public Task<bool> BatchSaveVariableAsync([FromBody] List<VariableInput> variables, ItemChangedType type, bool restart)
|
||||
public Task<bool> BatchSaveVariableAsync([FromBody] List<VariableInput> variables, ItemChangedType type, bool restart = true)
|
||||
{
|
||||
return GlobalData.VariableRuntimeService.BatchSaveVariableAsync(variables.AdaptListVariable(), type, restart, default);
|
||||
}
|
||||
@@ -178,7 +178,7 @@ public class ControlController : ControllerBase
|
||||
/// </summary>
|
||||
[HttpPost("deleteChannel")]
|
||||
[DisplayName("删除通道")]
|
||||
public Task<bool> DeleteChannelAsync([FromBody] List<long> ids, bool restart)
|
||||
public Task<bool> DeleteChannelAsync([FromBody] List<long> ids, bool restart = true)
|
||||
{
|
||||
if (ids == null || ids.Count == 0) ids = GlobalData.IdChannels.Keys.ToList();
|
||||
return GlobalData.ChannelRuntimeService.DeleteChannelAsync(ids, restart, default);
|
||||
@@ -190,7 +190,7 @@ public class ControlController : ControllerBase
|
||||
/// </summary>
|
||||
[HttpPost("deleteDevice")]
|
||||
[DisplayName("删除设备")]
|
||||
public Task<bool> DeleteDeviceAsync([FromBody] List<long> ids, bool restart)
|
||||
public Task<bool> DeleteDeviceAsync([FromBody] List<long> ids, bool restart = true)
|
||||
{
|
||||
if (ids == null || ids.Count == 0) ids = GlobalData.IdDevices.Keys.ToList();
|
||||
return GlobalData.DeviceRuntimeService.DeleteDeviceAsync(ids, restart, default);
|
||||
@@ -201,7 +201,7 @@ public class ControlController : ControllerBase
|
||||
/// </summary>
|
||||
[HttpPost("deleteVariable")]
|
||||
[DisplayName("删除变量")]
|
||||
public Task<bool> DeleteVariableAsync([FromBody] List<long> ids, bool restart)
|
||||
public Task<bool> DeleteVariableAsync([FromBody] List<long> ids, bool restart = true)
|
||||
{
|
||||
if (ids == null || ids.Count == 0) ids = GlobalData.IdVariables.Keys.ToList();
|
||||
return GlobalData.VariableRuntimeService.DeleteVariableAsync(ids, restart, default);
|
||||
@@ -213,7 +213,7 @@ public class ControlController : ControllerBase
|
||||
/// </summary>
|
||||
[HttpPost("insertTestData")]
|
||||
[DisplayName("增加测试数据")]
|
||||
public Task InsertTestDataAsync(int testVariableCount, int testDeviceCount, string slaveUrl, bool businessEnable, bool restart)
|
||||
public Task InsertTestDataAsync(int testVariableCount, int testDeviceCount, string slaveUrl, bool businessEnable, bool restart = true)
|
||||
{
|
||||
return GlobalData.VariableRuntimeService.InsertTestDataAsync(testVariableCount, testDeviceCount, slaveUrl, businessEnable, restart, default);
|
||||
}
|
||||
|
@@ -344,18 +344,25 @@ public abstract class CollectBase : DriverBase, IRpcDriver
|
||||
{
|
||||
if (state is not VariableSourceRead variableSourceRead) return;
|
||||
|
||||
if (Pause)
|
||||
return;
|
||||
if (cancellationToken.IsCancellationRequested)
|
||||
return;
|
||||
if (Pause) return;
|
||||
if (cancellationToken.IsCancellationRequested) return;
|
||||
|
||||
var readErrorCount = 0;
|
||||
|
||||
await ReadWriteLock.ReaderLockAsync(cancellationToken).ConfigureAwait(false);
|
||||
var readToken = await ReadWriteLock.ReaderLockAsync(cancellationToken).ConfigureAwait(false);
|
||||
|
||||
if (readToken.IsCancellationRequested)
|
||||
{
|
||||
await ReadVariableSource(state, cancellationToken).ConfigureAwait(false);
|
||||
return;
|
||||
}
|
||||
|
||||
using var allTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, readToken);
|
||||
var allToken = allTokenSource.Token;
|
||||
|
||||
//if (LogMessage?.LogLevel <= TouchSocket.Core.LogLevel.Trace)
|
||||
// LogMessage?.Trace(string.Format("{0} - Collecting [{1} - {2}]", DeviceName, variableSourceRead?.RegisterAddress, variableSourceRead?.Length));
|
||||
var readResult = await ReadSourceAsync(variableSourceRead, cancellationToken).ConfigureAwait(false);
|
||||
var readResult = await ReadSourceAsync(variableSourceRead, allToken).ConfigureAwait(false);
|
||||
|
||||
// 读取失败时重试一定次数
|
||||
while (!readResult.IsSuccess && readErrorCount < CollectProperties.RetryCount)
|
||||
@@ -365,6 +372,12 @@ public abstract class CollectBase : DriverBase, IRpcDriver
|
||||
if (cancellationToken.IsCancellationRequested)
|
||||
return;
|
||||
|
||||
if (readToken.IsCancellationRequested)
|
||||
{
|
||||
await ReadVariableSource(state, cancellationToken).ConfigureAwait(false);
|
||||
return;
|
||||
}
|
||||
|
||||
readErrorCount++;
|
||||
if (LogMessage?.LogLevel <= TouchSocket.Core.LogLevel.Trace)
|
||||
LogMessage?.Trace(string.Format("{0} - Collection [{1} - {2}] failed - {3}", DeviceName, variableSourceRead?.RegisterAddress, variableSourceRead?.Length, readResult.ErrorMessage));
|
||||
@@ -372,7 +385,7 @@ public abstract class CollectBase : DriverBase, IRpcDriver
|
||||
|
||||
//if (LogMessage?.LogLevel <= TouchSocket.Core.LogLevel.Trace)
|
||||
// LogMessage?.Trace(string.Format("{0} - Collecting [{1} - {2}]", DeviceName, variableSourceRead?.RegisterAddress, variableSourceRead?.Length));
|
||||
readResult = await ReadSourceAsync(variableSourceRead, cancellationToken).ConfigureAwait(false);
|
||||
readResult = await ReadSourceAsync(variableSourceRead, allToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
if (readResult.IsSuccess)
|
||||
@@ -387,6 +400,12 @@ public abstract class CollectBase : DriverBase, IRpcDriver
|
||||
if (cancellationToken.IsCancellationRequested)
|
||||
return;
|
||||
|
||||
if (readToken.IsCancellationRequested)
|
||||
{
|
||||
await ReadVariableSource(state, cancellationToken).ConfigureAwait(false);
|
||||
return;
|
||||
}
|
||||
|
||||
// 读取失败时记录日志并增加失败计数器,更新错误信息并清除变量状态
|
||||
if (variableSourceRead.LastErrorMessage != readResult.ErrorMessage)
|
||||
{
|
||||
@@ -552,20 +571,20 @@ public abstract class CollectBase : DriverBase, IRpcDriver
|
||||
.ToDictionary(item => item.Key, item => item.Value).ToArray();
|
||||
// 使用并发方式遍历写入信息列表,并进行异步写入操作
|
||||
await list.ParallelForEachAsync(async (writeInfo, cancellationToken) =>
|
||||
{
|
||||
try
|
||||
{
|
||||
// 调用协议的写入方法,将写入信息中的数据写入到对应的寄存器地址,并获取操作结果
|
||||
var result = await InvokeMethodAsync(writeInfo.Key.VariableMethod, writeInfo.Value?.ToString(), false, cancellationToken).ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
// 调用协议的写入方法,将写入信息中的数据写入到对应的寄存器地址,并获取操作结果
|
||||
var result = await InvokeMethodAsync(writeInfo.Key.VariableMethod, writeInfo.Value?.ToString(), false, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
// 将操作结果添加到结果字典中,使用变量名称作为键
|
||||
operResults.TryAdd(writeInfo.Key.Name, result);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
operResults.TryAdd(writeInfo.Key.Name, new(ex));
|
||||
}
|
||||
}, CollectProperties.MaxConcurrentCount, cancellationToken).ConfigureAwait(false);
|
||||
// 将操作结果添加到结果字典中,使用变量名称作为键
|
||||
operResults.TryAdd(writeInfo.Key.Name, result);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
operResults.TryAdd(writeInfo.Key.Name, new(ex));
|
||||
}
|
||||
}, CollectProperties.MaxConcurrentCount, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
finally
|
||||
{
|
||||
|
@@ -147,7 +147,7 @@ public abstract class DriverBase : DisposableObject, IDriver
|
||||
|
||||
#region 日志
|
||||
|
||||
private WaitLock SetLogLock = new();
|
||||
private WaitLock SetLogLock = new(nameof(DriverBase));
|
||||
public async Task SetLogAsync(LogLevel? logLevel = null, bool upDataBase = true)
|
||||
{
|
||||
try
|
||||
@@ -354,6 +354,7 @@ public abstract class DriverBase : DisposableObject, IDriver
|
||||
|
||||
protected override void Dispose(bool disposing)
|
||||
{
|
||||
TaskSchedulerLoop?.Stop();
|
||||
TextLogger?.Dispose();
|
||||
_logger?.TryDispose();
|
||||
IdVariableRuntimes?.Clear();
|
||||
@@ -362,6 +363,7 @@ public abstract class DriverBase : DisposableObject, IDriver
|
||||
if (device != null)
|
||||
device.Driver = null;
|
||||
|
||||
|
||||
LogMessage?.Logs?.ForEach(a => a.TryDispose());
|
||||
LogMessage = null;
|
||||
pluginPropertyEditorItems?.Clear();
|
||||
|
@@ -25,7 +25,12 @@ public class RpcLog : PrimaryIdEntity
|
||||
[SugarColumn(ColumnDescription = "日志时间", IsNullable = false)]
|
||||
[AutoGenerateColumn(Visible = true, DefaultSort = true, Sortable = true, DefaultSortOrder = SortOrder.Desc)]
|
||||
public DateTime LogTime { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// 执行时间
|
||||
/// </summary>
|
||||
[SugarColumn(ColumnDescription = "执行时间", IsNullable = true)]
|
||||
[AutoGenerateColumn(Visible = true, DefaultSort = true, Sortable = true, DefaultSortOrder = SortOrder.Desc)]
|
||||
public int ExecutionTime { get; set; }
|
||||
/// <summary>
|
||||
/// 操作源
|
||||
///</summary>
|
||||
|
@@ -404,6 +404,7 @@
|
||||
"OperateMethod": "RPCMethod",
|
||||
"OperateObject": "OperationObject",
|
||||
"OperateSource": "OperationSource",
|
||||
"ExecutionTime": "ExecutionTime",
|
||||
"ParamJson": "RequestParameters",
|
||||
"ResultJson": "ReturnResults"
|
||||
},
|
||||
|
@@ -405,6 +405,7 @@
|
||||
"OperateMethod": "RPC方法",
|
||||
"OperateObject": "操作对象",
|
||||
"OperateSource": "操作源",
|
||||
"ExecutionTime": "执行时间",
|
||||
"ParamJson": "请求参数",
|
||||
"ResultJson": "返回结果"
|
||||
},
|
||||
|
@@ -49,7 +49,7 @@ public class ChannelRuntime : Channel, IChannelOptions, IDisposable
|
||||
[Newtonsoft.Json.JsonIgnore]
|
||||
[MapperIgnore]
|
||||
[AutoGenerateColumn(Ignore = true)]
|
||||
public WaitLock WaitLock { get; private set; } = new WaitLock();
|
||||
public WaitLock WaitLock { get; private set; } = new WaitLock(nameof(ChannelRuntime));
|
||||
|
||||
/// <inheritdoc/>
|
||||
[MinValue(1)]
|
||||
@@ -68,7 +68,7 @@ public class ChannelRuntime : Channel, IChannelOptions, IDisposable
|
||||
if (WaitLock?.MaxCount != MaxConcurrentCount)
|
||||
{
|
||||
var _lock = WaitLock;
|
||||
WaitLock = new WaitLock(_maxConcurrentCount);
|
||||
WaitLock = new WaitLock(nameof(ChannelRuntime), _maxConcurrentCount);
|
||||
_lock?.SafeDispose();
|
||||
}
|
||||
}
|
||||
|
@@ -24,7 +24,7 @@ public class ChannelRuntimeService : IChannelRuntimeService
|
||||
{
|
||||
_logger = logger;
|
||||
}
|
||||
private WaitLock WaitLock { get; set; } = new WaitLock();
|
||||
private WaitLock WaitLock { get; set; } = new WaitLock(nameof(ChannelRuntimeService));
|
||||
|
||||
public async Task<bool> CopyAsync(List<Channel> models, Dictionary<Device, List<Variable>> devices, bool restart, CancellationToken cancellationToken)
|
||||
{
|
||||
@@ -125,7 +125,7 @@ public class ChannelRuntimeService : IChannelRuntimeService
|
||||
}
|
||||
}
|
||||
|
||||
public async Task<bool> BatchEditAsync(IEnumerable<Channel> models, Channel oldModel, Channel model, bool restart = true)
|
||||
public async Task<bool> BatchEditAsync(IEnumerable<Channel> models, Channel oldModel, Channel model, bool restart)
|
||||
{
|
||||
try
|
||||
{
|
||||
@@ -186,7 +186,7 @@ public class ChannelRuntimeService : IChannelRuntimeService
|
||||
public Task<MemoryStream> ExportMemoryStream(IEnumerable<Channel> data) =>
|
||||
GlobalData.ChannelService.ExportMemoryStream(data);
|
||||
|
||||
public async Task ImportChannelAsync(Dictionary<string, ImportPreviewOutputBase> input, bool restart = true)
|
||||
public async Task ImportChannelAsync(Dictionary<string, ImportPreviewOutputBase> input, bool restart)
|
||||
{
|
||||
try
|
||||
{
|
||||
@@ -209,7 +209,7 @@ public class ChannelRuntimeService : IChannelRuntimeService
|
||||
WaitLock.Release();
|
||||
}
|
||||
}
|
||||
public async Task<bool> SaveChannelAsync(Channel input, ItemChangedType type, bool restart = true)
|
||||
public async Task<bool> SaveChannelAsync(Channel input, ItemChangedType type, bool restart)
|
||||
{
|
||||
try
|
||||
{
|
||||
|
@@ -27,7 +27,7 @@ public class DeviceRuntimeService : IDeviceRuntimeService
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
private WaitLock WaitLock { get; set; } = new WaitLock();
|
||||
private WaitLock WaitLock { get; set; } = new WaitLock(nameof(DeviceRuntimeService));
|
||||
|
||||
|
||||
public async Task<bool> CopyAsync(Dictionary<Device, List<Variable>> devices, bool restart, CancellationToken cancellationToken)
|
||||
@@ -60,7 +60,7 @@ public class DeviceRuntimeService : IDeviceRuntimeService
|
||||
}
|
||||
}
|
||||
|
||||
public async Task<bool> BatchEditAsync(IEnumerable<Device> models, Device oldModel, Device model, bool restart = true)
|
||||
public async Task<bool> BatchEditAsync(IEnumerable<Device> models, Device oldModel, Device model, bool restart)
|
||||
{
|
||||
try
|
||||
{
|
||||
@@ -136,7 +136,7 @@ public class DeviceRuntimeService : IDeviceRuntimeService
|
||||
GlobalData.DeviceService.ExportMemoryStream(data, channelName, plugin);
|
||||
|
||||
|
||||
public async Task ImportDeviceAsync(Dictionary<string, ImportPreviewOutputBase> input, bool restart = true)
|
||||
public async Task ImportDeviceAsync(Dictionary<string, ImportPreviewOutputBase> input, bool restart)
|
||||
{
|
||||
try
|
||||
{
|
||||
@@ -174,7 +174,7 @@ public class DeviceRuntimeService : IDeviceRuntimeService
|
||||
|
||||
}
|
||||
|
||||
public async Task<bool> SaveDeviceAsync(Device input, ItemChangedType type, bool restart = true)
|
||||
public async Task<bool> SaveDeviceAsync(Device input, ItemChangedType type, bool restart)
|
||||
{
|
||||
try
|
||||
{
|
||||
|
@@ -30,7 +30,7 @@ internal sealed class ChannelThreadManage : IChannelThreadManage
|
||||
|
||||
#region 设备管理
|
||||
|
||||
private WaitLock NewChannelLock = new();
|
||||
private WaitLock NewChannelLock = new(nameof(ChannelThreadManage));
|
||||
/// <summary>
|
||||
/// 移除指定通道
|
||||
/// </summary>
|
||||
|
@@ -76,7 +76,7 @@ internal sealed class DeviceThreadManage : IAsyncDisposable, IDeviceThreadManage
|
||||
|
||||
#region 日志
|
||||
|
||||
private WaitLock SetLogLock = new();
|
||||
private WaitLock SetLogLock = new(nameof(DeviceThreadManage));
|
||||
public async Task SetLogAsync(LogLevel? logLevel = null, bool upDataBase = true)
|
||||
{
|
||||
try
|
||||
@@ -171,7 +171,7 @@ internal sealed class DeviceThreadManage : IAsyncDisposable, IDeviceThreadManage
|
||||
|
||||
#region 设备管理
|
||||
|
||||
private WaitLock NewDeviceLock = new();
|
||||
private WaitLock NewDeviceLock = new(nameof(DeviceThreadManage));
|
||||
|
||||
/// <summary>
|
||||
/// 向当前通道添加设备
|
||||
@@ -346,8 +346,6 @@ internal sealed class DeviceThreadManage : IAsyncDisposable, IDeviceThreadManage
|
||||
|
||||
CancellationTokenSources.TryAdd(driver.DeviceId, cts);
|
||||
|
||||
token.Register(driver.Stop);
|
||||
|
||||
_ = Task.Factory.StartNew((state) => DriverStart(state, token), driver, token);
|
||||
|
||||
}).ConfigureAwait(false);
|
||||
@@ -427,23 +425,25 @@ internal sealed class DeviceThreadManage : IAsyncDisposable, IDeviceThreadManage
|
||||
}
|
||||
|
||||
|
||||
|
||||
// 取消驱动程序的操作
|
||||
if (CancellationTokenSources.TryRemove(deviceId, out var token))
|
||||
{
|
||||
if (token != null)
|
||||
{
|
||||
driver.Stop();
|
||||
token.Cancel();
|
||||
token.Dispose();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
if (DriverTasks.TryRemove(deviceId, out var task))
|
||||
{
|
||||
task.Stop();
|
||||
}
|
||||
{
|
||||
task.Stop();
|
||||
}
|
||||
|
||||
// 取消驱动程序的操作
|
||||
if (CancellationTokenSources.TryRemove(deviceId, out var token))
|
||||
{
|
||||
if (token != null)
|
||||
{
|
||||
token.Cancel();
|
||||
token.Dispose();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
await Task.Delay(100).ConfigureAwait(false);
|
||||
|
@@ -199,7 +199,7 @@ internal sealed class RedundancyTask : IRpcDriver, IAsyncDisposable
|
||||
}
|
||||
|
||||
|
||||
private WaitLock _switchLock = new();
|
||||
private WaitLock _switchLock = new(nameof(RedundancyTask));
|
||||
|
||||
|
||||
|
||||
@@ -424,7 +424,7 @@ internal sealed class RedundancyTask : IRpcDriver, IAsyncDisposable
|
||||
|
||||
#region ForcedSync
|
||||
|
||||
WaitLock ForcedSyncWaitLock = new WaitLock();
|
||||
WaitLock ForcedSyncWaitLock = new WaitLock(nameof(RedundancyTask));
|
||||
public async Task ForcedSync(CancellationToken cancellationToken = default)
|
||||
{
|
||||
await ForcedSyncWaitLock.WaitAsync(cancellationToken).ConfigureAwait(false);
|
||||
|
@@ -154,8 +154,8 @@ internal sealed class UpdateZipFileHostedService : BackgroundService, IUpdateZip
|
||||
return updateZipFiles.OrderByDescending(a => a.Version).ToList();
|
||||
}
|
||||
|
||||
private readonly WaitLock WaitLock = new();
|
||||
private readonly WaitLock UpdateWaitLock = new();
|
||||
private readonly WaitLock WaitLock = new(nameof(UpdateZipFileHostedService));
|
||||
private readonly WaitLock UpdateWaitLock = new(nameof(UpdateZipFileHostedService));
|
||||
public async Task Update(UpdateZipFile updateZipFile, Func<Task<bool>> check = null)
|
||||
{
|
||||
try
|
||||
|
@@ -40,7 +40,7 @@ internal sealed class PluginService : IPluginService
|
||||
private const string DelEx = ".del";
|
||||
|
||||
private readonly IDispatchService<PluginInfo> _dispatchService;
|
||||
private readonly WaitLock _locker = new();
|
||||
private readonly WaitLock _locker = new(nameof(PluginService));
|
||||
private readonly ILogger _logger;
|
||||
|
||||
public PluginService(ILogger<PluginService> logger, IDispatchService<PluginInfo> dispatchService)
|
||||
|
@@ -130,9 +130,10 @@ internal sealed class RpcService : IRpcService
|
||||
{
|
||||
try
|
||||
{
|
||||
var start = DateTime.Now;
|
||||
// 调用设备的写入方法
|
||||
var result = await driverData.Key.InVokeWriteAsync(driverData.Value, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
var end = DateTime.Now;
|
||||
// 写入日志
|
||||
foreach (var resultItem in result)
|
||||
{
|
||||
@@ -149,7 +150,8 @@ internal sealed class RpcService : IRpcService
|
||||
_logQueues.Enqueue(
|
||||
new RpcLog()
|
||||
{
|
||||
LogTime = DateTime.Now,
|
||||
LogTime = start,
|
||||
ExecutionTime = (int)(end - start).TotalMilliseconds,
|
||||
OperateMessage = variableResult.Value.IsSuccess ? null : variableResult.Value.ToString(),
|
||||
IsSuccess = variableResult.Value.IsSuccess,
|
||||
OperateMethod = AppResource.WriteVariable,
|
||||
@@ -190,10 +192,12 @@ internal sealed class RpcService : IRpcService
|
||||
{
|
||||
try
|
||||
{
|
||||
var start = DateTime.Now;
|
||||
// 调用设备的写入方法
|
||||
var result = await driverData.Key.InvokeMethodAsync(driverData.Value, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
Dictionary<string, string> operateMethods = driverData.Value.Select(a => a.Key).ToDictionary(a => a.Name, a => a.OtherMethod!);
|
||||
var end = DateTime.Now;
|
||||
|
||||
// 写入日志
|
||||
foreach (var resultItem in result)
|
||||
@@ -210,7 +214,8 @@ internal sealed class RpcService : IRpcService
|
||||
_logQueues.Enqueue(
|
||||
new RpcLog()
|
||||
{
|
||||
LogTime = DateTime.Now,
|
||||
LogTime = start,
|
||||
ExecutionTime = (int)(end - start).TotalMilliseconds,
|
||||
OperateMessage = variableResult.Value.IsSuccess ? null : variableResult.Value.ToString(),
|
||||
IsSuccess = variableResult.Value.IsSuccess,
|
||||
OperateMethod = operateMethods[variableResult.Key],
|
||||
|
@@ -52,7 +52,7 @@ internal sealed class RulesEngineHostedService : BackgroundService, IRulesEngine
|
||||
/// <summary>
|
||||
/// 重启锁
|
||||
/// </summary>
|
||||
private WaitLock RestartLock { get; } = new();
|
||||
private WaitLock RestartLock { get; } = new(nameof(RulesEngineHostedService));
|
||||
private List<Rules> Rules { get; set; } = new();
|
||||
public Dictionary<RulesLog, Diagram> Diagrams { get; private set; } = new();
|
||||
|
||||
|
@@ -377,7 +377,7 @@ public partial class SiemensS7Master : DeviceBase
|
||||
#endregion 读写
|
||||
|
||||
#region 初始握手
|
||||
private WaitLock ChannelStartedWaitLock = new();
|
||||
private WaitLock ChannelStartedWaitLock = new(nameof(SiemensS7Master));
|
||||
private SiemensTypeEnum siemensS7Type;
|
||||
|
||||
/// <inheritdoc/>
|
||||
|
@@ -47,7 +47,7 @@ public partial class MqttClient : BusinessBaseWithCacheIntervalScriptAll
|
||||
|
||||
private MqttClientSubscribeOptions _mqttSubscribeOptions;
|
||||
|
||||
private WaitLock ConnectLock = new();
|
||||
private WaitLock ConnectLock = new(nameof(MqttClient));
|
||||
|
||||
protected override void AlarmChange(AlarmVariable alarmVariable)
|
||||
{
|
||||
|
@@ -35,7 +35,7 @@ public partial class MqttCollect : CollectBase
|
||||
|
||||
private MqttClientSubscribeOptions _mqttSubscribeOptions;
|
||||
|
||||
private WaitLock ConnectLock = new();
|
||||
private WaitLock ConnectLock = new(nameof(MqttCollect));
|
||||
|
||||
|
||||
#region mqtt方法
|
||||
|
@@ -148,17 +148,16 @@ public class OpcDaMaster : CollectBase
|
||||
}
|
||||
|
||||
/// <inheritdoc/>
|
||||
protected override async ValueTask<OperResult<byte[]>> ReadSourceAsync(VariableSourceRead deviceVariableSourceRead, CancellationToken cancellationToken)
|
||||
protected override ValueTask<OperResult<byte[]>> ReadSourceAsync(VariableSourceRead deviceVariableSourceRead, CancellationToken cancellationToken)
|
||||
{
|
||||
try
|
||||
{
|
||||
await ReadWriteLock.ReaderLockAsync(cancellationToken).ConfigureAwait(false);
|
||||
_plc.ReadItemsWithGroup(deviceVariableSourceRead.RegisterAddress);
|
||||
return OperResult.CreateSuccessResult(Array.Empty<byte>());
|
||||
return ValueTask.FromResult(OperResult.CreateSuccessResult(Array.Empty<byte>()));
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
return new OperResult<byte[]>($"ReadSourceAsync Error:{Environment.NewLine}{ex}");
|
||||
return ValueTask.FromResult(new OperResult<byte[]>($"ReadSourceAsync Error:{Environment.NewLine}{ex}"));
|
||||
}
|
||||
|
||||
}
|
||||
|
@@ -247,7 +247,6 @@ public class OpcUaMaster : CollectBase
|
||||
var addresss = deviceVariableSourceRead.VariableRuntimes.Where(a => !a.RegisterAddress.IsNullOrEmpty()).Select(a => a.RegisterAddress!).ToArray();
|
||||
try
|
||||
{
|
||||
await ReadWriteLock.ReaderLockAsync(cancellationToken).ConfigureAwait(false);
|
||||
|
||||
var result = await _plc.ReadJTokenValueAsync(addresss, cancellationToken).ConfigureAwait(false);
|
||||
foreach (var data in result)
|
||||
|
Reference in New Issue
Block a user