mirror of
https://gitee.com/ThingsGateway/ThingsGateway.git
synced 2025-10-22 19:43:07 +08:00
10.8.12
This commit is contained in:
@@ -1,8 +1,8 @@
|
|||||||
<Project>
|
<Project>
|
||||||
|
|
||||||
<PropertyGroup>
|
<PropertyGroup>
|
||||||
<PluginVersion>10.8.10</PluginVersion>
|
<PluginVersion>10.8.12</PluginVersion>
|
||||||
<ProPluginVersion>10.8.10</ProPluginVersion>
|
<ProPluginVersion>10.8.12</ProPluginVersion>
|
||||||
<AuthenticationVersion>2.8.0</AuthenticationVersion>
|
<AuthenticationVersion>2.8.0</AuthenticationVersion>
|
||||||
<SourceGeneratorVersion>10.8.2</SourceGeneratorVersion>
|
<SourceGeneratorVersion>10.8.2</SourceGeneratorVersion>
|
||||||
<NET8Version>8.0.17</NET8Version>
|
<NET8Version>8.0.17</NET8Version>
|
||||||
|
|||||||
@@ -996,4 +996,6 @@ public abstract class DeviceBase : DisposableObject, IDevice
|
|||||||
}
|
}
|
||||||
return a => { };
|
return a => { };
|
||||||
}
|
}
|
||||||
|
public abstract ValueTask<OperResult<byte[]>> ReadAsync(object state, CancellationToken cancellationToken = default);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -466,4 +466,5 @@ public interface IDevice : IDisposable
|
|||||||
/// <param name="channel">通道</param>
|
/// <param name="channel">通道</param>
|
||||||
/// <param name="deviceLog">单独设备日志</param>
|
/// <param name="deviceLog">单独设备日志</param>
|
||||||
void InitChannel(IChannel channel, ILog? deviceLog = null);
|
void InitChannel(IChannel channel, ILog? deviceLog = null);
|
||||||
|
ValueTask<OperResult<byte[]>> ReadAsync(object state, CancellationToken cancellationToken = default);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -30,6 +30,11 @@ public interface IVariableSource
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
string RegisterAddress { get; set; }
|
string RegisterAddress { get; set; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 变量地址
|
||||||
|
/// </summary>
|
||||||
|
object AddressObject { get; set; }
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// IntervalTime
|
/// IntervalTime
|
||||||
/// </summary>
|
/// </summary>
|
||||||
|
|||||||
@@ -36,6 +36,8 @@ public class VariableSourceClass : IVariableSource
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
public IEnumerable<IVariable> VariableRuntimes => _variableRuntimes;
|
public IEnumerable<IVariable> VariableRuntimes => _variableRuntimes;
|
||||||
|
|
||||||
|
public object AddressObject { get; set; }
|
||||||
|
|
||||||
/// <inheritdoc/>
|
/// <inheritdoc/>
|
||||||
public virtual void AddVariable(IVariable variable)
|
public virtual void AddVariable(IVariable variable)
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -137,8 +137,9 @@ public abstract class CollectFoundationBase : CollectBase
|
|||||||
|
|
||||||
if (cancellationToken.IsCancellationRequested)
|
if (cancellationToken.IsCancellationRequested)
|
||||||
return new(new OperationCanceledException());
|
return new(new OperationCanceledException());
|
||||||
|
|
||||||
// 从协议读取数据
|
// 从协议读取数据
|
||||||
var read = await FoundationDevice.ReadAsync(variableSourceRead.RegisterAddress, variableSourceRead.Length, cancellationToken).ConfigureAwait(false);
|
var read = await FoundationDevice.ReadAsync(variableSourceRead.AddressObject, cancellationToken).ConfigureAwait(false);
|
||||||
|
|
||||||
// 如果读取成功且有有效内容,则解析结构化内容
|
// 如果读取成功且有有效内容,则解析结构化内容
|
||||||
if (read.IsSuccess)
|
if (read.IsSuccess)
|
||||||
|
|||||||
@@ -32,6 +32,7 @@ public class VariableSourceRead : IVariableSource
|
|||||||
/// 读取地址
|
/// 读取地址
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public string RegisterAddress { get; set; }
|
public string RegisterAddress { get; set; }
|
||||||
|
public object AddressObject { get; set; }
|
||||||
|
|
||||||
public string IntervalTime { get; set; }
|
public string IntervalTime { get; set; }
|
||||||
|
|
||||||
|
|||||||
@@ -44,7 +44,7 @@ public class Dlt645_2007Master : DtuServiceDeviceBase
|
|||||||
/// <param name="dateTime">时间</param>
|
/// <param name="dateTime">时间</param>
|
||||||
/// <param name="cancellationToken">取消令箭</param>
|
/// <param name="cancellationToken">取消令箭</param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public async ValueTask<OperResult> BroadcastTimeAsync(DateTime dateTime, CancellationToken cancellationToken = default)
|
public ValueTask<OperResult> BroadcastTimeAsync(DateTime dateTime, CancellationToken cancellationToken = default)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
@@ -53,40 +53,30 @@ public class Dlt645_2007Master : DtuServiceDeviceBase
|
|||||||
dAddress.Station = str.HexStringToBytes();
|
dAddress.Station = str.HexStringToBytes();
|
||||||
dAddress.DataId = "999999999999".HexStringToBytes();
|
dAddress.DataId = "999999999999".HexStringToBytes();
|
||||||
|
|
||||||
return await Dlt645SendAsync(dAddress, ControlCode.BroadcastTime, FEHead, cancellationToken: cancellationToken).ConfigureAwait(false);
|
return Dlt645SendAsync(dAddress, ControlCode.BroadcastTime, FEHead, cancellationToken: cancellationToken);
|
||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
return new OperResult(ex);
|
return EasyValueTask.FromResult(new OperResult(ex));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <inheritdoc/>
|
/// <inheritdoc/>
|
||||||
public async ValueTask<OperResult<byte[]>> Dlt645RequestAsync(Dlt645_2007Address dAddress, ControlCode controlCode, string feHead, byte[] codes = default, string[] datas = default, CancellationToken cancellationToken = default)
|
public ValueTask<OperResult<byte[]>> Dlt645RequestAsync(Dlt645_2007Address dAddress, ControlCode controlCode, string feHead, byte[] codes = default, string[] datas = default, CancellationToken cancellationToken = default)
|
||||||
{
|
{
|
||||||
try
|
|
||||||
{
|
return SendThenReturnAsync(GetSendMessage(dAddress, controlCode, feHead, codes, datas), cancellationToken);
|
||||||
return await SendThenReturnAsync(GetSendMessage(dAddress, controlCode, feHead, codes, datas), cancellationToken).ConfigureAwait(false);
|
|
||||||
}
|
|
||||||
catch (Exception ex)
|
|
||||||
{
|
|
||||||
return new OperResult<byte[]>(ex);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <inheritdoc/>
|
/// <inheritdoc/>
|
||||||
public async ValueTask<OperResult> Dlt645SendAsync(Dlt645_2007Address dAddress, ControlCode controlCode, string feHead, byte[] codes = default, string[] datas = default, CancellationToken cancellationToken = default)
|
public ValueTask<OperResult> Dlt645SendAsync(Dlt645_2007Address dAddress, ControlCode controlCode, string feHead, byte[] codes = default, string[] datas = default, CancellationToken cancellationToken = default)
|
||||||
{
|
{
|
||||||
try
|
|
||||||
{
|
|
||||||
|
|
||||||
return await SendAsync(GetSendMessage(dAddress, controlCode, feHead, codes, datas), cancellationToken).ConfigureAwait(false);
|
|
||||||
|
|
||||||
}
|
return SendAsync(GetSendMessage(dAddress, controlCode, feHead, codes, datas), cancellationToken);
|
||||||
catch (Exception ex)
|
|
||||||
{
|
|
||||||
return new OperResult<byte[]>(ex);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@@ -147,7 +137,7 @@ public class Dlt645_2007Master : DtuServiceDeviceBase
|
|||||||
/// <inheritdoc/>
|
/// <inheritdoc/>
|
||||||
public override List<T> LoadSourceRead<T>(IEnumerable<IVariable> deviceVariables, int maxPack, string defaultIntervalTime)
|
public override List<T> LoadSourceRead<T>(IEnumerable<IVariable> deviceVariables, int maxPack, string defaultIntervalTime)
|
||||||
{
|
{
|
||||||
return PackHelper.LoadSourceRead<T>(this, deviceVariables, maxPack, defaultIntervalTime);
|
return PackHelper.LoadSourceRead<T>(this, deviceVariables, maxPack, Station, defaultIntervalTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <inheritdoc/>
|
/// <inheritdoc/>
|
||||||
@@ -163,7 +153,19 @@ public class Dlt645_2007Master : DtuServiceDeviceBase
|
|||||||
return EasyValueTask.FromResult(new OperResult<byte[]>(ex));
|
return EasyValueTask.FromResult(new OperResult<byte[]>(ex));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
public override ValueTask<OperResult<byte[]>> ReadAsync(object state, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
|
||||||
|
if (state is Dlt645_2007Address dlt645_2007Address)
|
||||||
|
{
|
||||||
|
return Dlt645RequestAsync(dlt645_2007Address, ControlCode.Read, FEHead, cancellationToken: cancellationToken);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
throw new ArgumentException("State must be of type Dlt645_2007Address", nameof(state));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 读取通信地址
|
/// 读取通信地址
|
||||||
/// </summary>
|
/// </summary>
|
||||||
@@ -231,16 +233,16 @@ public class Dlt645_2007Master : DtuServiceDeviceBase
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// <inheritdoc/>
|
/// <inheritdoc/>
|
||||||
public override async ValueTask<OperResult> WriteAsync(string address, string value, IThingsGatewayBitConverter bitConverter = null, CancellationToken cancellationToken = default)
|
public override ValueTask<OperResult> WriteAsync(string address, string value, IThingsGatewayBitConverter bitConverter = null, CancellationToken cancellationToken = default)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
string[] strArray = value.SplitStringBySemicolon();
|
string[] strArray = value.SplitStringBySemicolon();
|
||||||
return await WriteAsync(address, value, bitConverter, cancellationToken).ConfigureAwait(false);
|
return WriteAsync(address, value, bitConverter, cancellationToken);
|
||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
return new OperResult<byte[]>(ex);
|
return EasyValueTask.FromResult(new OperResult(ex));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -18,9 +18,10 @@ internal static class PackHelper
|
|||||||
/// <param name="device"></param>
|
/// <param name="device"></param>
|
||||||
/// <param name="deviceVariables"></param>
|
/// <param name="deviceVariables"></param>
|
||||||
/// <param name="maxPack">最大打包长度</param>
|
/// <param name="maxPack">最大打包长度</param>
|
||||||
|
/// <param name="station">station</param>
|
||||||
/// <param name="defaultIntervalTime">默认间隔时间</param>
|
/// <param name="defaultIntervalTime">默认间隔时间</param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public static List<T> LoadSourceRead<T>(IDevice device, IEnumerable<IVariable> deviceVariables, int maxPack, string defaultIntervalTime) where T : IVariableSource, new()
|
public static List<T> LoadSourceRead<T>(IDevice device, IEnumerable<IVariable> deviceVariables, int maxPack, string station, string defaultIntervalTime) where T : IVariableSource, new()
|
||||||
{
|
{
|
||||||
var byteConverter = device.ThingsGatewayBitConverter;
|
var byteConverter = device.ThingsGatewayBitConverter;
|
||||||
var result = new List<T>();
|
var result = new List<T>();
|
||||||
@@ -42,6 +43,7 @@ internal static class PackHelper
|
|||||||
var r = new T()
|
var r = new T()
|
||||||
{
|
{
|
||||||
RegisterAddress = item.Key!,
|
RegisterAddress = item.Key!,
|
||||||
|
AddressObject = Dlt645_2007Address.ParseFrom(item.Key, station),
|
||||||
Length = 1,
|
Length = 1,
|
||||||
IntervalTime = string.IsNullOrWhiteSpace(item.FirstOrDefault().IntervalTime) ? defaultIntervalTime : item.FirstOrDefault().IntervalTime,
|
IntervalTime = string.IsNullOrWhiteSpace(item.FirstOrDefault().IntervalTime) ? defaultIntervalTime : item.FirstOrDefault().IntervalTime,
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -167,6 +167,7 @@ public static class PackHelper
|
|||||||
IntervalTime = intervalTime,
|
IntervalTime = intervalTime,
|
||||||
// 将当前组打包地址中的起始地址作为实际打包报文中的起始地址
|
// 将当前组打包地址中的起始地址作为实际打包报文中的起始地址
|
||||||
RegisterAddress = startAddress.ToString(),
|
RegisterAddress = startAddress.ToString(),
|
||||||
|
AddressObject = new ModbusAddress(startAddress) { Length = (ushort)sourceLen },
|
||||||
Length = sourceLen.ToInt()
|
Length = sourceLen.ToInt()
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@@ -98,31 +98,64 @@ public partial class ModbusMaster : DtuServiceDeviceBase, IModbusAddress
|
|||||||
return PackHelper.LoadSourceRead<T>(this, deviceVariables, maxPack, defaultIntervalTime, Station);
|
return PackHelper.LoadSourceRead<T>(this, deviceVariables, maxPack, defaultIntervalTime, Station);
|
||||||
}
|
}
|
||||||
|
|
||||||
public async ValueTask<OperResult<byte[]>> ModbusRequestAsync(ModbusAddress mAddress, bool read, CancellationToken cancellationToken = default)
|
public override ValueTask<OperResult<byte[]>> ReadAsync(object state, CancellationToken cancellationToken = default)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
return await SendThenReturnAsync(GetSendMessage(mAddress, read),
|
if (state is ModbusAddress mAddress)
|
||||||
cancellationToken).ConfigureAwait(false);
|
{
|
||||||
|
return ModbusReadAsync(mAddress, cancellationToken);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
throw new ArgumentException("State must be of type ModbusAddress", nameof(state));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
return new OperResult<byte[]>(ex);
|
return EasyValueTask.FromResult(new OperResult<byte[]>(ex));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public ValueTask<OperResult<byte[]>> ModbusReadAsync(ModbusAddress mAddress, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
return SendThenReturnAsync(GetSendMessage(mAddress, true),
|
||||||
|
cancellationToken);
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
return EasyValueTask.FromResult(new OperResult<byte[]>(ex));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public ValueTask<OperResult<byte[]>> ModbusRequestAsync(ModbusAddress mAddress, bool read, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
return SendThenReturnAsync(GetSendMessage(mAddress, read),
|
||||||
|
cancellationToken);
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
return EasyValueTask.FromResult(new OperResult<byte[]>(ex));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <inheritdoc/>
|
/// <inheritdoc/>
|
||||||
public override async ValueTask<OperResult<byte[]>> ReadAsync(string address, int length, CancellationToken cancellationToken = default)
|
public override ValueTask<OperResult<byte[]>> ReadAsync(string address, int length, CancellationToken cancellationToken = default)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
var mAddress = GetModbusAddress(address, Station);
|
var mAddress = GetModbusAddress(address, Station);
|
||||||
mAddress.Length = (ushort)length;
|
mAddress.Length = (ushort)length;
|
||||||
return await ModbusRequestAsync(mAddress, true, cancellationToken).ConfigureAwait(false);
|
return ModbusRequestAsync(mAddress, true, cancellationToken);
|
||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
return new OperResult<byte[]>(ex);
|
return EasyValueTask.FromResult(new OperResult<byte[]>(ex));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -310,6 +310,34 @@ public class ModbusSlave : DeviceBase, IModbusAddress
|
|||||||
return EasyValueTask.FromResult(new OperResult<byte[]>(result));
|
return EasyValueTask.FromResult(new OperResult<byte[]>(result));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public override ValueTask<OperResult<byte[]>> ReadAsync(object state, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
if (state is ModbusAddress mAddress)
|
||||||
|
{
|
||||||
|
var result = ModbusRequest(mAddress, true, cancellationToken);
|
||||||
|
if (result.IsSuccess)
|
||||||
|
{
|
||||||
|
return EasyValueTask.FromResult(new OperResult<byte[]>() { Content = result.Content.ToArray() });
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
return EasyValueTask.FromResult(new OperResult<byte[]>(result));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
throw new ArgumentException("State must be of type ModbusAddress", nameof(state));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
return EasyValueTask.FromResult(new OperResult<byte[]>(ex));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public virtual ModbusAddress GetModbusAddress(string address, byte? station, bool isCache = true)
|
public virtual ModbusAddress GetModbusAddress(string address, byte? station, bool isCache = true)
|
||||||
{
|
{
|
||||||
var mAddress = ModbusAddress.ParseFrom(address, station, isCache);
|
var mAddress = ModbusAddress.ParseFrom(address, station, isCache);
|
||||||
|
|||||||
@@ -230,6 +230,7 @@ internal static class PackHelper
|
|||||||
{
|
{
|
||||||
IntervalTime = intervalTime, // 设置时间戳
|
IntervalTime = intervalTime, // 设置时间戳
|
||||||
RegisterAddress = tempAddresses.OrderBy(it => it.AddressStart).First().ToString(), // 获取地址并按地址排序
|
RegisterAddress = tempAddresses.OrderBy(it => it.AddressStart).First().ToString(), // 获取地址并按地址排序
|
||||||
|
AddressObject = new SiemensS7Address(tempAddresses.OrderBy(it => it.AddressStart).First()) { Length = sourceLen },
|
||||||
Length = sourceLen // 设置源长度
|
Length = sourceLen // 设置源长度
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@@ -298,6 +298,26 @@ public partial class SiemensS7Master : DeviceBase
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public override ValueTask<OperResult<byte[]>> ReadAsync(object state, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
if (state is SiemensS7Address sAddress)
|
||||||
|
{
|
||||||
|
return S7ReadAsync([sAddress], cancellationToken);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
throw new ArgumentException("State must be of type SiemensS7Address", nameof(state));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
return EasyValueTask.FromResult(new OperResult<byte[]>(ex));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/// <inheritdoc/>
|
/// <inheritdoc/>
|
||||||
public override async ValueTask<OperResult> WriteAsync(string address, byte[] value, DataTypeEnum dataType, CancellationToken cancellationToken = default)
|
public override async ValueTask<OperResult> WriteAsync(string address, byte[] value, DataTypeEnum dataType, CancellationToken cancellationToken = default)
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -25,7 +25,7 @@ namespace ThingsGateway.Plugin.QuestDB;
|
|||||||
/// <summary>
|
/// <summary>
|
||||||
/// QuestDBProducer
|
/// QuestDBProducer
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public partial class QuestDBProducer : BusinessBaseWithCacheIntervalVariableModel<QuestDBHistoryValue>, IDBHistoryValueService
|
public partial class QuestDBProducer : BusinessBaseWithCacheIntervalVariableModel<VariableBasicData>, IDBHistoryValueService
|
||||||
{
|
{
|
||||||
internal readonly RealDBProducerProperty _driverPropertys = new();
|
internal readonly RealDBProducerProperty _driverPropertys = new();
|
||||||
private readonly QuestDBProducerVariableProperty _variablePropertys = new();
|
private readonly QuestDBProducerVariableProperty _variablePropertys = new();
|
||||||
|
|||||||
@@ -23,11 +23,11 @@ namespace ThingsGateway.Plugin.QuestDB;
|
|||||||
/// <summary>
|
/// <summary>
|
||||||
/// RabbitMQProducer
|
/// RabbitMQProducer
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public partial class QuestDBProducer : BusinessBaseWithCacheIntervalVariableModel<QuestDBHistoryValue>
|
public partial class QuestDBProducer : BusinessBaseWithCacheIntervalVariableModel<VariableBasicData>
|
||||||
{
|
{
|
||||||
private TypeAdapterConfig _config;
|
private TypeAdapterConfig _config;
|
||||||
|
|
||||||
protected override ValueTask<OperResult> UpdateVarModel(IEnumerable<CacheDBItem<QuestDBHistoryValue>> item, CancellationToken cancellationToken)
|
protected override ValueTask<OperResult> UpdateVarModel(IEnumerable<CacheDBItem<VariableBasicData>> item, CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
return UpdateVarModel(item.Select(a => a.Value).OrderBy(a => a.Id), cancellationToken);
|
return UpdateVarModel(item.Select(a => a.Value).OrderBy(a => a.Id), cancellationToken);
|
||||||
}
|
}
|
||||||
@@ -41,7 +41,7 @@ public partial class QuestDBProducer : BusinessBaseWithCacheIntervalVariableMode
|
|||||||
UpdateVariable(variableRuntime, variable);
|
UpdateVariable(variableRuntime, variable);
|
||||||
base.VariableChange(variableRuntime, variable);
|
base.VariableChange(variableRuntime, variable);
|
||||||
}
|
}
|
||||||
protected override ValueTask<OperResult> UpdateVarModels(IEnumerable<QuestDBHistoryValue> item, CancellationToken cancellationToken)
|
protected override ValueTask<OperResult> UpdateVarModels(IEnumerable<VariableBasicData> item, CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
return UpdateVarModel(item, cancellationToken);
|
return UpdateVarModel(item, cancellationToken);
|
||||||
}
|
}
|
||||||
@@ -56,18 +56,18 @@ public partial class QuestDBProducer : BusinessBaseWithCacheIntervalVariableMode
|
|||||||
|
|
||||||
foreach (var group in varGroup)
|
foreach (var group in varGroup)
|
||||||
{
|
{
|
||||||
AddQueueVarModel(new CacheDBItem<List<QuestDBHistoryValue>>(group.Adapt<List<QuestDBHistoryValue>>(_config)));
|
AddQueueVarModel(new CacheDBItem<List<VariableBasicData>>(group.ToList()));
|
||||||
}
|
}
|
||||||
foreach (var variable in varList)
|
foreach (var variable in varList)
|
||||||
{
|
{
|
||||||
AddQueueVarModel(new CacheDBItem<QuestDBHistoryValue>(variable.Adapt<QuestDBHistoryValue>(_config)));
|
AddQueueVarModel(new CacheDBItem<VariableBasicData>(variable));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
foreach (var variable in variables)
|
foreach (var variable in variables)
|
||||||
{
|
{
|
||||||
AddQueueVarModel(new CacheDBItem<QuestDBHistoryValue>(variable.Adapt<QuestDBHistoryValue>(_config)));
|
AddQueueVarModel(new CacheDBItem<VariableBasicData>(variable));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -78,15 +78,15 @@ public partial class QuestDBProducer : BusinessBaseWithCacheIntervalVariableMode
|
|||||||
if (_driverPropertys.GroupUpdate && !variable.BusinessGroup.IsNullOrEmpty() && VariableRuntimeGroups.TryGetValue(variable.BusinessGroup, out var variableRuntimeGroup))
|
if (_driverPropertys.GroupUpdate && !variable.BusinessGroup.IsNullOrEmpty() && VariableRuntimeGroups.TryGetValue(variable.BusinessGroup, out var variableRuntimeGroup))
|
||||||
{
|
{
|
||||||
|
|
||||||
AddQueueVarModel(new CacheDBItem<List<QuestDBHistoryValue>>(variableRuntimeGroup.Adapt<List<QuestDBHistoryValue>>(_config)));
|
AddQueueVarModel(new CacheDBItem<List<VariableBasicData>>(variableRuntimeGroup.Adapt<List<VariableBasicData>>(_config)));
|
||||||
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
AddQueueVarModel(new CacheDBItem<QuestDBHistoryValue>(variableRuntime.Adapt<QuestDBHistoryValue>(_config)));
|
AddQueueVarModel(new CacheDBItem<VariableBasicData>(variable));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
private async ValueTask<OperResult> UpdateVarModel(IEnumerable<QuestDBHistoryValue> item, CancellationToken cancellationToken)
|
private async ValueTask<OperResult> UpdateVarModel(IEnumerable<VariableBasicData> item, CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
var result = await InserableAsync(item.WhereIf(_driverPropertys.OnlineFilter, a => a.IsOnline == true).ToList(), cancellationToken).ConfigureAwait(false);
|
var result = await InserableAsync(item.WhereIf(_driverPropertys.OnlineFilter, a => a.IsOnline == true).ToList(), cancellationToken).ConfigureAwait(false);
|
||||||
if (success != result.IsSuccess)
|
if (success != result.IsSuccess)
|
||||||
@@ -101,7 +101,7 @@ public partial class QuestDBProducer : BusinessBaseWithCacheIntervalVariableMode
|
|||||||
|
|
||||||
#region 方法
|
#region 方法
|
||||||
|
|
||||||
private async ValueTask<OperResult> InserableAsync(List<QuestDBHistoryValue> dbInserts, CancellationToken cancellationToken)
|
private async ValueTask<OperResult> InserableAsync(List<VariableBasicData> dbInserts, CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
@@ -118,8 +118,8 @@ public partial class QuestDBProducer : BusinessBaseWithCacheIntervalVariableMode
|
|||||||
{
|
{
|
||||||
Stopwatch stopwatch = new();
|
Stopwatch stopwatch = new();
|
||||||
stopwatch.Start();
|
stopwatch.Start();
|
||||||
|
var data = dbInserts.Adapt<List<QuestDBHistoryValue>>();
|
||||||
var result = await _db.Insertable(dbInserts).AS(_driverPropertys.TableName).ExecuteCommandAsync(cancellationToken).ConfigureAwait(false);//不要加分表
|
var result = await _db.Insertable(data).AS(_driverPropertys.TableName).ExecuteCommandAsync(cancellationToken).ConfigureAwait(false);//不要加分表
|
||||||
stopwatch.Stop();
|
stopwatch.Stop();
|
||||||
|
|
||||||
//var result = await db.Insertable(dbInserts).SplitTable().ExecuteCommandAsync().ConfigureAwait(false);
|
//var result = await db.Insertable(dbInserts).SplitTable().ExecuteCommandAsync().ConfigureAwait(false);
|
||||||
|
|||||||
@@ -25,7 +25,7 @@ namespace ThingsGateway.Plugin.SqlDB;
|
|||||||
/// <summary>
|
/// <summary>
|
||||||
/// SqlDBProducer
|
/// SqlDBProducer
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public partial class SqlDBProducer : BusinessBaseWithCacheIntervalVariableModel<SQLHistoryValue>, IDBHistoryValueService
|
public partial class SqlDBProducer : BusinessBaseWithCacheIntervalVariableModel<VariableBasicData>, IDBHistoryValueService
|
||||||
{
|
{
|
||||||
internal readonly SqlDBProducerProperty _driverPropertys = new();
|
internal readonly SqlDBProducerProperty _driverPropertys = new();
|
||||||
private readonly SqlDBProducerVariableProperty _variablePropertys = new();
|
private readonly SqlDBProducerVariableProperty _variablePropertys = new();
|
||||||
@@ -236,7 +236,7 @@ public partial class SqlDBProducer : BusinessBaseWithCacheIntervalVariableModel<
|
|||||||
var varList = RealTimeVariables.ToListWithDequeue();
|
var varList = RealTimeVariables.ToListWithDequeue();
|
||||||
if (varList.Count > 0)
|
if (varList.Count > 0)
|
||||||
{
|
{
|
||||||
var result = await UpdateAsync(varList.Adapt<List<SQLRealValue>>(), cancellationToken).ConfigureAwait(false);
|
var result = await UpdateAsync(varList, cancellationToken).ConfigureAwait(false);
|
||||||
if (success != result.IsSuccess)
|
if (success != result.IsSuccess)
|
||||||
{
|
{
|
||||||
if (!result.IsSuccess)
|
if (!result.IsSuccess)
|
||||||
|
|||||||
@@ -24,13 +24,13 @@ namespace ThingsGateway.Plugin.SqlDB;
|
|||||||
/// <summary>
|
/// <summary>
|
||||||
/// SqlDBProducer
|
/// SqlDBProducer
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public partial class SqlDBProducer : BusinessBaseWithCacheIntervalVariableModel<SQLHistoryValue>
|
public partial class SqlDBProducer : BusinessBaseWithCacheIntervalVariableModel<VariableBasicData>
|
||||||
{
|
{
|
||||||
private TypeAdapterConfig _config;
|
private TypeAdapterConfig _config;
|
||||||
private volatile bool _initRealData;
|
private volatile bool _initRealData;
|
||||||
private ConcurrentDictionary<long, VariableBasicData> RealTimeVariables { get; } = new ConcurrentDictionary<long, VariableBasicData>();
|
private ConcurrentDictionary<long, VariableBasicData> RealTimeVariables { get; } = new ConcurrentDictionary<long, VariableBasicData>();
|
||||||
|
|
||||||
protected override ValueTask<OperResult> UpdateVarModel(IEnumerable<CacheDBItem<SQLHistoryValue>> item, CancellationToken cancellationToken)
|
protected override ValueTask<OperResult> UpdateVarModel(IEnumerable<CacheDBItem<VariableBasicData>> item, CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
return UpdateVarModel(item.Select(a => a.Value).OrderBy(a => a.Id), cancellationToken);
|
return UpdateVarModel(item.Select(a => a.Value).OrderBy(a => a.Id), cancellationToken);
|
||||||
}
|
}
|
||||||
@@ -48,7 +48,7 @@ public partial class SqlDBProducer : BusinessBaseWithCacheIntervalVariableModel<
|
|||||||
UpdateVariable(variableRuntime, variable);
|
UpdateVariable(variableRuntime, variable);
|
||||||
base.VariableChange(variableRuntime, variable);
|
base.VariableChange(variableRuntime, variable);
|
||||||
}
|
}
|
||||||
protected override ValueTask<OperResult> UpdateVarModels(IEnumerable<SQLHistoryValue> item, CancellationToken cancellationToken)
|
protected override ValueTask<OperResult> UpdateVarModels(IEnumerable<VariableBasicData> item, CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
return UpdateVarModel(item, cancellationToken);
|
return UpdateVarModel(item, cancellationToken);
|
||||||
}
|
}
|
||||||
@@ -62,18 +62,18 @@ public partial class SqlDBProducer : BusinessBaseWithCacheIntervalVariableModel<
|
|||||||
|
|
||||||
foreach (var group in varGroup)
|
foreach (var group in varGroup)
|
||||||
{
|
{
|
||||||
AddQueueVarModel(new CacheDBItem<List<SQLHistoryValue>>(group.Adapt<List<SQLHistoryValue>>(_config)));
|
AddQueueVarModel(new CacheDBItem<List<VariableBasicData>>(group.ToList()));
|
||||||
}
|
}
|
||||||
foreach (var variable in varList)
|
foreach (var variable in varList)
|
||||||
{
|
{
|
||||||
AddQueueVarModel(new CacheDBItem<SQLHistoryValue>(variable.Adapt<SQLHistoryValue>(_config)));
|
AddQueueVarModel(new CacheDBItem<VariableBasicData>(variable));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
foreach (var variable in variables)
|
foreach (var variable in variables)
|
||||||
{
|
{
|
||||||
AddQueueVarModel(new CacheDBItem<SQLHistoryValue>(variable.Adapt<SQLHistoryValue>(_config)));
|
AddQueueVarModel(new CacheDBItem<VariableBasicData>(variable));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -85,12 +85,12 @@ public partial class SqlDBProducer : BusinessBaseWithCacheIntervalVariableModel<
|
|||||||
if (_driverPropertys.GroupUpdate && !variable.BusinessGroup.IsNullOrEmpty() && VariableRuntimeGroups.TryGetValue(variable.BusinessGroup, out var variableRuntimeGroup))
|
if (_driverPropertys.GroupUpdate && !variable.BusinessGroup.IsNullOrEmpty() && VariableRuntimeGroups.TryGetValue(variable.BusinessGroup, out var variableRuntimeGroup))
|
||||||
{
|
{
|
||||||
|
|
||||||
AddQueueVarModel(new CacheDBItem<List<SQLHistoryValue>>(variableRuntimeGroup.Adapt<List<SQLHistoryValue>>(_config)));
|
AddQueueVarModel(new CacheDBItem<List<VariableBasicData>>(variableRuntimeGroup.Adapt<List<VariableBasicData>>(_config)));
|
||||||
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
AddQueueVarModel(new CacheDBItem<SQLHistoryValue>(variableRuntime.Adapt<SQLHistoryValue>(_config)));
|
AddQueueVarModel(new CacheDBItem<VariableBasicData>(variable));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -101,7 +101,7 @@ public partial class SqlDBProducer : BusinessBaseWithCacheIntervalVariableModel<
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private async ValueTask<OperResult> UpdateVarModel(IEnumerable<SQLHistoryValue> item, CancellationToken cancellationToken)
|
private async ValueTask<OperResult> UpdateVarModel(IEnumerable<VariableBasicData> item, CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
var result = await InserableAsync(item.WhereIf(_driverPropertys.OnlineFilter, a => a.IsOnline == true).ToList(), cancellationToken).ConfigureAwait(false);
|
var result = await InserableAsync(item.WhereIf(_driverPropertys.OnlineFilter, a => a.IsOnline == true).ToList(), cancellationToken).ConfigureAwait(false);
|
||||||
if (success != result.IsSuccess)
|
if (success != result.IsSuccess)
|
||||||
@@ -116,7 +116,7 @@ public partial class SqlDBProducer : BusinessBaseWithCacheIntervalVariableModel<
|
|||||||
|
|
||||||
#region 方法
|
#region 方法
|
||||||
|
|
||||||
private async ValueTask<OperResult> InserableAsync(List<SQLHistoryValue> dbInserts, CancellationToken cancellationToken)
|
private async ValueTask<OperResult> InserableAsync(List<VariableBasicData> dbInserts, CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
@@ -134,8 +134,8 @@ public partial class SqlDBProducer : BusinessBaseWithCacheIntervalVariableModel<
|
|||||||
{
|
{
|
||||||
Stopwatch stopwatch = new();
|
Stopwatch stopwatch = new();
|
||||||
stopwatch.Start();
|
stopwatch.Start();
|
||||||
var result = await _db.Fastest<SQLHistoryValue>().PageSize(50000).SplitTable().BulkCopyAsync(dbInserts).ConfigureAwait(false);
|
var data = dbInserts.Adapt<List<SQLHistoryValue>>(_config);
|
||||||
//var result = await db.Insertable(dbInserts).SplitTable().ExecuteCommandAsync().ConfigureAwait(false);
|
var result = await _db.Fastest<SQLHistoryValue>().PageSize(50000).SplitTable().BulkCopyAsync(data).ConfigureAwait(false);
|
||||||
stopwatch.Stop();
|
stopwatch.Stop();
|
||||||
if (result > 0)
|
if (result > 0)
|
||||||
{
|
{
|
||||||
@@ -152,7 +152,7 @@ public partial class SqlDBProducer : BusinessBaseWithCacheIntervalVariableModel<
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private async ValueTask<OperResult> UpdateAsync(List<SQLRealValue> datas, CancellationToken cancellationToken)
|
private async ValueTask<OperResult> UpdateAsync(List<VariableBasicData> datas, CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
@@ -190,7 +190,8 @@ public partial class SqlDBProducer : BusinessBaseWithCacheIntervalVariableModel<
|
|||||||
Stopwatch stopwatch = new();
|
Stopwatch stopwatch = new();
|
||||||
stopwatch.Start();
|
stopwatch.Start();
|
||||||
|
|
||||||
var result = await _db.Fastest<SQLRealValue>().AS(_driverPropertys.ReadDBTableName).PageSize(100000).BulkUpdateAsync(datas).ConfigureAwait(false);
|
var data = datas.Adapt<List<SQLRealValue>>(_config);
|
||||||
|
var result = await _db.Fastest<SQLRealValue>().AS(_driverPropertys.ReadDBTableName).PageSize(100000).BulkUpdateAsync(data).ConfigureAwait(false);
|
||||||
|
|
||||||
stopwatch.Stop();
|
stopwatch.Stop();
|
||||||
if (result > 0)
|
if (result > 0)
|
||||||
|
|||||||
@@ -43,7 +43,7 @@ public partial class SqlHistoryAlarm : BusinessBaseWithCacheVariableModel<Histor
|
|||||||
|
|
||||||
protected override async Task InitChannelAsync(IChannel? channel, CancellationToken cancellationToken)
|
protected override async Task InitChannelAsync(IChannel? channel, CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
_db = BusinessDatabaseUtil.GetDb(_driverPropertys.DbType, _driverPropertys.BigTextConnectStr);
|
_db = BusinessDatabaseUtil.GetDb((DbType)_driverPropertys.DbType, _driverPropertys.BigTextConnectStr);
|
||||||
|
|
||||||
_config.ForType<AlarmVariable, HistoryAlarm>().Map(dest => dest.Id, (src) => CommonUtils.GetSingleId());
|
_config.ForType<AlarmVariable, HistoryAlarm>().Map(dest => dest.Id, (src) => CommonUtils.GetSingleId());
|
||||||
GlobalData.AlarmChangedEvent -= AlarmWorker_OnAlarmChanged;
|
GlobalData.AlarmChangedEvent -= AlarmWorker_OnAlarmChanged;
|
||||||
@@ -112,7 +112,7 @@ public partial class SqlHistoryAlarm : BusinessBaseWithCacheVariableModel<Histor
|
|||||||
|
|
||||||
internal ISugarQueryable<HistoryAlarm> Query(DBHistoryAlarmPageInput input)
|
internal ISugarQueryable<HistoryAlarm> Query(DBHistoryAlarmPageInput input)
|
||||||
{
|
{
|
||||||
using var db = BusinessDatabaseUtil.GetDb(_driverPropertys.DbType, _driverPropertys.BigTextConnectStr);
|
using var db = BusinessDatabaseUtil.GetDb((DbType)_driverPropertys.DbType, _driverPropertys.BigTextConnectStr);
|
||||||
var query = db.Queryable<HistoryAlarm>().AS(_driverPropertys.TableName)
|
var query = db.Queryable<HistoryAlarm>().AS(_driverPropertys.TableName)
|
||||||
.WhereIF(input.StartTime != null, a => a.EventTime >= input.StartTime)
|
.WhereIF(input.StartTime != null, a => a.EventTime >= input.StartTime)
|
||||||
.WhereIF(input.EndTime != null, a => a.EventTime <= input.EndTime)
|
.WhereIF(input.EndTime != null, a => a.EventTime <= input.EndTime)
|
||||||
@@ -132,7 +132,7 @@ public partial class SqlHistoryAlarm : BusinessBaseWithCacheVariableModel<Histor
|
|||||||
|
|
||||||
internal async Task<QueryData<HistoryAlarm>> QueryData(QueryPageOptions option)
|
internal async Task<QueryData<HistoryAlarm>> QueryData(QueryPageOptions option)
|
||||||
{
|
{
|
||||||
using var db = BusinessDatabaseUtil.GetDb(_driverPropertys.DbType, _driverPropertys.BigTextConnectStr);
|
using var db = BusinessDatabaseUtil.GetDb((DbType)_driverPropertys.DbType, _driverPropertys.BigTextConnectStr);
|
||||||
var ret = new QueryData<HistoryAlarm>()
|
var ret = new QueryData<HistoryAlarm>()
|
||||||
{
|
{
|
||||||
IsSorted = option.SortOrder != SortOrder.Unset,
|
IsSorted = option.SortOrder != SortOrder.Unset,
|
||||||
|
|||||||
@@ -13,7 +13,7 @@ using Mapster;
|
|||||||
using System.Diagnostics;
|
using System.Diagnostics;
|
||||||
|
|
||||||
using ThingsGateway.Foundation;
|
using ThingsGateway.Foundation;
|
||||||
using ThingsGateway.NewLife.Threading;
|
using ThingsGateway.Plugin.DB;
|
||||||
|
|
||||||
using TouchSocket.Core;
|
using TouchSocket.Core;
|
||||||
|
|
||||||
@@ -56,36 +56,6 @@ public partial class SqlHistoryAlarm : BusinessBaseWithCacheVariableModel<Histor
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private async ValueTask<OperResult> InserableAsync(List<HistoryAlarm> dbInserts, CancellationToken cancellationToken)
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
|
|
||||||
int result = 0;
|
|
||||||
//.SplitTable()
|
|
||||||
Stopwatch stopwatch = new();
|
|
||||||
stopwatch.Start();
|
|
||||||
|
|
||||||
if (_db.CurrentConnectionConfig.DbType == SqlSugar.DbType.QuestDB)
|
|
||||||
result = await _db.Insertable(dbInserts).AS(_driverPropertys.TableName).ExecuteCommandAsync(cancellationToken).ConfigureAwait(false);//不要加分表
|
|
||||||
else
|
|
||||||
result = await _db.Fastest<HistoryAlarm>().AS(_driverPropertys.TableName).PageSize(50000).BulkCopyAsync(dbInserts).ConfigureAwait(false);
|
|
||||||
|
|
||||||
|
|
||||||
stopwatch.Stop();
|
|
||||||
//var result = await db.Insertable(dbInserts).SplitTable().ExecuteCommandAsync().ConfigureAwait(false);
|
|
||||||
if (result > 0)
|
|
||||||
{
|
|
||||||
LogMessage?.Trace($"Count:{dbInserts.Count},watchTime: {stopwatch.ElapsedMilliseconds} ms");
|
|
||||||
}
|
|
||||||
return OperResult.Success;
|
|
||||||
}
|
|
||||||
catch (Exception ex)
|
|
||||||
{
|
|
||||||
return new OperResult(ex);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private async ValueTask<OperResult> UpdateT(IEnumerable<HistoryAlarm> item, CancellationToken cancellationToken)
|
private async ValueTask<OperResult> UpdateT(IEnumerable<HistoryAlarm> item, CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
var result = await InserableAsync(item.ToList(), cancellationToken).ConfigureAwait(false);
|
var result = await InserableAsync(item.ToList(), cancellationToken).ConfigureAwait(false);
|
||||||
@@ -98,4 +68,51 @@ public partial class SqlHistoryAlarm : BusinessBaseWithCacheVariableModel<Histor
|
|||||||
|
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private async ValueTask<OperResult> InserableAsync(List<HistoryAlarm> dbInserts, CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
_db.Ado.CancellationToken = cancellationToken;
|
||||||
|
if (!_driverPropertys.BigTextScriptHistoryTable.IsNullOrEmpty())
|
||||||
|
{
|
||||||
|
var getDeviceModel = CSharpScriptEngineExtension.Do<DynamicSQLBase>(_driverPropertys.BigTextScriptHistoryTable);
|
||||||
|
|
||||||
|
getDeviceModel.Logger = LogMessage;
|
||||||
|
|
||||||
|
await getDeviceModel.DBInsertable(_db, dbInserts, cancellationToken).ConfigureAwait(false);
|
||||||
|
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
int result = 0;
|
||||||
|
//.SplitTable()
|
||||||
|
Stopwatch stopwatch = new();
|
||||||
|
stopwatch.Start();
|
||||||
|
|
||||||
|
if (_db.CurrentConnectionConfig.DbType == SqlSugar.DbType.QuestDB)
|
||||||
|
result = await _db.Insertable(dbInserts).AS(_driverPropertys.TableName).ExecuteCommandAsync(cancellationToken).ConfigureAwait(false);
|
||||||
|
else
|
||||||
|
result = await _db.Fastest<HistoryAlarm>().AS(_driverPropertys.TableName).PageSize(50000).BulkCopyAsync(dbInserts).ConfigureAwait(false);
|
||||||
|
|
||||||
|
stopwatch.Stop();
|
||||||
|
//var result = await db.Insertable(dbInserts).SplitTable().ExecuteCommandAsync().ConfigureAwait(false);
|
||||||
|
if (result > 0)
|
||||||
|
{
|
||||||
|
LogMessage?.Trace($"Count:{dbInserts.Count},watchTime: {stopwatch.ElapsedMilliseconds} ms");
|
||||||
|
}
|
||||||
|
return OperResult.Success;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
return OperResult.Success;
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
return new OperResult(ex);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -12,7 +12,8 @@ using BootstrapBlazor.Components;
|
|||||||
|
|
||||||
using System.ComponentModel.DataAnnotations;
|
using System.ComponentModel.DataAnnotations;
|
||||||
|
|
||||||
using ThingsGateway.SqlSugar;
|
using ThingsGateway.Plugin.SqlDB;
|
||||||
|
|
||||||
|
|
||||||
namespace ThingsGateway.Plugin.SqlHistoryAlarm;
|
namespace ThingsGateway.Plugin.SqlHistoryAlarm;
|
||||||
|
|
||||||
@@ -23,15 +24,21 @@ public class SqlHistoryAlarmProperty : BusinessPropertyWithCache
|
|||||||
{
|
{
|
||||||
[DynamicProperty]
|
[DynamicProperty]
|
||||||
public DbType DbType { get; set; } = DbType.SqlServer;
|
public DbType DbType { get; set; } = DbType.SqlServer;
|
||||||
|
[DynamicProperty]
|
||||||
|
[Required]
|
||||||
|
public string TableName { get; set; } = "historyAlarm";
|
||||||
[DynamicProperty]
|
[DynamicProperty]
|
||||||
[Required]
|
[Required]
|
||||||
[AutoGenerateColumn(ComponentType = typeof(Textarea), Rows = 1)]
|
[AutoGenerateColumn(ComponentType = typeof(Textarea), Rows = 1)]
|
||||||
public string BigTextConnectStr { get; set; } = "server=.;uid=sa;pwd=111111;database=test;";
|
public string BigTextConnectStr { get; set; } = "server=.;uid=sa;pwd=111111;database=test;";
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 历史表脚本
|
||||||
|
/// </summary>
|
||||||
[DynamicProperty]
|
[DynamicProperty]
|
||||||
[Required]
|
[AutoGenerateColumn(Visible = true, IsVisibleWhenEdit = false, IsVisibleWhenAdd = false)]
|
||||||
public string TableName { get; set; } = "historyAlarm";
|
public string? BigTextScriptHistoryTable { get; set; }
|
||||||
|
|
||||||
|
|
||||||
public override bool OnlineFilter { get; set; } = false;
|
public override bool OnlineFilter { get; set; } = false;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -29,7 +29,7 @@ namespace ThingsGateway.Plugin.TDengineDB;
|
|||||||
/// <summary>
|
/// <summary>
|
||||||
/// TDengineDBProducer
|
/// TDengineDBProducer
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public partial class TDengineDBProducer : BusinessBaseWithCacheIntervalVariableModel<TDengineDBHistoryValue>, IDBHistoryValueService
|
public partial class TDengineDBProducer : BusinessBaseWithCacheIntervalVariableModel<VariableBasicData>, IDBHistoryValueService
|
||||||
{
|
{
|
||||||
internal readonly RealDBProducerProperty _driverPropertys = new()
|
internal readonly RealDBProducerProperty _driverPropertys = new()
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -26,11 +26,11 @@ namespace ThingsGateway.Plugin.TDengineDB;
|
|||||||
/// <summary>
|
/// <summary>
|
||||||
/// RabbitMQProducer
|
/// RabbitMQProducer
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public partial class TDengineDBProducer : BusinessBaseWithCacheIntervalVariableModel<TDengineDBHistoryValue>
|
public partial class TDengineDBProducer : BusinessBaseWithCacheIntervalVariableModel<VariableBasicData>
|
||||||
{
|
{
|
||||||
private TypeAdapterConfig _config;
|
private TypeAdapterConfig _config;
|
||||||
|
|
||||||
protected override ValueTask<OperResult> UpdateVarModel(IEnumerable<CacheDBItem<TDengineDBHistoryValue>> item, CancellationToken cancellationToken)
|
protected override ValueTask<OperResult> UpdateVarModel(IEnumerable<CacheDBItem<VariableBasicData>> item, CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
return UpdateVarModel(item.Select(a => a.Value).OrderBy(a => a.Id), cancellationToken);
|
return UpdateVarModel(item.Select(a => a.Value).OrderBy(a => a.Id), cancellationToken);
|
||||||
}
|
}
|
||||||
@@ -46,7 +46,7 @@ public partial class TDengineDBProducer : BusinessBaseWithCacheIntervalVariableM
|
|||||||
UpdateVariable(variableRuntime, variable);
|
UpdateVariable(variableRuntime, variable);
|
||||||
base.VariableChange(variableRuntime, variable);
|
base.VariableChange(variableRuntime, variable);
|
||||||
}
|
}
|
||||||
protected override ValueTask<OperResult> UpdateVarModels(IEnumerable<TDengineDBHistoryValue> item, CancellationToken cancellationToken)
|
protected override ValueTask<OperResult> UpdateVarModels(IEnumerable<VariableBasicData> item, CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
return UpdateVarModel(item, cancellationToken);
|
return UpdateVarModel(item, cancellationToken);
|
||||||
}
|
}
|
||||||
@@ -59,18 +59,18 @@ public partial class TDengineDBProducer : BusinessBaseWithCacheIntervalVariableM
|
|||||||
|
|
||||||
foreach (var group in varGroup)
|
foreach (var group in varGroup)
|
||||||
{
|
{
|
||||||
AddQueueVarModel(new CacheDBItem<List<TDengineDBHistoryValue>>(group.Adapt<List<TDengineDBHistoryValue>>(_config)));
|
AddQueueVarModel(new CacheDBItem<List<VariableBasicData>>(group.ToList()));
|
||||||
}
|
}
|
||||||
foreach (var variable in varList)
|
foreach (var variable in varList)
|
||||||
{
|
{
|
||||||
AddQueueVarModel(new CacheDBItem<TDengineDBHistoryValue>(variable.Adapt<TDengineDBHistoryValue>(_config)));
|
AddQueueVarModel(new CacheDBItem<VariableBasicData>(variable));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
foreach (var variable in variables)
|
foreach (var variable in variables)
|
||||||
{
|
{
|
||||||
AddQueueVarModel(new CacheDBItem<TDengineDBHistoryValue>(variable.Adapt<TDengineDBHistoryValue>(_config)));
|
AddQueueVarModel(new CacheDBItem<VariableBasicData>(variable));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -80,15 +80,15 @@ public partial class TDengineDBProducer : BusinessBaseWithCacheIntervalVariableM
|
|||||||
if (_driverPropertys.GroupUpdate && !variable.BusinessGroup.IsNullOrEmpty() && VariableRuntimeGroups.TryGetValue(variable.BusinessGroup, out var variableRuntimeGroup))
|
if (_driverPropertys.GroupUpdate && !variable.BusinessGroup.IsNullOrEmpty() && VariableRuntimeGroups.TryGetValue(variable.BusinessGroup, out var variableRuntimeGroup))
|
||||||
{
|
{
|
||||||
|
|
||||||
AddQueueVarModel(new CacheDBItem<List<TDengineDBHistoryValue>>(variableRuntimeGroup.Adapt<List<TDengineDBHistoryValue>>(_config)));
|
AddQueueVarModel(new CacheDBItem<List<VariableBasicData>>(variableRuntimeGroup.Adapt<List<VariableBasicData>>(_config)));
|
||||||
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
AddQueueVarModel(new CacheDBItem<TDengineDBHistoryValue>(variableRuntime.Adapt<TDengineDBHistoryValue>(_config)));
|
AddQueueVarModel(new CacheDBItem<VariableBasicData>(variable));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
private async ValueTask<OperResult> UpdateVarModel(IEnumerable<TDengineDBHistoryValue> item, CancellationToken cancellationToken)
|
private async ValueTask<OperResult> UpdateVarModel(IEnumerable<VariableBasicData> item, CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
var result = await InserableAsync(item.WhereIf(_driverPropertys.OnlineFilter, a => a.IsOnline == true).ToList(), cancellationToken).ConfigureAwait(false);
|
var result = await InserableAsync(item.WhereIf(_driverPropertys.OnlineFilter, a => a.IsOnline == true).ToList(), cancellationToken).ConfigureAwait(false);
|
||||||
if (success != result.IsSuccess)
|
if (success != result.IsSuccess)
|
||||||
@@ -103,7 +103,7 @@ public partial class TDengineDBProducer : BusinessBaseWithCacheIntervalVariableM
|
|||||||
|
|
||||||
#region 方法
|
#region 方法
|
||||||
|
|
||||||
private async ValueTask<OperResult> InserableAsync(List<TDengineDBHistoryValue> dbInserts, CancellationToken cancellationToken)
|
private async ValueTask<OperResult> InserableAsync(List<VariableBasicData> dbInserts, CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -19,7 +19,6 @@ using ThingsGateway.Gateway.Application;
|
|||||||
using ThingsGateway.NewLife;
|
using ThingsGateway.NewLife;
|
||||||
using ThingsGateway.NewLife.Extension;
|
using ThingsGateway.NewLife.Extension;
|
||||||
using ThingsGateway.NewLife.Json.Extension;
|
using ThingsGateway.NewLife.Json.Extension;
|
||||||
using ThingsGateway.NewLife.Threading;
|
|
||||||
using ThingsGateway.SqlSugar;
|
using ThingsGateway.SqlSugar;
|
||||||
|
|
||||||
using TouchSocket.Core;
|
using TouchSocket.Core;
|
||||||
|
|||||||
@@ -16,7 +16,6 @@ using MQTTnet.Client;
|
|||||||
#endif
|
#endif
|
||||||
|
|
||||||
using ThingsGateway.Foundation;
|
using ThingsGateway.Foundation;
|
||||||
using ThingsGateway.NewLife.Threading;
|
|
||||||
|
|
||||||
using TouchSocket.Core;
|
using TouchSocket.Core;
|
||||||
|
|
||||||
|
|||||||
@@ -14,7 +14,6 @@ using ThingsGateway.Foundation.OpcDa;
|
|||||||
using ThingsGateway.Foundation.OpcDa.Da;
|
using ThingsGateway.Foundation.OpcDa.Da;
|
||||||
using ThingsGateway.Gateway.Application;
|
using ThingsGateway.Gateway.Application;
|
||||||
using ThingsGateway.NewLife.Json.Extension;
|
using ThingsGateway.NewLife.Json.Extension;
|
||||||
using ThingsGateway.NewLife.Threading;
|
|
||||||
|
|
||||||
using TouchSocket.Core;
|
using TouchSocket.Core;
|
||||||
|
|
||||||
|
|||||||
@@ -22,7 +22,6 @@ using ThingsGateway.Extension;
|
|||||||
using ThingsGateway.Extension.Generic;
|
using ThingsGateway.Extension.Generic;
|
||||||
using ThingsGateway.Gateway.Application;
|
using ThingsGateway.Gateway.Application;
|
||||||
using ThingsGateway.NewLife.Extension;
|
using ThingsGateway.NewLife.Extension;
|
||||||
using ThingsGateway.NewLife.Threading;
|
|
||||||
|
|
||||||
using TouchSocket.Core;
|
using TouchSocket.Core;
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
<Project>
|
<Project>
|
||||||
<PropertyGroup>
|
<PropertyGroup>
|
||||||
<Version>10.8.10</Version>
|
<Version>10.8.12</Version>
|
||||||
</PropertyGroup>
|
</PropertyGroup>
|
||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
|
|||||||
Reference in New Issue
Block a user