update touchsocket

This commit is contained in:
Kimdiego2098
2023-10-17 23:06:21 +08:00
parent 854c5d4ade
commit 0cac2062d3
12 changed files with 142 additions and 181 deletions

View File

@@ -50,13 +50,16 @@ namespace ThingsGateway.Foundation.Dmtp
#region
private ClientWebSocket m_client;
private Func<string, IDmtpActor> m_findDmtpActor;
private ValueCounter m_receiveCounter;
private ValueCounter m_sendCounter;
private SealedDmtpActor m_dmtpActor;
private TcpDmtpAdapter m_smtpAdapter;
private readonly SemaphoreSlim m_semaphore = new SemaphoreSlim(1, 1);
private ClientWebSocket m_client;
private SealedDmtpActor m_dmtpActor;
private Func<string, IDmtpActor> m_findDmtpActor;
private int m_receiveBufferSize = 1024 * 10;
private ValueCounter m_receiveCounter;
private int m_sendBufferSize = 1024 * 10;
private ValueCounter m_sendCounter;
private TcpDmtpAdapter m_smtpAdapter;
#endregion
/// <inheritdoc/>
@@ -90,25 +93,21 @@ namespace ThingsGateway.Foundation.Dmtp
/// <inheritdoc/>
public DateTime LastSendTime => this.m_sendCounter.LastIncrement;
/// <summary>
/// 未实现
/// </summary>
public Func<ByteBlock, bool> OnHandleRawBuffer { get => throw new NotImplementedException(); set => throw new NotImplementedException(); }
/// <summary>
/// 未实现
/// </summary>
public Func<ByteBlock, IRequestInfo, bool> OnHandleReceivedData { get => throw new NotImplementedException(); set => throw new NotImplementedException(); }
/// <inheritdoc/>
public IPluginsManager PluginsManager { get; private set; }
/// <inheritdoc/>
public Protocol Protocol { get; set; } = DmtpUtility.DmtpProtocol;
/// <inheritdoc/>
public override int ReceiveBufferSize => this.m_receiveBufferSize;
/// <inheritdoc/>
public IPHost RemoteIPHost { get; private set; }
/// <inheritdoc/>
public override int SendBufferSize => this.m_sendBufferSize;
/// <summary>
/// 发送<see cref="IDmtpActor"/>关闭消息。
/// </summary>
@@ -388,12 +387,12 @@ namespace ThingsGateway.Foundation.Dmtp
private void OnReceivePeriod(long value)
{
this.ReceiveBufferSize = TouchSocketUtility.HitBufferLength(value);
this.m_receiveBufferSize = TouchSocketUtility.HitBufferLength(value);
}
private void OnSendPeriod(long value)
{
this.SendBufferSize = TouchSocketUtility.HitBufferLength(value);
this.m_sendBufferSize = TouchSocketUtility.HitBufferLength(value);
}
private void PrivateClose(string msg)
@@ -473,8 +472,7 @@ namespace ThingsGateway.Foundation.Dmtp
{
task = this.m_client.SendAsync(transferBytes[i], WebSocketMessageType.Binary, false, CancellationToken.None);
}
task.ConfigureAwait(false);
task.GetAwaiter().GetResult();
task.GetFalseAwaitResult();
this.m_sendCounter.Increment(transferBytes[i].Count);
}
}
@@ -522,9 +520,8 @@ namespace ThingsGateway.Foundation.Dmtp
/// <summary>
/// 不支持该功能
/// </summary>
/// <returns></returns>
/// <exception cref="NotSupportedException"></exception>
public IReceiver CreateReceiver()
public void ClearReceiver()
{
throw new NotSupportedException("不支持该功能");
}
@@ -532,12 +529,13 @@ namespace ThingsGateway.Foundation.Dmtp
/// <summary>
/// 不支持该功能
/// </summary>
/// <returns></returns>
/// <exception cref="NotSupportedException"></exception>
public void ClearReceiver()
public IReceiver CreateReceiver()
{
throw new NotSupportedException("不支持该功能");
}
#endregion
#endregion Receiver
}
}

View File

@@ -21,22 +21,12 @@ public abstract class BaseSerial : DependencyObject, ISerial
/// 同步根。
/// </summary>
protected readonly object SyncRoot = new object();
private int m_receiveBufferSize = 1024 * 10;
private int m_sendBufferSize = 1024 * 10;
/// <inheritdoc/>
public virtual int SendBufferSize
{
get => this.m_sendBufferSize;
set => this.m_sendBufferSize = value < 1024 ? 1024 : value;
}
public abstract int SendBufferSize { get; }
/// <inheritdoc/>
public virtual int ReceiveBufferSize
{
get => this.m_receiveBufferSize;
set => this.m_receiveBufferSize = value < 1024 ? 1024 : value;
}
public abstract int ReceiveBufferSize { get; }
/// <inheritdoc/>
public ILog Logger { get; set; }

View File

@@ -24,9 +24,14 @@ internal sealed class InternalSerialCore : SerialCore
public class SerialCore : IDisposable, ISender
{
/// <summary>
/// 初始缓存大小
/// 最小缓存尺寸
/// </summary>
public const int BufferSize = 1024 * 10;
public int MinBufferSize { get; set; } = 1024 * 10;
/// <summary>
/// 最大缓存尺寸
/// </summary>
public int MaxBufferSize { get; set; } = 1024 * 1024 * 10;
#region
/// <summary>
@@ -35,14 +40,15 @@ public class SerialCore : IDisposable, ISender
public readonly object SyncRoot = new object();
private long m_bufferRate;
private bool m_disposedValue;
private SpinLock m_lock;
private bool m_online => MainSerialPort?.IsOpen == true;
private int m_receiveBufferSize = BufferSize;
private bool m_online => m_serialPort?.IsOpen == true;
private int m_receiveBufferSize = 1024 * 10;
private ValueCounter m_receiveCounter;
private int m_sendBufferSize = BufferSize;
private int m_sendBufferSize = 1024 * 10;
private ValueCounter m_sendCounter;
private readonly EasyLock m_semaphore = new EasyLock();
private SerialPort m_serialPort;
#endregion
/// <summary>
@@ -69,7 +75,7 @@ public class SerialCore : IDisposable, ISender
/// </summary>
~SerialCore()
{
this.Dispose(disposing: false);
this.SafeDispose();
}
/// <inheritdoc/>
@@ -100,19 +106,11 @@ public class SerialCore : IDisposable, ISender
public Action<SerialCore, ByteBlock> OnReceived { get; set; }
/// <summary>
/// 接收缓存池(可以设定初始值,运行时的值会根据流速自动调整
/// 接收缓存池,运行时的值会根据流速自动调整
/// </summary>
public int ReceiveBufferSize
{
get => this.m_receiveBufferSize;
set
{
this.m_receiveBufferSize = value;
if (this.MainSerialPort != null && !MainSerialPort.IsOpen)
{
this.MainSerialPort.ReadBufferSize = value;
}
}
}
/// <summary>
@@ -121,19 +119,11 @@ public class SerialCore : IDisposable, ISender
public ValueCounter ReceiveCounter { get => this.m_receiveCounter; }
/// <summary>
/// 发送缓存池(可以设定初始值,运行时的值会根据流速自动调整
/// 发送缓存池,运行时的值会根据流速自动调整
/// </summary>
public int SendBufferSize
{
get => this.m_sendBufferSize;
set
{
this.m_sendBufferSize = value;
if (this.MainSerialPort != null && !MainSerialPort.IsOpen)
{
this.MainSerialPort.WriteBufferSize = value;
}
}
}
/// <summary>
@@ -144,7 +134,7 @@ public class SerialCore : IDisposable, ISender
/// <summary>
/// SerialPort
/// </summary>
public SerialPort MainSerialPort { get; private set; }
public SerialPort MainSerialPort { get => this.m_serialPort; }
/// <summary>
@@ -155,11 +145,12 @@ public class SerialCore : IDisposable, ISender
var byteBlock = BytePool.Default.GetByteBlock(this.ReceiveBufferSize);
this.UserToken = byteBlock;
byteBlock.SetLength(0);
if (this.MainSerialPort.BytesToRead > 0)
if (this.m_serialPort.BytesToRead > 0)
{
this.m_bufferRate += 2;
this.ProcessReceived();
}
MainSerialPort.DataReceived += MainSerialPort_DataReceived;
m_serialPort.DataReceived += MainSerialPort_DataReceived;
}
private void MainSerialPort_DataReceived(object sender, SerialDataReceivedEventArgs e)
@@ -189,12 +180,12 @@ public class SerialCore : IDisposable, ISender
/// </summary>
public void Dispose()
{
this.Dispose(disposing: true);
GC.SuppressFinalize(this);
UserToken.SafeDispose();
}
/// <summary>
/// 重置环境,并设置新的<see cref="MainSerialPort"/>。
/// 重置环境,并设置新的<see cref="m_serialPort"/>。
/// </summary>
/// <param name="socket"></param>
public virtual void Reset(SerialPort socket)
@@ -209,7 +200,7 @@ public class SerialCore : IDisposable, ISender
throw new Exception("新的SerialPort必须在连接状态。");
}
this.Reset();
this.MainSerialPort = socket;
this.m_serialPort = socket;
}
/// <summary>
@@ -219,14 +210,14 @@ public class SerialCore : IDisposable, ISender
{
this.m_receiveCounter.Reset();
this.m_sendCounter.Reset();
this.MainSerialPort = null;
this.m_serialPort = null;
this.OnReceived = null;
this.OnBreakOut = null;
this.UserToken = null;
this.m_bufferRate = 1;
this.m_lock = new SpinLock();
this.m_receiveBufferSize = BufferSize;
this.m_sendBufferSize = BufferSize;
this.m_receiveBufferSize = this.MinBufferSize;
this.m_sendBufferSize = this.MinBufferSize;
}
/// <summary>
@@ -244,7 +235,7 @@ public class SerialCore : IDisposable, ISender
try
{
this.m_lock.Enter(ref lockTaken);
this.MainSerialPort.Write(buffer, offset, length);
this.m_serialPort.Write(buffer, offset, length);
}
finally
{
@@ -267,7 +258,7 @@ public class SerialCore : IDisposable, ISender
{
await this.m_semaphore.WaitAsync();
this.MainSerialPort.Write(buffer, offset, length);
this.m_serialPort.Write(buffer, offset, length);
}
finally
{
@@ -287,22 +278,7 @@ public class SerialCore : IDisposable, ISender
this.OnBreakOut?.Invoke(this, manual, msg);
}
/// <summary>
/// 释放对象
/// </summary>
/// <param name="disposing"></param>
protected virtual void Dispose(bool disposing)
{
if (!this.m_disposedValue)
{
if (disposing)
{
}
this.m_disposedValue = true;
}
UserToken.SafeDispose();
}
/// <summary>
/// 当发生异常的时候
@@ -341,12 +317,20 @@ public class SerialCore : IDisposable, ISender
private void OnReceivePeriod(long value)
{
this.ReceiveBufferSize = TouchSocketUtility.HitBufferLength(value);
this.m_receiveBufferSize = Math.Max(TouchSocketUtility.HitBufferLength(value), this.MinBufferSize);
if (this.MainSerialPort != null && !MainSerialPort.IsOpen)
{
this.MainSerialPort.ReadBufferSize = this.m_receiveBufferSize;
}
}
private void OnSendPeriod(long value)
{
this.SendBufferSize = TouchSocketUtility.HitBufferLength(value);
this.m_sendBufferSize = Math.Max(TouchSocketUtility.HitBufferLength(value), this.MinBufferSize);
if (this.MainSerialPort != null && !MainSerialPort.IsOpen)
{
this.MainSerialPort.WriteBufferSize = this.m_sendBufferSize;
}
}
private void PrivateBreakOut(bool manual, string msg)
@@ -367,21 +351,21 @@ public class SerialCore : IDisposable, ISender
UserToken?.SafeDispose();
return;
}
if (MainSerialPort.BytesToRead > 0)
if (m_serialPort.BytesToRead > 0)
{
var byteBlock = UserToken;
byte[] buffer = BytePool.Default.Rent(MainSerialPort.BytesToRead);
int num = MainSerialPort.Read(buffer, 0, MainSerialPort.BytesToRead);
byte[] buffer = BytePool.Default.Rent(m_serialPort.BytesToRead);
int num = m_serialPort.Read(buffer, 0, m_serialPort.BytesToRead);
byteBlock.Write(buffer, 0, num);
byteBlock.SetLength(num);
this.HandleBuffer(byteBlock);
try
{
var newByteBlock = BytePool.Default.GetByteBlock((int)Math.Min(this.ReceiveBufferSize * this.m_bufferRate, TouchSocketUtility.MaxBufferLength));
var newByteBlock = BytePool.Default.GetByteBlock((int)Math.Min(this.ReceiveBufferSize * this.m_bufferRate, this.MaxBufferSize));
newByteBlock.SetLength(0);
UserToken = newByteBlock;
if (MainSerialPort.BytesToRead > 0)
if (m_serialPort.BytesToRead > 0)
{
this.m_bufferRate += 2;
this.ProcessReceived();

View File

@@ -204,10 +204,10 @@ public class SerialSessionBase : BaseSerial, ISerialSession
#region
/// <inheritdoc/>
public DateTime LastReceivedTime => this.GetTcpCore().ReceiveCounter.LastIncrement;
public DateTime LastReceivedTime => this.GetSerialCore().ReceiveCounter.LastIncrement;
/// <inheritdoc/>
public DateTime LastSendTime => this.GetTcpCore().SendCounter.LastIncrement;
public DateTime LastSendTime => this.GetSerialCore().SendCounter.LastIncrement;
/// <inheritdoc/>
public IContainer Container { get; private set; }
@@ -321,7 +321,7 @@ public class SerialSessionBase : BaseSerial, ISerialSession
private void BeginReceive()
{
this.GetTcpCore().BeginIocpReceive();
this.GetSerialCore().BeginIocpReceive();
}
@@ -375,32 +375,22 @@ public class SerialSessionBase : BaseSerial, ISerialSession
}
}
private SerialCore GetTcpCore()
private SerialCore GetSerialCore()
{
this.ThrowIfDisposed();
return this.m_serialCore ?? throw new ObjectDisposedException(this.GetType().Name);
}
/// <inheritdoc/>
public override int ReceiveBufferSize
{
get => this.GetTcpCore().ReceiveBufferSize;
set
{
this.GetTcpCore().ReceiveBufferSize = value;
}
get => this.GetSerialCore().ReceiveBufferSize;
}
/// <inheritdoc/>
public override int SendBufferSize
{
get => this.GetTcpCore().SendBufferSize;
set
{
this.GetTcpCore().SendBufferSize = value;
}
get => this.GetSerialCore().SendBufferSize;
}
/// <inheritdoc/>
@@ -728,7 +718,7 @@ public class SerialSessionBase : BaseSerial, ISerialSession
{
if (this.SendingData(buffer, offset, length).GetFalseAwaitResult())
{
this.GetTcpCore().Send(buffer, offset, length);
this.GetSerialCore().Send(buffer, offset, length);
}
}
@@ -737,7 +727,7 @@ public class SerialSessionBase : BaseSerial, ISerialSession
{
if (await this.SendingData(buffer, offset, length))
{
await this.GetTcpCore().SendAsync(buffer, offset, length);
await this.GetSerialCore().SendAsync(buffer, offset, length);
}
}

View File

@@ -35,22 +35,11 @@ namespace ThingsGateway.Foundation.Sockets
/// </summary>
protected readonly object SyncRoot = new object();
private int m_receiveBufferSize = 1024 * 10;
private int m_sendBufferSize = 1024 * 10;
/// <inheritdoc/>
public abstract int SendBufferSize { get; }
/// <inheritdoc/>
public virtual int SendBufferSize
{
get => this.m_sendBufferSize;
set => this.m_sendBufferSize = value < 1024 ? 1024 : value;
}
/// <inheritdoc/>
public virtual int ReceiveBufferSize
{
get => this.m_receiveBufferSize;
set => this.m_receiveBufferSize = value < 1024 ? 1024 : value;
}
public abstract int ReceiveBufferSize { get; }
/// <inheritdoc/>
public ILog Logger { get; set; }

View File

@@ -108,10 +108,6 @@ namespace ThingsGateway.Foundation.Sockets
}
}
/// <summary>
/// 最大BufferLength
/// </summary>
public static int MaxBufferLength { get; set; } = 1024 * 1024 * 10;
/// <summary>
/// 命中BufferLength

View File

@@ -24,9 +24,15 @@ public class TcpCore : SocketAsyncEventArgs, IDisposable, ISender
private const string m_msg1 = "远程终端主动关闭";
/// <summary>
/// 初始缓存大小
/// 最小缓存尺寸
/// </summary>
public const int BufferSize = 1024 * 10;
public int MinBufferSize { get; set; } = 1024 * 10;
/// <summary>
/// 最大缓存尺寸
/// </summary>
public int MaxBufferSize { get; set; } = 1024 * 1024 * 10;
#region
/// <summary>
@@ -37,10 +43,11 @@ public class TcpCore : SocketAsyncEventArgs, IDisposable, ISender
private long m_bufferRate;
private SpinLock m_lock;
private volatile bool m_online;
private int m_receiveBufferSize = BufferSize;
private int m_receiveBufferSize = 1024 * 10;
private ValueCounter m_receiveCounter;
private int m_sendBufferSize = BufferSize;
private int m_sendBufferSize = 1024 * 10;
private ValueCounter m_sendCounter;
private Socket m_socket;
private readonly EasyLock m_semaphore = new();
#endregion
@@ -87,15 +94,11 @@ public class TcpCore : SocketAsyncEventArgs, IDisposable, ISender
public Action<TcpCore, ByteBlock> OnReceived { get; set; }
/// <summary>
/// 接收缓存池(可以设定初始值,运行时的值会根据流速自动调整
/// 接收缓存池,运行时的值会根据流速自动调整
/// </summary>
public int ReceiveBufferSize
{
get => this.m_receiveBufferSize;
set
{
this.m_receiveBufferSize = value;
}
}
/// <summary>
@@ -104,15 +107,11 @@ public class TcpCore : SocketAsyncEventArgs, IDisposable, ISender
public ValueCounter ReceiveCounter { get => this.m_receiveCounter; }
/// <summary>
/// 发送缓存池(可以设定初始值,运行时的值会根据流速自动调整
/// 发送缓存池,运行时的值会根据流速自动调整
/// </summary>
public int SendBufferSize
{
get => this.m_sendBufferSize;
set
{
this.m_sendBufferSize = value;
}
}
/// <summary>
@@ -123,7 +122,7 @@ public class TcpCore : SocketAsyncEventArgs, IDisposable, ISender
/// <summary>
/// Socket
/// </summary>
public Socket Socket { get; private set; }
public Socket Socket { get => this.m_socket; }
/// <summary>
/// 提供一个用于客户端-服务器通信的流,该流使用安全套接字层 (SSL) 安全协议对服务器和(可选)客户端进行身份验证。
@@ -141,7 +140,7 @@ public class TcpCore : SocketAsyncEventArgs, IDisposable, ISender
/// <param name="sslOption"></param>
public virtual void Authenticate(ServiceSslOption sslOption)
{
var sslStream = (sslOption.CertificateValidationCallback != null) ? new SslStream(new NetworkStream(this.Socket, false), false, sslOption.CertificateValidationCallback) : new SslStream(new NetworkStream(this.Socket, false), false);
var sslStream = (sslOption.CertificateValidationCallback != null) ? new SslStream(new NetworkStream(this.m_socket, false), false, sslOption.CertificateValidationCallback) : new SslStream(new NetworkStream(this.m_socket, false), false);
sslStream.AuthenticateAsServer(sslOption.Certificate);
this.SslStream = sslStream;
@@ -154,7 +153,7 @@ public class TcpCore : SocketAsyncEventArgs, IDisposable, ISender
/// <param name="sslOption"></param>
public virtual void Authenticate(ClientSslOption sslOption)
{
var sslStream = (sslOption.CertificateValidationCallback != null) ? new SslStream(new NetworkStream(this.Socket, false), false, sslOption.CertificateValidationCallback) : new SslStream(new NetworkStream(this.Socket, false), false);
var sslStream = (sslOption.CertificateValidationCallback != null) ? new SslStream(new NetworkStream(this.m_socket, false), false, sslOption.CertificateValidationCallback) : new SslStream(new NetworkStream(this.m_socket, false), false);
if (sslOption.ClientCertificates == null)
{
sslStream.AuthenticateAsClient(sslOption.TargetHost);
@@ -174,7 +173,7 @@ public class TcpCore : SocketAsyncEventArgs, IDisposable, ISender
/// <returns></returns>
public virtual async Task AuthenticateAsync(ServiceSslOption sslOption)
{
var sslStream = (sslOption.CertificateValidationCallback != null) ? new SslStream(new NetworkStream(this.Socket, false), false, sslOption.CertificateValidationCallback) : new SslStream(new NetworkStream(this.Socket, false), false);
var sslStream = (sslOption.CertificateValidationCallback != null) ? new SslStream(new NetworkStream(this.m_socket, false), false, sslOption.CertificateValidationCallback) : new SslStream(new NetworkStream(this.m_socket, false), false);
await sslStream.AuthenticateAsServerAsync(sslOption.Certificate);
this.SslStream = sslStream;
@@ -188,7 +187,7 @@ public class TcpCore : SocketAsyncEventArgs, IDisposable, ISender
/// <returns></returns>
public virtual async Task AuthenticateAsync(ClientSslOption sslOption)
{
var sslStream = (sslOption.CertificateValidationCallback != null) ? new SslStream(new NetworkStream(this.Socket, false), false, sslOption.CertificateValidationCallback) : new SslStream(new NetworkStream(this.Socket, false), false);
var sslStream = (sslOption.CertificateValidationCallback != null) ? new SslStream(new NetworkStream(this.m_socket, false), false, sslOption.CertificateValidationCallback) : new SslStream(new NetworkStream(this.m_socket, false), false);
if (sslOption.ClientCertificates == null)
{
await sslStream.AuthenticateAsClientAsync(sslOption.TargetHost);
@@ -209,8 +208,9 @@ public class TcpCore : SocketAsyncEventArgs, IDisposable, ISender
var byteBlock = BytePool.Default.GetByteBlock(this.ReceiveBufferSize);
this.UserToken = byteBlock;
this.SetBuffer(byteBlock.Buffer, 0, byteBlock.Capacity);
if (!this.Socket.ReceiveAsync(this))
if (!this.m_socket.ReceiveAsync(this))
{
this.m_bufferRate += 2;
this.ProcessReceived(this);
}
}
@@ -278,7 +278,7 @@ public class TcpCore : SocketAsyncEventArgs, IDisposable, ISender
}
this.Reset();
this.m_online = true;
this.Socket = socket;
this.m_socket = socket;
}
/// <summary>
@@ -290,14 +290,14 @@ public class TcpCore : SocketAsyncEventArgs, IDisposable, ISender
this.m_sendCounter.Reset();
this.SslStream?.Dispose();
this.SslStream = null;
this.Socket = null;
this.m_socket = null;
this.OnReceived = null;
this.OnBreakOut = null;
this.UserToken = null;
this.m_bufferRate = 1;
this.m_lock = new SpinLock();
this.m_receiveBufferSize = BufferSize;
this.m_sendBufferSize = BufferSize;
this.m_receiveBufferSize = this.MinBufferSize;
this.m_sendBufferSize = this.MinBufferSize;
this.m_online = false;
}
@@ -324,7 +324,7 @@ public class TcpCore : SocketAsyncEventArgs, IDisposable, ISender
this.m_lock.Enter(ref lockTaken);
while (length > 0)
{
var r = this.Socket.Send(buffer, offset, length, SocketFlags.None);
var r = this.m_socket.Send(buffer, offset, length, SocketFlags.None);
if (r == 0 && length > 0)
{
throw new Exception("发送数据不完全");
@@ -367,7 +367,7 @@ public class TcpCore : SocketAsyncEventArgs, IDisposable, ISender
while (length > 0)
{
var r = await this.Socket.SendAsync(new ArraySegment<byte>(buffer, offset, length), SocketFlags.None, CancellationToken.None);
var r = await this.m_socket.SendAsync(new ArraySegment<byte>(buffer, offset, length), SocketFlags.None, CancellationToken.None);
if (r == 0 && length > 0)
{
throw new Exception("发送数据不完全");
@@ -394,7 +394,7 @@ public class TcpCore : SocketAsyncEventArgs, IDisposable, ISender
while (length > 0)
{
var r = this.Socket.Send(buffer, offset, length, SocketFlags.None);
var r = this.m_socket.Send(buffer, offset, length, SocketFlags.None);
if (r == 0 && length > 0)
{
throw new Exception("发送数据不完全");
@@ -477,12 +477,20 @@ public class TcpCore : SocketAsyncEventArgs, IDisposable, ISender
private void OnReceivePeriod(long value)
{
this.ReceiveBufferSize = TouchSocketUtility.HitBufferLength(value);
this.m_receiveBufferSize = Math.Max(TouchSocketUtility.HitBufferLength(value), this.MinBufferSize);
if (this.m_socket != null)
{
this.m_socket.ReceiveBufferSize = this.m_receiveBufferSize;
}
}
private void OnSendPeriod(long value)
{
this.SendBufferSize = TouchSocketUtility.HitBufferLength(value);
this.m_sendBufferSize = Math.Max(TouchSocketUtility.HitBufferLength(value), this.MinBufferSize);
if (this.m_socket != null)
{
this.m_socket.SendBufferSize = this.m_sendBufferSize;
}
}
private void PrivateBreakOut(bool manual, string msg)
@@ -511,11 +519,11 @@ public class TcpCore : SocketAsyncEventArgs, IDisposable, ISender
this.HandleBuffer(byteBlock);
try
{
var newByteBlock = BytePool.Default.GetByteBlock((int)Math.Min(this.ReceiveBufferSize * this.m_bufferRate, TouchSocketUtility.MaxBufferLength));
var newByteBlock = BytePool.Default.GetByteBlock((int)Math.Min(this.ReceiveBufferSize * this.m_bufferRate, this.MaxBufferSize));
e.UserToken = newByteBlock;
e.SetBuffer(newByteBlock.Buffer, 0, newByteBlock.Capacity);
if (!this.Socket.ReceiveAsync(e))
if (!this.m_socket.ReceiveAsync(e))
{
this.m_bufferRate += 2;
this.ProcessReceived(e);

View File

@@ -398,20 +398,12 @@ namespace ThingsGateway.Foundation.Sockets
public override int ReceiveBufferSize
{
get => this.GetTcpCore().ReceiveBufferSize;
set
{
this.GetTcpCore().ReceiveBufferSize = value;
}
}
/// <inheritdoc/>
public override int SendBufferSize
{
get => this.GetTcpCore().SendBufferSize;
set
{
this.GetTcpCore().SendBufferSize = value;
}
}
/// <inheritdoc/>

View File

@@ -523,20 +523,12 @@ namespace ThingsGateway.Foundation.Sockets
public override int ReceiveBufferSize
{
get => this.GetTcpCore().ReceiveBufferSize;
set
{
this.GetTcpCore().ReceiveBufferSize = value;
}
}
/// <inheritdoc/>
public override int SendBufferSize
{
get => this.GetTcpCore().SendBufferSize;
set
{
this.GetTcpCore().SendBufferSize = value;
}
}
/// <inheritdoc/>

View File

@@ -69,10 +69,14 @@ namespace ThingsGateway.Foundation.Sockets
this.Protocol = Protocol.Udp;
var socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
this.Monitor = new UdpNetworkMonitor(null, socket);
this.ReceiveBufferSize = 1024 * 64;
this.SendBufferSize = 1024 * 64;
}
/// <inheritdoc/>
public override int ReceiveBufferSize => 64 * 1024;
/// <inheritdoc/>
public override int SendBufferSize => 64 * 1024;
/// <summary>
/// <inheritdoc/>
/// </summary>

View File

@@ -33,12 +33,12 @@ namespace ThingsGateway.Foundation.Sockets
/// <summary>
/// 发送缓存区大小。最小值=1024。
/// </summary>
int SendBufferSize { get; set; }
int SendBufferSize { get; }
/// <summary>
/// 接收缓存区大小。最小值=1024。
/// </summary>
int ReceiveBufferSize { get; set; }
int ReceiveBufferSize { get; }
/// <summary>
/// 日志记录器

View File

@@ -31,6 +31,24 @@ namespace ThingsGateway.Foundation.Sockets
[PluginOption(Singleton = true, NotRegister = true)]
public sealed class ReconnectionPlugin<TClient> : PluginBase where TClient : class, ITcpClient
{
/// <summary>
/// 重连插件
/// </summary>
public ReconnectionPlugin()
{
this.ActionForConnect = async (c) =>
{
try
{
await c.ConnectAsync();
return true;
}
catch
{
return false;
}
};
}
/// <inheritdoc/>
protected override void Loaded(IPluginsManager pluginsManager)
{