feat: taos时序库支持每个数据点独立子表

feat: 重启运行时方法改为重载数据库获取通道设备变量
This commit is contained in:
Diego
2025-02-26 21:59:41 +08:00
parent b1346698d6
commit c407e0e83e
17 changed files with 155 additions and 22 deletions

View File

@@ -22,7 +22,7 @@
<PackageReference Include="UAParser" Version="3.1.47" />
<PackageReference Include="Rougamo.Fody" Version="5.0.0" />
<PackageReference Include="MailKit" Version="4.10.0" />
<PackageReference Include="SqlSugarCore" Version="5.1.4.178" />
<PackageReference Include="SqlSugarCore" Version="5.1.4.179" />
</ItemGroup>
<ItemGroup Condition=" '$(TargetFramework)' == 'net8.0' ">
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="8.*" />

View File

@@ -39,7 +39,7 @@
<PackageReference Include="System.Text.RegularExpressions" Version="4.3.1" />
<PackageReference Include="Mapster" Version="7.4.0" />
<PackageReference Include="MiniProfiler.AspNetCore.Mvc" Version="4.5.4" />
<PackageReference Include="Swashbuckle.AspNetCore" Version="7.2.0" />
<PackageReference Include="Swashbuckle.AspNetCore" Version="7.3.0" />
</ItemGroup>
<ItemGroup Condition=" '$(TargetFramework)' == 'net8.0' ">

View File

@@ -1,8 +1,8 @@
<Project>
<PropertyGroup>
<PluginVersion>10.0.2.9</PluginVersion>
<ProPluginVersion>10.0.2.9</ProPluginVersion>
<PluginVersion>10.0.2.10</PluginVersion>
<ProPluginVersion>10.0.2.10</ProPluginVersion>
</PropertyGroup>
<PropertyGroup>

View File

@@ -70,6 +70,18 @@ public class ControlController : ControllerBase
}
throw Oops.Bah("device not found");
}
/// <summary>
/// 重启当前机构线程
/// </summary>
/// <returns></returns>
[HttpPost("restartScopeThread")]
[DisplayName("重启当前机构线程")]
public async Task RestartScopeThread()
{
var data = await GlobalData.GetCurrentUserChannels().ConfigureAwait(false);
await GlobalData.ChannelRuntimeService.RestartChannelAsync(data.Select(a => a.Value)).ConfigureAwait(false);
}
/// <summary>
/// 重启全部线程
/// </summary>
@@ -78,8 +90,7 @@ public class ControlController : ControllerBase
[DisplayName("重启全部线程")]
public async Task RestartAllThread()
{
var data = await GlobalData.GetCurrentUserChannels().ConfigureAwait(false);
await GlobalData.ChannelThreadManage.RestartChannelAsync(data.Select(a => a.Value)).ConfigureAwait(false);
await GlobalData.ChannelRuntimeService.RestartChannelAsync(GlobalData.Channels.Values).ConfigureAwait(false);
}
/// <summary>

View File

@@ -10,19 +10,25 @@
using BootstrapBlazor.Components;
using CSScripting;
using Mapster;
using Microsoft.AspNetCore.Components.Forms;
using Microsoft.Extensions.Logging;
using ThingsGateway.Extension.Generic;
using ThingsGateway.NewLife;
using TouchSocket.Core;
namespace ThingsGateway.Gateway.Application;
public class ChannelRuntimeService : IChannelRuntimeService
{
private ILogger _logger;
public ChannelRuntimeService(ILogger<ChannelRuntimeService> logger)
{
_logger = logger;
}
private WaitLock WaitLock { get; set; } = new WaitLock();
public async Task<bool> BatchEditAsync(IEnumerable<Channel> models, Channel oldModel, Channel model)
{
@@ -187,4 +193,71 @@ public class ChannelRuntimeService : IChannelRuntimeService
WaitLock.Release();
}
}
public async Task RestartChannelAsync(IEnumerable<ChannelRuntime> oldChannelRuntimes)
{
oldChannelRuntimes.SelectMany(a => a.DeviceRuntimes.SelectMany(a => a.Value.VariableRuntimes)).ParallelForEach(a => a.Value.SafeDispose());
oldChannelRuntimes.SelectMany(a => a.DeviceRuntimes).ParallelForEach(a => a.Value.SafeDispose());
oldChannelRuntimes.ParallelForEach(a => a.SafeDispose());
var ids = oldChannelRuntimes.Select(a => a.Id).ToHashSet();
try
{
await WaitLock.WaitAsync().ConfigureAwait(false);
//网关启动时,获取所有通道
var channelRuntimes = (await GlobalData.ChannelService.GetAllAsync().ConfigureAwait(false)).Where(a => ids.Contains(a.Id) || !GlobalData.Channels.ContainsKey(a.Id)).Adapt<List<ChannelRuntime>>();
var chanelIds = channelRuntimes.Select(a => a.Id).ToHashSet();
var deviceRuntimes = (await GlobalData.DeviceService.GetAllAsync().ConfigureAwait(false)).Where(a => chanelIds.Contains(a.ChannelId)).Adapt<List<DeviceRuntime>>();
var variableRuntimes = (await GlobalData.VariableService.GetByDeviceIdAsync(deviceRuntimes.Select(a => a.Id).ToList()).ConfigureAwait(false)).Adapt<List<VariableRuntime>>();
foreach (var channelRuntime in channelRuntimes)
{
try
{
channelRuntime.Init();
var devRuntimes = deviceRuntimes.Where(x => x.ChannelId == channelRuntime.Id);
foreach (var item in devRuntimes)
{
item.Init(channelRuntime);
var varRuntimes = variableRuntimes.Where(x => x.DeviceId == item.Id);
varRuntimes.ParallelForEach(varItem =>
{
varItem.Init(item);
});
}
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Init Channel");
}
}
var startCollectChannelEnable = GlobalData.StartCollectChannelEnable;
var startBusinessChannelEnable = GlobalData.StartBusinessChannelEnable;
var collectChannelRuntimes = channelRuntimes.Where(x => (x.Enable && x.IsCollect == true && startCollectChannelEnable));
var businessChannelRuntimes = channelRuntimes.Where(x => (x.Enable && x.IsCollect == false && startBusinessChannelEnable));
//根据初始冗余属性,筛选启动
await GlobalData.ChannelThreadManage.RestartChannelAsync(businessChannelRuntimes).ConfigureAwait(false);
await GlobalData.ChannelThreadManage.RestartChannelAsync(collectChannelRuntimes).ConfigureAwait(false);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Start error");
}
finally
{
WaitLock.Release();
}
}
}

View File

@@ -44,4 +44,5 @@ public interface IChannelRuntimeService
Task<Dictionary<string, object>> ExportChannelAsync(ExportFilter exportFilter);
Task<Dictionary<string, ImportPreviewOutputBase>> PreviewAsync(IBrowserFile browserFile);
Task<MemoryStream> ExportMemoryStream(List<Channel> data);
Task RestartChannelAsync(IEnumerable<ChannelRuntime> oldChannelRuntimes);
}

View File

@@ -191,6 +191,7 @@ internal sealed class ChannelThreadManage : IChannelThreadManage
NewChannelLock.Release();
}
}
#endregion

View File

@@ -105,4 +105,7 @@ internal interface IVariableService
/// 保存初始值
/// </summary>
Task UpdateInitValueAsync(List<Variable> variables);
Task<List<Variable>> GetByDeviceIdAsync(List<long> deviceIds);
}

View File

@@ -284,6 +284,12 @@ internal sealed class VariableService : BaseService<Variable>, IVariableService
return result;
}
public async Task<List<Variable>> GetByDeviceIdAsync(List<long> deviceIds)
{
using var db = GetDB();
var deviceVariables = await db.Queryable<Variable>().Where(a => deviceIds.Contains(a.DeviceId)).ToListAsync().ConfigureAwait(false);
return deviceVariables;
}
public async Task<List<Variable>> GetAllAsync(long? devId = null)
{
using var db = GetDB();
@@ -949,5 +955,6 @@ internal sealed class VariableService : BaseService<Variable>, IVariableService
}
}
#endregion
}

View File

@@ -6,7 +6,7 @@
<TargetFrameworks>net8.0;</TargetFrameworks>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="SqlSugar.TDengineCore" Version="4.18.1" />
<PackageReference Include="SqlSugar.TDengineCore" Version="4.18.5" />
<PackageReference Include="Rougamo.Fody" Version="5.0.0" />
<PackageReference Include="TouchSocket.Dmtp" Version="3.0.14" />
<PackageReference Include="TouchSocket.WebApi.Swagger" Version="3.0.14" />

View File

@@ -81,7 +81,7 @@ public partial class QuickActions
await Task.Run(async () =>
{
var data = await GlobalData.GetCurrentUserChannels().ConfigureAwait(false);
await GlobalData.ChannelThreadManage.RestartChannelAsync(data.Select(a => a.Value));
await GlobalData.ChannelRuntimeService.RestartChannelAsync(data.Select(a => a.Value));
});
}

View File

@@ -125,7 +125,7 @@ internal sealed class RedundancyHostedService : BackgroundService, IRedundancyHo
private static Task RestartAsync()
{
return GlobalData.ChannelThreadManage.RestartChannelAsync(GlobalData.ReadOnlyChannels.Values);
return GlobalData.ChannelRuntimeService.RestartChannelAsync(GlobalData.ReadOnlyChannels.Values);
}
/// <summary>

View File

@@ -16,6 +16,7 @@ using SqlSugar.TDengine;
namespace ThingsGateway.Plugin.TDengineDB;
[SugarTable("historyValue")]
[STableAttribute(STableName = "historyValue", Tag1 = nameof(Name))]
public class TDengineDBHistoryValue : STable, IPrimaryIdEntity, IDBHistoryValue
{
public long Id { get; set; }
@@ -36,7 +37,6 @@ public class TDengineDBHistoryValue : STable, IPrimaryIdEntity, IDBHistoryValue
[AutoGenerateColumn(Order = 1, Visible = true, Sortable = true, Filterable = false)]
public string Name { get; set; }
[AutoGenerateColumn(Order = 1, Visible = true, Sortable = true, Filterable = false)]
public bool IsOnline { get; set; }

View File

@@ -26,7 +26,11 @@ namespace ThingsGateway.Plugin.TDengineDB;
/// </summary>
public partial class TDengineDBProducer : BusinessBaseWithCacheIntervalVariableModel<TDengineDBHistoryValue>, IDBHistoryValueService
{
internal readonly RealDBProducerProperty _driverPropertys = new() { DbType = DbType.TDengine };
internal readonly RealDBProducerProperty _driverPropertys = new()
{
DbType = DbType.TDengine,
BigTextConnectStr = "Host=localhost;Port=6030;Username=root;Password=taosdata;Database=power"
};
private readonly TDengineDBProducerVariableProperty _variablePropertys = new();
/// <inheritdoc/>
public override Type DriverPropertyUIType => typeof(RealDBProducerPropertyRazor);
@@ -80,8 +84,8 @@ public partial class TDengineDBProducer : BusinessBaseWithCacheIntervalVariableM
internal ISugarQueryable<TDengineDBHistoryValue> Query(DBHistoryValuePageInput input)
{
var db = BusinessDatabaseUtil.GetDb(_driverPropertys.DbType, _driverPropertys.BigTextConnectStr);
var query = db.Queryable<TDengineDBHistoryValue>().AS(_driverPropertys.TableName)
var db = TDengineDBUtil.GetDb(_driverPropertys.DbType, _driverPropertys.BigTextConnectStr, _driverPropertys.TableName);
var query = db.Queryable<TDengineDBHistoryValue>()
.WhereIF(input.StartTime != null, a => a.CreateTime >= input.StartTime)
.WhereIF(input.EndTime != null, a => a.CreateTime <= input.EndTime)
.WhereIF(!string.IsNullOrEmpty(input.VariableName), it => it.Name.Contains(input.VariableName))
@@ -99,7 +103,7 @@ public partial class TDengineDBProducer : BusinessBaseWithCacheIntervalVariableM
internal async Task<QueryData<TDengineDBHistoryValue>> QueryData(QueryPageOptions option)
{
using var db = BusinessDatabaseUtil.GetDb(_driverPropertys.DbType, _driverPropertys.BigTextConnectStr);
var db = TDengineDBUtil.GetDb(_driverPropertys.DbType, _driverPropertys.BigTextConnectStr, _driverPropertys.TableName);
var ret = new QueryData<TDengineDBHistoryValue>()
{
IsSorted = option.SortOrder != SortOrder.Unset,
@@ -107,7 +111,7 @@ public partial class TDengineDBProducer : BusinessBaseWithCacheIntervalVariableM
IsAdvanceSearch = option.AdvanceSearches.Count > 0 || option.CustomerSearches.Count > 0,
IsSearch = option.Searches.Count > 0
};
var query = db.Queryable<TDengineDBHistoryValue>().AS(_driverPropertys.TableName);
var query = db.Queryable<TDengineDBHistoryValue>().AsTDengineSTable();
query = db.GetQuery<TDengineDBHistoryValue>(option, query);
@@ -140,8 +144,10 @@ public partial class TDengineDBProducer : BusinessBaseWithCacheIntervalVariableM
protected override async Task ProtectedStartAsync(CancellationToken cancellationToken)
{
var db = BusinessDatabaseUtil.GetDb(_driverPropertys.DbType, _driverPropertys.BigTextConnectStr);
var db = TDengineDBUtil.GetDb(_driverPropertys.DbType, _driverPropertys.BigTextConnectStr, _driverPropertys.TableName);
db.DbMaintenance.CreateDatabase();
//必须为间隔上传
if (!_driverPropertys.BigTextScriptHistoryTable.IsNullOrEmpty())
{
@@ -152,7 +158,7 @@ public partial class TDengineDBProducer : BusinessBaseWithCacheIntervalVariableM
}
else
{
db.CodeFirst.As<TDengineDBHistoryValue>(_driverPropertys.TableName).InitTables(typeof(TDengineDBHistoryValue));
db.CodeFirst.InitTables(typeof(TDengineDBHistoryValue));
}
await base.ProtectedStartAsync(cancellationToken).ConfigureAwait(false);
}

View File

@@ -10,6 +10,9 @@
using Mapster;
using SqlSugar;
using SqlSugar.TDengine;
using ThingsGateway.Foundation;
using ThingsGateway.Plugin.DB;
@@ -60,7 +63,7 @@ public partial class TDengineDBProducer : BusinessBaseWithCacheIntervalVariableM
{
try
{
var db = BusinessDatabaseUtil.GetDb(_driverPropertys.DbType, _driverPropertys.BigTextConnectStr);
var db = TDengineDBUtil.GetDb(_driverPropertys.DbType, _driverPropertys.BigTextConnectStr, _driverPropertys.TableName);
db.Ado.CancellationToken = cancellationToken;
if (!_driverPropertys.BigTextScriptHistoryTable.IsNullOrEmpty())
@@ -75,7 +78,7 @@ public partial class TDengineDBProducer : BusinessBaseWithCacheIntervalVariableM
}
else
{
var result = await db.Insertable(dbInserts).AS(_driverPropertys.TableName).ExecuteCommandAsync(cancellationToken).ConfigureAwait(false);//不要加分表
var result = await db.Insertable(dbInserts).SetTDengineChildTableName((stableName, it) => $"{stableName}_{it.Name}").ExecuteCommandAsync().ConfigureAwait(false);//不要加分表
//var result = await db.Insertable(dbInserts).SplitTable().ExecuteCommandAsync().ConfigureAwait(false);
if (result > 0)

View File

@@ -0,0 +1,28 @@
//------------------------------------------------------------------------------
// 此代码版权声明为全文件覆盖,如有原作者特别声明,会在下方手动补充
// 此代码版权除特别声明外的代码归作者本人Diego所有
// 源代码使用协议遵循本仓库的开源协议及附加协议
// Gitee源代码仓库https://gitee.com/diego2098/ThingsGateway
// Github源代码仓库https://github.com/kimdiego2098/ThingsGateway
// 使用文档https://thingsgateway.cn/
// QQ群605534569
//------------------------------------------------------------------------------
using SqlSugar;
namespace ThingsGateway.Plugin.TDengineDB;
public static class TDengineDBUtil
{
/// <summary>
/// 获取数据库链接
/// </summary>
/// <returns></returns>
public static SqlSugarClient GetDb(DbType dbType, string connectionString, string stableName)
{
var db = BusinessDatabaseUtil.GetDb(dbType, connectionString); ;
db.MappingSTableName<TDengineDBHistoryValue>(stableName);
return db;
}
}

View File

@@ -1,6 +1,6 @@
<Project>
<PropertyGroup>
<Version>10.0.2.9</Version>
<Version>10.0.2.10</Version>
</PropertyGroup>
<ItemGroup>