diff --git a/src/ThingsGateway.Gateway.Application/Worker/Device/BusinessDeviceWorker.cs b/src/ThingsGateway.Gateway.Application/Worker/Device/BusinessDeviceWorker.cs index f6b44f378..1511b9921 100644 --- a/src/ThingsGateway.Gateway.Application/Worker/Device/BusinessDeviceWorker.cs +++ b/src/ThingsGateway.Gateway.Application/Worker/Device/BusinessDeviceWorker.cs @@ -29,14 +29,25 @@ public class BusinessDeviceWorker : DeviceWorker #region public 设备创建更新结束 + private EasyLock publicRestartLock = new(); + public async Task RestartAsync() { - await StopAsync(); - await StartAsync(); + try + { + await publicRestartLock.WaitAsync(); + + await StopAsync(); + await StartAsync(); + } + finally + { + publicRestartLock.Release(); + } } /// - /// 开始 + /// 启动全部设备,如果没有找到设备会创建 /// public async Task StartAsync() { @@ -64,7 +75,7 @@ public class BusinessDeviceWorker : DeviceWorker } /// - /// 开始 + /// 初始化,如果没有找到设备会创建 /// public async Task CreatThreadsAsync() { @@ -98,14 +109,7 @@ public class BusinessDeviceWorker : DeviceWorker { await restartLock.WaitAsync(); await singleRestartLock.WaitAsync(); - await BeforeRemoveAllChannelThreadAsync(); - await ProtectedStoping(); - - await RemoveAllChannelThreadAsync(); - await ProtectedStoped(); - - //清空内存列表 - GlobalData.BusinessDevices.Clear(); + await StopThreadAsync(); } catch (Exception ex) { @@ -118,12 +122,38 @@ public class BusinessDeviceWorker : DeviceWorker } } - #endregion public 设备创建更新结束 + private async Task StopThreadAsync() + { + //取消全部采集线程 + await BeforeRemoveAllChannelThreadAsync(); + //取消其他后台服务 + await ProtectedStoping(); + //停止全部采集线程 + await RemoveAllChannelThreadAsync(); + //停止其他后台服务 + await ProtectedStoped(); + //清空内存列表 + GlobalData.CollectDevices.Clear(); + } - #region public 设备创建更新结束 + private async Task CollectDeviceWorker_Starting() + { + await CreatThreadsAsync(); + } + + private async Task CollectDeviceWorker_Started() + { + await Task.Delay(1000); + await StartAsync(); + } + + private async Task CollectDeviceWorker_Stoping() + { + await StopAsync(); + } /// - /// 创建业务设备线程 + /// 读取数据库,创建全部设备 /// /// protected async Task CreatAllChannelThreadsAsync() @@ -182,22 +212,6 @@ public class BusinessDeviceWorker : DeviceWorker #region worker服务 - private async Task CollectDeviceWorker_Starting() - { - await CreatThreadsAsync(); - } - - private async Task CollectDeviceWorker_Started() - { - await Task.Delay(1000); - await StartAsync(); - } - - private async Task CollectDeviceWorker_Stoping() - { - await StopAsync(); - } - public override async Task StartAsync(CancellationToken cancellationToken) { await base.StartAsync(cancellationToken); diff --git a/src/ThingsGateway.Gateway.Application/Worker/Device/CollectDeviceWorker.cs b/src/ThingsGateway.Gateway.Application/Worker/Device/CollectDeviceWorker.cs index a2d761551..c689663c9 100644 --- a/src/ThingsGateway.Gateway.Application/Worker/Device/CollectDeviceWorker.cs +++ b/src/ThingsGateway.Gateway.Application/Worker/Device/CollectDeviceWorker.cs @@ -27,37 +27,43 @@ public class CollectDeviceWorker : DeviceWorker #region public 设备创建更新结束 - /// - /// 重启采集服务 - /// + private EasyLock publicRestartLock = new(); + public async Task RestartAsync() + { + try + { + await publicRestartLock.WaitAsync(); + + await StopAsync(); + await StartAsync(); + } + finally + { + publicRestartLock.Release(); + } + } + + /// + /// 启动全部设备,如果没有找到设备会创建 + /// + public async Task StartAsync() { try { await restartLock.WaitAsync(); await singleRestartLock.WaitAsync(); - - //停止采集服务 - await BeforeRemoveAllChannelThreadAsync(); - await ProtectedStoping(); - //完全停止全部采集线程 - await RemoveAllChannelThreadAsync(); - await ProtectedStoped(); - - //清空内存列表 - GlobalData.CollectDevices.Clear(); - - //创建全部采集线程 - await CreatAllChannelThreadsAsync(); - await ProtectedStarting(); - - //开始全部采集线程 + if (ChannelThreads.Count == 0) + { + await CreatAllChannelThreadsAsync(); + await ProtectedStarting(); + } await StartAllChannelThreadsAsync(); await ProtectedStarted(); } catch (Exception ex) { - _logger.LogError(ex, "重启错误"); + _logger.LogError(ex, "启动发生错误"); } finally { @@ -66,12 +72,70 @@ public class CollectDeviceWorker : DeviceWorker } } - #endregion public 设备创建更新结束 - - #region Private + /// + /// 初始化,如果没有找到设备会创建 + /// + public async Task CreatThreadsAsync() + { + try + { + await restartLock.WaitAsync(); + await singleRestartLock.WaitAsync(); + if (ChannelThreads.Count == 0) + { + await CreatAllChannelThreadsAsync(); + await ProtectedStarting(); + } + } + catch (Exception ex) + { + _logger.LogError(ex, "启动发生错误"); + } + finally + { + singleRestartLock.Release(); + restartLock.Release(); + } + } /// - /// 创建设备采集线程 + /// 停止 + /// + public async Task StopAsync() + { + try + { + await restartLock.WaitAsync(); + await singleRestartLock.WaitAsync(); + await StopThreadAsync(); + } + catch (Exception ex) + { + _logger.LogError(ex, "停止错误"); + } + finally + { + singleRestartLock.Release(); + restartLock.Release(); + } + } + + internal async Task StopThreadAsync() + { + //取消全部采集线程 + await BeforeRemoveAllChannelThreadAsync(); + //取消其他后台服务 + await ProtectedStoping(); + //停止全部采集线程 + await RemoveAllChannelThreadAsync(); + //停止其他后台服务 + await ProtectedStoped(); + //清空内存列表 + GlobalData.CollectDevices.Clear(); + } + + /// + /// 读取数据库,创建全部设备 /// /// protected virtual async Task CreatAllChannelThreadsAsync() @@ -101,12 +165,7 @@ public class CollectDeviceWorker : DeviceWorker } } - protected override async Task> GetDeviceRunTimeAsync(long deviceId) - { - return await _serviceScope.ServiceProvider.GetService().GetCollectDeviceRuntimeAsync(deviceId); - } - - #endregion Private + #endregion public 设备创建更新结束 #region worker服务 @@ -116,23 +175,10 @@ public class CollectDeviceWorker : DeviceWorker using var stoppingToken = new CancellationTokenSource(); _stoppingToken = stoppingToken.Token; stoppingToken.Cancel(); - await StopAsync(); + await StopThreadAsync(); await base.StopAsync(cancellationToken); } - internal async Task StopAsync() - { - await BeforeRemoveAllChannelThreadAsync(); - //停止其他后台服务 - await ProtectedStoping(); - //停止全部采集线程 - await RemoveAllChannelThreadAsync(); - //停止其他后台服务 - await ProtectedStoped(); - //清空内存列表 - GlobalData.CollectDevices.Clear(); - } - /// protected override async Task ExecuteAsync(CancellationToken stoppingToken) { @@ -144,5 +190,10 @@ public class CollectDeviceWorker : DeviceWorker await WhileExecuteAsync(stoppingToken); } + protected override async Task> GetDeviceRunTimeAsync(long deviceId) + { + return await _serviceScope.ServiceProvider.GetService().GetCollectDeviceRuntimeAsync(deviceId); + } + #endregion worker服务 } \ No newline at end of file diff --git a/src/ThingsGateway.Gateway.Application/Worker/Management/ManagementWoker.cs b/src/ThingsGateway.Gateway.Application/Worker/Management/ManagementWoker.cs index 0a266bcaa..af86a52bf 100644 --- a/src/ThingsGateway.Gateway.Application/Worker/Management/ManagementWoker.cs +++ b/src/ThingsGateway.Gateway.Application/Worker/Management/ManagementWoker.cs @@ -69,7 +69,23 @@ public class ManagementWoker : BackgroundService private EasyLock _easyLock = new(); - internal volatile bool IsStart = false; + internal bool IsStart + { + get + { + return isStart; + } + set + { + if (isStart != value) + { + isStart = value; + //触发启动事件 + } + } + } + + private volatile bool isStart = false; /// public override async Task StartAsync(CancellationToken cancellationToken) @@ -117,88 +133,101 @@ public class ManagementWoker : BackgroundService { try { - GatewayState? gatewayState = null; - await StartLock.WaitAsync(); - var online = await udpDmtp.PingAsync(3000); - if (online) + bool online = false; + var waitInvoke = new InvokeOption(millisecondsTimeout: 5000) { - var readErrorCount = 0; - while (readErrorCount < Options.MaxErrorCount) + FeedbackType = FeedbackType.WaitInvoke, + Token = stoppingToken, + Timeout = 3000 + }; + try + { + GatewayState? gatewayState = null; + await StartLock.WaitAsync(stoppingToken); + online = await udpDmtp.PingAsync(3000); + if (online) { - try + var readErrorCount = 0; + while (readErrorCount < Options.MaxErrorCount) { - gatewayState = await udpDmtp.GetDmtpRpcActor().InvokeTAsync("GetGatewayStateAsync", InvokeOption.WaitInvoke, IsStart); - break; - } - catch - { - readErrorCount++; + try + { + gatewayState = await udpDmtp.GetDmtpRpcActor().InvokeTAsync("GetGatewayStateAsync", waitInvoke, IsStart); + break; + } + catch + { + readErrorCount++; + } } } - } - if (gatewayState != null) - { - if (gatewayState.IsPrimary == Options.Redundancy.IsPrimary) + if (gatewayState != null) { + if (gatewayState.IsPrimary == Options.Redundancy.IsPrimary) + { + if (!IsStart) + { + _logger.LogInformation("主备站设置重复!"); + IsStart = true; + } + await Task.Delay(1000); + continue; + } + } + if (gatewayState == null) + { + //无法获取状态,启动本机 if (!IsStart) { - _logger.LogInformation("主备站设置重复!"); + _logger.LogInformation("无法连接冗余站点,本机将切换到正常状态"); IsStart = true; } - await Task.Delay(1000); - continue; } - } - if (gatewayState == null) - { - //无法获取状态,启动本机 - if (!IsStart) + else if (gatewayState.IsPrimary) { - _logger.LogInformation("无法连接冗余站点,本机将切换到正常状态"); - IsStart = true; - } - } - else if (gatewayState.IsPrimary) - { - //主站已经启动 - if (gatewayState.IsStart) - { - if (IsStart) + //主站已经启动 + if (gatewayState.IsStart) { - _logger.LogInformation("主站已恢复,本机(从站)将切换到备用状态"); - IsStart = false; + if (IsStart) + { + _logger.LogInformation("主站已恢复,本机(从站)将切换到备用状态"); + IsStart = false; + } + } + else + { + //等待主站切换到正常后,再停止从站 } } else { - //等待主站切换到正常后,再停止从站 - } - } - else - { - //从站已经启动 - if (gatewayState.IsStart) - { - //等待从站切换到备用后,再启动主站 - } - else - { - if (!IsStart) + //从站已经启动 + if (gatewayState.IsStart) { - _logger.LogInformation("本机(主站)将切换到正常状态"); - IsStart = true; + //等待从站切换到备用后,再启动主站 + } + else + { + if (!IsStart) + { + _logger.LogInformation("本机(主站)将切换到正常状态"); + IsStart = true; + } } } } - + finally + { + StartLock.Release(); + } //TODO:发布到从站数据 if (Options.Redundancy.IsPrimary) { try { if (online) - await udpDmtp.GetDmtpRpcActor().InvokeTAsync("UpdateGatewayDataAsync", InvokeOption.WaitInvoke, GlobalData.CollectDevices); + await udpDmtp.GetDmtpRpcActor().InvokeTAsync("UpdateGatewayDataAsync", waitInvoke, GlobalData.CollectDevices); } catch (Exception ex) { @@ -217,10 +246,6 @@ public class ManagementWoker : BackgroundService { _logger.LogWarning(ex, "循环线程出错"); } - finally - { - StartLock.Release(); - } } } else