mirror of
https://gitee.com/ThingsGateway/ThingsGateway.git
synced 2025-11-05 01:53:58 +08:00
优化sql实时表执行性能
This commit is contained in:
@@ -1,8 +1,8 @@
|
|||||||
<Project>
|
<Project>
|
||||||
|
|
||||||
<PropertyGroup>
|
<PropertyGroup>
|
||||||
<PluginVersion>10.7.47</PluginVersion>
|
<PluginVersion>10.7.48</PluginVersion>
|
||||||
<ProPluginVersion>10.7.47</ProPluginVersion>
|
<ProPluginVersion>10.7.48</ProPluginVersion>
|
||||||
<AuthenticationVersion>2.5.0</AuthenticationVersion>
|
<AuthenticationVersion>2.5.0</AuthenticationVersion>
|
||||||
<NET8Version>8.0.17</NET8Version>
|
<NET8Version>8.0.17</NET8Version>
|
||||||
<NET9Version>9.0.6</NET9Version>
|
<NET9Version>9.0.6</NET9Version>
|
||||||
|
|||||||
@@ -164,12 +164,15 @@ public partial class SqlDBProducer : BusinessBaseWithCacheIntervalVariableModel<
|
|||||||
.Map(dest => dest.Value, src => src.Value == null ? string.Empty : src.Value.ToString() ?? string.Empty)
|
.Map(dest => dest.Value, src => src.Value == null ? string.Empty : src.Value.ToString() ?? string.Empty)
|
||||||
.Map(dest => dest.CreateTime, (src) => DateTime.Now);
|
.Map(dest => dest.CreateTime, (src) => DateTime.Now);
|
||||||
|
|
||||||
_exRealTimerTick = new(_driverPropertys.RealTableBusinessInterval);
|
|
||||||
|
|
||||||
await base.InitChannelAsync(channel, cancellationToken).ConfigureAwait(false);
|
await base.InitChannelAsync(channel, cancellationToken).ConfigureAwait(false);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
public override Task AfterVariablesChangedAsync(CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
RealTimeVariables.Clear();
|
||||||
|
_initRealData = false;
|
||||||
|
return base.AfterVariablesChangedAsync(cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
protected override async Task ProtectedStartAsync(CancellationToken cancellationToken)
|
protected override async Task ProtectedStartAsync(CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
@@ -214,27 +217,11 @@ public partial class SqlDBProducer : BusinessBaseWithCacheIntervalVariableModel<
|
|||||||
protected override async ValueTask ProtectedExecuteAsync(CancellationToken cancellationToken)
|
protected override async ValueTask ProtectedExecuteAsync(CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
if (_driverPropertys.IsReadDB)
|
if (_driverPropertys.IsReadDB)
|
||||||
{
|
|
||||||
if (_exRealTimerTick.IsTickHappen())
|
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
var varList = IdVariableRuntimes.Select(a => a.Value);
|
var varList = RealTimeVariables.ToListWithDequeue();
|
||||||
if (_driverPropertys.GroupUpdate)
|
if (varList.Count > 0)
|
||||||
{
|
|
||||||
var groups = varList.GroupBy(a => a.BusinessGroup);
|
|
||||||
foreach (var item in groups)
|
|
||||||
{
|
|
||||||
var result = await UpdateAsync(item.Adapt<List<SQLRealValue>>(), cancellationToken).ConfigureAwait(false);
|
|
||||||
if (success != result.IsSuccess)
|
|
||||||
{
|
|
||||||
if (!result.IsSuccess)
|
|
||||||
LogMessage?.LogWarning(result.ToString());
|
|
||||||
success = result.IsSuccess;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
{
|
||||||
var result = await UpdateAsync(varList.Adapt<List<SQLRealValue>>(), cancellationToken).ConfigureAwait(false);
|
var result = await UpdateAsync(varList.Adapt<List<SQLRealValue>>(), cancellationToken).ConfigureAwait(false);
|
||||||
if (success != result.IsSuccess)
|
if (success != result.IsSuccess)
|
||||||
@@ -252,7 +239,6 @@ public partial class SqlDBProducer : BusinessBaseWithCacheIntervalVariableModel<
|
|||||||
success = false;
|
success = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
if (_driverPropertys.IsHistoryDB)
|
if (_driverPropertys.IsHistoryDB)
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -58,13 +58,6 @@ public class SqlDBProducerProperty : BusinessPropertyWithCacheInterval
|
|||||||
[AutoGenerateColumn(ComponentType = typeof(Textarea), Rows = 1)]
|
[AutoGenerateColumn(ComponentType = typeof(Textarea), Rows = 1)]
|
||||||
public string BigTextConnectStr { get; set; } = "server=.;uid=sa;pwd=111111;database=test;";
|
public string BigTextConnectStr { get; set; } = "server=.;uid=sa;pwd=111111;database=test;";
|
||||||
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 实时表间隔上传时间
|
|
||||||
/// </summary>
|
|
||||||
[DynamicProperty]
|
|
||||||
public virtual string RealTableBusinessInterval { get; set; } = "3000";
|
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 实时表脚本
|
/// 实时表脚本
|
||||||
/// </summary>
|
/// </summary>
|
||||||
|
|||||||
@@ -10,11 +10,11 @@
|
|||||||
|
|
||||||
using Mapster;
|
using Mapster;
|
||||||
|
|
||||||
|
using System.Collections.Concurrent;
|
||||||
using System.Diagnostics;
|
using System.Diagnostics;
|
||||||
|
|
||||||
using ThingsGateway.Extension.Generic;
|
using ThingsGateway.Extension.Generic;
|
||||||
using ThingsGateway.Foundation;
|
using ThingsGateway.Foundation;
|
||||||
using ThingsGateway.NewLife;
|
|
||||||
using ThingsGateway.Plugin.DB;
|
using ThingsGateway.Plugin.DB;
|
||||||
|
|
||||||
using TouchSocket.Core;
|
using TouchSocket.Core;
|
||||||
@@ -27,7 +27,6 @@ namespace ThingsGateway.Plugin.SqlDB;
|
|||||||
public partial class SqlDBProducer : BusinessBaseWithCacheIntervalVariableModel<SQLHistoryValue>
|
public partial class SqlDBProducer : BusinessBaseWithCacheIntervalVariableModel<SQLHistoryValue>
|
||||||
{
|
{
|
||||||
private TypeAdapterConfig _config;
|
private TypeAdapterConfig _config;
|
||||||
private TimeTick _exRealTimerTick;
|
|
||||||
private volatile bool _initRealData;
|
private volatile bool _initRealData;
|
||||||
|
|
||||||
protected override ValueTask<OperResult> UpdateVarModel(IEnumerable<CacheDBItem<SQLHistoryValue>> item, CancellationToken cancellationToken)
|
protected override ValueTask<OperResult> UpdateVarModel(IEnumerable<CacheDBItem<SQLHistoryValue>> item, CancellationToken cancellationToken)
|
||||||
@@ -78,6 +77,7 @@ public partial class SqlDBProducer : BusinessBaseWithCacheIntervalVariableModel<
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private ConcurrentDictionary<long,VariableBasicData> RealTimeVariables { get; } = new ConcurrentDictionary<long, VariableBasicData>();
|
||||||
private void UpdateVariable(VariableRuntime variableRuntime, VariableBasicData variable)
|
private void UpdateVariable(VariableRuntime variableRuntime, VariableBasicData variable)
|
||||||
{
|
{
|
||||||
if (_driverPropertys.IsHistoryDB)
|
if (_driverPropertys.IsHistoryDB)
|
||||||
@@ -93,6 +93,11 @@ public partial class SqlDBProducer : BusinessBaseWithCacheIntervalVariableModel<
|
|||||||
AddQueueVarModel(new CacheDBItem<SQLHistoryValue>(variableRuntime.Adapt<SQLHistoryValue>(_config)));
|
AddQueueVarModel(new CacheDBItem<SQLHistoryValue>(variableRuntime.Adapt<SQLHistoryValue>(_config)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (_driverPropertys.IsReadDB)
|
||||||
|
{
|
||||||
|
RealTimeVariables.AddOrUpdate(variable.Id, variable, (key, oldValue) => oldValue);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -168,26 +173,19 @@ public partial class SqlDBProducer : BusinessBaseWithCacheIntervalVariableModel<
|
|||||||
{
|
{
|
||||||
|
|
||||||
if (!_initRealData)
|
if (!_initRealData)
|
||||||
{
|
|
||||||
if (datas?.Count > 0)
|
|
||||||
{
|
{
|
||||||
Stopwatch stopwatch = new();
|
Stopwatch stopwatch = new();
|
||||||
stopwatch.Start();
|
stopwatch.Start();
|
||||||
var ids = (await db.Queryable<SQLRealValue>().AS(_driverPropertys.ReadDBTableName).Select(a => a.Id).ToListAsync(cancellationToken).ConfigureAwait(false)).ToHashSet();
|
var ids = (await db.Queryable<SQLRealValue>().AS(_driverPropertys.ReadDBTableName).Select(a => a.Id).ToListAsync(cancellationToken).ConfigureAwait(false)).ToHashSet();
|
||||||
var InsertData = datas.Where(a => !ids.Contains(a.Id)).ToList();
|
var InsertData = IdVariableRuntimes.Where(a => !ids.Contains(a.Key)).Select(a=>a.Value).Adapt<List<SQLRealValue>>();
|
||||||
var result = await db.Fastest<SQLRealValue>().AS(_driverPropertys.ReadDBTableName).PageSize(100000).BulkCopyAsync(InsertData).ConfigureAwait(false);
|
var result = await db.Fastest<SQLRealValue>().AS(_driverPropertys.ReadDBTableName).PageSize(100000).BulkCopyAsync(InsertData).ConfigureAwait(false);
|
||||||
//var result = await db.Storageable(datas).As(_driverPropertys.ReadDBTableName).PageSize(5000).ExecuteSqlBulkCopyAsync(cancellationToken).ConfigureAwait(false);
|
|
||||||
_initRealData = true;
|
_initRealData = true;
|
||||||
stopwatch.Stop();
|
stopwatch.Stop();
|
||||||
if (result > 0)
|
if (result > 0)
|
||||||
{
|
{
|
||||||
LogMessage?.Trace($"RealTable Insert Data Count:{result},watchTime: {stopwatch.ElapsedMilliseconds} ms");
|
LogMessage?.Trace($"RealTable Insert Data Count:{result},watchTime: {stopwatch.ElapsedMilliseconds} ms");
|
||||||
}
|
}
|
||||||
return OperResult.Success;
|
|
||||||
}
|
}
|
||||||
return OperResult.Success;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
{
|
||||||
if (datas?.Count > 0)
|
if (datas?.Count > 0)
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
<Project>
|
<Project>
|
||||||
<PropertyGroup>
|
<PropertyGroup>
|
||||||
<Version>10.7.47</Version>
|
<Version>10.7.48</Version>
|
||||||
</PropertyGroup>
|
</PropertyGroup>
|
||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
|
|||||||
Reference in New Issue
Block a user