双网关冗余(未完成)

This commit is contained in:
Kimdiego2098
2024-02-27 17:50:23 +08:00
parent 9043fa7f56
commit 5ddaa6b872
6 changed files with 192 additions and 290 deletions

View File

@@ -249,6 +249,7 @@ public class ChannelThread
{
try
{
//TODO:根据管理服务中的isStart判定是否启动线程
await EasyLock.WaitAsync();
if (DriverTask != null)
{

View File

@@ -8,8 +8,6 @@
// QQ群605534569
//------------------------------------------------------------------------------
using Furion.Logging.Extensions;
using Mapster;
using Microsoft.Extensions.Hosting;
@@ -27,115 +25,6 @@ public class BusinessDeviceWorker : DeviceWorker
_logger = _serviceScope.ServiceProvider.GetService<ILoggerFactory>().CreateLogger("业务设备服务");
}
#region public
private EasyLock publicRestartLock = new();
public async Task RestartAsync()
{
try
{
await publicRestartLock.WaitAsync();
await StopAsync();
await StartAsync();
}
finally
{
publicRestartLock.Release();
}
}
/// <summary>
/// 启动全部设备,如果没有找到设备会创建
/// </summary>
public async Task StartAsync()
{
try
{
await restartLock.WaitAsync();
await singleRestartLock.WaitAsync();
if (ChannelThreads.Count == 0)
{
await CreatAllChannelThreadsAsync();
await ProtectedStarting();
}
await StartAllChannelThreadsAsync();
await ProtectedStarted();
}
catch (Exception ex)
{
_logger.LogError(ex, "启动发生错误");
}
finally
{
singleRestartLock.Release();
restartLock.Release();
}
}
/// <summary>
/// 初始化,如果没有找到设备会创建
/// </summary>
public async Task CreatThreadsAsync()
{
try
{
await restartLock.WaitAsync();
await singleRestartLock.WaitAsync();
if (ChannelThreads.Count == 0)
{
await CreatAllChannelThreadsAsync();
await ProtectedStarting();
}
}
catch (Exception ex)
{
_logger.LogError(ex, "启动发生错误");
}
finally
{
singleRestartLock.Release();
restartLock.Release();
}
}
/// <summary>
/// 停止
/// </summary>
public async Task StopAsync()
{
try
{
await restartLock.WaitAsync();
await singleRestartLock.WaitAsync();
await StopThreadAsync();
}
catch (Exception ex)
{
_logger.LogError(ex, "停止错误");
}
finally
{
singleRestartLock.Release();
restartLock.Release();
}
}
private async Task StopThreadAsync()
{
//取消全部采集线程
await BeforeRemoveAllChannelThreadAsync();
//取消其他后台服务
await ProtectedStoping();
//停止全部采集线程
await RemoveAllChannelThreadAsync();
//停止其他后台服务
await ProtectedStoped();
//清空内存列表
GlobalData.CollectDevices.Clear();
}
private async Task CollectDeviceWorker_Starting()
{
await CreatThreadsAsync();
@@ -152,39 +41,6 @@ public class BusinessDeviceWorker : DeviceWorker
await StopAsync();
}
/// <summary>
/// 读取数据库,创建全部设备
/// </summary>
/// <returns></returns>
protected async Task CreatAllChannelThreadsAsync()
{
if (!_stoppingToken.IsCancellationRequested)
{
_logger.LogInformation("正在获取业务设备组态信息");
var deviceRunTimes = await _serviceScope.ServiceProvider.GetService<IDeviceService>().GetBusinessDeviceRuntimeAsync();
_logger.LogInformation("获取业务设备组态信息完成");
var idSet = deviceRunTimes.ToDictionary(a => a.Id);
var result = deviceRunTimes.Where(a => !idSet.ContainsKey(a.RedundantDeviceId) && !a.IsRedundant).ToList();
result.ForEach(collectDeviceRunTime =>
{
if (!_stoppingToken.IsCancellationRequested)
{
try
{
DriverBase driverBase = collectDeviceRunTime.CreatDriver(PluginService);
GetChannelThread(driverBase);
}
catch (Exception ex)
{
_logger.LogError(ex, $"{collectDeviceRunTime.Name}初始化错误!");
}
}
});
}
}
#endregion public
#region
/// <summary>
@@ -238,10 +94,45 @@ public class BusinessDeviceWorker : DeviceWorker
await WhileExecuteAsync(stoppingToken);
}
#endregion worker服务
#region
protected override async Task<IEnumerable<DeviceRunTime>> GetDeviceRunTimeAsync(long deviceId)
{
return await _serviceScope.ServiceProvider.GetService<IDeviceService>().GetBusinessDeviceRuntimeAsync(deviceId);
}
#endregion worker服务
/// <summary>
/// 读取数据库,创建全部设备
/// </summary>
/// <returns></returns>
protected override async Task CreatAllChannelThreadsAsync()
{
if (!_stoppingToken.IsCancellationRequested)
{
_logger.LogInformation("正在获取业务设备组态信息");
var deviceRunTimes = await _serviceScope.ServiceProvider.GetService<IDeviceService>().GetBusinessDeviceRuntimeAsync();
_logger.LogInformation("获取业务设备组态信息完成");
var idSet = deviceRunTimes.ToDictionary(a => a.Id);
var result = deviceRunTimes.Where(a => !idSet.ContainsKey(a.RedundantDeviceId) && !a.IsRedundant).ToList();
result.ForEach(collectDeviceRunTime =>
{
if (!_stoppingToken.IsCancellationRequested)
{
try
{
DriverBase driverBase = collectDeviceRunTime.CreatDriver(PluginService);
GetChannelThread(driverBase);
}
catch (Exception ex)
{
_logger.LogError(ex, $"{collectDeviceRunTime.Name}初始化错误!");
}
}
});
}
}
#endregion
}

View File

@@ -25,120 +25,43 @@ public class CollectDeviceWorker : DeviceWorker
_logger = _serviceScope.ServiceProvider.GetService<ILoggerFactory>().CreateLogger("采集设备服务");
}
#region public
#region worker服务
private EasyLock publicRestartLock = new();
public async Task RestartAsync()
/// <inheritdoc/>
public override async Task StopAsync(CancellationToken cancellationToken)
{
try
{
await publicRestartLock.WaitAsync();
await StopAsync();
await StartAsync();
}
finally
{
publicRestartLock.Release();
}
using var stoppingToken = new CancellationTokenSource();
_stoppingToken = stoppingToken.Token;
stoppingToken.Cancel();
await StopThreadAsync();
await base.StopAsync(cancellationToken);
}
/// <summary>
/// 启动全部设备,如果没有找到设备会创建
/// </summary>
public async Task StartAsync()
#endregion worker服务
#region
/// <inheritdoc/>
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
try
{
await restartLock.WaitAsync();
await singleRestartLock.WaitAsync();
if (ChannelThreads.Count == 0)
{
await CreatAllChannelThreadsAsync();
await ProtectedStarting();
}
await StartAllChannelThreadsAsync();
await ProtectedStarted();
}
catch (Exception ex)
{
_logger.LogError(ex, "启动发生错误");
}
finally
{
singleRestartLock.Release();
restartLock.Release();
}
await _easyLock?.WaitAsync();
PluginService = _serviceScope.ServiceProvider.GetService<IPluginService>();
GlobalData = _serviceScope.ServiceProvider.GetService<GlobalData>();
//重启采集线程,会启动其他后台服务
await RestartAsync();
await WhileExecuteAsync(stoppingToken);
}
/// <summary>
/// 初始化,如果没有找到设备会创建
/// </summary>
public async Task CreatThreadsAsync()
protected override async Task<IEnumerable<DeviceRunTime>> GetDeviceRunTimeAsync(long deviceId)
{
try
{
await restartLock.WaitAsync();
await singleRestartLock.WaitAsync();
if (ChannelThreads.Count == 0)
{
await CreatAllChannelThreadsAsync();
await ProtectedStarting();
}
}
catch (Exception ex)
{
_logger.LogError(ex, "启动发生错误");
}
finally
{
singleRestartLock.Release();
restartLock.Release();
}
}
/// <summary>
/// 停止
/// </summary>
public async Task StopAsync()
{
try
{
await restartLock.WaitAsync();
await singleRestartLock.WaitAsync();
await StopThreadAsync();
}
catch (Exception ex)
{
_logger.LogError(ex, "停止错误");
}
finally
{
singleRestartLock.Release();
restartLock.Release();
}
}
internal async Task StopThreadAsync()
{
//取消全部采集线程
await BeforeRemoveAllChannelThreadAsync();
//取消其他后台服务
await ProtectedStoping();
//停止全部采集线程
await RemoveAllChannelThreadAsync();
//停止其他后台服务
await ProtectedStoped();
//清空内存列表
GlobalData.CollectDevices.Clear();
return await _serviceScope.ServiceProvider.GetService<DeviceService>().GetCollectDeviceRuntimeAsync(deviceId);
}
/// <summary>
/// 读取数据库,创建全部设备
/// </summary>
/// <returns></returns>
protected virtual async Task CreatAllChannelThreadsAsync()
protected override async Task CreatAllChannelThreadsAsync()
{
if (!_stoppingToken.IsCancellationRequested)
{
@@ -165,35 +88,5 @@ public class CollectDeviceWorker : DeviceWorker
}
}
#endregion public
#region worker服务
/// <inheritdoc/>
public override async Task StopAsync(CancellationToken cancellationToken)
{
using var stoppingToken = new CancellationTokenSource();
_stoppingToken = stoppingToken.Token;
stoppingToken.Cancel();
await StopThreadAsync();
await base.StopAsync(cancellationToken);
}
/// <inheritdoc/>
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await _easyLock?.WaitAsync();
PluginService = _serviceScope.ServiceProvider.GetService<IPluginService>();
GlobalData = _serviceScope.ServiceProvider.GetService<GlobalData>();
//重启采集线程,会启动其他后台服务
await RestartAsync();
await WhileExecuteAsync(stoppingToken);
}
protected override async Task<IEnumerable<DeviceRunTime>> GetDeviceRunTimeAsync(long deviceId)
{
return await _serviceScope.ServiceProvider.GetService<DeviceService>().GetCollectDeviceRuntimeAsync(deviceId);
}
#endregion worker服务
#endregion
}

View File

@@ -61,8 +61,6 @@ public abstract class DeviceWorker : BackgroundService
/// </summary>
protected ConcurrentList<ChannelThread> ChannelThreads { get; set; } = new();
#region public
/// <summary>
/// 控制设备线程启停
/// </summary>
@@ -77,8 +75,6 @@ public abstract class DeviceWorker : BackgroundService
DriverBases.FirstOrDefault(it => it.DeviceId == deviceId)?.PasueThread(isStart);
}
#endregion public
#region Private
/// <summary>
@@ -361,10 +357,6 @@ public abstract class DeviceWorker : BackgroundService
return driverPlugin?.DriverUIType;
}
#endregion
#region
/// <summary>
/// 获取设备方法
/// </summary>
@@ -544,6 +536,127 @@ public abstract class DeviceWorker : BackgroundService
}
#endregion worker服务
#region
private EasyLock publicRestartLock = new();
public async Task RestartAsync()
{
try
{
await publicRestartLock.WaitAsync();
await StopAsync();
await StartAsync();
}
finally
{
publicRestartLock.Release();
}
}
/// <summary>
/// 启动全部设备,如果没有找到设备会创建
/// </summary>
public async Task StartAsync()
{
try
{
await restartLock.WaitAsync();
await singleRestartLock.WaitAsync();
if (ChannelThreads.Count == 0)
{
await CreatAllChannelThreadsAsync();
await ProtectedStarting();
}
await StartAllChannelThreadsAsync();
await ProtectedStarted();
}
catch (Exception ex)
{
_logger.LogError(ex, "启动发生错误");
}
finally
{
singleRestartLock.Release();
restartLock.Release();
}
}
/// <summary>
/// 初始化,如果没有找到设备会创建
/// </summary>
public async Task CreatThreadsAsync()
{
try
{
await restartLock.WaitAsync();
await singleRestartLock.WaitAsync();
if (ChannelThreads.Count == 0)
{
await CreatAllChannelThreadsAsync();
await ProtectedStarting();
}
}
catch (Exception ex)
{
_logger.LogError(ex, "启动发生错误");
}
finally
{
singleRestartLock.Release();
restartLock.Release();
}
}
/// <summary>
/// 停止
/// </summary>
public async Task StopAsync()
{
try
{
await restartLock.WaitAsync();
await singleRestartLock.WaitAsync();
await StopThreadAsync();
}
catch (Exception ex)
{
_logger.LogError(ex, "停止错误");
}
finally
{
singleRestartLock.Release();
restartLock.Release();
}
}
protected async Task StopThreadAsync()
{
//取消全部采集线程
await BeforeRemoveAllChannelThreadAsync();
//取消其他后台服务
await ProtectedStoping();
//停止全部采集线程
await RemoveAllChannelThreadAsync();
//停止其他后台服务
await ProtectedStoped();
//清空内存列表
GlobalData.CollectDevices.Clear();
}
#endregion
#region
/// <summary>
/// 读取数据库,创建全部设备
/// </summary>
/// <returns></returns>
protected abstract Task CreatAllChannelThreadsAsync();
#endregion
}
public delegate Task RestartEventHandler();

View File

@@ -46,9 +46,6 @@ public class Redundancy
public bool IsStartBusinessDevice { get; set; }
}
/// <summary>
/// TODO:网关管理服务
/// </summary>
public class ManagementWoker : BackgroundService
{
protected IServiceScope _serviceScope;
@@ -152,7 +149,7 @@ public class ManagementWoker : BackgroundService
{
try
{
gatewayState = await udpDmtp.GetDmtpRpcActor().InvokeTAsync<GatewayState>("GetGatewayStateAsync", waitInvoke, IsStart);
gatewayState = await udpDmtp.GetDmtpRpcActor().InvokeTAsync<GatewayState>(nameof(ReverseCallbackServer.GetGatewayStateAsync), waitInvoke, IsStart);
break;
}
catch
@@ -221,13 +218,12 @@ public class ManagementWoker : BackgroundService
{
StartLock.Release();
}
//TODO:发布到从站数据
if (Options.Redundancy.IsPrimary)
{
try
{
if (online)
await udpDmtp.GetDmtpRpcActor().InvokeTAsync<GatewayState>("UpdateGatewayDataAsync", waitInvoke, GlobalData.CollectDevices);
await udpDmtp.GetDmtpRpcActor().InvokeTAsync<GatewayState>(nameof(ReverseCallbackServer.UpdateGatewayDataAsync), waitInvoke, GlobalData.CollectDevices);
}
catch (Exception ex)
{

View File

@@ -47,4 +47,12 @@ internal class ReverseCallbackServer : RpcServer
easyLock.Release();
}
}
[DmtpRpc(true)]//使用方法名作为调用键
public Task<GatewayState> UpdateGatewayDataAsync(bool isStart)
{
//TODO:获取主站数据
throw new NotImplementedException();
}
}