双网关冗余(未完成)

This commit is contained in:
Kimdiego2098
2024-02-27 13:59:12 +08:00
parent d3b87179aa
commit 4c8e487dc9
9 changed files with 176 additions and 187 deletions

View File

@@ -6,13 +6,16 @@
"HeartbeatInterval": 3000, //心跳间隔
"MaxErrorCount": 3, //最大错误次数
"Redundancy": {
"Enable": false, //启用冗余
"Enable": true, //启用冗余
"IsPrimary": true, //是否主站
"IsHot": true
//默认主站优先,当主站恢复后,从站切换回主站,并重新开始向从站数据同步。
//备用站对外提供的数据都来自主站的数据同步
//热切换:主备站都完成对采集的初始化,并且都采集数据
//冷切换:主站完成对采集的初始化,并采集数据,只有当主站故障后,备用站才开始初始化并采集。
"SwitchMode": {
"IsHot": true, //热备
"IsStartBusinessDevice": true //是否启用备用站点的业务设备
}
//主从站的采集配置必须一致
//默认主站优先,当主站恢复后,从站切换回备用模式。
//热切换:主备站都完成对采集的初始化,并且都采集数据,但从站的数据都来自主站的数据同步
//冷切换:主站完成对采集的初始化,并采集数据,从站的数据都来自主站的数据同步,只有当主站故障后,从站才开始初始化并采集。
}
}
}

View File

@@ -1,15 +1,21 @@
{
"Management": {
"ServerUri": "127.0.0.1:7777", //IP
"ServerStandbyUri": "127.0.0.1:7777", //备用IP
"RemoteUri": "127.0.0.1:7778", //主(备)站IP
"Port": 7777, //监听端口
"VerifyToken": "ThingsGateway",
"VerifyToken": "ThingsGateway", //登录token双方一致
"HeartbeatInterval": 3000, //心跳间隔
"MaxErrorCount": 3, //最大错误次数
"Redundancy": {
"Enable": true, //启用冗余
"IsPrimary": true, //是否主站
"IsHot": true //是否热冗余
"SwitchMode": {
"IsHot": true, //热备
"IsStartBusinessDevice": true //是否启用备用站点的业务设备
}
//主从站的采集配置必须一致
//默认主站优先,当主站恢复后,从站切换回备用模式。
//热切换:主备站都完成对采集的初始化,并且都采集数据,但从站的数据都来自主站的数据同步
//冷切换:主站完成对采集的初始化,并采集数据,从站的数据都来自主站的数据同步,只有当主站故障后,从站才开始初始化并采集。
}
}
}

View File

@@ -47,7 +47,7 @@ public class Startup : AppStartup
services.AddHostedService<BusinessDeviceWorker>();
services.AddHostedService<AlarmWorker>();
services.AddConfigurableOptions<ManagementOptions>();
services.AddHostedService<ManagementWoker>();
//services.AddHostedService<ManagementWoker>();
TypeExtension.DefaultDisplayNameFuncs.Add(a => a.GetCustomAttribute<DynamicPropertyAttribute>()?.Description);
}

View File

@@ -446,6 +446,16 @@ public class AlarmWorker : BackgroundService
private EasyLock _easyLock = new(false);
private async Task CollectDeviceWorker_Starting()
{
await StartAsync();
}
private async Task CollectDeviceWorker_Stoping()
{
await StopAsync();
}
/// <inheritdoc/>
public override async Task StartAsync(CancellationToken cancellationToken)
{
@@ -463,7 +473,9 @@ public class AlarmWorker : BackgroundService
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await _easyLock?.WaitAsync();
var collectDeviceWorker = WorkerUtil.GetWoker<CollectDeviceWorker>();
collectDeviceWorker.Starting += CollectDeviceWorker_Starting;
collectDeviceWorker.Stoping += CollectDeviceWorker_Stoping;
GlobalData = _serviceScope.ServiceProvider.GetService<GlobalData>();
while (!stoppingToken.IsCancellationRequested)
{

View File

@@ -182,6 +182,22 @@ public class BusinessDeviceWorker : DeviceWorker
#region worker服务
private async Task CollectDeviceWorker_Starting()
{
await CreatThreadsAsync();
}
private async Task CollectDeviceWorker_Started()
{
await Task.Delay(1000);
await StartAsync();
}
private async Task CollectDeviceWorker_Stoping()
{
await StopAsync();
}
public override async Task StartAsync(CancellationToken cancellationToken)
{
await base.StartAsync(cancellationToken);
@@ -199,7 +215,10 @@ public class BusinessDeviceWorker : DeviceWorker
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await _easyLock?.WaitAsync();
var collectDeviceWorker = WorkerUtil.GetWoker<CollectDeviceWorker>();
collectDeviceWorker.Starting += CollectDeviceWorker_Starting;
collectDeviceWorker.Started += CollectDeviceWorker_Started;
collectDeviceWorker.Stoping += CollectDeviceWorker_Stoping;
PluginService = _serviceScope.ServiceProvider.GetService<IPluginService>();
GlobalData = _serviceScope.ServiceProvider.GetService<GlobalData>();
await WhileExecuteAsync(stoppingToken);

View File

@@ -129,6 +129,19 @@ public class CollectDeviceWorker : DeviceWorker
await RemoveAllChannelThreadAsync();
//停止其他后台服务
await ProtectedStoped();
//清空内存列表
GlobalData.CollectDevices.Clear();
}
/// <inheritdoc/>
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await _easyLock?.WaitAsync();
PluginService = _serviceScope.ServiceProvider.GetService<IPluginService>();
GlobalData = _serviceScope.ServiceProvider.GetService<GlobalData>();
//重启采集线程,会启动其他后台服务
await RestartAsync();
await WhileExecuteAsync(stoppingToken);
}
#endregion worker服务

View File

@@ -29,7 +29,7 @@ namespace ThingsGateway.Gateway.Application;
public class ManagementOptions : IConfigurableOptions
{
public string ServerUri { get; set; }
public string RemoteUri { get; set; }
public string ServerStandbyUri { get; set; }
public int Port { get; set; }
public string VerifyToken { get; set; }
@@ -42,7 +42,8 @@ public class Redundancy
{
public bool Enable { get; set; }
public bool IsPrimary { get; set; }
public bool IsHot { get; set; }
public bool IsStartBusinessDevice { get; set; }
}
/// <summary>
@@ -64,35 +65,11 @@ public class ManagementWoker : BackgroundService
internal readonly EasyLock workerLock = new();
private async Task CollectDeviceWorker_Starting()
{
if (isStart)
{
await WorkerUtil.GetWoker<AlarmWorker>().StartAsync();
await WorkerUtil.GetWoker<BusinessDeviceWorker>().CreatThreadsAsync();
}
}
private async Task CollectDeviceWorker_Stoping()
{
await WorkerUtil.GetWoker<AlarmWorker>().StopAsync();
await WorkerUtil.GetWoker<BusinessDeviceWorker>().StopAsync();
}
private async Task CollectDeviceWorker_Started()
{
if (isStart)
{
await Task.Delay(1000);
await WorkerUtil.GetWoker<BusinessDeviceWorker>().StartAsync();
}
}
#region worker服务
private EasyLock _easyLock = new();
internal volatile bool isStart = false;
internal volatile bool IsStart = false;
/// <inheritdoc/>
public override async Task StartAsync(CancellationToken cancellationToken)
@@ -110,73 +87,76 @@ public class ManagementWoker : BackgroundService
return base.StopAsync(cancellationToken);
}
internal ManagementOptions options;
internal ManagementOptions Options;
internal GlobalData GlobalData;
/// <summary>
/// 启动锁
/// </summary>
internal EasyLock StartLock = new(true);
/// <inheritdoc/>
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await _easyLock?.WaitAsync();
var globalData = _serviceScope.ServiceProvider.GetService<GlobalData>();
options = App.GetOptions<ManagementOptions>();
var collectDeviceWorker = WorkerUtil.GetWoker<CollectDeviceWorker>();
collectDeviceWorker.Starting += CollectDeviceWorker_Starting;
collectDeviceWorker.Started += CollectDeviceWorker_Started;
collectDeviceWorker.Stoping += CollectDeviceWorker_Stoping;
if (options.Redundancy.Enable)
GlobalData = _serviceScope.ServiceProvider.GetService<GlobalData>();
Options = App.GetOptions<ManagementOptions>();
if (Options.Redundancy.Enable)
{
if (options.Redundancy.IsHot)
{
//热备冗余,直接启动采集服务
await collectDeviceWorker.RestartAsync();
}
var udpDmtp = GetUdpDmtp(options);
var udpDmtp = GetUdpDmtp(Options);
await udpDmtp.StartAsync();//启动
var firstStart = true;
if (Options.Redundancy.IsPrimary)
{
//初始化时,主站直接启动
IsStart = true;
StartLock.Release();
}
while (!stoppingToken.IsCancellationRequested)
{
try
{
GatewayState? gatewayState = null;
try
await StartLock.WaitAsync();
var online = await udpDmtp.PingAsync(3000);
if (online)
{
await workerLock.WaitAsync();
var readErrorCount = 0;
while (readErrorCount < Options.MaxErrorCount)
{
var readErrorCount = 0;
while (readErrorCount < options.MaxErrorCount)
try
{
try
{
gatewayState = await udpDmtp.GetDmtpRpcActor().InvokeTAsync<GatewayState>("GetGatewayStateAsync", InvokeOption.WaitInvoke, isStart);
break;
}
catch
{
readErrorCount++;
}
gatewayState = await udpDmtp.GetDmtpRpcActor().InvokeTAsync<GatewayState>("GetGatewayStateAsync", InvokeOption.WaitInvoke, IsStart);
break;
}
catch
{
readErrorCount++;
}
}
}
finally
{
workerLock.Release();
}
if (gatewayState != null)
{
if (gatewayState.IsPrimary == options.Redundancy.IsPrimary)
if (gatewayState.IsPrimary == Options.Redundancy.IsPrimary)
{
_logger.LogInformation("主备站设置重复,退出冗余服务!");
await StartAsync();
break;
if (!IsStart)
{
_logger.LogInformation("主备站设置重复!");
IsStart = true;
}
await Task.Delay(1000);
continue;
}
}
if (gatewayState == null)
{
//无法获取状态,启动本机
if (!isStart)
if (!IsStart)
{
_logger.LogInformation("无法连接冗余站点,本机将切换到正常状态");
await StartAsync();
IsStart = true;
}
}
else if (gatewayState.IsPrimary)
@@ -184,10 +164,10 @@ public class ManagementWoker : BackgroundService
//主站已经启动
if (gatewayState.IsStart)
{
if (isStart || firstStart)
if (IsStart)
{
_logger.LogInformation("主站已恢复,本机将切换到备用状态");
await StopAsync();
_logger.LogInformation("主站已恢复,本机(从站)将切换到备用状态");
IsStart = false;
}
}
else
@@ -204,19 +184,27 @@ public class ManagementWoker : BackgroundService
}
else
{
if (!isStart)
if (!IsStart)
{
_logger.LogInformation("本机(主站)将切换到正常状态");
await StartAsync();
IsStart = true;
}
}
}
//TODO:发布到从站数据
//if (options.Redundancy.IsPrimary)
//{
// await udpDmtp.GetDmtpRpcActor().InvokeTAsync<GatewayState>("UpdateGatewayDataAsync", InvokeOption.WaitInvoke, globalData.CollectDevices, globalData.BusinessDevices);
//}
if (Options.Redundancy.IsPrimary)
{
try
{
if (online)
await udpDmtp.GetDmtpRpcActor().InvokeTAsync<GatewayState>("UpdateGatewayDataAsync", InvokeOption.WaitInvoke, GlobalData.CollectDevices);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "同步数据到从站时,发生错误");
}
}
await Task.Delay(1000, stoppingToken);
}
catch (TaskCanceledException)
@@ -231,16 +219,19 @@ public class ManagementWoker : BackgroundService
}
finally
{
firstStart = false;
StartLock.Release();
}
}
}
else
{
isStart = true;
//直接启动
IsStart = true;
//无冗余,直接启动采集服务
await collectDeviceWorker.RestartAsync();
_logger.LogInformation("不启用网关冗余站点");
StartLock.Release();
}
while (!stoppingToken.IsCancellationRequested)
{
try
@@ -260,53 +251,6 @@ public class ManagementWoker : BackgroundService
}
}
private async Task StartAsync()
{
try
{
await workerLock.WaitAsync();
isStart = true;
if (options.Redundancy.IsHot)
{
await CollectDeviceWorker_Starting();
await CollectDeviceWorker_Started();
}
else
{
var collectDeviceWorker = WorkerUtil.GetWoker<CollectDeviceWorker>();
await collectDeviceWorker.RestartAsync();
}
}
finally
{
workerLock.Release();
}
}
private async Task StopAsync()
{
try
{
await workerLock.WaitAsync();
isStart = false;
if (options.Redundancy.IsHot)
{
await CollectDeviceWorker_Stoping();
}
else
{
var collectDeviceWorker = WorkerUtil.GetWoker<CollectDeviceWorker>();
await collectDeviceWorker.StopAsync();
}
}
finally
{
workerLock.Release();
}
}
#endregion worker服务
#region
@@ -320,7 +264,7 @@ public class ManagementWoker : BackgroundService
{
var udpDmtp = new UdpDmtp();
var config = new TouchSocketConfig()
.SetRemoteIPHost(options.ServerUri)
.SetRemoteIPHost(options.RemoteUri)
.SetBindIPHost(options.Port)
.SetDmtpOption(
new DmtpOption() { VerifyToken = options.VerifyToken })
@@ -337,7 +281,7 @@ public class ManagementWoker : BackgroundService
.ConfigurePlugins(a =>
{
a.UseDmtpFileTransfer();//必须添加文件传输插件
//a.Add<FilePlugin>();
//a.Add<FilePlugin>();
a.UseDmtpHeartbeat()//使用Dmtp心跳
.SetTick(TimeSpan.FromMilliseconds(options.HeartbeatInterval))
.SetMaxFailCount(options.MaxErrorCount);

View File

@@ -37,20 +37,8 @@ internal class ReverseCallbackServer : RpcServer
//冗余双方站点可能存在同时执行冗余切换的情况
{
GatewayState result = new();
result.IsStart = managementWoker.isStart;
//避免出现偶发同时启动
if (isStart && result.IsStart)
{
//请求方停止
result.RequestStop = true;
}
if (!isStart && !result.IsStart)
{
//请求方启动
result.RequestStop = false;
}
result.IsStart = managementWoker.IsStart;
result.IsPrimary = managementWoker.Options.Redundancy.IsPrimary;
return result;
}
}

View File

@@ -19,19 +19,25 @@ namespace ThingsGateway.Foundation;
/// </summary>
public abstract class VariableObject
{
private IProtocol protocol;
private int maxPack;
private List<VariableSourceClass>? deviceVariableSourceReads;
private Dictionary<string, VariableRuntimeProperty>? dict;
/// <summary>
/// VariableObject
/// </summary>
public VariableObject(IProtocol protocol, int maxPack)
{
this.protocol = protocol;
this.Protocol = protocol;
this.maxPack = maxPack;
}
private List<VariableSourceClass>? deviceVariableSourceReads;
/// <summary>
/// 协议对象
/// </summary>
public IProtocol Protocol { get; set; }
/// <summary>
/// <see cref="VariableRuntimeAttribute"/>特性连读,反射赋值到继承类中的属性
@@ -44,10 +50,10 @@ public abstract class VariableObject
//连读
foreach (var item in deviceVariableSourceReads)
{
var result = await protocol.ReadAsync(item.RegisterAddress, item.Length);
var result = await Protocol.ReadAsync(item.RegisterAddress, item.Length);
if (result.IsSuccess)
{
item.VariableRunTimes.PraseStructContent(protocol, result.Content, item, exWhenAny: true);
item.VariableRunTimes.PraseStructContent(Protocol, result.Content, item, exWhenAny: true);
}
else
{
@@ -71,30 +77,6 @@ public abstract class VariableObject
}
}
private Dictionary<string, VariableRuntimeProperty>? dict;
private void GetVariableSources()
{
if (deviceVariableSourceReads == null)
{
dict = VariableObjectHelper.GetPairs(GetType());
List<VariableClass> variableClasss = new();
foreach (var pair in dict)
{
var dataType = pair.Value.Attribute.DataType == DataTypeEnum.Object ? Type.GetTypeCode(pair.Value.Property.PropertyType.IsArray ? pair.Value.Property.PropertyType.GetElementType() : pair.Value.Property.PropertyType).GetDataType() : pair.Value.Attribute.DataType;
VariableClass variableClass = new VariableClass()
{
DataType = dataType,
RegisterAddress = pair.Value.Attribute.RegisterAddress,
IntervalTime = 1000,
};
pair.Value.VariableClass = variableClass;
variableClasss.Add(variableClass);
}
deviceVariableSourceReads = protocol.LoadSourceRead<VariableSourceClass>(variableClasss, maxPack, 1000);
}
}
/// <summary>
/// <see cref="VariableRuntimeAttribute"/>特性连读,反射赋值到继承类中的属性
/// </summary>
@@ -106,10 +88,10 @@ public abstract class VariableObject
//连读
foreach (var item in deviceVariableSourceReads)
{
var result = protocol.Read(item.RegisterAddress, item.Length);
var result = Protocol.Read(item.RegisterAddress, item.Length);
if (result.IsSuccess)
{
item.VariableRunTimes.PraseStructContent(protocol, result.Content, item, exWhenAny: true);
item.VariableRunTimes.PraseStructContent(Protocol, result.Content, item, exWhenAny: true);
}
else
{
@@ -153,7 +135,7 @@ public abstract class VariableObject
{
return new($"该属性未被识别,可能没有使用{typeof(VariableRuntimeAttribute)}特性标识");
}
var result = protocol.Write(variableRuntimeProperty.VariableClass.RegisterAddress, JToken.FromObject(value), variableRuntimeProperty.VariableClass.DataType, cancellationToken);
var result = Protocol.Write(variableRuntimeProperty.VariableClass.RegisterAddress, JToken.FromObject(value), variableRuntimeProperty.VariableClass.DataType, cancellationToken);
return result;
}
catch (Exception ex)
@@ -182,7 +164,7 @@ public abstract class VariableObject
{
return new($"该属性未被识别,可能没有使用{typeof(VariableRuntimeAttribute)}特性标识");
}
var result = await protocol.WriteAsync(variableRuntimeProperty.VariableClass.RegisterAddress, JToken.FromObject(value), variableRuntimeProperty.VariableClass.DataType, cancellationToken);
var result = await Protocol.WriteAsync(variableRuntimeProperty.VariableClass.RegisterAddress, JToken.FromObject(value), variableRuntimeProperty.VariableClass.DataType, cancellationToken);
return result;
}
catch (Exception ex)
@@ -190,4 +172,26 @@ public abstract class VariableObject
return new(ex);
}
}
private void GetVariableSources()
{
if (deviceVariableSourceReads == null)
{
dict = VariableObjectHelper.GetPairs(GetType());
List<VariableClass> variableClasss = new();
foreach (var pair in dict)
{
var dataType = pair.Value.Attribute.DataType == DataTypeEnum.Object ? Type.GetTypeCode(pair.Value.Property.PropertyType.IsArray ? pair.Value.Property.PropertyType.GetElementType() : pair.Value.Property.PropertyType).GetDataType() : pair.Value.Attribute.DataType;
VariableClass variableClass = new VariableClass()
{
DataType = dataType,
RegisterAddress = pair.Value.Attribute.RegisterAddress,
IntervalTime = 1000,
};
pair.Value.VariableClass = variableClass;
variableClasss.Add(variableClass);
}
deviceVariableSourceReads = Protocol.LoadSourceRead<VariableSourceClass>(variableClasss, maxPack, 1000);
}
}
}