build: 10.9.2

fix: taos connection dispose
refactor: opcua AddSubscriptionAsync 添加延时和重试
This commit is contained in:
Diego
2025-06-27 11:16:58 +08:00
parent 1d82cea40d
commit 4c95997d62
12 changed files with 119 additions and 121 deletions

View File

@@ -73,7 +73,7 @@ namespace TDengineAdo
protected override void Dispose(bool disposing)
{
connection.Dispose();
connection?.Dispose();
}

View File

@@ -1,8 +1,8 @@
<Project>
<PropertyGroup>
<PluginVersion>10.9.1</PluginVersion>
<ProPluginVersion>10.9.1</ProPluginVersion>
<PluginVersion>10.9.2</PluginVersion>
<ProPluginVersion>10.9.2</ProPluginVersion>
<AuthenticationVersion>2.9.0</AuthenticationVersion>
<SourceGeneratorVersion>10.9.1</SourceGeneratorVersion>
<NET8Version>8.0.17</NET8Version>

View File

@@ -127,23 +127,7 @@ public abstract class CollectBase : DriverBase, IRpcDriver
LogMessage?.LogWarning(ex, string.Format(AppResource.GetMethodError, ex.Message));
}
if (VariableTasks.Count > 0)
{
foreach (var item in VariableTasks)
{
item.Stop();
TaskSchedulerLoop.Remove(item);
}
VariableTasks = AddVariableTask(cancellationToken);
foreach (var item in VariableTasks)
{
TaskSchedulerLoop.Add(item);
item.Start();
}
}
RefreshVariableTasks(cancellationToken);
// 根据标签获取方法信息的局部函数
List<VariableMethod> GetMethod(IEnumerable<VariableRuntime> tag)
@@ -169,6 +153,26 @@ public abstract class CollectBase : DriverBase, IRpcDriver
}
}
protected void RefreshVariableTasks(CancellationToken cancellationToken)
{
if (VariableTasks.Count > 0)
{
foreach (var item in VariableTasks)
{
item.Stop();
TaskSchedulerLoop.Remove(item);
}
VariableTasks = AddVariableTask(cancellationToken);
foreach (var item in VariableTasks)
{
TaskSchedulerLoop.Add(item);
item.Start();
}
}
}
internal override void ProtectedInitDevice(DeviceRuntime device)
{
// 调用基类的初始化方法

View File

@@ -274,7 +274,24 @@ public class OpcUaMaster : IDisposable
}
m_session.AddSubscription(m_subscription);
await m_subscription.CreateAsync(cancellationToken).ConfigureAwait(false);
try
{
await m_subscription.CreateAsync(cancellationToken).ConfigureAwait(false);
await Task.Delay(100, cancellationToken).ConfigureAwait(false); // allow for subscription to be finished on server?
}
catch (Exception)
{
try
{
await m_subscription.CreateAsync(cancellationToken).ConfigureAwait(false);
await Task.Delay(100, cancellationToken).ConfigureAwait(false); // allow for subscription to be finished on server?
}
catch (Exception)
{
m_session.RemoveSubscription(m_subscription);
throw;
}
}
m_subscription.AddItems(monitoredItems);
foreach (var item in m_subscription.MonitoredItems.Where(a => a.Status.Error != null && StatusCode.IsBad(a.Status.Error.StatusCode)))

View File

@@ -92,14 +92,14 @@ public partial class QuestDBProducer : BusinessBaseWithCacheIntervalVariableMode
_config.ForType<VariableRuntime, QuestDBNumberHistoryValue>()
//.Map(dest => dest.Id, src => CommonUtils.GetSingleId())
.Map(dest => dest.Id, src => src.Id)//Id更改为变量Id
.Map(dest => dest.Value, src => ConvertUtility.Convert.ToDecimal(src.Value, 0))
.Map(dest => dest.Value, src => src.Value.GetType() == typeof(bool) ? ConvertUtility.Convert.ToBoolean(src.Value, false) ? 1 : 0 : ConvertUtility.Convert.ToDecimal(src.Value, 0))
.Map(dest => dest.CollectTime, (src) => src.CollectTime < DateTime.MinValue ? utcTime : src.CollectTime)//注意sqlsugar插入时无时区直接utc时间
.Map(dest => dest.CreateTime, (src) => DateTime.UtcNow)
;//注意sqlsugar插入时无时区直接utc时间
_config.ForType<VariableBasicData, QuestDBNumberHistoryValue>()
//.Map(dest => dest.Id, src => CommonUtils.GetSingleId())
.Map(dest => dest.Id, src => src.Id)//Id更改为变量Id
.Map(dest => dest.Value, src => ConvertUtility.Convert.ToDecimal(src.Value, 0))
.Map(dest => dest.Value, src => src.Value.GetType() == typeof(bool) ? ConvertUtility.Convert.ToBoolean(src.Value, false) ? 1 : 0 : ConvertUtility.Convert.ToDecimal(src.Value, 0))
.Map(dest => dest.CollectTime, (src) => src.CollectTime < DateTime.MinValue ? utcTime : src.CollectTime)//注意sqlsugar插入时无时区直接utc时间
.Map(dest => dest.CreateTime, (src) => DateTime.UtcNow)
;//注意sqlsugar插入时无时区直接utc时间

View File

@@ -120,30 +120,36 @@ public partial class QuestDBProducer : BusinessBaseWithCacheIntervalVariableMode
var stringData = dbInserts.Where(a => (!a.IsNumber && a.Value is not bool));
var numberData = dbInserts.Where(a => (a.IsNumber || a.Value is bool));
Stopwatch stopwatch = new();
stopwatch.Start();
var data = numberData.Adapt<List<QuestDBNumberHistoryValue>>(_config);
var result = await _db.Insertable(data).AS(_driverPropertys.NumberTableName).ExecuteCommandAsync(cancellationToken).ConfigureAwait(false);//不要加分表
stopwatch.Stop();
//var result = await db.Insertable(dbInserts).SplitTable().ExecuteCommandAsync().ConfigureAwait(false);
if (result > 0)
if (numberData.Any())
{
LogMessage?.Trace($"TableName{_driverPropertys.NumberTableName}Count{result}watchTime: {stopwatch.ElapsedMilliseconds} ms");
Stopwatch stopwatch = new();
stopwatch.Start();
var data = numberData.Adapt<List<QuestDBNumberHistoryValue>>(_config);
var result = await _db.Insertable(data).AS(_driverPropertys.NumberTableName).ExecuteCommandAsync(cancellationToken).ConfigureAwait(false);//不要加分表
stopwatch.Stop();
//var result = await db.Insertable(dbInserts).SplitTable().ExecuteCommandAsync().ConfigureAwait(false);
if (result > 0)
{
LogMessage?.Trace($"TableName{_driverPropertys.NumberTableName}Count{result}watchTime: {stopwatch.ElapsedMilliseconds} ms");
}
}
stopwatch.Restart();
var strdata = stringData.Adapt<List<QuestDBHistoryValue>>(_config);
result = await _db.Insertable(strdata).AS(_driverPropertys.StringTableName).ExecuteCommandAsync(cancellationToken).ConfigureAwait(false);//不要加分表
stopwatch.Stop();
//var result = await db.Insertable(dbInserts).SplitTable().ExecuteCommandAsync().ConfigureAwait(false);
if (result > 0)
if (stringData.Any())
{
LogMessage?.Trace($"TableName{_driverPropertys.StringTableName}Count{result}watchTime: {stopwatch.ElapsedMilliseconds} ms");
Stopwatch stopwatch = new();
stopwatch.Start();
var data = stringData.Adapt<List<QuestDBHistoryValue>>(_config);
var result = await _db.Insertable(data).AS(_driverPropertys.StringTableName).ExecuteCommandAsync(cancellationToken).ConfigureAwait(false);//不要加分表
stopwatch.Stop();
//var result = await db.Insertable(dbInserts).SplitTable().ExecuteCommandAsync().ConfigureAwait(false);
if (result > 0)
{
LogMessage?.Trace($"TableName{_driverPropertys.StringTableName}Count{result}watchTime: {stopwatch.ElapsedMilliseconds} ms");
}
}
}

View File

@@ -175,13 +175,13 @@ public partial class SqlDBProducer : BusinessBaseWithCacheIntervalVariableModel<
_config.ForType<VariableRuntime, SQLNumberHistoryValue>()
//.Map(dest => dest.Id, (src) =>CommonUtils.GetSingleId())
.Map(dest => dest.Id, src => src.Id)//Id更改为变量Id
.Map(dest => dest.Value, src => ConvertUtility.Convert.ToDecimal(src.Value, 0))
.Map(dest => dest.Value, src => src.Value.GetType() == typeof(bool) ? ConvertUtility.Convert.ToBoolean(src.Value, false) ? 1 : 0 : ConvertUtility.Convert.ToDecimal(src.Value, 0))
.Map(dest => dest.CreateTime, (src) => DateTime.Now);
_config.ForType<VariableBasicData, SQLNumberHistoryValue>()
//.Map(dest => dest.Id, (src) =>CommonUtils.GetSingleId())
.Map(dest => dest.Id, src => src.Id)//Id更改为变量Id
.Map(dest => dest.Value, src => ConvertUtility.Convert.ToDecimal(src.Value, 0))
.Map(dest => dest.Value, src => src.Value.GetType() == typeof(bool) ? ConvertUtility.Convert.ToBoolean(src.Value, false) ? 1 : 0 : ConvertUtility.Convert.ToDecimal(src.Value, 0))
.Map(dest => dest.CreateTime, (src) => DateTime.Now);
if (_businessPropertyWithCacheInterval.BusinessUpdateEnum == BusinessUpdateEnum.Interval && _driverPropertys.IsReadDB)

View File

@@ -136,30 +136,35 @@ public partial class SqlDBProducer : BusinessBaseWithCacheIntervalVariableModel<
var stringData = dbInserts.Where(a => (!a.IsNumber && a.Value is not bool));
var numberData = dbInserts.Where(a => (a.IsNumber || a.Value is bool));
Stopwatch stopwatch = new();
stopwatch.Start();
var data = numberData.Adapt<List<SQLNumberHistoryValue>>(_config);
var result = await _db.Fastest<SQLNumberHistoryValue>().PageSize(50000).SplitTable().BulkCopyAsync(data).ConfigureAwait(false);
stopwatch.Stop();
//var result = await db.Insertable(dbInserts).SplitTable().ExecuteCommandAsync().ConfigureAwait(false);
if (result > 0)
if (numberData.Any())
{
LogMessage?.Trace($"TableName{_driverPropertys.NumberTableName}Count{result}watchTime: {stopwatch.ElapsedMilliseconds} ms");
Stopwatch stopwatch = new();
stopwatch.Start();
var data = numberData.Adapt<List<SQLNumberHistoryValue>>(_config);
var result = await _db.Fastest<SQLNumberHistoryValue>().PageSize(50000).SplitTable().BulkCopyAsync(data).ConfigureAwait(false);
stopwatch.Stop();
//var result = await db.Insertable(dbInserts).SplitTable().ExecuteCommandAsync().ConfigureAwait(false);
if (result > 0)
{
LogMessage?.Trace($"TableName{_driverPropertys.NumberTableName}Count{result}watchTime: {stopwatch.ElapsedMilliseconds} ms");
}
}
stopwatch.Restart();
var strdata = stringData.Adapt<List<SQLHistoryValue>>(_config);
result = await _db.Fastest<SQLHistoryValue>().PageSize(50000).SplitTable().BulkCopyAsync(strdata).ConfigureAwait(false);
stopwatch.Stop();
//var result = await db.Insertable(dbInserts).SplitTable().ExecuteCommandAsync().ConfigureAwait(false);
if (result > 0)
if (stringData.Any())
{
LogMessage?.Trace($"TableName{_driverPropertys.StringTableName}Count{result}watchTime: {stopwatch.ElapsedMilliseconds} ms");
Stopwatch stopwatch = new();
stopwatch.Start();
var data = stringData.Adapt<List<SQLHistoryValue>>(_config);
var result = await _db.Fastest<SQLHistoryValue>().PageSize(50000).SplitTable().BulkCopyAsync(data).ConfigureAwait(false);
stopwatch.Stop();
//var result = await db.Insertable(dbInserts).SplitTable().ExecuteCommandAsync().ConfigureAwait(false);
if (result > 0)
{
LogMessage?.Trace($"TableName{_driverPropertys.StringTableName}Count{result}watchTime: {stopwatch.ElapsedMilliseconds} ms");
}
}

View File

@@ -123,34 +123,10 @@ public partial class TDengineDBProducer : BusinessBaseWithCacheIntervalVariableM
var stringData = dbInserts.Where(a => (!a.IsNumber && a.Value is not bool));
var numberData = dbInserts.Where(a => (a.IsNumber || a.Value is bool));
Stopwatch stopwatch = new();
stopwatch.Start();
//var result = await db.Insertable(dbInserts).SetTDengineChildTableName((stableName, it) => $"{stableName}_{it.DeviceName}_{it.Name}").ExecuteCommandAsync().ConfigureAwait(false);//不要加分表
await InserableAsync(numberData, _driverPropertys.NumberTableNameLow, cancellationToken).ConfigureAwait(false);
stopwatch.Stop();
//var result = await db.Insertable(dbInserts).SplitTable().ExecuteCommandAsync().ConfigureAwait(false);
//if (result > 0)
{
LogMessage?.Trace($"TableName{_driverPropertys.NumberTableNameLow}Count{dbInserts.Count}watchTime: {stopwatch.ElapsedMilliseconds} ms");
}
stopwatch.Restart();
//var result = await db.Insertable(dbInserts).SetTDengineChildTableName((stableName, it) => $"{stableName}_{it.DeviceName}_{it.Name}").ExecuteCommandAsync().ConfigureAwait(false);//不要加分表
await InserableAsync(stringData, _driverPropertys.StringTableNameLow, cancellationToken).ConfigureAwait(false);
stopwatch.Stop();
//var result = await db.Insertable(dbInserts).SplitTable().ExecuteCommandAsync().ConfigureAwait(false);
//if (result > 0)
{
LogMessage?.Trace($"TableName{_driverPropertys.StringTableNameLow}Count{dbInserts.Count}watchTime: {stopwatch.ElapsedMilliseconds} ms");
}
}
return OperResult.Success;
}
@@ -162,6 +138,13 @@ public partial class TDengineDBProducer : BusinessBaseWithCacheIntervalVariableM
private async Task InserableAsync(IEnumerable<VariableBasicData> dbInserts, string tableName, CancellationToken cancellationToken)
{
if (!dbInserts.Any())
{
return;
}
Stopwatch stopwatch = new();
stopwatch.Start();
StringBuilder stringBuilder = new();
stringBuilder.Append($"INSERT INTO");
//(`id`,`createtime`,`collecttime`,`isonline`,`value`)
@@ -188,7 +171,15 @@ public partial class TDengineDBProducer : BusinessBaseWithCacheIntervalVariableM
stringBuilder.Append(';');
stringBuilder.AppendLine();
await _db.Ado.ExecuteCommandAsync(stringBuilder.ToString(), default, cancellationToken: cancellationToken).ConfigureAwait(false);
var result = await _db.Ado.ExecuteCommandAsync(stringBuilder.ToString(), default, cancellationToken: cancellationToken).ConfigureAwait(false);
stopwatch.Stop();
//if (result > 0)
{
LogMessage?.Trace($"TableName{tableName}Count{result}watchTime: {stopwatch.ElapsedMilliseconds} ms");
}
}
private string GetValue(VariableBasicData src)
{

View File

@@ -206,22 +206,8 @@ public class OpcDaMaster : CollectBase
{
LogMessage?.LogWarning(ex);
}
if (VariableTasks.Count > 0)
{
foreach (var item in VariableTasks)
{
item.Stop();
TaskSchedulerLoop.Remove(item);
}
VariableTasks = AddVariableTask(cancellationToken);
foreach (var item in VariableTasks)
{
TaskSchedulerLoop.Add(item);
item.Start();
}
}
RefreshVariableTasks(cancellationToken);
}
private Dictionary<string, List<VariableRuntime>> VariableAddresDicts { get; set; } = new();

View File

@@ -177,6 +177,9 @@ public class OpcUaMaster : CollectBase
LogMessage?.LogInformation($"AddSubscription index {CurrentDevice.VariableSourceReads.IndexOf(variableSourceRead)} done");
}
await Task.Delay(100, cancellationToken).ConfigureAwait(false); // allow for subscription to be finished on server?
}
LogMessage?.LogInformation("AddSubscriptions done");
checkLog = true;
@@ -184,7 +187,7 @@ public class OpcUaMaster : CollectBase
catch (Exception ex)
{
if (!checkLog)
LogMessage?.LogWarning(ex, "AddSubscriptions");
LogMessage?.LogWarning(ex, "AddSubscriptions error");
checkLog = false;
}
finally
@@ -341,22 +344,8 @@ public class OpcUaMaster : CollectBase
}
}
if (VariableTasks.Count > 0)
{
foreach (var item in VariableTasks)
{
item.Stop();
TaskSchedulerLoop.Remove(item);
}
RefreshVariableTasks(cancellationToken);
VariableTasks = AddVariableTask(cancellationToken);
foreach (var item in VariableTasks)
{
TaskSchedulerLoop.Add(item);
item.Start();
}
}
}
private Dictionary<string, List<VariableRuntime>> VariableAddresDicts { get; set; } = new();

View File

@@ -1,6 +1,6 @@
<Project>
<PropertyGroup>
<Version>10.9.1</Version>
<Version>10.9.2</Version>
</PropertyGroup>
<ItemGroup>