From c407e0e83e1c1b38282e4f72a45cc3a7968b344a Mon Sep 17 00:00:00 2001 From: Diego <2248356998@qq.com> Date: Wed, 26 Feb 2025 21:59:41 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20taos=E6=97=B6=E5=BA=8F=E5=BA=93?= =?UTF-8?q?=E6=94=AF=E6=8C=81=E6=AF=8F=E4=B8=AA=E6=95=B0=E6=8D=AE=E7=82=B9?= =?UTF-8?q?=E7=8B=AC=E7=AB=8B=E5=AD=90=E8=A1=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit feat: 重启运行时方法改为重载数据库获取通道设备变量 --- .../ThingsGateway.Admin.Application.csproj | 2 +- .../ThingsGateway.Furion.csproj | 2 +- src/Directory.Build.props | 4 +- .../Controller/ControlController.cs | 15 +++- .../Services/Channel/ChannelRuntimeService.cs | 77 ++++++++++++++++++- .../Channel/IChannelRuntimeService.cs | 1 + .../ChannelManage/ChannelThreadManage.cs | 1 + .../Services/Variable/IVariableService.cs | 3 + .../Services/Variable/VariableService.cs | 7 ++ .../ThingsGateway.Gateway.Application.csproj | 2 +- .../Components/QuickActions.razor.cs | 2 +- .../Management/RedundancyHostedService.cs | 2 +- .../TDengineDB/TDengineDBHistoryValue.cs | 2 +- .../TDengineDB/TDengineDBProducer.cs | 20 +++-- .../TDengineDB/TDengineDBProducer.other.cs | 7 +- .../TDengineDB/TDengineDBUtil.cs | 28 +++++++ src/Version.props | 2 +- 17 files changed, 155 insertions(+), 22 deletions(-) create mode 100644 src/Plugin/ThingsGateway.Plugin.DB/TDengineDB/TDengineDBUtil.cs diff --git a/src/Admin/ThingsGateway.Admin.Application/ThingsGateway.Admin.Application.csproj b/src/Admin/ThingsGateway.Admin.Application/ThingsGateway.Admin.Application.csproj index cec3e4c4f..8f0ce874b 100644 --- a/src/Admin/ThingsGateway.Admin.Application/ThingsGateway.Admin.Application.csproj +++ b/src/Admin/ThingsGateway.Admin.Application/ThingsGateway.Admin.Application.csproj @@ -22,7 +22,7 @@ - + diff --git a/src/Admin/ThingsGateway.Furion/ThingsGateway.Furion.csproj b/src/Admin/ThingsGateway.Furion/ThingsGateway.Furion.csproj index 9301ba1b9..074ad943a 100644 --- a/src/Admin/ThingsGateway.Furion/ThingsGateway.Furion.csproj +++ b/src/Admin/ThingsGateway.Furion/ThingsGateway.Furion.csproj @@ -39,7 +39,7 @@ - + diff --git a/src/Directory.Build.props b/src/Directory.Build.props index 20db0e94e..d2a992a1d 100644 --- a/src/Directory.Build.props +++ b/src/Directory.Build.props @@ -1,8 +1,8 @@ - 10.0.2.9 - 10.0.2.9 + 10.0.2.10 + 10.0.2.10 diff --git a/src/Gateway/ThingsGateway.Gateway.Application/Controller/ControlController.cs b/src/Gateway/ThingsGateway.Gateway.Application/Controller/ControlController.cs index 1c5280bef..6a1430e0a 100644 --- a/src/Gateway/ThingsGateway.Gateway.Application/Controller/ControlController.cs +++ b/src/Gateway/ThingsGateway.Gateway.Application/Controller/ControlController.cs @@ -70,6 +70,18 @@ public class ControlController : ControllerBase } throw Oops.Bah("device not found"); } + /// + /// 重启当前机构线程 + /// + /// + [HttpPost("restartScopeThread")] + [DisplayName("重启当前机构线程")] + public async Task RestartScopeThread() + { + var data = await GlobalData.GetCurrentUserChannels().ConfigureAwait(false); + await GlobalData.ChannelRuntimeService.RestartChannelAsync(data.Select(a => a.Value)).ConfigureAwait(false); + } + /// /// 重启全部线程 /// @@ -78,8 +90,7 @@ public class ControlController : ControllerBase [DisplayName("重启全部线程")] public async Task RestartAllThread() { - var data = await GlobalData.GetCurrentUserChannels().ConfigureAwait(false); - await GlobalData.ChannelThreadManage.RestartChannelAsync(data.Select(a => a.Value)).ConfigureAwait(false); + await GlobalData.ChannelRuntimeService.RestartChannelAsync(GlobalData.Channels.Values).ConfigureAwait(false); } /// diff --git a/src/Gateway/ThingsGateway.Gateway.Application/Services/Channel/ChannelRuntimeService.cs b/src/Gateway/ThingsGateway.Gateway.Application/Services/Channel/ChannelRuntimeService.cs index df82ee0f8..9919d959f 100644 --- a/src/Gateway/ThingsGateway.Gateway.Application/Services/Channel/ChannelRuntimeService.cs +++ b/src/Gateway/ThingsGateway.Gateway.Application/Services/Channel/ChannelRuntimeService.cs @@ -10,19 +10,25 @@ using BootstrapBlazor.Components; -using CSScripting; - using Mapster; using Microsoft.AspNetCore.Components.Forms; +using Microsoft.Extensions.Logging; using ThingsGateway.Extension.Generic; using ThingsGateway.NewLife; +using TouchSocket.Core; + namespace ThingsGateway.Gateway.Application; public class ChannelRuntimeService : IChannelRuntimeService { + private ILogger _logger; + public ChannelRuntimeService(ILogger logger) + { + _logger = logger; + } private WaitLock WaitLock { get; set; } = new WaitLock(); public async Task BatchEditAsync(IEnumerable models, Channel oldModel, Channel model) { @@ -187,4 +193,71 @@ public class ChannelRuntimeService : IChannelRuntimeService WaitLock.Release(); } } + + + public async Task RestartChannelAsync(IEnumerable oldChannelRuntimes) + { + oldChannelRuntimes.SelectMany(a => a.DeviceRuntimes.SelectMany(a => a.Value.VariableRuntimes)).ParallelForEach(a => a.Value.SafeDispose()); + oldChannelRuntimes.SelectMany(a => a.DeviceRuntimes).ParallelForEach(a => a.Value.SafeDispose()); + oldChannelRuntimes.ParallelForEach(a => a.SafeDispose()); + var ids = oldChannelRuntimes.Select(a => a.Id).ToHashSet(); + try + { + await WaitLock.WaitAsync().ConfigureAwait(false); + + //网关启动时,获取所有通道 + var channelRuntimes = (await GlobalData.ChannelService.GetAllAsync().ConfigureAwait(false)).Where(a => ids.Contains(a.Id) || !GlobalData.Channels.ContainsKey(a.Id)).Adapt>(); + + var chanelIds = channelRuntimes.Select(a => a.Id).ToHashSet(); + var deviceRuntimes = (await GlobalData.DeviceService.GetAllAsync().ConfigureAwait(false)).Where(a => chanelIds.Contains(a.ChannelId)).Adapt>(); + + var variableRuntimes = (await GlobalData.VariableService.GetByDeviceIdAsync(deviceRuntimes.Select(a => a.Id).ToList()).ConfigureAwait(false)).Adapt>(); + foreach (var channelRuntime in channelRuntimes) + { + try + { + channelRuntime.Init(); + var devRuntimes = deviceRuntimes.Where(x => x.ChannelId == channelRuntime.Id); + foreach (var item in devRuntimes) + { + item.Init(channelRuntime); + + var varRuntimes = variableRuntimes.Where(x => x.DeviceId == item.Id); + + varRuntimes.ParallelForEach(varItem => + { + varItem.Init(item); + }); + + } + + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Init Channel"); + } + } + + var startCollectChannelEnable = GlobalData.StartCollectChannelEnable; + var startBusinessChannelEnable = GlobalData.StartBusinessChannelEnable; + + var collectChannelRuntimes = channelRuntimes.Where(x => (x.Enable && x.IsCollect == true && startCollectChannelEnable)); + + var businessChannelRuntimes = channelRuntimes.Where(x => (x.Enable && x.IsCollect == false && startBusinessChannelEnable)); + + //根据初始冗余属性,筛选启动 + await GlobalData.ChannelThreadManage.RestartChannelAsync(businessChannelRuntimes).ConfigureAwait(false); + await GlobalData.ChannelThreadManage.RestartChannelAsync(collectChannelRuntimes).ConfigureAwait(false); + + + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Start error"); + } + finally + { + WaitLock.Release(); + } + } } \ No newline at end of file diff --git a/src/Gateway/ThingsGateway.Gateway.Application/Services/Channel/IChannelRuntimeService.cs b/src/Gateway/ThingsGateway.Gateway.Application/Services/Channel/IChannelRuntimeService.cs index 515f677c5..756b2d44d 100644 --- a/src/Gateway/ThingsGateway.Gateway.Application/Services/Channel/IChannelRuntimeService.cs +++ b/src/Gateway/ThingsGateway.Gateway.Application/Services/Channel/IChannelRuntimeService.cs @@ -44,4 +44,5 @@ public interface IChannelRuntimeService Task> ExportChannelAsync(ExportFilter exportFilter); Task> PreviewAsync(IBrowserFile browserFile); Task ExportMemoryStream(List data); + Task RestartChannelAsync(IEnumerable oldChannelRuntimes); } \ No newline at end of file diff --git a/src/Gateway/ThingsGateway.Gateway.Application/Services/GatewayMonitor/ChannelManage/ChannelThreadManage.cs b/src/Gateway/ThingsGateway.Gateway.Application/Services/GatewayMonitor/ChannelManage/ChannelThreadManage.cs index 7604983a3..78c70bacd 100644 --- a/src/Gateway/ThingsGateway.Gateway.Application/Services/GatewayMonitor/ChannelManage/ChannelThreadManage.cs +++ b/src/Gateway/ThingsGateway.Gateway.Application/Services/GatewayMonitor/ChannelManage/ChannelThreadManage.cs @@ -191,6 +191,7 @@ internal sealed class ChannelThreadManage : IChannelThreadManage NewChannelLock.Release(); } } + #endregion diff --git a/src/Gateway/ThingsGateway.Gateway.Application/Services/Variable/IVariableService.cs b/src/Gateway/ThingsGateway.Gateway.Application/Services/Variable/IVariableService.cs index c1d32a7b4..e1e2cce94 100644 --- a/src/Gateway/ThingsGateway.Gateway.Application/Services/Variable/IVariableService.cs +++ b/src/Gateway/ThingsGateway.Gateway.Application/Services/Variable/IVariableService.cs @@ -105,4 +105,7 @@ internal interface IVariableService /// 保存初始值 /// Task UpdateInitValueAsync(List variables); + + + Task> GetByDeviceIdAsync(List deviceIds); } diff --git a/src/Gateway/ThingsGateway.Gateway.Application/Services/Variable/VariableService.cs b/src/Gateway/ThingsGateway.Gateway.Application/Services/Variable/VariableService.cs index 0e0c25ce4..8d0693537 100644 --- a/src/Gateway/ThingsGateway.Gateway.Application/Services/Variable/VariableService.cs +++ b/src/Gateway/ThingsGateway.Gateway.Application/Services/Variable/VariableService.cs @@ -284,6 +284,12 @@ internal sealed class VariableService : BaseService, IVariableService return result; } + public async Task> GetByDeviceIdAsync(List deviceIds) + { + using var db = GetDB(); + var deviceVariables = await db.Queryable().Where(a => deviceIds.Contains(a.DeviceId)).ToListAsync().ConfigureAwait(false); + return deviceVariables; + } public async Task> GetAllAsync(long? devId = null) { using var db = GetDB(); @@ -949,5 +955,6 @@ internal sealed class VariableService : BaseService, IVariableService } } + #endregion 导入 } diff --git a/src/Gateway/ThingsGateway.Gateway.Application/ThingsGateway.Gateway.Application.csproj b/src/Gateway/ThingsGateway.Gateway.Application/ThingsGateway.Gateway.Application.csproj index 920b30114..afa9a6535 100644 --- a/src/Gateway/ThingsGateway.Gateway.Application/ThingsGateway.Gateway.Application.csproj +++ b/src/Gateway/ThingsGateway.Gateway.Application/ThingsGateway.Gateway.Application.csproj @@ -6,7 +6,7 @@ net8.0; - + diff --git a/src/Gateway/ThingsGateway.Gateway.Razor/Components/QuickActions.razor.cs b/src/Gateway/ThingsGateway.Gateway.Razor/Components/QuickActions.razor.cs index 0bc49c146..12c52cc9b 100644 --- a/src/Gateway/ThingsGateway.Gateway.Razor/Components/QuickActions.razor.cs +++ b/src/Gateway/ThingsGateway.Gateway.Razor/Components/QuickActions.razor.cs @@ -81,7 +81,7 @@ public partial class QuickActions await Task.Run(async () => { var data = await GlobalData.GetCurrentUserChannels().ConfigureAwait(false); - await GlobalData.ChannelThreadManage.RestartChannelAsync(data.Select(a => a.Value)); + await GlobalData.ChannelRuntimeService.RestartChannelAsync(data.Select(a => a.Value)); }); } diff --git a/src/Gateway/ThingsGateway.Management/Management/RedundancyHostedService.cs b/src/Gateway/ThingsGateway.Management/Management/RedundancyHostedService.cs index 4c8105200..b4cd4755b 100644 --- a/src/Gateway/ThingsGateway.Management/Management/RedundancyHostedService.cs +++ b/src/Gateway/ThingsGateway.Management/Management/RedundancyHostedService.cs @@ -125,7 +125,7 @@ internal sealed class RedundancyHostedService : BackgroundService, IRedundancyHo private static Task RestartAsync() { - return GlobalData.ChannelThreadManage.RestartChannelAsync(GlobalData.ReadOnlyChannels.Values); + return GlobalData.ChannelRuntimeService.RestartChannelAsync(GlobalData.ReadOnlyChannels.Values); } /// diff --git a/src/Plugin/ThingsGateway.Plugin.DB/TDengineDB/TDengineDBHistoryValue.cs b/src/Plugin/ThingsGateway.Plugin.DB/TDengineDB/TDengineDBHistoryValue.cs index 34469688a..f01a31d96 100644 --- a/src/Plugin/ThingsGateway.Plugin.DB/TDengineDB/TDengineDBHistoryValue.cs +++ b/src/Plugin/ThingsGateway.Plugin.DB/TDengineDB/TDengineDBHistoryValue.cs @@ -16,6 +16,7 @@ using SqlSugar.TDengine; namespace ThingsGateway.Plugin.TDengineDB; [SugarTable("historyValue")] +[STableAttribute(STableName = "historyValue", Tag1 = nameof(Name))] public class TDengineDBHistoryValue : STable, IPrimaryIdEntity, IDBHistoryValue { public long Id { get; set; } @@ -36,7 +37,6 @@ public class TDengineDBHistoryValue : STable, IPrimaryIdEntity, IDBHistoryValue [AutoGenerateColumn(Order = 1, Visible = true, Sortable = true, Filterable = false)] public string Name { get; set; } - [AutoGenerateColumn(Order = 1, Visible = true, Sortable = true, Filterable = false)] public bool IsOnline { get; set; } diff --git a/src/Plugin/ThingsGateway.Plugin.DB/TDengineDB/TDengineDBProducer.cs b/src/Plugin/ThingsGateway.Plugin.DB/TDengineDB/TDengineDBProducer.cs index 6094f8631..41ee35fd8 100644 --- a/src/Plugin/ThingsGateway.Plugin.DB/TDengineDB/TDengineDBProducer.cs +++ b/src/Plugin/ThingsGateway.Plugin.DB/TDengineDB/TDengineDBProducer.cs @@ -26,7 +26,11 @@ namespace ThingsGateway.Plugin.TDengineDB; /// public partial class TDengineDBProducer : BusinessBaseWithCacheIntervalVariableModel, IDBHistoryValueService { - internal readonly RealDBProducerProperty _driverPropertys = new() { DbType = DbType.TDengine }; + internal readonly RealDBProducerProperty _driverPropertys = new() + { + DbType = DbType.TDengine, + BigTextConnectStr = "Host=localhost;Port=6030;Username=root;Password=taosdata;Database=power" + }; private readonly TDengineDBProducerVariableProperty _variablePropertys = new(); /// public override Type DriverPropertyUIType => typeof(RealDBProducerPropertyRazor); @@ -80,8 +84,8 @@ public partial class TDengineDBProducer : BusinessBaseWithCacheIntervalVariableM internal ISugarQueryable Query(DBHistoryValuePageInput input) { - var db = BusinessDatabaseUtil.GetDb(_driverPropertys.DbType, _driverPropertys.BigTextConnectStr); - var query = db.Queryable().AS(_driverPropertys.TableName) + var db = TDengineDBUtil.GetDb(_driverPropertys.DbType, _driverPropertys.BigTextConnectStr, _driverPropertys.TableName); + var query = db.Queryable() .WhereIF(input.StartTime != null, a => a.CreateTime >= input.StartTime) .WhereIF(input.EndTime != null, a => a.CreateTime <= input.EndTime) .WhereIF(!string.IsNullOrEmpty(input.VariableName), it => it.Name.Contains(input.VariableName)) @@ -99,7 +103,7 @@ public partial class TDengineDBProducer : BusinessBaseWithCacheIntervalVariableM internal async Task> QueryData(QueryPageOptions option) { - using var db = BusinessDatabaseUtil.GetDb(_driverPropertys.DbType, _driverPropertys.BigTextConnectStr); + var db = TDengineDBUtil.GetDb(_driverPropertys.DbType, _driverPropertys.BigTextConnectStr, _driverPropertys.TableName); var ret = new QueryData() { IsSorted = option.SortOrder != SortOrder.Unset, @@ -107,7 +111,7 @@ public partial class TDengineDBProducer : BusinessBaseWithCacheIntervalVariableM IsAdvanceSearch = option.AdvanceSearches.Count > 0 || option.CustomerSearches.Count > 0, IsSearch = option.Searches.Count > 0 }; - var query = db.Queryable().AS(_driverPropertys.TableName); + var query = db.Queryable().AsTDengineSTable(); query = db.GetQuery(option, query); @@ -140,8 +144,10 @@ public partial class TDengineDBProducer : BusinessBaseWithCacheIntervalVariableM protected override async Task ProtectedStartAsync(CancellationToken cancellationToken) { - var db = BusinessDatabaseUtil.GetDb(_driverPropertys.DbType, _driverPropertys.BigTextConnectStr); + var db = TDengineDBUtil.GetDb(_driverPropertys.DbType, _driverPropertys.BigTextConnectStr, _driverPropertys.TableName); db.DbMaintenance.CreateDatabase(); + + //必须为间隔上传 if (!_driverPropertys.BigTextScriptHistoryTable.IsNullOrEmpty()) { @@ -152,7 +158,7 @@ public partial class TDengineDBProducer : BusinessBaseWithCacheIntervalVariableM } else { - db.CodeFirst.As(_driverPropertys.TableName).InitTables(typeof(TDengineDBHistoryValue)); + db.CodeFirst.InitTables(typeof(TDengineDBHistoryValue)); } await base.ProtectedStartAsync(cancellationToken).ConfigureAwait(false); } diff --git a/src/Plugin/ThingsGateway.Plugin.DB/TDengineDB/TDengineDBProducer.other.cs b/src/Plugin/ThingsGateway.Plugin.DB/TDengineDB/TDengineDBProducer.other.cs index 4046a178f..fc0f291ae 100644 --- a/src/Plugin/ThingsGateway.Plugin.DB/TDengineDB/TDengineDBProducer.other.cs +++ b/src/Plugin/ThingsGateway.Plugin.DB/TDengineDB/TDengineDBProducer.other.cs @@ -10,6 +10,9 @@ using Mapster; +using SqlSugar; +using SqlSugar.TDengine; + using ThingsGateway.Foundation; using ThingsGateway.Plugin.DB; @@ -60,7 +63,7 @@ public partial class TDengineDBProducer : BusinessBaseWithCacheIntervalVariableM { try { - var db = BusinessDatabaseUtil.GetDb(_driverPropertys.DbType, _driverPropertys.BigTextConnectStr); + var db = TDengineDBUtil.GetDb(_driverPropertys.DbType, _driverPropertys.BigTextConnectStr, _driverPropertys.TableName); db.Ado.CancellationToken = cancellationToken; if (!_driverPropertys.BigTextScriptHistoryTable.IsNullOrEmpty()) @@ -75,7 +78,7 @@ public partial class TDengineDBProducer : BusinessBaseWithCacheIntervalVariableM } else { - var result = await db.Insertable(dbInserts).AS(_driverPropertys.TableName).ExecuteCommandAsync(cancellationToken).ConfigureAwait(false);//不要加分表 + var result = await db.Insertable(dbInserts).SetTDengineChildTableName((stableName, it) => $"{stableName}_{it.Name}").ExecuteCommandAsync().ConfigureAwait(false);//不要加分表 //var result = await db.Insertable(dbInserts).SplitTable().ExecuteCommandAsync().ConfigureAwait(false); if (result > 0) diff --git a/src/Plugin/ThingsGateway.Plugin.DB/TDengineDB/TDengineDBUtil.cs b/src/Plugin/ThingsGateway.Plugin.DB/TDengineDB/TDengineDBUtil.cs new file mode 100644 index 000000000..bf27359de --- /dev/null +++ b/src/Plugin/ThingsGateway.Plugin.DB/TDengineDB/TDengineDBUtil.cs @@ -0,0 +1,28 @@ +//------------------------------------------------------------------------------ +// 此代码版权声明为全文件覆盖,如有原作者特别声明,会在下方手动补充 +// 此代码版权(除特别声明外的代码)归作者本人Diego所有 +// 源代码使用协议遵循本仓库的开源协议及附加协议 +// Gitee源代码仓库:https://gitee.com/diego2098/ThingsGateway +// Github源代码仓库:https://github.com/kimdiego2098/ThingsGateway +// 使用文档:https://thingsgateway.cn/ +// QQ群:605534569 +//------------------------------------------------------------------------------ + +using SqlSugar; + +namespace ThingsGateway.Plugin.TDengineDB; + +public static class TDengineDBUtil +{ + /// + /// 获取数据库链接 + /// + /// + public static SqlSugarClient GetDb(DbType dbType, string connectionString, string stableName) + { + var db = BusinessDatabaseUtil.GetDb(dbType, connectionString); ; + db.MappingSTableName(stableName); + return db; + } + +} diff --git a/src/Version.props b/src/Version.props index 2d687958a..6090a2df0 100644 --- a/src/Version.props +++ b/src/Version.props @@ -1,6 +1,6 @@ - 10.0.2.9 + 10.0.2.10