This commit is contained in:
Kimdiego2098
2024-02-27 16:58:01 +08:00
parent 4c8e487dc9
commit 9043fa7f56
3 changed files with 224 additions and 134 deletions

View File

@@ -29,14 +29,25 @@ public class BusinessDeviceWorker : DeviceWorker
#region public
private EasyLock publicRestartLock = new();
public async Task RestartAsync()
{
await StopAsync();
await StartAsync();
try
{
await publicRestartLock.WaitAsync();
await StopAsync();
await StartAsync();
}
finally
{
publicRestartLock.Release();
}
}
/// <summary>
/// 开始
/// 启动全部设备,如果没有找到设备会创建
/// </summary>
public async Task StartAsync()
{
@@ -64,7 +75,7 @@ public class BusinessDeviceWorker : DeviceWorker
}
/// <summary>
/// 开始
/// 初始化,如果没有找到设备会创建
/// </summary>
public async Task CreatThreadsAsync()
{
@@ -98,14 +109,7 @@ public class BusinessDeviceWorker : DeviceWorker
{
await restartLock.WaitAsync();
await singleRestartLock.WaitAsync();
await BeforeRemoveAllChannelThreadAsync();
await ProtectedStoping();
await RemoveAllChannelThreadAsync();
await ProtectedStoped();
//清空内存列表
GlobalData.BusinessDevices.Clear();
await StopThreadAsync();
}
catch (Exception ex)
{
@@ -118,12 +122,38 @@ public class BusinessDeviceWorker : DeviceWorker
}
}
#endregion public
private async Task StopThreadAsync()
{
//取消全部采集线程
await BeforeRemoveAllChannelThreadAsync();
//取消其他后台服务
await ProtectedStoping();
//停止全部采集线程
await RemoveAllChannelThreadAsync();
//停止其他后台服务
await ProtectedStoped();
//清空内存列表
GlobalData.CollectDevices.Clear();
}
#region public
private async Task CollectDeviceWorker_Starting()
{
await CreatThreadsAsync();
}
private async Task CollectDeviceWorker_Started()
{
await Task.Delay(1000);
await StartAsync();
}
private async Task CollectDeviceWorker_Stoping()
{
await StopAsync();
}
/// <summary>
/// 创建业务设备线程
/// 读取数据库,创建全部设备
/// </summary>
/// <returns></returns>
protected async Task CreatAllChannelThreadsAsync()
@@ -182,22 +212,6 @@ public class BusinessDeviceWorker : DeviceWorker
#region worker服务
private async Task CollectDeviceWorker_Starting()
{
await CreatThreadsAsync();
}
private async Task CollectDeviceWorker_Started()
{
await Task.Delay(1000);
await StartAsync();
}
private async Task CollectDeviceWorker_Stoping()
{
await StopAsync();
}
public override async Task StartAsync(CancellationToken cancellationToken)
{
await base.StartAsync(cancellationToken);

View File

@@ -27,37 +27,43 @@ public class CollectDeviceWorker : DeviceWorker
#region public
/// <summary>
/// 重启采集服务
/// </summary>
private EasyLock publicRestartLock = new();
public async Task RestartAsync()
{
try
{
await publicRestartLock.WaitAsync();
await StopAsync();
await StartAsync();
}
finally
{
publicRestartLock.Release();
}
}
/// <summary>
/// 启动全部设备,如果没有找到设备会创建
/// </summary>
public async Task StartAsync()
{
try
{
await restartLock.WaitAsync();
await singleRestartLock.WaitAsync();
//停止采集服务
await BeforeRemoveAllChannelThreadAsync();
await ProtectedStoping();
//完全停止全部采集线程
await RemoveAllChannelThreadAsync();
await ProtectedStoped();
//清空内存列表
GlobalData.CollectDevices.Clear();
//创建全部采集线程
await CreatAllChannelThreadsAsync();
await ProtectedStarting();
//开始全部采集线程
if (ChannelThreads.Count == 0)
{
await CreatAllChannelThreadsAsync();
await ProtectedStarting();
}
await StartAllChannelThreadsAsync();
await ProtectedStarted();
}
catch (Exception ex)
{
_logger.LogError(ex, "启错误");
_logger.LogError(ex, "启动发生错误");
}
finally
{
@@ -66,12 +72,70 @@ public class CollectDeviceWorker : DeviceWorker
}
}
#endregion public
#region Private
/// <summary>
/// 初始化,如果没有找到设备会创建
/// </summary>
public async Task CreatThreadsAsync()
{
try
{
await restartLock.WaitAsync();
await singleRestartLock.WaitAsync();
if (ChannelThreads.Count == 0)
{
await CreatAllChannelThreadsAsync();
await ProtectedStarting();
}
}
catch (Exception ex)
{
_logger.LogError(ex, "启动发生错误");
}
finally
{
singleRestartLock.Release();
restartLock.Release();
}
}
/// <summary>
/// 创建设备采集线程
/// 停止
/// </summary>
public async Task StopAsync()
{
try
{
await restartLock.WaitAsync();
await singleRestartLock.WaitAsync();
await StopThreadAsync();
}
catch (Exception ex)
{
_logger.LogError(ex, "停止错误");
}
finally
{
singleRestartLock.Release();
restartLock.Release();
}
}
internal async Task StopThreadAsync()
{
//取消全部采集线程
await BeforeRemoveAllChannelThreadAsync();
//取消其他后台服务
await ProtectedStoping();
//停止全部采集线程
await RemoveAllChannelThreadAsync();
//停止其他后台服务
await ProtectedStoped();
//清空内存列表
GlobalData.CollectDevices.Clear();
}
/// <summary>
/// 读取数据库,创建全部设备
/// </summary>
/// <returns></returns>
protected virtual async Task CreatAllChannelThreadsAsync()
@@ -101,12 +165,7 @@ public class CollectDeviceWorker : DeviceWorker
}
}
protected override async Task<IEnumerable<DeviceRunTime>> GetDeviceRunTimeAsync(long deviceId)
{
return await _serviceScope.ServiceProvider.GetService<DeviceService>().GetCollectDeviceRuntimeAsync(deviceId);
}
#endregion Private
#endregion public
#region worker服务
@@ -116,23 +175,10 @@ public class CollectDeviceWorker : DeviceWorker
using var stoppingToken = new CancellationTokenSource();
_stoppingToken = stoppingToken.Token;
stoppingToken.Cancel();
await StopAsync();
await StopThreadAsync();
await base.StopAsync(cancellationToken);
}
internal async Task StopAsync()
{
await BeforeRemoveAllChannelThreadAsync();
//停止其他后台服务
await ProtectedStoping();
//停止全部采集线程
await RemoveAllChannelThreadAsync();
//停止其他后台服务
await ProtectedStoped();
//清空内存列表
GlobalData.CollectDevices.Clear();
}
/// <inheritdoc/>
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
@@ -144,5 +190,10 @@ public class CollectDeviceWorker : DeviceWorker
await WhileExecuteAsync(stoppingToken);
}
protected override async Task<IEnumerable<DeviceRunTime>> GetDeviceRunTimeAsync(long deviceId)
{
return await _serviceScope.ServiceProvider.GetService<DeviceService>().GetCollectDeviceRuntimeAsync(deviceId);
}
#endregion worker服务
}

View File

@@ -69,7 +69,23 @@ public class ManagementWoker : BackgroundService
private EasyLock _easyLock = new();
internal volatile bool IsStart = false;
internal bool IsStart
{
get
{
return isStart;
}
set
{
if (isStart != value)
{
isStart = value;
//触发启动事件
}
}
}
private volatile bool isStart = false;
/// <inheritdoc/>
public override async Task StartAsync(CancellationToken cancellationToken)
@@ -117,88 +133,101 @@ public class ManagementWoker : BackgroundService
{
try
{
GatewayState? gatewayState = null;
await StartLock.WaitAsync();
var online = await udpDmtp.PingAsync(3000);
if (online)
bool online = false;
var waitInvoke = new InvokeOption(millisecondsTimeout: 5000)
{
var readErrorCount = 0;
while (readErrorCount < Options.MaxErrorCount)
FeedbackType = FeedbackType.WaitInvoke,
Token = stoppingToken,
Timeout = 3000
};
try
{
GatewayState? gatewayState = null;
await StartLock.WaitAsync(stoppingToken);
online = await udpDmtp.PingAsync(3000);
if (online)
{
try
var readErrorCount = 0;
while (readErrorCount < Options.MaxErrorCount)
{
gatewayState = await udpDmtp.GetDmtpRpcActor().InvokeTAsync<GatewayState>("GetGatewayStateAsync", InvokeOption.WaitInvoke, IsStart);
break;
}
catch
{
readErrorCount++;
try
{
gatewayState = await udpDmtp.GetDmtpRpcActor().InvokeTAsync<GatewayState>("GetGatewayStateAsync", waitInvoke, IsStart);
break;
}
catch
{
readErrorCount++;
}
}
}
}
if (gatewayState != null)
{
if (gatewayState.IsPrimary == Options.Redundancy.IsPrimary)
if (gatewayState != null)
{
if (gatewayState.IsPrimary == Options.Redundancy.IsPrimary)
{
if (!IsStart)
{
_logger.LogInformation("主备站设置重复!");
IsStart = true;
}
await Task.Delay(1000);
continue;
}
}
if (gatewayState == null)
{
//无法获取状态,启动本机
if (!IsStart)
{
_logger.LogInformation("主备站设置重复!");
_logger.LogInformation("无法连接冗余站点,本机将切换到正常状态");
IsStart = true;
}
await Task.Delay(1000);
continue;
}
}
if (gatewayState == null)
{
//无法获取状态,启动本机
if (!IsStart)
else if (gatewayState.IsPrimary)
{
_logger.LogInformation("无法连接冗余站点,本机将切换到正常状态");
IsStart = true;
}
}
else if (gatewayState.IsPrimary)
{
//主站已经启动
if (gatewayState.IsStart)
{
if (IsStart)
//主站已经启动
if (gatewayState.IsStart)
{
_logger.LogInformation("主站已恢复,本机(从站)将切换到备用状态");
IsStart = false;
if (IsStart)
{
_logger.LogInformation("主站已恢复,本机(从站)将切换到备用状态");
IsStart = false;
}
}
else
{
//等待主站切换到正常后,再停止从站
}
}
else
{
//等待主站切换到正常后,再停止从站
}
}
else
{
//从站已经启动
if (gatewayState.IsStart)
{
//等待从站切换到备用后,再启动主站
}
else
{
if (!IsStart)
//从站已经启动
if (gatewayState.IsStart)
{
_logger.LogInformation("本机(主站)将切换到正常状态");
IsStart = true;
//等待从站切换到备用后,再启动主站
}
else
{
if (!IsStart)
{
_logger.LogInformation("本机(主站)将切换到正常状态");
IsStart = true;
}
}
}
}
finally
{
StartLock.Release();
}
//TODO:发布到从站数据
if (Options.Redundancy.IsPrimary)
{
try
{
if (online)
await udpDmtp.GetDmtpRpcActor().InvokeTAsync<GatewayState>("UpdateGatewayDataAsync", InvokeOption.WaitInvoke, GlobalData.CollectDevices);
await udpDmtp.GetDmtpRpcActor().InvokeTAsync<GatewayState>("UpdateGatewayDataAsync", waitInvoke, GlobalData.CollectDevices);
}
catch (Exception ex)
{
@@ -217,10 +246,6 @@ public class ManagementWoker : BackgroundService
{
_logger.LogWarning(ex, "循环线程出错");
}
finally
{
StartLock.Release();
}
}
}
else