This commit is contained in:
2248356998 qq.com
2025-09-08 17:56:19 +08:00
parent 2e00e8c135
commit aa247422d2
20 changed files with 406 additions and 340 deletions

View File

@@ -64,7 +64,10 @@ public class ObjectPool<T> : DisposeBase, IPool<T> where T : notnull
// 启动定期清理的定时器
StartTimer();
}
~ObjectPool()
{
this.TryDispose();
}
/// <summary>销毁</summary>
/// <param name="disposing"></param>
protected override void Dispose(Boolean disposing)

View File

@@ -5,39 +5,45 @@ namespace ThingsGateway.NewLife;
public class ExpiringDictionary<TKey, TValue> : IDisposable
{
private readonly ConcurrentDictionary<TKey, TValue> _dict = new();
private ConcurrentDictionary<TKey, TValue> _dict = new();
private readonly TimerX _cleanupTimer;
public ExpiringDictionary(int cleanupInterval = 600000)
public ExpiringDictionary(int cleanupInterval = 60000)
{
_cleanupTimer = new TimerX(Clear, null, cleanupInterval, cleanupInterval) { Async = true };
}
public void TryAdd(TKey key, TValue value)
{
if (_cleanupTimer.Disposed) throw new ObjectDisposedException(nameof(ExpiringDictionary<TKey, TValue>));
_dict.TryAdd(key, value);
}
public bool TryGetValue(TKey key, out TValue value)
{
if (_cleanupTimer.Disposed) throw new ObjectDisposedException(nameof(ExpiringDictionary<TKey, TValue>));
return _dict.TryGetValue(key, out value);
}
public TValue GetOrAdd(TKey key, Func<TKey, TValue> func)
{
if (_cleanupTimer.Disposed) throw new ObjectDisposedException(nameof(ExpiringDictionary<TKey, TValue>));
return _dict.GetOrAdd(key, func);
}
public TValue GetOrAdd(TKey key, TValue value)
{
if (_cleanupTimer.Disposed) throw new ObjectDisposedException(nameof(ExpiringDictionary<TKey, TValue>));
return _dict.GetOrAdd(key, value);
}
public bool TryRemove(TKey key) => _dict.TryRemove(key, out _);
public void Clear() => _dict.Clear();
public void Clear() => Clear(null);
private void Clear(object? state)
{
_dict.Clear();
var data = _dict;
_dict = new();
data.Clear();
}
public void Dispose()

View File

@@ -8,6 +8,8 @@
// QQ群605534569
//------------------------------------------------------------------------------
using ThingsGateway.NewLife.Log;
namespace ThingsGateway.NewLife;
/// <summary>
@@ -50,16 +52,13 @@ public sealed class WaitLock : IDisposable
public int CurrentCount => _waiterLock.CurrentCount;
public bool Waitting => _waiterLock.CurrentCount < MaxCount;
private object m_lockObj = new();
/// <summary>
/// 离开锁
/// </summary>
public void Release()
{
if (DisposedValue) return;
lock (m_lockObj)
{
if (Waitting)
//if (Waitting)
{
try
{
@@ -67,7 +66,7 @@ public sealed class WaitLock : IDisposable
}
catch (SemaphoreFullException)
{
}
XTrace.WriteException(new Exception($"WaitLock {_name} 释放失败,当前信号量无需释放"));
}
}
}

View File

@@ -554,10 +554,7 @@ public static class Reflect
//}
private static class DelegateCache<TFunc>
{
public static readonly ExpiringDictionary<DelegateCacheKey, TFunc> Cache = new();
}
/// <summary>把一个方法转为泛型委托,便于快速反射调用</summary>
/// <typeparam name="TFunc"></typeparam>
@@ -580,8 +577,14 @@ public static class Reflect
return func;
}
private readonly struct DelegateCacheKey : IEquatable<DelegateCacheKey>
{
#endregion
}
public static class DelegateCache<TFunc>
{
public static readonly ExpiringDictionary<DelegateCacheKey, TFunc> Cache = new();
}
public readonly struct DelegateCacheKey : IEquatable<DelegateCacheKey>
{
public readonly MethodInfo Method;
public readonly Type FuncType;
public readonly object? Target;
@@ -612,6 +615,4 @@ public static class Reflect
return hash;
}
}
}
#endregion
}

View File

@@ -1,6 +1,7 @@
using System.Reflection;
using ThingsGateway.NewLife.Log;
using ThingsGateway.NewLife.Reflection;
namespace ThingsGateway.NewLife.Threading;
@@ -389,6 +390,13 @@ public class TimerX : ITimer, ITimerx, IDisposable
// 释放非托管资源
Scheduler?.Remove(this, disposing ? "Dispose" : "GC");
DelegateCache<TimerCallback>.Cache.Clear();
#if NET6_0_OR_GREATER
DelegateCache<Func<Object?, ValueTask>>.Cache.Clear();
#endif
DelegateCache<Func<Object?, Task>>.Cache.Clear();
}
#if NET6_0_OR_GREATER

View File

@@ -1,9 +1,9 @@
<Project>
<PropertyGroup>
<PluginVersion>10.11.29</PluginVersion>
<ProPluginVersion>10.11.29</ProPluginVersion>
<DefaultVersion>10.11.29</DefaultVersion>
<PluginVersion>10.11.31</PluginVersion>
<ProPluginVersion>10.11.31</ProPluginVersion>
<DefaultVersion>10.11.31</DefaultVersion>
<AuthenticationVersion>10.11.3</AuthenticationVersion>
<SourceGeneratorVersion>10.11.3</SourceGeneratorVersion>
<NET8Version>8.0.19</NET8Version>

View File

@@ -37,9 +37,7 @@ public static class ChannelOptionsExtensions
for (int i = 0; i < funcs.Count; i++)
{
var func = funcs[i];
var task = func.Invoke(clientChannel, e, i == funcs.Count - 1);
if (!task.IsCompleted)
await task.ConfigureAwait(false);
await func.Invoke(clientChannel, e, i == funcs.Count - 1).ConfigureAwait(false);
if (e.Handled)
{
break;

View File

@@ -192,16 +192,12 @@ public class SerialPortChannel : SerialPortClient, IClientChannel
/// <inheritdoc/>
protected override async Task OnSerialReceived(ReceivedDataEventArgs e)
{
var receivedTask = base.OnSerialReceived(e);
if (!receivedTask.IsCompleted)
await receivedTask.ConfigureAwait(false);
await base.OnSerialReceived(e).ConfigureAwait(false);
if (e.Handled)
return;
var channelReceivedTask = this.OnChannelReceivedEvent(e, ChannelReceived);
if (!channelReceivedTask.IsCompleted)
await channelReceivedTask.ConfigureAwait(false);
await this.OnChannelReceivedEvent(e, ChannelReceived).ConfigureAwait(false);
}
/// <inheritdoc/>

View File

@@ -179,16 +179,12 @@ public class TcpClientChannel : TcpClient, IClientChannel
/// <inheritdoc/>
protected override async Task OnTcpReceived(ReceivedDataEventArgs e)
{
var receivedTask = base.OnTcpReceived(e);
if (!receivedTask.IsCompleted)
await receivedTask.ConfigureAwait(false);
await base.OnTcpReceived(e).ConfigureAwait(false);
if (e.Handled)
return;
var channelReceivedTask = this.OnChannelReceivedEvent(e, ChannelReceived);
if (!channelReceivedTask.IsCompleted)
await channelReceivedTask.ConfigureAwait(false);
await this.OnChannelReceivedEvent(e, ChannelReceived).ConfigureAwait(false);
}
/// <inheritdoc/>

View File

@@ -241,16 +241,12 @@ public class TcpServiceChannel<TClient> : TcpServiceChannelBase<TClient>, IChann
/// <inheritdoc/>
protected override async Task OnTcpReceived(TClient socketClient, ReceivedDataEventArgs e)
{
var receivedTask = base.OnTcpReceived(socketClient, e);
if (!receivedTask.IsCompleted)
await receivedTask.ConfigureAwait(false);
await base.OnTcpReceived(socketClient, e).ConfigureAwait(false);
if (e.Handled)
return;
var channelReceivedTask = socketClient.OnChannelReceivedEvent(e, ChannelReceived);
if (!channelReceivedTask.IsCompleted)
await channelReceivedTask.ConfigureAwait(false);
await socketClient.OnChannelReceivedEvent(e, ChannelReceived).ConfigureAwait(false);
}

View File

@@ -145,15 +145,11 @@ public class TcpSessionClientChannel : TcpSessionClient, IClientChannel
/// <inheritdoc/>
protected override async Task OnTcpReceived(ReceivedDataEventArgs e)
{
var receivedTask = base.OnTcpReceived(e);
if (!receivedTask.IsCompleted)
await receivedTask.ConfigureAwait(false);
await base.OnTcpReceived(e).ConfigureAwait(false);
if (e.Handled)
return;
var channelReceivedTask = this.OnChannelReceivedEvent(e, ChannelReceived);
if (!channelReceivedTask.IsCompleted)
await channelReceivedTask.ConfigureAwait(false);
await this.OnChannelReceivedEvent(e, ChannelReceived).ConfigureAwait(false);
}
}

View File

@@ -196,16 +196,12 @@ public class UdpSessionChannel : UdpSession, IClientChannel
/// <inheritdoc/>
protected override async Task OnUdpReceived(UdpReceivedDataEventArgs e)
{
var receivedTask = base.OnUdpReceived(e);
if (!receivedTask.IsCompleted)
await receivedTask.ConfigureAwait(false);
await base.OnUdpReceived(e).ConfigureAwait(false);
if (e.Handled)
return;
var channelReceivedTask = this.OnChannelReceivedEvent(e, ChannelReceived);
if (!channelReceivedTask.IsCompleted)
await channelReceivedTask.ConfigureAwait(false);
await this.OnChannelReceivedEvent(e, ChannelReceived).ConfigureAwait(false);
}

View File

@@ -10,8 +10,6 @@
using System.Text;
using ThingsGateway.NewLife.Collections;
namespace ThingsGateway.Foundation;
/// <summary>
@@ -59,13 +57,6 @@ public class DeviceSingleStreamDataHandleAdapter<TRequest> : CustomDataHandlingA
/// <inheritdoc />
public void SetRequest(ISendMessage sendMessage)
{
if (IsSingleThread)
{
if (Request != null)
{
_requestPool.Return(Request);
}
}
var request = GetInstance();
request.Sign = sendMessage.Sign;
request.SendInfo(sendMessage);
@@ -165,26 +156,14 @@ public class DeviceSingleStreamDataHandleAdapter<TRequest> : CustomDataHandlingA
}
}
private static ObjectPool<TRequest> _requestPool { get; } = new ObjectPool<TRequest>();
/// <summary>
/// 获取泛型实例。
/// </summary>
/// <returns></returns>
protected virtual TRequest GetInstance()
{
if (IsSingleThread)
{
var request = _requestPool.Get();
request.OperCode = -1;
request.Sign = -1;
return request;
}
else
{
return new TRequest() { OperCode = -1, Sign = -1 };
}
}
public override void SendInput<TWriter>(ref TWriter writer, in ReadOnlyMemory<byte> memory)
{

View File

@@ -11,8 +11,6 @@
using System.Net;
using System.Text;
using ThingsGateway.NewLife.Collections;
namespace ThingsGateway.Foundation;
/// <summary>
@@ -52,13 +50,6 @@ public class DeviceUdpDataHandleAdapter<TRequest> : UdpDataHandlingAdapter, IDev
/// <inheritdoc />
public void SetRequest(ISendMessage sendMessage)
{
if (IsSingleThread)
{
if (Request != null)
{
_requestPool.Return(Request);
}
}
var request = GetInstance();
request.Sign = sendMessage.Sign;
request.SendInfo(sendMessage);
@@ -71,27 +62,15 @@ public class DeviceUdpDataHandleAdapter<TRequest> : UdpDataHandlingAdapter, IDev
return Owner.ToString();
}
private static ObjectPool<TRequest> _requestPool { get; } = new ObjectPool<TRequest>();
/// <summary>
/// 获取泛型实例。
/// </summary>
/// <returns></returns>
protected virtual TRequest GetInstance()
{
if (IsSingleThread)
{
var request = _requestPool.Get();
request.OperCode = -1;
request.Sign = -1;
return request;
}
else
{
return new TRequest() { OperCode = -1, Sign = -1 };
}
}
#region ParseRequest

View File

@@ -15,6 +15,7 @@ using System.Net;
using ThingsGateway.Foundation.Extension.Generic;
using ThingsGateway.Foundation.Extension.String;
using ThingsGateway.NewLife;
using ThingsGateway.NewLife.Collections;
using ThingsGateway.NewLife.Extension;
using TouchSocket.SerialPorts;
@@ -330,15 +331,10 @@ public abstract class DeviceBase : AsyncAndSyncDisposableObject, IDevice
}
public bool AutoConnect { get; protected set; } = true;
/// <inheritdoc/>
private async ValueTask<OperResult> SendAsync(ISendMessage sendMessage, IClientChannel channel = default, EndPoint endPoint = default, CancellationToken token = default)
private async ValueTask<OperResult> SendAsync(ISendMessage sendMessage, IClientChannel channel, CancellationToken token = default)
{
try
{
if (channel == default)
{
if (Channel is not IClientChannel clientChannel) { throw new ArgumentNullException(nameof(channel)); }
channel = clientChannel;
}
if (SendDelayTime != 0)
await Task.Delay(SendDelayTime, token).ConfigureAwait(false);
@@ -348,19 +344,13 @@ public abstract class DeviceBase : AsyncAndSyncDisposableObject, IDevice
if (channel is IDtuUdpSessionChannel udpSession)
{
var sendTask = udpSession.SendAsync(endPoint, sendMessage, token);
if (!sendTask.IsCompleted)
{
await sendTask.ConfigureAwait(false);
}
EndPoint? endPoint = GetUdpEndpoint();
await udpSession.SendAsync(endPoint, sendMessage, token).ConfigureAwait(false);
}
else
{
var sendTask = channel.SendAsync(sendMessage, token);
if (!sendTask.IsCompleted)
{
await sendTask.ConfigureAwait(false);
}
await channel.SendAsync(sendMessage, token).ConfigureAwait(false);
}
return OperResult.Success;
@@ -415,25 +405,19 @@ public abstract class DeviceBase : AsyncAndSyncDisposableObject, IDevice
{
try
{
var dtuId = this is IDtu dtu1 ? dtu1.DtuId : null;
var channelResult = GetChannel(dtuId);
var channelResult = GetChannel();
if (!channelResult.IsSuccess) return new OperResult<byte[]>(channelResult);
WaitLock? waitLock = GetWaitLock(channelResult.Content, dtuId);
WaitLock? waitLock = GetWaitLock(channelResult.Content);
try
{
var beforeSendTask = BeforeSendAsync(channelResult.Content, cancellationToken);
if (!beforeSendTask.IsCompleted)
{
await beforeSendTask.ConfigureAwait(false);
}
await BeforeSendAsync(channelResult.Content, cancellationToken).ConfigureAwait(false);
await waitLock.WaitAsync(cancellationToken).ConfigureAwait(false);
channelResult.Content.SetDataHandlingAdapterLogger(Logger);
EndPoint? endPoint = GetUdpEndpoint(dtuId);
return await SendAsync(sendMessage, channelResult.Content, endPoint, cancellationToken).ConfigureAwait(false);
return await SendAsync(sendMessage, channelResult.Content, cancellationToken).ConfigureAwait(false);
}
finally
{
@@ -449,8 +433,13 @@ public abstract class DeviceBase : AsyncAndSyncDisposableObject, IDevice
}
/// <inheritdoc/>
public virtual OperResult<IClientChannel> GetChannel(string socketId)
public virtual OperResult<IClientChannel> GetChannel()
{
if (Channel is IClientChannel clientChannel1)
return new OperResult<IClientChannel>() { Content = clientChannel1 };
var socketId = this is IDtu dtu1 ? dtu1.DtuId : null;
if (string.IsNullOrWhiteSpace(socketId))
{
if (Channel is IClientChannel clientChannel)
@@ -485,10 +474,11 @@ public abstract class DeviceBase : AsyncAndSyncDisposableObject, IDevice
}
/// <inheritdoc/>
public virtual EndPoint GetUdpEndpoint(string socketId)
public virtual EndPoint GetUdpEndpoint()
{
if (Channel is IDtuUdpSessionChannel udpSessionChannel)
{
var socketId = this is IDtu dtu1 ? dtu1.DtuId : null;
if (string.IsNullOrWhiteSpace(socketId))
return udpSessionChannel.DefaultEndpoint;
@@ -514,7 +504,7 @@ public abstract class DeviceBase : AsyncAndSyncDisposableObject, IDevice
/// <inheritdoc/>
public virtual ValueTask<OperResult<ReadOnlyMemory<byte>>> SendThenReturnAsync(ISendMessage sendMessage, CancellationToken cancellationToken = default)
{
var channelResult = GetChannel(this is IDtu dtu ? dtu.DtuId : null);
var channelResult = GetChannel();
if (!channelResult.IsSuccess) return EasyValueTask.FromResult(new OperResult<ReadOnlyMemory<byte>>(channelResult));
return SendThenReturnAsync(sendMessage, channelResult.Content, cancellationToken);
}
@@ -524,19 +514,9 @@ public abstract class DeviceBase : AsyncAndSyncDisposableObject, IDevice
{
try
{
var sendTask = SendThenReturnMessageAsync(sendMessage, channel, cancellationToken);
if (!sendTask.IsCompleted)
{
var result = await sendTask.ConfigureAwait(false);
var result = await SendThenReturnMessageAsync(sendMessage, channel, cancellationToken).ConfigureAwait(false);
return new OperResult<ReadOnlyMemory<byte>>(result) { Content = result.Content };
}
else
{
var result = sendTask.Result;
return new OperResult<ReadOnlyMemory<byte>>(result) { Content = result.Content };
}
}
catch (Exception ex)
{
return new(ex);
@@ -546,7 +526,7 @@ public abstract class DeviceBase : AsyncAndSyncDisposableObject, IDevice
/// <inheritdoc/>
protected virtual ValueTask<MessageBase> SendThenReturnMessageAsync(ISendMessage sendMessage, CancellationToken cancellationToken = default)
{
var channelResult = GetChannel(this is IDtu dtu ? dtu.DtuId : null);
var channelResult = GetChannel();
if (!channelResult.IsSuccess) return EasyValueTask.FromResult(new MessageBase(channelResult));
return SendThenReturnMessageAsync(sendMessage, channelResult.Content, cancellationToken);
}
@@ -557,88 +537,69 @@ public abstract class DeviceBase : AsyncAndSyncDisposableObject, IDevice
return GetResponsedDataAsync(command, clientChannel, Timeout, cancellationToken);
}
private ObjectPool<ReusableCancellationTokenSource> _reusableTimeouts = new();
/// <summary>
/// 发送并等待数据
/// </summary>
protected async ValueTask<MessageBase> GetResponsedDataAsync(ISendMessage command, IClientChannel clientChannel, int timeout = 3000, CancellationToken cancellationToken = default)
protected async ValueTask<MessageBase> GetResponsedDataAsync(
ISendMessage command,
IClientChannel clientChannel,
int timeout = 3000,
CancellationToken cancellationToken = default)
{
var waitData = clientChannel.WaitHandlePool.GetWaitDataAsync(out var sign);
command.Sign = sign;
WaitLock? waitLock = null;
try
{
var beforeSendTask = BeforeSendAsync(clientChannel, cancellationToken);
if (!beforeSendTask.IsCompleted)
{
await beforeSendTask.ConfigureAwait(false);
}
var dtuId = this is IDtu dtu1 ? dtu1.DtuId : null;
waitLock = GetWaitLock(clientChannel, dtuId);
await BeforeSendAsync(clientChannel, cancellationToken).ConfigureAwait(false);
waitLock = GetWaitLock(clientChannel);
await waitLock.WaitAsync(cancellationToken).ConfigureAwait(false);
EndPoint? endPoint = GetUdpEndpoint(dtuId);
if (cancellationToken.IsCancellationRequested)
return new MessageBase(new OperationCanceledException());
clientChannel.SetDataHandlingAdapterLogger(Logger);
var sendResult = await SendAsync(command, clientChannel, cancellationToken).ConfigureAwait(false);
if (!sendResult.IsSuccess)
return new MessageBase(sendResult);
if (cancellationToken.IsCancellationRequested)
return new MessageBase(new OperationCanceledException());
if (waitData.Status == WaitDataStatus.Success)
return waitData.CompletedData;
OperResult sendOperResult = default;
var sendTask = SendAsync(command, clientChannel, endPoint, cancellationToken);
if (!sendTask.IsCompleted)
{
sendOperResult = await sendTask.ConfigureAwait(false);
}
else
{
sendOperResult = sendTask.Result;
}
bool timeoutStatus = false;
if (!sendOperResult.IsSuccess)
return new MessageBase(sendOperResult);
using var ctsTime = new CancellationTokenSource(timeout);
var reusableTimeout = _reusableTimeouts.Get();
try
{
using var cts = CancellationTokenSource.CreateLinkedTokenSource(ctsTime.Token, Channel.ClosedToken);
var waitDataTask = waitData.WaitAsync(cts.Token);
if (!waitDataTask.IsCompleted)
{
await waitDataTask.ConfigureAwait(false);
}
var cts = reusableTimeout.GetTokenSource(timeout, cancellationToken, Channel.ClosedToken);
await waitData.WaitAsync(cts.Token).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
if (ctsTime.IsCancellationRequested)
{
return new MessageBase(new TimeoutException());
}
timeoutStatus = reusableTimeout.TimeoutStatus;
return timeoutStatus
? new MessageBase(new TimeoutException())
: new MessageBase(new OperationCanceledException());
}
catch (Exception ex)
{
return new MessageBase(ex);
}
var result = waitData.Check(ctsTime.Token);
if (result.IsSuccess)
finally
{
return waitData.CompletedData;
}
else
{
return new MessageBase(result);
reusableTimeout.Set();
timeoutStatus = reusableTimeout.TimeoutStatus;
_reusableTimeouts.Return(reusableTimeout);
}
return waitData.Status == WaitDataStatus.Success
? waitData.CompletedData
: new MessageBase(waitData.Check(timeoutStatus));
}
catch (Exception ex)
{
if (!cancellationToken.IsCancellationRequested)
await Task.Delay(1000, cancellationToken).ConfigureAwait(false);
return new MessageBase(ex);
}
finally
@@ -648,12 +609,13 @@ public abstract class DeviceBase : AsyncAndSyncDisposableObject, IDevice
}
}
private static WaitLock GetWaitLock(IClientChannel clientChannel, string dtuId)
private WaitLock GetWaitLock(IClientChannel clientChannel)
{
WaitLock? waitLock = null;
if (clientChannel is IDtuUdpSessionChannel udpSessionChannel)
{
waitLock = udpSessionChannel.GetLock(dtuId);
waitLock = udpSessionChannel.GetLock(this is IDtu dtu1 ? dtu1.DtuId : null);
}
waitLock ??= clientChannel.GetLock(null);
return waitLock;
@@ -1068,7 +1030,7 @@ public abstract class DeviceBase : AsyncAndSyncDisposableObject, IDevice
Channel.Collects.Remove(this);
}
}
_reusableTimeouts?.SafeDispose();
_deviceLogger?.TryDispose();
base.Dispose(disposing);
}
@@ -1120,6 +1082,7 @@ public abstract class DeviceBase : AsyncAndSyncDisposableObject, IDevice
Channel.Collects.Remove(this);
}
_reusableTimeouts?.SafeDispose();
_deviceLogger?.TryDispose();
base.Dispose(disposing);
}

View File

@@ -134,7 +134,7 @@ public static partial class DeviceExtension
/// <summary>
/// 当状态不是<see cref="WaitDataStatus.Success"/>时返回异常。
/// </summary>
public static OperResult Check(this AsyncWaitData<MessageBase> waitDataAsync, CancellationToken cancellationToken)
public static OperResult Check(this AsyncWaitData<MessageBase> waitDataAsync, bool timeout)
{
switch (waitDataAsync.Status)
{
@@ -142,7 +142,7 @@ public static partial class DeviceExtension
return new();
case WaitDataStatus.Canceled:
if (cancellationToken.IsCancellationRequested)
if (timeout)
{
if (waitDataAsync.CompletedData != null)
{

View File

@@ -426,9 +426,8 @@ public interface IDevice : IDisposable, IDisposableObject, IAsyncDisposable
/// <summary>
/// 获取通道
/// </summary>
/// <param name="socketId"></param>
/// <returns></returns>
OperResult<IClientChannel> GetChannel(string socketId);
OperResult<IClientChannel> GetChannel();
/// <summary>
/// 发送会经过适配器可传入socketId如果为空则默认通道必须为<see cref="IClientChannel"/>类型

View File

@@ -0,0 +1,117 @@
//------------------------------------------------------------------------------
// 此代码版权除特别声明或在XREF结尾的命名空间的代码归作者本人若汝棋茗所有
// 源代码使用协议遵循本仓库的开源协议及附加协议若本仓库没有设置则按MIT开源协议授权
// CSDN博客https://blog.csdn.net/qq_40374647
// 哔哩哔哩视频https://space.bilibili.com/94253567
// Gitee源代码仓库https://gitee.com/RRQM_Home
// Github源代码仓库https://github.com/RRQM
// API首页https://touchsocket.net/
// 交流QQ群234762506
// 感谢您的下载和使用
//------------------------------------------------------------------------------
namespace ThingsGateway.Foundation;
using System;
using System.Threading;
public sealed class ReusableCancellationTokenSource : IDisposable
{
private readonly Timer _timer;
private readonly object _lock = new();
private CancellationTokenSource? _cts;
public ReusableCancellationTokenSource()
{
_timer = new Timer(OnTimeout, null, Timeout.Infinite, Timeout.Infinite);
}
public bool TimeoutStatus = false;
private void OnTimeout(object? state)
{
lock (_lock)
{
TimeoutStatus = true;
if (_cts?.IsCancellationRequested == false)
_cts?.Cancel();
}
}
/// <summary>
/// 获取一个 CTS并启动超时
/// </summary>
public CancellationTokenSource GetTokenSource(long timeout, CancellationToken external1 = default)
{
lock (_lock)
{
TimeoutStatus = false;
// 如果已有 CTS先 Dispose
_cts?.SafeCancel();
_cts?.SafeDispose();
// 创建新的 CTS
_cts = CancellationTokenSource.CreateLinkedTokenSource(external1);
// 启动 Timer
_timer.Change(timeout, Timeout.Infinite);
return _cts;
}
}
/// <summary>
/// 获取一个 CTS并启动超时
/// </summary>
public CancellationTokenSource GetTokenSource(long timeout, CancellationToken external1 = default, CancellationToken external2 = default, CancellationToken external3 = default)
{
lock (_lock)
{
TimeoutStatus = false;
// 如果已有 CTS先 Dispose
_cts?.SafeCancel();
_cts?.SafeDispose();
// 创建新的 CTS
_cts = CancellationTokenSource.CreateLinkedTokenSource(external1, external2, external3);
// 启动 Timer
_timer.Change(timeout, Timeout.Infinite);
return _cts;
}
}
public void Set()
{
_timer?.Change(Timeout.Infinite, Timeout.Infinite);
}
/// <summary>
/// 手动取消
/// </summary>
public void Cancel()
{
lock (_lock)
{
_cts?.SafeCancel();
}
}
public void Dispose()
{
lock (_lock)
{
_cts?.SafeCancel();
_cts?.SafeDispose();
_timer.SafeDispose();
}
}
}

View File

@@ -54,4 +54,3 @@ public class LinkedCancellationTokenSourceCache : IDisposable
}

View File

@@ -139,47 +139,63 @@ public partial class SiemensS7Master : DeviceBase
/// <summary>
/// 此方法并不会智能分组以最大化效率减少传输次数因为返回值是byte[],所以一切都按地址数组的顺序执行,最后合并数组
/// </summary>
public async ValueTask<OperResult<ReadOnlyMemory<byte>>> S7ReadAsync(SiemensS7Address[] sAddresss, CancellationToken cancellationToken = default)
public async ValueTask<OperResult<ReadOnlyMemory<byte>>> S7ReadAsync(
SiemensS7Address[] addresses,
CancellationToken cancellationToken = default)
{
{
var byteBlock = new ValueByteBlock(2048);
var byteBuffer = new ValueByteBlock(512);
try
{
foreach (var sAddress in sAddresss)
foreach (var address in addresses)
{
int num = 0;
var addressLen = sAddress.Length == 0 ? 1 : sAddress.Length;
var start = sAddress.AddressStart;
int readCount = 0;
int totalLength = address.Length == 0 ? 1 : address.Length;
int originalStart = address.AddressStart;
try
{
while (num < addressLen)
while (readCount < totalLength)
{
//pdu长度重复生成报文直至全部生成
int len = Math.Min(addressLen - num, PduLength);
sAddress.Length = len;
var result = await SendThenReturnAsync(new S7Send([sAddress], true), cancellationToken: cancellationToken).ConfigureAwait(false);
if (!result.IsSuccess) return result;
// 每次读取的 PDU 长度,循环直到读取完整
int chunkLength = Math.Min(totalLength - readCount, PduLength);
address.Length = chunkLength;
byteBlock.Write(result.Content.Span);
num += len;
var result = await SendThenReturnAsync(
new S7Send([address], true),
cancellationToken: cancellationToken
).ConfigureAwait(false);
if (sAddress.DataCode == S7Area.TM || sAddress.DataCode == S7Area.CT)
if (!result.IsSuccess)
return result;
byteBuffer.Write(result.Content.Span);
if (readCount + chunkLength >= totalLength)
{
sAddress.AddressStart += len / 2;
if (addresses.Length == 1)
{
return result;
}
break;
}
readCount += chunkLength;
// 更新地址起点
if (address.DataCode == S7Area.TM || address.DataCode == S7Area.CT)
address.AddressStart += chunkLength / 2;
else
{
sAddress.AddressStart += len * 8;
}
address.AddressStart += chunkLength * 8;
}
}
finally
{
sAddress.AddressStart = start;
address.AddressStart = originalStart;
}
}
return new OperResult<ReadOnlyMemory<byte>>() { Content = byteBlock.ToArray() };
return new OperResult<ReadOnlyMemory<byte>> { Content = byteBuffer.ToArray() };
}
catch (Exception ex)
{
@@ -187,34 +203,42 @@ public partial class SiemensS7Master : DeviceBase
}
finally
{
byteBlock.SafeDispose();
}
byteBuffer.SafeDispose();
}
}
/// <summary>
/// 此方法并不会智能分组以最大化效率减少传输次数因为返回值是byte[],所以一切都按地址数组的顺序执行,最后合并数组
/// </summary>
public async ValueTask<Dictionary<SiemensS7Address, OperResult>> S7WriteAsync(SiemensS7Address[] sAddresss, CancellationToken cancellationToken = default)
public async ValueTask<Dictionary<SiemensS7Address, OperResult>> S7WriteAsync(
SiemensS7Address[] addresses,
CancellationToken cancellationToken = default)
{
var dictOperResult = new Dictionary<SiemensS7Address, OperResult>();
void SetFailOperResult(OperResult operResult)
{
foreach (var item in sAddresss)
foreach (var address in addresses)
{
dictOperResult.TryAdd(item, operResult);
dictOperResult.TryAdd(address, operResult);
}
}
var firstAddress = addresses[0];
// 单位写入(位写入)
if (addresses.Length <= 1 && firstAddress.IsBit)
{
var sAddress = sAddresss[0];
if (sAddresss.Length <= 1 && sAddress.IsBit)
{
var byteBlock = new ValueByteBlock(2048);
var byteBuffer = new ValueByteBlock(512);
try
{
var wresult = await SendThenReturnAsync(new S7Send([sAddress], false), cancellationToken: cancellationToken).ConfigureAwait(false);
dictOperResult.TryAdd(sAddress, wresult);
var writeResult = await SendThenReturnAsync(
new S7Send([firstAddress], false),
cancellationToken: cancellationToken
).ConfigureAwait(false);
dictOperResult.TryAdd(firstAddress, writeResult);
return dictOperResult;
}
catch (Exception ex)
@@ -224,42 +248,48 @@ public partial class SiemensS7Master : DeviceBase
}
finally
{
byteBlock.SafeDispose();
byteBuffer.SafeDispose();
}
}
else
{
//多写
List<List<SiemensS7Address>> siemensS7Addresses = new();
ushort dataLen = 0;
ushort itemLen = 1;
List<SiemensS7Address> addresses = new();
for (int i = 0; i < sAddresss.Length; i++)
// 多写
var addressChunks = new List<List<SiemensS7Address>>();
ushort dataLength = 0;
ushort itemCount = 1;
var currentChunk = new List<SiemensS7Address>();
for (int i = 0; i < addresses.Length; i++)
{
var item = sAddresss[i];
dataLen = (ushort)(dataLen + item.Data.Length + 4);
ushort telegramLen = (ushort)(itemLen * 12 + 19 + dataLen);
if (telegramLen < PduLength)
var address = addresses[i];
dataLength += (ushort)(address.Data.Length + 4);
ushort telegramLength = (ushort)(itemCount * 12 + 19 + dataLength);
if (telegramLength < PduLength)
{
addresses.Add(item);
itemLen++;
if (i == sAddresss.Length - 1)
siemensS7Addresses.Add(addresses);
currentChunk.Add(address);
itemCount++;
if (i == addresses.Length - 1)
addressChunks.Add(currentChunk);
}
else
{
siemensS7Addresses.Add(addresses);
addresses = new();
dataLen = 0;
itemLen = 1;
dataLen = (ushort)(dataLen + item.Data.Length + 4);
telegramLen = (ushort)(itemLen * 12 + 19 + dataLen);
if (telegramLen < PduLength)
addressChunks.Add(currentChunk);
currentChunk = new List<SiemensS7Address>();
dataLength = 0;
itemCount = 1;
dataLength += (ushort)(address.Data.Length + 4);
telegramLength = (ushort)(itemCount * 12 + 19 + dataLength);
if (telegramLength < PduLength)
{
addresses.Add(item);
itemLen++;
if (i == sAddresss.Length - 1)
siemensS7Addresses.Add(addresses);
currentChunk.Add(address);
itemCount++;
if (i == addresses.Length - 1)
addressChunks.Add(currentChunk);
}
else
{
@@ -269,14 +299,18 @@ public partial class SiemensS7Master : DeviceBase
}
}
foreach (var item in siemensS7Addresses)
foreach (var chunk in addressChunks)
{
try
{
var result = await SendThenReturnAsync(new S7Send(item.ToArray(), false), cancellationToken: cancellationToken).ConfigureAwait(false);
foreach (var i1 in item)
var result = await SendThenReturnAsync(
new S7Send(chunk.ToArray(), false),
cancellationToken: cancellationToken
).ConfigureAwait(false);
foreach (var addr in chunk)
{
dictOperResult.TryAdd(i1, result);
dictOperResult.TryAdd(addr, result);
}
}
catch (Exception ex)
@@ -285,10 +319,11 @@ public partial class SiemensS7Master : DeviceBase
return dictOperResult;
}
}
return dictOperResult;
}
}
}
#region