mirror of
https://gitee.com/ThingsGateway/ThingsGateway.git
synced 2025-10-20 10:50:48 +08:00
refactor: 冗余服务代码修正
This commit is contained in:
@@ -68,7 +68,7 @@ public static class LinqExtensions
|
||||
}
|
||||
return thisValue;
|
||||
}
|
||||
#if NET6_0_OR_GREATER
|
||||
|
||||
/// <inheritdoc/>
|
||||
public static void AddRange<TKey, TItem>(this Dictionary<TKey, TItem> @this, IEnumerable<KeyValuePair<TKey, TItem>> values)
|
||||
{
|
||||
@@ -77,7 +77,6 @@ public static class LinqExtensions
|
||||
@this.TryAdd(value.Key, value.Value);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
/// <inheritdoc/>
|
||||
public static void AddRange<T>(this ICollection<T> @this, IEnumerable<T> values)
|
||||
{
|
||||
|
@@ -56,7 +56,17 @@ internal sealed class RedundancyTask : IRpcDriver, IAsyncDisposable
|
||||
|
||||
private ScheduledAsyncTask scheduledTask;
|
||||
|
||||
private int GetBatchSize()
|
||||
{
|
||||
// 默认批量数量
|
||||
const int defaultSize = 10000;
|
||||
const int highMemorySize = 50000;
|
||||
const long memoryThreshold = 2L * 1024 * 1024; // 2GB,单位KB
|
||||
|
||||
return GlobalData.HardwareJob.HardwareInfo.MachineInfo.AvailableMemory > memoryThreshold
|
||||
? highMemorySize
|
||||
: defaultSize;
|
||||
}
|
||||
/// <summary>
|
||||
/// 主站
|
||||
/// </summary>
|
||||
@@ -80,11 +90,8 @@ internal sealed class RedundancyTask : IRpcDriver, IAsyncDisposable
|
||||
if (online)
|
||||
{
|
||||
|
||||
int batchSize = 10000;
|
||||
if (GlobalData.HardwareJob.HardwareInfo.MachineInfo.AvailableMemory > 2 * 1024 * 1024)
|
||||
{
|
||||
batchSize = 200000;
|
||||
}
|
||||
int batchSize = GetBatchSize();
|
||||
|
||||
var deviceRunTimes = GlobalData.ReadOnlyIdDevices.Where(a => a.Value.IsCollect == true).Select(a => a.Value).Batch(batchSize);
|
||||
|
||||
|
||||
@@ -347,7 +354,7 @@ internal sealed class RedundancyTask : IRpcDriver, IAsyncDisposable
|
||||
var tcpDmtpClient = new TcpDmtpClient();
|
||||
var config = new TouchSocketConfig()
|
||||
.SetRemoteIPHost(redundancy.MasterUri)
|
||||
.SetAdapterOption(new AdapterOption() { MaxPackageSize = 1024 * 1024 * 1024 })
|
||||
.SetAdapterOption(new AdapterOption() { MaxPackageSize = 0x20000000 })
|
||||
.SetDmtpOption(new DmtpOption() { VerifyToken = redundancy.VerifyToken })
|
||||
.ConfigureContainer(a =>
|
||||
{
|
||||
@@ -363,6 +370,30 @@ internal sealed class RedundancyTask : IRpcDriver, IAsyncDisposable
|
||||
a.UseDmtpHeartbeat()//使用Dmtp心跳
|
||||
.SetTick(TimeSpan.FromMilliseconds(redundancy.HeartbeatInterval))
|
||||
.SetMaxFailCount(redundancy.MaxErrorCount);
|
||||
|
||||
//使用重连
|
||||
a.UseDmtpReconnection<TcpDmtpClient>()
|
||||
.UsePolling(TimeSpan.FromSeconds(3))//使用轮询,每3秒检测一次
|
||||
.SetActionForCheck(async (c, i) =>//重新定义检活策略
|
||||
{
|
||||
|
||||
//方法2,直接ping,如果true,则客户端必在线。如果false,则客户端不一定不在线,原因是可能当前传输正在忙
|
||||
if (await c.PingAsync().ConfigureAwait(false))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
//返回false时可以判断,如果最近活动时间不超过3秒,则猜测客户端确实在忙,所以跳过本次重连
|
||||
else if (DateTime.Now - c.GetLastActiveTime() < TimeSpan.FromSeconds(3))
|
||||
{
|
||||
return null;
|
||||
}
|
||||
//否则,直接重连。
|
||||
else
|
||||
{
|
||||
return false;
|
||||
}
|
||||
});
|
||||
|
||||
});
|
||||
|
||||
await tcpDmtpClient.SetupAsync(config).ConfigureAwait(false);
|
||||
@@ -378,7 +409,7 @@ internal sealed class RedundancyTask : IRpcDriver, IAsyncDisposable
|
||||
var tcpDmtpService = new TcpDmtpService();
|
||||
var config = new TouchSocketConfig()
|
||||
.SetListenIPHosts(redundancy.MasterUri)
|
||||
.SetAdapterOption(new AdapterOption() { MaxPackageSize = 1024 * 1024 * 1024 })
|
||||
.SetAdapterOption(new AdapterOption() { MaxPackageSize = 0x20000000 })
|
||||
.SetDmtpOption(new DmtpOption() { VerifyToken = redundancy.VerifyToken })
|
||||
.ConfigureContainer(a =>
|
||||
{
|
||||
@@ -471,9 +502,45 @@ internal sealed class RedundancyTask : IRpcDriver, IAsyncDisposable
|
||||
|
||||
private async Task InvokeSyncDataAsync(IDmtpActorObject client, DmtpInvokeOption invokeOption, CancellationToken cancellationToken)
|
||||
{
|
||||
int maxBatchSize = GetBatchSize() / 10;
|
||||
|
||||
var groups = GlobalData.IdVariables.Select(a => a.Value).GroupBy(a => a.DeviceRuntime);
|
||||
|
||||
var channelBatch = new HashSet<Channel>();
|
||||
var deviceBatch = new HashSet<Device>();
|
||||
var variableBatch = new List<Variable>();
|
||||
foreach (var group in groups)
|
||||
{
|
||||
var channel = group.Key.ChannelRuntime.AdaptChannel();
|
||||
var device = group.Key.AdaptDevice();
|
||||
channelBatch.Add(channel);
|
||||
deviceBatch.Add(device);
|
||||
|
||||
foreach (var variable in group)
|
||||
{
|
||||
channelBatch.Add(channel);
|
||||
deviceBatch.Add(device);
|
||||
variableBatch.Add(variable.AdaptVariable());
|
||||
|
||||
if (variableBatch.Count >= maxBatchSize)
|
||||
{
|
||||
// 发送一批
|
||||
await client.GetDmtpRpcActor().InvokeAsync(nameof(ReverseCallbackServer.SyncData), null, invokeOption, channelBatch.ToList(), deviceBatch.ToList(), variableBatch).ConfigureAwait(false);
|
||||
|
||||
variableBatch.Clear();
|
||||
channelBatch.Remove(channel);
|
||||
deviceBatch.Remove(device);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// 发送最后剩余的一批
|
||||
if (variableBatch.Count > 0)
|
||||
{
|
||||
await client.GetDmtpRpcActor().InvokeAsync(nameof(ReverseCallbackServer.SyncData), null, invokeOption, channelBatch.ToList(), deviceBatch.ToList(), variableBatch).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
await client.GetDmtpRpcActor().InvokeAsync(nameof(ReverseCallbackServer.SyncData), null, invokeOption, GlobalData.Channels.Select(a => a.Value).ToList(), GlobalData.Devices.Select(a => a.Value).ToList(), GlobalData.IdVariables.Select(a => a.Value).ToList())
|
||||
.ConfigureAwait(false);
|
||||
LogMessage?.LogTrace($"ForcedSync data success");
|
||||
|
||||
}
|
||||
|
@@ -304,6 +304,7 @@ internal sealed class UpdateZipFileHostedService : BackgroundService, IUpdateZip
|
||||
var upgradeServerOptions = App.GetOptions<UpgradeServerOptions>();
|
||||
var config = new TouchSocketConfig()
|
||||
.SetRemoteIPHost(new IPHost($"{upgradeServerOptions.UpgradeServerIP}:{upgradeServerOptions.UpgradeServerPort}"))
|
||||
.SetAdapterOption(new AdapterOption() { MaxPackageSize = 0x20000000 })
|
||||
.SetDmtpOption(new DmtpOption()
|
||||
{
|
||||
VerifyToken = upgradeServerOptions.VerifyToken
|
||||
@@ -326,7 +327,7 @@ internal sealed class UpdateZipFileHostedService : BackgroundService, IUpdateZip
|
||||
|
||||
//使用重连
|
||||
a.UseDmtpReconnection<TcpDmtpClient>()
|
||||
.UsePolling(TimeSpan.FromSeconds(5))//使用轮询,每3秒检测一次
|
||||
.UsePolling(TimeSpan.FromSeconds(3))//使用轮询,每3秒检测一次
|
||||
.SetActionForCheck(async (c, i) =>//重新定义检活策略
|
||||
{
|
||||
if (!upgradeServerOptions.Enable) return true;
|
||||
|
@@ -91,7 +91,7 @@
|
||||
<Copy SourceFiles="@(PkgThingsGateway_Plugin_TS550PackageFiles)" DestinationFolder="$(PluginFolder)ThingsGateway.Plugin.TS550%(RecursiveDir)" />
|
||||
<Copy SourceFiles="@(PkgThingsGateway_Plugin_VigorPackageFiles)" DestinationFolder="$(PluginFolder)ThingsGateway.Plugin.Vigor%(RecursiveDir)" />
|
||||
|
||||
<Copy SourceFiles="@(PkgThingsGateway_Plugin_SynchronizationPackageFiles)" DestinationFolder="$(PluginFolder)%(RecursiveDir)" />
|
||||
<Copy SourceFiles="@(PkgThingsGateway_Plugin_VigorPackageFiles)" DestinationFolder="$(PluginFolder)ThingsGateway.Plugin.Synchronization%(RecursiveDir)" />
|
||||
</Target>
|
||||
|
||||
</Project>
|
||||
|
@@ -33,6 +33,7 @@ public class FileHostService : BackgroundService, IFileHostService
|
||||
var upgradeServerOptions = App.GetOptions<UpgradeServerOptions>();
|
||||
var service = new TcpDmtpService();
|
||||
var config = new TouchSocketConfig()//配置
|
||||
.SetAdapterOption(new AdapterOption() { MaxPackageSize = 0x20000000 })
|
||||
.SetListenIPHosts(new IPHost[] { new IPHost(upgradeServerOptions.UpgradeServerPort) })
|
||||
.ConfigureContainer(a =>
|
||||
{
|
||||
|
Reference in New Issue
Block a user