diff --git a/src/Admin/ThingsGateway.NewLife.X/Collections/ObjectPool.cs b/src/Admin/ThingsGateway.NewLife.X/Collections/ObjectPool.cs index 63b1e864f..44e197592 100644 --- a/src/Admin/ThingsGateway.NewLife.X/Collections/ObjectPool.cs +++ b/src/Admin/ThingsGateway.NewLife.X/Collections/ObjectPool.cs @@ -64,7 +64,10 @@ public class ObjectPool : DisposeBase, IPool where T : notnull // 启动定期清理的定时器 StartTimer(); } - + ~ObjectPool() + { + this.TryDispose(); + } /// 销毁 /// protected override void Dispose(Boolean disposing) diff --git a/src/Admin/ThingsGateway.NewLife.X/Common/ExpiringDictionary.cs b/src/Admin/ThingsGateway.NewLife.X/Common/ExpiringDictionary.cs index 4efc321e3..3b09eec94 100644 --- a/src/Admin/ThingsGateway.NewLife.X/Common/ExpiringDictionary.cs +++ b/src/Admin/ThingsGateway.NewLife.X/Common/ExpiringDictionary.cs @@ -5,39 +5,45 @@ namespace ThingsGateway.NewLife; public class ExpiringDictionary : IDisposable { - private readonly ConcurrentDictionary _dict = new(); + private ConcurrentDictionary _dict = new(); private readonly TimerX _cleanupTimer; - public ExpiringDictionary(int cleanupInterval = 600000) + public ExpiringDictionary(int cleanupInterval = 60000) { _cleanupTimer = new TimerX(Clear, null, cleanupInterval, cleanupInterval) { Async = true }; } public void TryAdd(TKey key, TValue value) { + if (_cleanupTimer.Disposed) throw new ObjectDisposedException(nameof(ExpiringDictionary)); _dict.TryAdd(key, value); } public bool TryGetValue(TKey key, out TValue value) { + if (_cleanupTimer.Disposed) throw new ObjectDisposedException(nameof(ExpiringDictionary)); return _dict.TryGetValue(key, out value); } public TValue GetOrAdd(TKey key, Func func) { + if (_cleanupTimer.Disposed) throw new ObjectDisposedException(nameof(ExpiringDictionary)); return _dict.GetOrAdd(key, func); } public TValue GetOrAdd(TKey key, TValue value) { + if (_cleanupTimer.Disposed) throw new ObjectDisposedException(nameof(ExpiringDictionary)); return _dict.GetOrAdd(key, value); } public bool TryRemove(TKey key) => _dict.TryRemove(key, out _); - public void Clear() => _dict.Clear(); + public void Clear() => Clear(null); private void Clear(object? state) { - _dict.Clear(); + var data = _dict; + _dict = new(); + data.Clear(); } public void Dispose() diff --git a/src/Admin/ThingsGateway.NewLife.X/Common/WaitLock.cs b/src/Admin/ThingsGateway.NewLife.X/Common/WaitLock.cs index b5014b9f9..943aa5626 100644 --- a/src/Admin/ThingsGateway.NewLife.X/Common/WaitLock.cs +++ b/src/Admin/ThingsGateway.NewLife.X/Common/WaitLock.cs @@ -8,6 +8,8 @@ // QQ群:605534569 //------------------------------------------------------------------------------ +using ThingsGateway.NewLife.Log; + namespace ThingsGateway.NewLife; /// @@ -50,24 +52,21 @@ public sealed class WaitLock : IDisposable public int CurrentCount => _waiterLock.CurrentCount; public bool Waitting => _waiterLock.CurrentCount < MaxCount; - private object m_lockObj = new(); /// /// 离开锁 /// public void Release() { if (DisposedValue) return; - lock (m_lockObj) + //if (Waitting) { - if (Waitting) + try { - try - { - _waiterLock.Release(); - } - catch (SemaphoreFullException) - { - } + _waiterLock.Release(); + } + catch (SemaphoreFullException) + { + XTrace.WriteException(new Exception($"WaitLock {_name} 释放失败,当前信号量无需释放")); } } } diff --git a/src/Admin/ThingsGateway.NewLife.X/Reflection/Reflect.cs b/src/Admin/ThingsGateway.NewLife.X/Reflection/Reflect.cs index 68e002106..895d30a1a 100644 --- a/src/Admin/ThingsGateway.NewLife.X/Reflection/Reflect.cs +++ b/src/Admin/ThingsGateway.NewLife.X/Reflection/Reflect.cs @@ -554,10 +554,7 @@ public static class Reflect //} - private static class DelegateCache - { - public static readonly ExpiringDictionary Cache = new(); - } + /// 把一个方法转为泛型委托,便于快速反射调用 /// @@ -580,38 +577,42 @@ public static class Reflect return func; } - private readonly struct DelegateCacheKey : IEquatable + #endregion +} +public static class DelegateCache +{ + public static readonly ExpiringDictionary Cache = new(); +} +public readonly struct DelegateCacheKey : IEquatable +{ + public readonly MethodInfo Method; + public readonly Type FuncType; + public readonly object? Target; + + public DelegateCacheKey(MethodInfo method, Type funcType, object? target) { - public readonly MethodInfo Method; - public readonly Type FuncType; - public readonly object? Target; + Method = method; + FuncType = funcType; + Target = target; + } - public DelegateCacheKey(MethodInfo method, Type funcType, object? target) + public bool Equals(DelegateCacheKey other) => + Method.Equals(other.Method) + && FuncType.Equals(other.FuncType) + && ReferenceEquals(Target, other.Target); + + public override bool Equals(object? obj) => + obj is DelegateCacheKey other && Equals(other); + + public override int GetHashCode() + { + unchecked { - Method = method; - FuncType = funcType; - Target = target; - } - - public bool Equals(DelegateCacheKey other) => - Method.Equals(other.Method) - && FuncType.Equals(other.FuncType) - && ReferenceEquals(Target, other.Target); - - public override bool Equals(object? obj) => - obj is DelegateCacheKey other && Equals(other); - - public override int GetHashCode() - { - unchecked - { - int hash = Method.GetHashCode(); - hash = (hash * 397) ^ FuncType.GetHashCode(); - if (Target != null) - hash = (hash * 397) ^ RuntimeHelpers.GetHashCode(Target); // 不受对象重写 GetHashCode 影响 - return hash; - } + int hash = Method.GetHashCode(); + hash = (hash * 397) ^ FuncType.GetHashCode(); + if (Target != null) + hash = (hash * 397) ^ RuntimeHelpers.GetHashCode(Target); // 不受对象重写 GetHashCode 影响 + return hash; } } - #endregion -} \ No newline at end of file +} diff --git a/src/Admin/ThingsGateway.NewLife.X/Threading/TimerX.cs b/src/Admin/ThingsGateway.NewLife.X/Threading/TimerX.cs index 4db3d3b12..b4144e9e7 100644 --- a/src/Admin/ThingsGateway.NewLife.X/Threading/TimerX.cs +++ b/src/Admin/ThingsGateway.NewLife.X/Threading/TimerX.cs @@ -1,6 +1,7 @@ using System.Reflection; using ThingsGateway.NewLife.Log; +using ThingsGateway.NewLife.Reflection; namespace ThingsGateway.NewLife.Threading; @@ -389,6 +390,13 @@ public class TimerX : ITimer, ITimerx, IDisposable // 释放非托管资源 Scheduler?.Remove(this, disposing ? "Dispose" : "GC"); + + DelegateCache.Cache.Clear(); +#if NET6_0_OR_GREATER + DelegateCache>.Cache.Clear(); +#endif + DelegateCache>.Cache.Clear(); + } #if NET6_0_OR_GREATER diff --git a/src/Directory.Build.props b/src/Directory.Build.props index afeb08682..89bdf1dc1 100644 --- a/src/Directory.Build.props +++ b/src/Directory.Build.props @@ -1,9 +1,9 @@ - 10.11.29 - 10.11.29 - 10.11.29 + 10.11.31 + 10.11.31 + 10.11.31 10.11.3 10.11.3 8.0.19 diff --git a/src/Foundation/ThingsGateway.Foundation/Channel/Extension/ChannelOptionsExtensions.cs b/src/Foundation/ThingsGateway.Foundation/Channel/Extension/ChannelOptionsExtensions.cs index e61f010fc..b96aed5ed 100644 --- a/src/Foundation/ThingsGateway.Foundation/Channel/Extension/ChannelOptionsExtensions.cs +++ b/src/Foundation/ThingsGateway.Foundation/Channel/Extension/ChannelOptionsExtensions.cs @@ -37,9 +37,7 @@ public static class ChannelOptionsExtensions for (int i = 0; i < funcs.Count; i++) { var func = funcs[i]; - var task = func.Invoke(clientChannel, e, i == funcs.Count - 1); - if (!task.IsCompleted) - await task.ConfigureAwait(false); + await func.Invoke(clientChannel, e, i == funcs.Count - 1).ConfigureAwait(false); if (e.Handled) { break; diff --git a/src/Foundation/ThingsGateway.Foundation/Channel/SerialPortChannel.cs b/src/Foundation/ThingsGateway.Foundation/Channel/SerialPortChannel.cs index 481f1ae45..a0e419322 100644 --- a/src/Foundation/ThingsGateway.Foundation/Channel/SerialPortChannel.cs +++ b/src/Foundation/ThingsGateway.Foundation/Channel/SerialPortChannel.cs @@ -192,16 +192,12 @@ public class SerialPortChannel : SerialPortClient, IClientChannel /// protected override async Task OnSerialReceived(ReceivedDataEventArgs e) { - var receivedTask = base.OnSerialReceived(e); - if (!receivedTask.IsCompleted) - await receivedTask.ConfigureAwait(false); + await base.OnSerialReceived(e).ConfigureAwait(false); if (e.Handled) return; - var channelReceivedTask = this.OnChannelReceivedEvent(e, ChannelReceived); - if (!channelReceivedTask.IsCompleted) - await channelReceivedTask.ConfigureAwait(false); + await this.OnChannelReceivedEvent(e, ChannelReceived).ConfigureAwait(false); } /// diff --git a/src/Foundation/ThingsGateway.Foundation/Channel/TcpClientChannel.cs b/src/Foundation/ThingsGateway.Foundation/Channel/TcpClientChannel.cs index a3909b867..e164f51af 100644 --- a/src/Foundation/ThingsGateway.Foundation/Channel/TcpClientChannel.cs +++ b/src/Foundation/ThingsGateway.Foundation/Channel/TcpClientChannel.cs @@ -179,16 +179,12 @@ public class TcpClientChannel : TcpClient, IClientChannel /// protected override async Task OnTcpReceived(ReceivedDataEventArgs e) { - var receivedTask = base.OnTcpReceived(e); - if (!receivedTask.IsCompleted) - await receivedTask.ConfigureAwait(false); + await base.OnTcpReceived(e).ConfigureAwait(false); if (e.Handled) return; - var channelReceivedTask = this.OnChannelReceivedEvent(e, ChannelReceived); - if (!channelReceivedTask.IsCompleted) - await channelReceivedTask.ConfigureAwait(false); + await this.OnChannelReceivedEvent(e, ChannelReceived).ConfigureAwait(false); } /// diff --git a/src/Foundation/ThingsGateway.Foundation/Channel/TcpServiceChannel.cs b/src/Foundation/ThingsGateway.Foundation/Channel/TcpServiceChannel.cs index df6bfe4d4..3a25be53b 100644 --- a/src/Foundation/ThingsGateway.Foundation/Channel/TcpServiceChannel.cs +++ b/src/Foundation/ThingsGateway.Foundation/Channel/TcpServiceChannel.cs @@ -241,16 +241,12 @@ public class TcpServiceChannel : TcpServiceChannelBase, IChann /// protected override async Task OnTcpReceived(TClient socketClient, ReceivedDataEventArgs e) { - var receivedTask = base.OnTcpReceived(socketClient, e); - if (!receivedTask.IsCompleted) - await receivedTask.ConfigureAwait(false); + await base.OnTcpReceived(socketClient, e).ConfigureAwait(false); if (e.Handled) return; - var channelReceivedTask = socketClient.OnChannelReceivedEvent(e, ChannelReceived); - if (!channelReceivedTask.IsCompleted) - await channelReceivedTask.ConfigureAwait(false); + await socketClient.OnChannelReceivedEvent(e, ChannelReceived).ConfigureAwait(false); } diff --git a/src/Foundation/ThingsGateway.Foundation/Channel/TcpSessionClientChannel.cs b/src/Foundation/ThingsGateway.Foundation/Channel/TcpSessionClientChannel.cs index 17bed0af3..1053c1905 100644 --- a/src/Foundation/ThingsGateway.Foundation/Channel/TcpSessionClientChannel.cs +++ b/src/Foundation/ThingsGateway.Foundation/Channel/TcpSessionClientChannel.cs @@ -145,15 +145,11 @@ public class TcpSessionClientChannel : TcpSessionClient, IClientChannel /// protected override async Task OnTcpReceived(ReceivedDataEventArgs e) { - var receivedTask = base.OnTcpReceived(e); - if (!receivedTask.IsCompleted) - await receivedTask.ConfigureAwait(false); + await base.OnTcpReceived(e).ConfigureAwait(false); if (e.Handled) return; - var channelReceivedTask = this.OnChannelReceivedEvent(e, ChannelReceived); - if (!channelReceivedTask.IsCompleted) - await channelReceivedTask.ConfigureAwait(false); + await this.OnChannelReceivedEvent(e, ChannelReceived).ConfigureAwait(false); } } diff --git a/src/Foundation/ThingsGateway.Foundation/Channel/UdpSessionChannel.cs b/src/Foundation/ThingsGateway.Foundation/Channel/UdpSessionChannel.cs index 9d0839b7c..6f93143f3 100644 --- a/src/Foundation/ThingsGateway.Foundation/Channel/UdpSessionChannel.cs +++ b/src/Foundation/ThingsGateway.Foundation/Channel/UdpSessionChannel.cs @@ -196,16 +196,12 @@ public class UdpSessionChannel : UdpSession, IClientChannel /// protected override async Task OnUdpReceived(UdpReceivedDataEventArgs e) { - var receivedTask = base.OnUdpReceived(e); - if (!receivedTask.IsCompleted) - await receivedTask.ConfigureAwait(false); + await base.OnUdpReceived(e).ConfigureAwait(false); if (e.Handled) return; - var channelReceivedTask = this.OnChannelReceivedEvent(e, ChannelReceived); - if (!channelReceivedTask.IsCompleted) - await channelReceivedTask.ConfigureAwait(false); + await this.OnChannelReceivedEvent(e, ChannelReceived).ConfigureAwait(false); } diff --git a/src/Foundation/ThingsGateway.Foundation/DataHandleAdapter/DeviceSingleStreamDataHandleAdapter.cs b/src/Foundation/ThingsGateway.Foundation/DataHandleAdapter/DeviceSingleStreamDataHandleAdapter.cs index 154264f0b..bf91b4bf2 100644 --- a/src/Foundation/ThingsGateway.Foundation/DataHandleAdapter/DeviceSingleStreamDataHandleAdapter.cs +++ b/src/Foundation/ThingsGateway.Foundation/DataHandleAdapter/DeviceSingleStreamDataHandleAdapter.cs @@ -10,8 +10,6 @@ using System.Text; -using ThingsGateway.NewLife.Collections; - namespace ThingsGateway.Foundation; /// @@ -59,13 +57,6 @@ public class DeviceSingleStreamDataHandleAdapter : CustomDataHandlingA /// public void SetRequest(ISendMessage sendMessage) { - if (IsSingleThread) - { - if (Request != null) - { - _requestPool.Return(Request); - } - } var request = GetInstance(); request.Sign = sendMessage.Sign; request.SendInfo(sendMessage); @@ -165,25 +156,13 @@ public class DeviceSingleStreamDataHandleAdapter : CustomDataHandlingA } } - private static ObjectPool _requestPool { get; } = new ObjectPool(); - /// /// 获取泛型实例。 /// /// protected virtual TRequest GetInstance() { - if (IsSingleThread) - { - var request = _requestPool.Get(); - request.OperCode = -1; - request.Sign = -1; - return request; - } - else - { - return new TRequest() { OperCode = -1, Sign = -1 }; - } + return new TRequest() { OperCode = -1, Sign = -1 }; } public override void SendInput(ref TWriter writer, in ReadOnlyMemory memory) diff --git a/src/Foundation/ThingsGateway.Foundation/DataHandleAdapter/DeviceUdpDataHandleAdapter.cs b/src/Foundation/ThingsGateway.Foundation/DataHandleAdapter/DeviceUdpDataHandleAdapter.cs index 5149aac40..eaf8d7d85 100644 --- a/src/Foundation/ThingsGateway.Foundation/DataHandleAdapter/DeviceUdpDataHandleAdapter.cs +++ b/src/Foundation/ThingsGateway.Foundation/DataHandleAdapter/DeviceUdpDataHandleAdapter.cs @@ -11,8 +11,6 @@ using System.Net; using System.Text; -using ThingsGateway.NewLife.Collections; - namespace ThingsGateway.Foundation; /// @@ -52,13 +50,6 @@ public class DeviceUdpDataHandleAdapter : UdpDataHandlingAdapter, IDev /// public void SetRequest(ISendMessage sendMessage) { - if (IsSingleThread) - { - if (Request != null) - { - _requestPool.Return(Request); - } - } var request = GetInstance(); request.Sign = sendMessage.Sign; request.SendInfo(sendMessage); @@ -71,26 +62,14 @@ public class DeviceUdpDataHandleAdapter : UdpDataHandlingAdapter, IDev return Owner.ToString(); } - private static ObjectPool _requestPool { get; } = new ObjectPool(); - - /// /// 获取泛型实例。 /// /// protected virtual TRequest GetInstance() { - if (IsSingleThread) - { - var request = _requestPool.Get(); - request.OperCode = -1; - request.Sign = -1; - return request; - } - else - { - return new TRequest() { OperCode = -1, Sign = -1 }; - } + + return new TRequest() { OperCode = -1, Sign = -1 }; } diff --git a/src/Foundation/ThingsGateway.Foundation/Device/DeviceBase.cs b/src/Foundation/ThingsGateway.Foundation/Device/DeviceBase.cs index b17a9e5eb..e5111ec54 100644 --- a/src/Foundation/ThingsGateway.Foundation/Device/DeviceBase.cs +++ b/src/Foundation/ThingsGateway.Foundation/Device/DeviceBase.cs @@ -15,6 +15,7 @@ using System.Net; using ThingsGateway.Foundation.Extension.Generic; using ThingsGateway.Foundation.Extension.String; using ThingsGateway.NewLife; +using ThingsGateway.NewLife.Collections; using ThingsGateway.NewLife.Extension; using TouchSocket.SerialPorts; @@ -330,15 +331,10 @@ public abstract class DeviceBase : AsyncAndSyncDisposableObject, IDevice } public bool AutoConnect { get; protected set; } = true; /// - private async ValueTask SendAsync(ISendMessage sendMessage, IClientChannel channel = default, EndPoint endPoint = default, CancellationToken token = default) + private async ValueTask SendAsync(ISendMessage sendMessage, IClientChannel channel, CancellationToken token = default) { try { - if (channel == default) - { - if (Channel is not IClientChannel clientChannel) { throw new ArgumentNullException(nameof(channel)); } - channel = clientChannel; - } if (SendDelayTime != 0) await Task.Delay(SendDelayTime, token).ConfigureAwait(false); @@ -348,19 +344,13 @@ public abstract class DeviceBase : AsyncAndSyncDisposableObject, IDevice if (channel is IDtuUdpSessionChannel udpSession) { - var sendTask = udpSession.SendAsync(endPoint, sendMessage, token); - if (!sendTask.IsCompleted) - { - await sendTask.ConfigureAwait(false); - } + EndPoint? endPoint = GetUdpEndpoint(); + await udpSession.SendAsync(endPoint, sendMessage, token).ConfigureAwait(false); + } else { - var sendTask = channel.SendAsync(sendMessage, token); - if (!sendTask.IsCompleted) - { - await sendTask.ConfigureAwait(false); - } + await channel.SendAsync(sendMessage, token).ConfigureAwait(false); } return OperResult.Success; @@ -415,25 +405,19 @@ public abstract class DeviceBase : AsyncAndSyncDisposableObject, IDevice { try { - var dtuId = this is IDtu dtu1 ? dtu1.DtuId : null; - var channelResult = GetChannel(dtuId); + var channelResult = GetChannel(); if (!channelResult.IsSuccess) return new OperResult(channelResult); - WaitLock? waitLock = GetWaitLock(channelResult.Content, dtuId); + WaitLock? waitLock = GetWaitLock(channelResult.Content); try { - var beforeSendTask = BeforeSendAsync(channelResult.Content, cancellationToken); - if (!beforeSendTask.IsCompleted) - { - await beforeSendTask.ConfigureAwait(false); - } + await BeforeSendAsync(channelResult.Content, cancellationToken).ConfigureAwait(false); await waitLock.WaitAsync(cancellationToken).ConfigureAwait(false); channelResult.Content.SetDataHandlingAdapterLogger(Logger); - EndPoint? endPoint = GetUdpEndpoint(dtuId); - return await SendAsync(sendMessage, channelResult.Content, endPoint, cancellationToken).ConfigureAwait(false); + return await SendAsync(sendMessage, channelResult.Content, cancellationToken).ConfigureAwait(false); } finally { @@ -449,8 +433,13 @@ public abstract class DeviceBase : AsyncAndSyncDisposableObject, IDevice } /// - public virtual OperResult GetChannel(string socketId) + public virtual OperResult GetChannel() { + if (Channel is IClientChannel clientChannel1) + return new OperResult() { Content = clientChannel1 }; + + var socketId = this is IDtu dtu1 ? dtu1.DtuId : null; + if (string.IsNullOrWhiteSpace(socketId)) { if (Channel is IClientChannel clientChannel) @@ -485,10 +474,11 @@ public abstract class DeviceBase : AsyncAndSyncDisposableObject, IDevice } /// - public virtual EndPoint GetUdpEndpoint(string socketId) + public virtual EndPoint GetUdpEndpoint() { if (Channel is IDtuUdpSessionChannel udpSessionChannel) { + var socketId = this is IDtu dtu1 ? dtu1.DtuId : null; if (string.IsNullOrWhiteSpace(socketId)) return udpSessionChannel.DefaultEndpoint; @@ -514,7 +504,7 @@ public abstract class DeviceBase : AsyncAndSyncDisposableObject, IDevice /// public virtual ValueTask>> SendThenReturnAsync(ISendMessage sendMessage, CancellationToken cancellationToken = default) { - var channelResult = GetChannel(this is IDtu dtu ? dtu.DtuId : null); + var channelResult = GetChannel(); if (!channelResult.IsSuccess) return EasyValueTask.FromResult(new OperResult>(channelResult)); return SendThenReturnAsync(sendMessage, channelResult.Content, cancellationToken); } @@ -524,18 +514,8 @@ public abstract class DeviceBase : AsyncAndSyncDisposableObject, IDevice { try { - var sendTask = SendThenReturnMessageAsync(sendMessage, channel, cancellationToken); - if (!sendTask.IsCompleted) - { - var result = await sendTask.ConfigureAwait(false); - return new OperResult>(result) { Content = result.Content }; - } - else - { - var result = sendTask.Result; - return new OperResult>(result) { Content = result.Content }; - } - + var result = await SendThenReturnMessageAsync(sendMessage, channel, cancellationToken).ConfigureAwait(false); + return new OperResult>(result) { Content = result.Content }; } catch (Exception ex) { @@ -546,7 +526,7 @@ public abstract class DeviceBase : AsyncAndSyncDisposableObject, IDevice /// protected virtual ValueTask SendThenReturnMessageAsync(ISendMessage sendMessage, CancellationToken cancellationToken = default) { - var channelResult = GetChannel(this is IDtu dtu ? dtu.DtuId : null); + var channelResult = GetChannel(); if (!channelResult.IsSuccess) return EasyValueTask.FromResult(new MessageBase(channelResult)); return SendThenReturnMessageAsync(sendMessage, channelResult.Content, cancellationToken); } @@ -557,88 +537,69 @@ public abstract class DeviceBase : AsyncAndSyncDisposableObject, IDevice return GetResponsedDataAsync(command, clientChannel, Timeout, cancellationToken); } + private ObjectPool _reusableTimeouts = new(); + /// /// 发送并等待数据 /// - protected async ValueTask GetResponsedDataAsync(ISendMessage command, IClientChannel clientChannel, int timeout = 3000, CancellationToken cancellationToken = default) + protected async ValueTask GetResponsedDataAsync( + ISendMessage command, + IClientChannel clientChannel, + int timeout = 3000, + CancellationToken cancellationToken = default) { var waitData = clientChannel.WaitHandlePool.GetWaitDataAsync(out var sign); command.Sign = sign; WaitLock? waitLock = null; + try { - var beforeSendTask = BeforeSendAsync(clientChannel, cancellationToken); - if (!beforeSendTask.IsCompleted) - { - await beforeSendTask.ConfigureAwait(false); - } - var dtuId = this is IDtu dtu1 ? dtu1.DtuId : null; - waitLock = GetWaitLock(clientChannel, dtuId); + await BeforeSendAsync(clientChannel, cancellationToken).ConfigureAwait(false); + waitLock = GetWaitLock(clientChannel); await waitLock.WaitAsync(cancellationToken).ConfigureAwait(false); - EndPoint? endPoint = GetUdpEndpoint(dtuId); - - if (cancellationToken.IsCancellationRequested) - return new MessageBase(new OperationCanceledException()); - clientChannel.SetDataHandlingAdapterLogger(Logger); + var sendResult = await SendAsync(command, clientChannel, cancellationToken).ConfigureAwait(false); + if (!sendResult.IsSuccess) + return new MessageBase(sendResult); - if (cancellationToken.IsCancellationRequested) - return new MessageBase(new OperationCanceledException()); + if (waitData.Status == WaitDataStatus.Success) + return waitData.CompletedData; - OperResult sendOperResult = default; - var sendTask = SendAsync(command, clientChannel, endPoint, cancellationToken); - if (!sendTask.IsCompleted) - { - sendOperResult = await sendTask.ConfigureAwait(false); - } - else - { - sendOperResult = sendTask.Result; - } + bool timeoutStatus = false; - if (!sendOperResult.IsSuccess) - return new MessageBase(sendOperResult); - - using var ctsTime = new CancellationTokenSource(timeout); + var reusableTimeout = _reusableTimeouts.Get(); try { - using var cts = CancellationTokenSource.CreateLinkedTokenSource(ctsTime.Token, Channel.ClosedToken); - var waitDataTask = waitData.WaitAsync(cts.Token); - if (!waitDataTask.IsCompleted) - { - await waitDataTask.ConfigureAwait(false); - } + var cts = reusableTimeout.GetTokenSource(timeout, cancellationToken, Channel.ClosedToken); + await waitData.WaitAsync(cts.Token).ConfigureAwait(false); } catch (OperationCanceledException) { - if (ctsTime.IsCancellationRequested) - { - return new MessageBase(new TimeoutException()); - } + timeoutStatus = reusableTimeout.TimeoutStatus; + return timeoutStatus + ? new MessageBase(new TimeoutException()) + : new MessageBase(new OperationCanceledException()); } catch (Exception ex) { return new MessageBase(ex); } - - var result = waitData.Check(ctsTime.Token); - - if (result.IsSuccess) + finally { - return waitData.CompletedData; - } - else - { - return new MessageBase(result); + reusableTimeout.Set(); + timeoutStatus = reusableTimeout.TimeoutStatus; + _reusableTimeouts.Return(reusableTimeout); } + + return waitData.Status == WaitDataStatus.Success + ? waitData.CompletedData + : new MessageBase(waitData.Check(timeoutStatus)); } catch (Exception ex) { - if (!cancellationToken.IsCancellationRequested) - await Task.Delay(1000, cancellationToken).ConfigureAwait(false); return new MessageBase(ex); } finally @@ -648,12 +609,13 @@ public abstract class DeviceBase : AsyncAndSyncDisposableObject, IDevice } } - private static WaitLock GetWaitLock(IClientChannel clientChannel, string dtuId) + + private WaitLock GetWaitLock(IClientChannel clientChannel) { WaitLock? waitLock = null; if (clientChannel is IDtuUdpSessionChannel udpSessionChannel) { - waitLock = udpSessionChannel.GetLock(dtuId); + waitLock = udpSessionChannel.GetLock(this is IDtu dtu1 ? dtu1.DtuId : null); } waitLock ??= clientChannel.GetLock(null); return waitLock; @@ -1068,7 +1030,7 @@ public abstract class DeviceBase : AsyncAndSyncDisposableObject, IDevice Channel.Collects.Remove(this); } } - + _reusableTimeouts?.SafeDispose(); _deviceLogger?.TryDispose(); base.Dispose(disposing); } @@ -1120,6 +1082,7 @@ public abstract class DeviceBase : AsyncAndSyncDisposableObject, IDevice Channel.Collects.Remove(this); } + _reusableTimeouts?.SafeDispose(); _deviceLogger?.TryDispose(); base.Dispose(disposing); } diff --git a/src/Foundation/ThingsGateway.Foundation/Device/DeviceExtension.cs b/src/Foundation/ThingsGateway.Foundation/Device/DeviceExtension.cs index 4eeddcd46..0812ef184 100644 --- a/src/Foundation/ThingsGateway.Foundation/Device/DeviceExtension.cs +++ b/src/Foundation/ThingsGateway.Foundation/Device/DeviceExtension.cs @@ -134,7 +134,7 @@ public static partial class DeviceExtension /// /// 当状态不是时返回异常。 /// - public static OperResult Check(this AsyncWaitData waitDataAsync, CancellationToken cancellationToken) + public static OperResult Check(this AsyncWaitData waitDataAsync, bool timeout) { switch (waitDataAsync.Status) { @@ -142,7 +142,7 @@ public static partial class DeviceExtension return new(); case WaitDataStatus.Canceled: - if (cancellationToken.IsCancellationRequested) + if (timeout) { if (waitDataAsync.CompletedData != null) { diff --git a/src/Foundation/ThingsGateway.Foundation/Device/IDevice.cs b/src/Foundation/ThingsGateway.Foundation/Device/IDevice.cs index e18ddfbb9..16f36a1c4 100644 --- a/src/Foundation/ThingsGateway.Foundation/Device/IDevice.cs +++ b/src/Foundation/ThingsGateway.Foundation/Device/IDevice.cs @@ -426,9 +426,8 @@ public interface IDevice : IDisposable, IDisposableObject, IAsyncDisposable /// /// 获取通道 /// - /// /// - OperResult GetChannel(string socketId); + OperResult GetChannel(); /// /// 发送,会经过适配器,可传入socketId,如果为空,则默认通道必须为类型 diff --git a/src/Foundation/ThingsGateway.Foundation/Utils/ReusableCancellationTokenSource.cs b/src/Foundation/ThingsGateway.Foundation/Utils/ReusableCancellationTokenSource.cs new file mode 100644 index 000000000..fe2e4c0c3 --- /dev/null +++ b/src/Foundation/ThingsGateway.Foundation/Utils/ReusableCancellationTokenSource.cs @@ -0,0 +1,117 @@ +//------------------------------------------------------------------------------ +// 此代码版权(除特别声明或在XREF结尾的命名空间的代码)归作者本人若汝棋茗所有 +// 源代码使用协议遵循本仓库的开源协议及附加协议,若本仓库没有设置,则按MIT开源协议授权 +// CSDN博客:https://blog.csdn.net/qq_40374647 +// 哔哩哔哩视频:https://space.bilibili.com/94253567 +// Gitee源代码仓库:https://gitee.com/RRQM_Home +// Github源代码仓库:https://github.com/RRQM +// API首页:https://touchsocket.net/ +// 交流QQ群:234762506 +// 感谢您的下载和使用 +//------------------------------------------------------------------------------ + +namespace ThingsGateway.Foundation; + +using System; +using System.Threading; + +public sealed class ReusableCancellationTokenSource : IDisposable +{ + private readonly Timer _timer; + private readonly object _lock = new(); + private CancellationTokenSource? _cts; + + public ReusableCancellationTokenSource() + { + _timer = new Timer(OnTimeout, null, Timeout.Infinite, Timeout.Infinite); + } + + public bool TimeoutStatus = false; + + private void OnTimeout(object? state) + { + lock (_lock) + { + TimeoutStatus = true; + + if (_cts?.IsCancellationRequested == false) + _cts?.Cancel(); + + } + } + /// + /// 获取一个 CTS,并启动超时 + /// + public CancellationTokenSource GetTokenSource(long timeout, CancellationToken external1 = default) + { + lock (_lock) + { + TimeoutStatus = false; + + // 如果已有 CTS,先 Dispose + _cts?.SafeCancel(); + _cts?.SafeDispose(); + + // 创建新的 CTS + _cts = CancellationTokenSource.CreateLinkedTokenSource(external1); + + // 启动 Timer + _timer.Change(timeout, Timeout.Infinite); + + return _cts; + } + } + + /// + /// 获取一个 CTS,并启动超时 + /// + public CancellationTokenSource GetTokenSource(long timeout, CancellationToken external1 = default, CancellationToken external2 = default, CancellationToken external3 = default) + { + lock (_lock) + { + TimeoutStatus = false; + + // 如果已有 CTS,先 Dispose + _cts?.SafeCancel(); + _cts?.SafeDispose(); + + // 创建新的 CTS + _cts = CancellationTokenSource.CreateLinkedTokenSource(external1, external2, external3); + + // 启动 Timer + _timer.Change(timeout, Timeout.Infinite); + + return _cts; + } + } + + + public void Set() + { + _timer?.Change(Timeout.Infinite, Timeout.Infinite); + } + + /// + /// 手动取消 + /// + public void Cancel() + { + lock (_lock) + { + _cts?.SafeCancel(); + } + } + + public void Dispose() + { + lock (_lock) + { + _cts?.SafeCancel(); + _cts?.SafeDispose(); + _timer.SafeDispose(); + } + } +} + + + diff --git a/src/Gateway/ThingsGateway.Gateway.Application/Common/LinkedCancellationTokenSourceCache.cs b/src/Gateway/ThingsGateway.Gateway.Application/Common/LinkedCancellationTokenSourceCache.cs index 2fc73de81..403d58b40 100644 --- a/src/Gateway/ThingsGateway.Gateway.Application/Common/LinkedCancellationTokenSourceCache.cs +++ b/src/Gateway/ThingsGateway.Gateway.Application/Common/LinkedCancellationTokenSourceCache.cs @@ -54,4 +54,3 @@ public class LinkedCancellationTokenSourceCache : IDisposable } - diff --git a/src/Plugin/ThingsGateway.Foundation.SiemensS7/S7/SiemensS7Master.cs b/src/Plugin/ThingsGateway.Foundation.SiemensS7/S7/SiemensS7Master.cs index f3ee8760f..7874cdfb6 100644 --- a/src/Plugin/ThingsGateway.Foundation.SiemensS7/S7/SiemensS7Master.cs +++ b/src/Plugin/ThingsGateway.Foundation.SiemensS7/S7/SiemensS7Master.cs @@ -139,157 +139,192 @@ public partial class SiemensS7Master : DeviceBase /// /// 此方法并不会智能分组以最大化效率,减少传输次数,因为返回值是byte[],所以一切都按地址数组的顺序执行,最后合并数组 /// - public async ValueTask>> S7ReadAsync(SiemensS7Address[] sAddresss, CancellationToken cancellationToken = default) + public async ValueTask>> S7ReadAsync( + SiemensS7Address[] addresses, + CancellationToken cancellationToken = default) { + var byteBuffer = new ValueByteBlock(512); + + try { - var byteBlock = new ValueByteBlock(2048); - try + foreach (var address in addresses) { - foreach (var sAddress in sAddresss) + int readCount = 0; + int totalLength = address.Length == 0 ? 1 : address.Length; + int originalStart = address.AddressStart; + + try { - int num = 0; - var addressLen = sAddress.Length == 0 ? 1 : sAddress.Length; - var start = sAddress.AddressStart; - try + while (readCount < totalLength) { - while (num < addressLen) + // 每次读取的 PDU 长度,循环直到读取完整 + int chunkLength = Math.Min(totalLength - readCount, PduLength); + address.Length = chunkLength; + + var result = await SendThenReturnAsync( + new S7Send([address], true), + cancellationToken: cancellationToken + ).ConfigureAwait(false); + + if (!result.IsSuccess) + return result; + + byteBuffer.Write(result.Content.Span); + + if (readCount + chunkLength >= totalLength) { - //pdu长度,重复生成报文,直至全部生成 - int len = Math.Min(addressLen - num, PduLength); - sAddress.Length = len; - var result = await SendThenReturnAsync(new S7Send([sAddress], true), cancellationToken: cancellationToken).ConfigureAwait(false); - if (!result.IsSuccess) return result; - - byteBlock.Write(result.Content.Span); - num += len; - - if (sAddress.DataCode == S7Area.TM || sAddress.DataCode == S7Area.CT) + if (addresses.Length == 1) { - sAddress.AddressStart += len / 2; - } - else - { - sAddress.AddressStart += len * 8; + return result; } + break; } - } - finally - { - sAddress.AddressStart = start; + + readCount += chunkLength; + + // 更新地址起点 + if (address.DataCode == S7Area.TM || address.DataCode == S7Area.CT) + address.AddressStart += chunkLength / 2; + else + address.AddressStart += chunkLength * 8; } } + finally + { + address.AddressStart = originalStart; + } + } - return new OperResult>() { Content = byteBlock.ToArray() }; - } - catch (Exception ex) - { - return new OperResult>(ex); - } - finally - { - byteBlock.SafeDispose(); - } + return new OperResult> { Content = byteBuffer.ToArray() }; + } + catch (Exception ex) + { + return new OperResult>(ex); + } + finally + { + byteBuffer.SafeDispose(); } } + + /// /// 此方法并不会智能分组以最大化效率,减少传输次数,因为返回值是byte[],所以一切都按地址数组的顺序执行,最后合并数组 /// - public async ValueTask> S7WriteAsync(SiemensS7Address[] sAddresss, CancellationToken cancellationToken = default) + public async ValueTask> S7WriteAsync( + SiemensS7Address[] addresses, + CancellationToken cancellationToken = default) { var dictOperResult = new Dictionary(); void SetFailOperResult(OperResult operResult) { - foreach (var item in sAddresss) + foreach (var address in addresses) { - dictOperResult.TryAdd(item, operResult); + dictOperResult.TryAdd(address, operResult); } } + var firstAddress = addresses[0]; + + // 单位写入(位写入) + if (addresses.Length <= 1 && firstAddress.IsBit) { - var sAddress = sAddresss[0]; - if (sAddresss.Length <= 1 && sAddress.IsBit) + var byteBuffer = new ValueByteBlock(512); + try + { + var writeResult = await SendThenReturnAsync( + new S7Send([firstAddress], false), + cancellationToken: cancellationToken + ).ConfigureAwait(false); + + dictOperResult.TryAdd(firstAddress, writeResult); + return dictOperResult; + } + catch (Exception ex) + { + SetFailOperResult(new OperResult(ex)); + return dictOperResult; + } + finally + { + byteBuffer.SafeDispose(); + } + } + else + { + // 多写入 + var addressChunks = new List>(); + ushort dataLength = 0; + ushort itemCount = 1; + var currentChunk = new List(); + + for (int i = 0; i < addresses.Length; i++) + { + var address = addresses[i]; + dataLength += (ushort)(address.Data.Length + 4); + ushort telegramLength = (ushort)(itemCount * 12 + 19 + dataLength); + + if (telegramLength < PduLength) + { + currentChunk.Add(address); + itemCount++; + + if (i == addresses.Length - 1) + addressChunks.Add(currentChunk); + } + else + { + addressChunks.Add(currentChunk); + currentChunk = new List(); + dataLength = 0; + itemCount = 1; + + dataLength += (ushort)(address.Data.Length + 4); + telegramLength = (ushort)(itemCount * 12 + 19 + dataLength); + + if (telegramLength < PduLength) + { + currentChunk.Add(address); + itemCount++; + + if (i == addresses.Length - 1) + addressChunks.Add(currentChunk); + } + else + { + SetFailOperResult(new OperResult("Write length exceeds limit")); + return dictOperResult; + } + } + } + + foreach (var chunk in addressChunks) { - var byteBlock = new ValueByteBlock(2048); try { - var wresult = await SendThenReturnAsync(new S7Send([sAddress], false), cancellationToken: cancellationToken).ConfigureAwait(false); - dictOperResult.TryAdd(sAddress, wresult); - return dictOperResult; + var result = await SendThenReturnAsync( + new S7Send(chunk.ToArray(), false), + cancellationToken: cancellationToken + ).ConfigureAwait(false); + + foreach (var addr in chunk) + { + dictOperResult.TryAdd(addr, result); + } } catch (Exception ex) { SetFailOperResult(new OperResult(ex)); return dictOperResult; } - finally - { - byteBlock.SafeDispose(); - } } - else - { - //多写 - List> siemensS7Addresses = new(); - ushort dataLen = 0; - ushort itemLen = 1; - List addresses = new(); - for (int i = 0; i < sAddresss.Length; i++) - { - var item = sAddresss[i]; - dataLen = (ushort)(dataLen + item.Data.Length + 4); - ushort telegramLen = (ushort)(itemLen * 12 + 19 + dataLen); - if (telegramLen < PduLength) - { - addresses.Add(item); - itemLen++; - if (i == sAddresss.Length - 1) - siemensS7Addresses.Add(addresses); - } - else - { - siemensS7Addresses.Add(addresses); - addresses = new(); - dataLen = 0; - itemLen = 1; - dataLen = (ushort)(dataLen + item.Data.Length + 4); - telegramLen = (ushort)(itemLen * 12 + 19 + dataLen); - if (telegramLen < PduLength) - { - addresses.Add(item); - itemLen++; - if (i == sAddresss.Length - 1) - siemensS7Addresses.Add(addresses); - } - else - { - SetFailOperResult(new OperResult("Write length exceeds limit")); - return dictOperResult; - } - } - } - foreach (var item in siemensS7Addresses) - { - try - { - var result = await SendThenReturnAsync(new S7Send(item.ToArray(), false), cancellationToken: cancellationToken).ConfigureAwait(false); - foreach (var i1 in item) - { - dictOperResult.TryAdd(i1, result); - } - } - catch (Exception ex) - { - SetFailOperResult(new OperResult(ex)); - return dictOperResult; - } - } - return dictOperResult; - } + return dictOperResult; } } + #region 读写 ///