feat: 优化opc单设备大批变量检索速度

fix: 定时任务删除网关日志无效
This commit is contained in:
Diego
2025-02-19 20:39:42 +08:00
parent 68af9a0695
commit 95b551d4ba
41 changed files with 312 additions and 243 deletions

View File

@@ -28,6 +28,7 @@ public class LogJob : IJob
private static async Task DeleteSysOperateLog(int daysAgo, CancellationToken stoppingToken)
{
using var db = DbContext.Db.GetConnectionScopeWithAttr<SysOperateLog>().CopyNew();
await db.DeleteableWithAttr<SysOperateLog>().Where(u => u.OpTime < DateTime.Now.AddDays(-daysAgo)).ExecuteCommandAsync(stoppingToken).ConfigureAwait(false); // 删除操作日志
var time = DateTime.Now.AddDays(-daysAgo);
await db.DeleteableWithAttr<SysOperateLog>().Where(u => u.OpTime < time).ExecuteCommandAsync(stoppingToken).ConfigureAwait(false); // 删除操作日志
}
}

View File

@@ -11,9 +11,7 @@
using Microsoft.Extensions.DependencyInjection;
using System.Net.NetworkInformation;
using System.Reflection;
using System.Security.Cryptography;
using ThingsGateway;
using ThingsGateway.Utilities;
@@ -127,7 +125,7 @@ public static class Native
/// <returns></returns>
public static int GetIdlePort()
{
return NetworkUtility.FindAvailableTcpPort(); ;
return NetworkUtility.FindAvailableTcpPort(); ;
}
}

View File

@@ -1,8 +1,8 @@
<Project>
<PropertyGroup>
<PluginVersion>10.0.1.9</PluginVersion>
<ProPluginVersion>10.0.1.9</ProPluginVersion>
<PluginVersion>10.0.2.1</PluginVersion>
<ProPluginVersion>10.0.2.1</ProPluginVersion>
</PropertyGroup>
<PropertyGroup>

View File

@@ -67,6 +67,7 @@ public class DoTask
private async Task Do()
{
await Task.Yield();
while (!_cancelTokenSource.IsCancellationRequested)
{
try

View File

@@ -66,7 +66,7 @@ public abstract class BusinessBase : DriverBase
base.ProtectedInitDevice(device); // 调用基类的初始化方法
}
public override void AfterVariablesChanged()
public override Task AfterVariablesChangedAsync()
{
LogMessage?.LogInformation("Refresh variable");
// 获取与当前设备相关的变量,CurrentDevice.VariableRuntimes并不适用于业务插件
@@ -100,6 +100,7 @@ public abstract class BusinessBase : DriverBase
// 获取当前设备需要采集的设备
CollectDevices = GlobalData.GetEnableDevices().Where(a => VariableRuntimes.Select(b => b.Value.DeviceId).ToHashSet().Contains(a.Key)).ToDictionary(a => a.Key, a => a.Value);
return Task.CompletedTask;
}
@@ -165,7 +166,7 @@ public abstract class BusinessBase : DriverBase
}
// 执行任务操作
if(TimeTick.IsTickHappen())
if (TimeTick.IsTickHappen())
await ProtectedExecuteAsync(cancellationToken).ConfigureAwait(false);
// 再次检查取消操作是否被请求
@@ -196,7 +197,7 @@ public abstract class BusinessBase : DriverBase
internal override ValueTask StartAsync(CancellationToken cancellationToken)
{
TimeTick= new TimeTick(CurrentDevice.IntervalTime);
TimeTick = new TimeTick(CurrentDevice.IntervalTime);
return base.StartAsync(cancellationToken);
}

View File

@@ -34,7 +34,7 @@ public abstract class BusinessBaseWithCacheIntervalAlarmModel<VarModel, DevModel
/// </summary>
protected abstract BusinessPropertyWithCacheInterval _businessPropertyWithCacheInterval { get; }
protected internal override void InitChannel(IChannel? channel = null)
protected internal override async Task InitChannelAsync(IChannel? channel = null)
{
// 初始化
_exTTimerTick = new(_businessPropertyWithCacheInterval.BusinessInterval);
@@ -55,9 +55,9 @@ public abstract class BusinessBaseWithCacheIntervalAlarmModel<VarModel, DevModel
}
base.InitChannel(channel);
await base.InitChannelAsync(channel).ConfigureAwait(false);
}
public override void AfterVariablesChanged()
public override async Task AfterVariablesChangedAsync()
{
// 如果业务属性指定了全部变量,则设置当前设备的变量运行时列表和采集设备列表
if (_businessPropertyWithCacheInterval.IsAllVariable)
@@ -68,7 +68,7 @@ public abstract class BusinessBaseWithCacheIntervalAlarmModel<VarModel, DevModel
}
else
{
base.AfterVariablesChanged();
await base.AfterVariablesChangedAsync().ConfigureAwait(false);
}

View File

@@ -41,7 +41,7 @@ public abstract class BusinessBaseWithCacheIntervalDeviceModel<VarModel, DevMode
protected internal override void InitChannel(IChannel? channel = null)
protected internal override async Task InitChannelAsync(IChannel? channel = null)
{
// 初始化设备和变量上传的定时器
_exTTimerTick = new(_businessPropertyWithCacheInterval.BusinessInterval);
@@ -61,9 +61,9 @@ public abstract class BusinessBaseWithCacheIntervalDeviceModel<VarModel, DevMode
}
base.InitChannel(channel);
await base.InitChannelAsync(channel).ConfigureAwait(false);
}
public override void AfterVariablesChanged()
public override async Task AfterVariablesChangedAsync()
{
// 如果业务属性指定了全部变量,则设置当前设备的变量运行时列表和采集设备列表
if (_businessPropertyWithCacheInterval.IsAllVariable)
@@ -73,7 +73,7 @@ public abstract class BusinessBaseWithCacheIntervalDeviceModel<VarModel, DevMode
}
else
{
base.AfterVariablesChanged();
await base.AfterVariablesChangedAsync().ConfigureAwait(false);
}

View File

@@ -37,7 +37,7 @@ public abstract class BusinessBaseWithCacheIntervalVariableModel<T> : BusinessBa
/// </summary>
protected abstract BusinessPropertyWithCacheInterval _businessPropertyWithCacheInterval { get; }
protected internal override void InitChannel(IChannel? channel = null)
protected internal override async Task InitChannelAsync(IChannel? channel = null)
{
// 初始化定时器
_exTTimerTick = new TimeTick(_businessPropertyWithCacheInterval.BusinessInterval);
@@ -49,9 +49,9 @@ public abstract class BusinessBaseWithCacheIntervalVariableModel<T> : BusinessBa
GlobalData.VariableValueChangeEvent += VariableValueChange;
}
base.InitChannel(channel);
await base.InitChannelAsync(channel).ConfigureAwait(false);
}
public override void AfterVariablesChanged()
public override async Task AfterVariablesChangedAsync()
{
// 如果业务属性指定了全部变量,则设置当前设备的变量运行时列表和采集设备列表
if (_businessPropertyWithCacheInterval.IsAllVariable)
@@ -61,7 +61,7 @@ public abstract class BusinessBaseWithCacheIntervalVariableModel<T> : BusinessBa
}
else
{
base.AfterVariablesChanged();
await base.AfterVariablesChangedAsync().ConfigureAwait(false);
}
// 触发一次变量值变化事件

View File

@@ -49,7 +49,7 @@ public abstract class CollectBase : DriverBase
/// <summary>
/// 获取设备变量打包列表/特殊方法列表
/// </summary>
public override void AfterVariablesChanged()
public override async Task AfterVariablesChangedAsync()
{
LogMessage?.LogInformation("Refresh variable");
var currentDevice = CurrentDevice;
@@ -98,7 +98,7 @@ public abstract class CollectBase : DriverBase
}).ToList();
// 将打包后的结果存储在当前设备的 VariableSourceReads 属性中
currentDevice.VariableSourceReads = ProtectedLoadSourceRead(tags.Where(source).ToList());
currentDevice.VariableSourceReads = await ProtectedLoadSourceReadAsync(tags.Where(source).ToList()).ConfigureAwait(false);
}
catch (Exception ex)
{
@@ -486,7 +486,7 @@ public abstract class CollectBase : DriverBase
/// </summary>
/// <param name="deviceVariables">设备下的全部通讯点位</param>
/// <returns></returns>
protected abstract List<VariableSourceRead> ProtectedLoadSourceRead(List<VariableRuntime> deviceVariables);
protected abstract Task<List<VariableSourceRead>> ProtectedLoadSourceReadAsync(List<VariableRuntime> deviceVariables);
protected AsyncReadWriteLock ReadWriteLock = new();

View File

@@ -281,10 +281,11 @@ public abstract class DriverBase : DisposableObject, IDriver
var timeout = 60; // 设置超时时间为 60 秒
var task = ProtectedStartAsync(cancellationToken);
try
{
// 异步执行初始化操作,并设置超时时间
await ProtectedStartAsync(cancellationToken).WaitAsync(TimeSpan.FromSeconds(timeout), cancellationToken).ConfigureAwait(false);
await task.WaitAsync(TimeSpan.FromSeconds(timeout), cancellationToken).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
@@ -292,7 +293,7 @@ public abstract class DriverBase : DisposableObject, IDriver
catch (TimeoutException)
{
// 如果初始化操作超时,则记录警告信息
LogMessage?.LogWarning(Localizer["DeviceTaskStartTimeout", DeviceName, timeout]);
LogMessage?.LogInformation(Localizer["DeviceTaskStartTimeout", DeviceName, timeout]);
}
// 设置设备状态为当前时间
@@ -459,17 +460,17 @@ public abstract class DriverBase : DisposableObject, IDriver
/// 初始化,在开始前执行,异常时会标识重启
/// </summary>
/// <param name="channel">通道,当通道类型为<see cref="ChannelTypeEnum.Other"/>时传入null</param>
internal protected virtual void InitChannel(IChannel? channel = null)
internal protected virtual async Task InitChannelAsync(IChannel? channel = null)
{
if (channel != null)
channel.SetupAsync(channel.Config.Clone());
AfterVariablesChanged();
await channel.SetupAsync(channel.Config.Clone()).ConfigureAwait(false);
await AfterVariablesChangedAsync().ConfigureAwait(false);
}
/// <summary>
/// 变量更改后, 重新初始化变量列表,获取设备变量打包列表/特殊方法列表等
/// </summary>
public abstract void AfterVariablesChanged();
public abstract Task AfterVariablesChangedAsync();
/// <inheritdoc/>
protected override void Dispose(bool disposing)

View File

@@ -42,6 +42,6 @@ namespace ThingsGateway.Gateway.Application
bool IsConnected();
void PauseThread(bool pause);
Task SetLogAsync(bool enable, LogLevel? logLevel = null, bool upDataBase = true);
void AfterVariablesChanged();
Task AfterVariablesChangedAsync();
}
}

View File

@@ -35,13 +35,15 @@ public class LogJob : IJob
private static async Task DeleteRpcLog(int daysAgo, CancellationToken stoppingToken)
{
using var db = DbContext.Db.GetConnectionScopeWithAttr<RpcLog>().CopyNew();
await db.DeleteableWithAttr<RpcLog>().Where(u => u.LogTime < DateTime.Now.AddDays(-daysAgo)).ExecuteCommandAsync(stoppingToken).ConfigureAwait(false); // 删除操作日志
var time = DateTime.Now.AddDays(-daysAgo);
await db.DeleteableWithAttr<RpcLog>().Where(u => u.LogTime < time).ExecuteCommandAsync(stoppingToken).ConfigureAwait(false); // 删除操作日志
}
private static async Task DeleteBackendLog(int daysAgo, CancellationToken stoppingToken)
{
using var db = DbContext.Db.GetConnectionScopeWithAttr<BackendLog>().CopyNew();
await db.DeleteableWithAttr<BackendLog>().Where(u => u.LogTime < DateTime.Now.AddDays(-daysAgo)).ExecuteCommandAsync(stoppingToken).ConfigureAwait(false); // 删除操作日志
var time = DateTime.Now.AddDays(-daysAgo);
await db.DeleteableWithAttr<BackendLog>().Where(u => u.LogTime < time).ExecuteCommandAsync(stoppingToken).ConfigureAwait(false); // 删除操作日志
}

View File

@@ -391,7 +391,7 @@ internal sealed class DeviceThreadManage : IAsyncDisposable, IDeviceThreadManage
// 初始化驱动程序对象,并加载源读取
driver.InitChannel(Channel);
await driver.InitChannelAsync(Channel).ConfigureAwait(false);
if (Channel != null && Drivers.Count <= 1)
{
@@ -772,7 +772,7 @@ internal sealed class DeviceThreadManage : IAsyncDisposable, IDeviceThreadManage
{
if (businessDeviceRuntime.Driver != null)
{
businessDeviceRuntime.Driver.AfterVariablesChanged();
await businessDeviceRuntime.Driver.AfterVariablesChangedAsync().ConfigureAwait(false);
}
}
}

View File

@@ -83,7 +83,7 @@ public class VariableRuntimeService : IVariableRuntimeService
//根据条件重启通道线程
foreach (var driver in changedDriver)
{
driver.AfterVariablesChanged();
await driver.AfterVariablesChangedAsync().ConfigureAwait(false);
}
}
@@ -156,7 +156,7 @@ public class VariableRuntimeService : IVariableRuntimeService
//根据条件重启通道线程
foreach (var driver in changedDriver)
{
driver.AfterVariablesChanged();
await driver.AfterVariablesChangedAsync().ConfigureAwait(false);
}
return true;
@@ -217,7 +217,7 @@ public class VariableRuntimeService : IVariableRuntimeService
foreach (var driver in changedDriver)
{
driver.AfterVariablesChanged();
await driver.AfterVariablesChangedAsync().ConfigureAwait(false);
}
@@ -295,7 +295,7 @@ public class VariableRuntimeService : IVariableRuntimeService
foreach (var driver in changedDriver)
{
driver.AfterVariablesChanged();
await driver.AfterVariablesChangedAsync().ConfigureAwait(false);
}
@@ -467,7 +467,7 @@ public class VariableRuntimeService : IVariableRuntimeService
//根据条件重启通道线程
foreach (var driver in changedDriver)
{
driver.AfterVariablesChanged();
await driver.AfterVariablesChangedAsync().ConfigureAwait(false);
}

View File

@@ -21,7 +21,7 @@ public partial class ModbusMaster : DeviceBase, IDtu
RegisterByteLength = 2;
channel.MaxSign = ushort.MaxValue;
}
public override IThingsGatewayBitConverter ThingsGatewayBitConverter { get; protected set; } = new ThingsGatewayBitConverter(EndianType.Big) { };
public override IThingsGatewayBitConverter ThingsGatewayBitConverter { get; protected set; } = new ThingsGatewayBitConverter(EndianType.Big) { };
/// <summary>
/// 客户端连接滑动过期时间(TCP服务通道时)

View File

@@ -10,6 +10,8 @@
//修改自https://github.com/dathlin/OpcUaHelper 与OPC基金会net库
using System.Collections.Concurrent;
namespace ThingsGateway.Foundation.OpcUa;
/// <summary>
@@ -53,14 +55,14 @@ public class OpcUaMaster : IDisposable
/// <summary>
/// 当前的变量名称/OPC变量节点
/// </summary>
private readonly Dictionary<string, VariableNode> _variableDicts = new();
private readonly ConcurrentDictionary<string, VariableNode> _variableDicts = new();
private readonly object checkLock = new();
/// <summary>
/// 当前的订阅组,组名称/组
/// </summary>
private readonly Dictionary<string, Subscription> dic_subscriptions = new();
private readonly ConcurrentDictionary<string, Subscription> dic_subscriptions = new();
private readonly ApplicationInstance m_application = new();
@@ -280,13 +282,13 @@ public class OpcUaMaster : IDisposable
m_session.AddSubscription(m_subscription);
m_subscription.Create();
await m_subscription.CreateAsync().ConfigureAwait(false);
foreach (var item in m_subscription.MonitoredItems.Where(a => a.Status.Error != null && StatusCode.IsBad(a.Status.Error.StatusCode)))
{
item.Filter = OpcUaProperty.DeadBand == 0 ? null : new DataChangeFilter() { DeadbandValue = OpcUaProperty.DeadBand, DeadbandType = (int)DeadbandType.None, Trigger = DataChangeTrigger.StatusValue };
}
m_subscription.ApplyChanges();
await m_subscription.ApplyChangesAsync().ConfigureAwait(false);
var isError = m_subscription.MonitoredItems.Any(a => a.Status.Error != null && StatusCode.IsBad(a.Status.Error.StatusCode));
if (isError)
@@ -296,20 +298,17 @@ public class OpcUaMaster : IDisposable
.Select(a => $"{a.StartNodeId}{a.Status.Error}").ToJsonString()}");
}
lock (dic_subscriptions)
if (dic_subscriptions.TryAdd(subscriptionName, m_subscription))
{
if (dic_subscriptions.TryGetValue(subscriptionName, out var existingSubscription))
{
// remove
existingSubscription.Delete(true);
m_session.RemoveSubscription(existingSubscription);
try { existingSubscription.Dispose(); } catch { }
dic_subscriptions[subscriptionName] = m_subscription;
}
else
{
dic_subscriptions.TryAdd(subscriptionName, m_subscription);
}
}
else if (dic_subscriptions.TryGetValue(subscriptionName, out var existingSubscription))
{
// remove
existingSubscription.Delete(true);
await m_session.RemoveSubscriptionAsync(existingSubscription).ConfigureAwait(false);
try { existingSubscription.Dispose(); } catch { }
dic_subscriptions[subscriptionName] = m_subscription;
}
}
@@ -721,19 +720,18 @@ public class OpcUaMaster : IDisposable
/// 移除订阅消息
/// </summary>
/// <param name="subscriptionName">组名称</param>
public void RemoveSubscription(string subscriptionName)
public async Task RemoveSubscriptionAsync(string subscriptionName)
{
lock (dic_subscriptions)
if (dic_subscriptions.TryGetValue(subscriptionName, out var subscription))
{
if (dic_subscriptions.TryGetValue(subscriptionName, out var subscription))
{
// remove
subscription.Delete(true);
m_session.RemoveSubscription(subscription);
try { subscription.Dispose(); } catch { }
dic_subscriptions.Remove(subscriptionName);
}
// remove
subscription.Delete(true);
await m_session.RemoveSubscriptionAsync(subscription).ConfigureAwait(false);
try { subscription.Dispose(); } catch { }
dic_subscriptions.TryRemove(subscriptionName, out _);
}
}
/// <summary>
@@ -878,7 +876,7 @@ public class OpcUaMaster : IDisposable
false,
OpcUaProperty.CheckDomain,
(string.IsNullOrEmpty(OPCUAName)) ? m_configuration.ApplicationName : OPCUAName,
60000,
600000,
userIdentity,
null, cancellationToken
).ConfigureAwait(false);
@@ -895,10 +893,12 @@ public class OpcUaMaster : IDisposable
//如果是订阅模式,连接时添加订阅组
if (OpcUaProperty.ActiveSubscribe)
{
List<Task> tasks = new(Variables.Count);
foreach (var item in Variables)
{
await AddSubscriptionAsync(Guid.NewGuid().ToString(), item.ToArray(), OpcUaProperty.LoadType).ConfigureAwait(false);
tasks.Add(AddSubscriptionAsync(Guid.NewGuid().ToString(), item.ToArray(), OpcUaProperty.LoadType));
}
await Task.WhenAll(tasks).ConfigureAwait(false);
}
return m_session;
}
@@ -984,7 +984,7 @@ public class OpcUaMaster : IDisposable
}
NodeId nodeToRead = new(nodeIdStr);
var node = (VariableNode)m_session.ReadNode(nodeToRead, NodeClass.Variable, false);
_variableDicts.AddOrUpdate(nodeIdStr, node);
_variableDicts.AddOrUpdate(nodeIdStr, a => node, (a, b) => node);
return node;
}
@@ -1004,7 +1004,7 @@ public class OpcUaMaster : IDisposable
var node = (VariableNode)await m_session.ReadNodeAsync(nodeToRead, NodeClass.Variable, false, cancellationToken).ConfigureAwait(false);
if (OpcUaProperty.LoadType)
await typeSystem.LoadType(node.DataType, ct: cancellationToken).ConfigureAwait(false);
_variableDicts.AddOrUpdate(nodeIdStr, node);
_variableDicts.AddOrUpdate(nodeIdStr, a => node, (a, b) => node);
return node;
}
@@ -1030,7 +1030,7 @@ public class OpcUaMaster : IDisposable
{
var node = ((VariableNode)nodes.Item1[i]);
await typeSystem.LoadType(node.DataType, ct: cancellationToken).ConfigureAwait(false);
_variableDicts.AddOrUpdate(nodeIdStrs[i], node);
_variableDicts.AddOrUpdate(nodeIdStrs[i], a => node, (a, b) => node);
}
else
{

View File

@@ -10,6 +10,8 @@
//修改自https://github.com/dathlin/OpcUaHelper 与OPC基金会net库
using System.Collections.Concurrent;
namespace ThingsGateway.Foundation.OpcUa105;
/// <summary>
@@ -53,14 +55,14 @@ public class OpcUaMaster : IDisposable
/// <summary>
/// 当前的变量名称/OPC变量节点
/// </summary>
private readonly Dictionary<string, VariableNode> _variableDicts = new();
private readonly ConcurrentDictionary<string, VariableNode> _variableDicts = new();
private readonly object checkLock = new();
/// <summary>
/// 当前的订阅组,组名称/组
/// </summary>
private readonly Dictionary<string, Subscription> dic_subscriptions = new();
private readonly ConcurrentDictionary<string, Subscription> dic_subscriptions = new();
private readonly ApplicationInstance m_application = new();
@@ -280,13 +282,13 @@ public class OpcUaMaster : IDisposable
m_session.AddSubscription(m_subscription);
m_subscription.Create();
await m_subscription.CreateAsync().ConfigureAwait(false);
foreach (var item in m_subscription.MonitoredItems.Where(a => a.Status.Error != null && StatusCode.IsBad(a.Status.Error.StatusCode)))
{
item.Filter = OpcUaProperty.DeadBand == 0 ? null : new DataChangeFilter() { DeadbandValue = OpcUaProperty.DeadBand, DeadbandType = (int)DeadbandType.None, Trigger = DataChangeTrigger.StatusValue };
}
m_subscription.ApplyChanges();
await m_subscription.ApplyChangesAsync().ConfigureAwait(false);
var isError = m_subscription.MonitoredItems.Any(a => a.Status.Error != null && StatusCode.IsBad(a.Status.Error.StatusCode));
if (isError)
@@ -296,20 +298,17 @@ public class OpcUaMaster : IDisposable
.Select(a => $"{a.StartNodeId}{a.Status.Error}").ToJsonString()}");
}
lock (dic_subscriptions)
if (dic_subscriptions.TryAdd(subscriptionName, m_subscription))
{
if (dic_subscriptions.TryGetValue(subscriptionName, out var existingSubscription))
{
// remove
existingSubscription.Delete(true);
m_session.RemoveSubscription(existingSubscription);
try { existingSubscription.Dispose(); } catch { }
dic_subscriptions[subscriptionName] = m_subscription;
}
else
{
dic_subscriptions.TryAdd(subscriptionName, m_subscription);
}
}
else if (dic_subscriptions.TryGetValue(subscriptionName, out var existingSubscription))
{
// remove
existingSubscription.Delete(true);
await m_session.RemoveSubscriptionAsync(existingSubscription).ConfigureAwait(false);
try { existingSubscription.Dispose(); } catch { }
dic_subscriptions[subscriptionName] = m_subscription;
}
}
@@ -703,37 +702,35 @@ public class OpcUaMaster : IDisposable
/// <summary>
/// 移除所有的订阅消息
/// </summary>
public void RemoveAllSubscription()
public async Task RemoveAllSubscription()
{
lock (dic_subscriptions)
foreach (var item in dic_subscriptions)
{
foreach (var item in dic_subscriptions)
{
item.Value.Delete(true);
m_session.RemoveSubscription(item.Value);
try { item.Value.Dispose(); } catch { }
}
dic_subscriptions.Clear();
item.Value.Delete(true);
await m_session.RemoveSubscriptionAsync(item.Value).ConfigureAwait(false);
try { item.Value.Dispose(); } catch { }
}
dic_subscriptions.Clear();
}
/// <summary>
/// 移除订阅消息
/// </summary>
/// <param name="subscriptionName">组名称</param>
public void RemoveSubscription(string subscriptionName)
public async Task RemoveSubscriptionAsync(string subscriptionName)
{
lock (dic_subscriptions)
if (dic_subscriptions.TryGetValue(subscriptionName, out var subscription))
{
if (dic_subscriptions.TryGetValue(subscriptionName, out var subscription))
{
// remove
subscription.Delete(true);
m_session.RemoveSubscription(subscription);
try { subscription.Dispose(); } catch { }
dic_subscriptions.Remove(subscriptionName);
}
// remove
subscription.Delete(true);
await m_session.RemoveSubscriptionAsync(subscription).ConfigureAwait(false);
try { subscription.Dispose(); } catch { }
dic_subscriptions.TryRemove(subscriptionName, out _);
}
}
/// <summary>
@@ -878,7 +875,7 @@ public class OpcUaMaster : IDisposable
false,
OpcUaProperty.CheckDomain,
(string.IsNullOrEmpty(OPCUAName)) ? m_configuration.ApplicationName : OPCUAName,
60000,
600000,
userIdentity,
null, cancellationToken
).ConfigureAwait(false);
@@ -895,10 +892,12 @@ public class OpcUaMaster : IDisposable
//如果是订阅模式,连接时添加订阅组
if (OpcUaProperty.ActiveSubscribe)
{
List<Task> tasks = new(Variables.Count);
foreach (var item in Variables)
{
await AddSubscriptionAsync(Guid.NewGuid().ToString(), item.ToArray(), OpcUaProperty.LoadType).ConfigureAwait(false);
tasks.Add(AddSubscriptionAsync(Guid.NewGuid().ToString(), item.ToArray(), OpcUaProperty.LoadType));
}
await Task.WhenAll(tasks).ConfigureAwait(false);
}
return m_session;
}
@@ -984,7 +983,7 @@ public class OpcUaMaster : IDisposable
}
NodeId nodeToRead = new(nodeIdStr);
var node = (VariableNode)m_session.ReadNode(nodeToRead, NodeClass.Variable, false);
_variableDicts.AddOrUpdate(nodeIdStr, node);
_variableDicts.AddOrUpdate(nodeIdStr, a => node, (a, b) => node);
return node;
}
@@ -1004,7 +1003,7 @@ public class OpcUaMaster : IDisposable
var node = (VariableNode)await m_session.ReadNodeAsync(nodeToRead, NodeClass.Variable, false, cancellationToken).ConfigureAwait(false);
if (OpcUaProperty.LoadType)
await typeSystem.LoadType(node.DataType, ct: cancellationToken).ConfigureAwait(false);
_variableDicts.AddOrUpdate(nodeIdStr, node);
_variableDicts.AddOrUpdate(nodeIdStr, a => node, (a, b) => node);
return node;
}
@@ -1030,7 +1029,7 @@ public class OpcUaMaster : IDisposable
{
var node = ((VariableNode)nodes.Item1[i]);
await typeSystem.LoadType(node.DataType, ct: cancellationToken).ConfigureAwait(false);
_variableDicts.AddOrUpdate(nodeIdStrs[i], node);
_variableDicts.AddOrUpdate(nodeIdStrs[i], a => node, (a, b) => node);
}
else
{

View File

@@ -58,7 +58,7 @@ public partial class QuestDBProducer : BusinessBaseWithCacheIntervalVariableMode
var data = await Query(input).ToListAsync().ConfigureAwait(false);
return data.Cast<IDBHistoryValue>().ToList(); ;
}
protected override void InitChannel(IChannel? channel = null)
protected override async Task InitChannelAsync(IChannel? channel = null)
{
_config = new TypeAdapterConfig();
@@ -71,7 +71,7 @@ public partial class QuestDBProducer : BusinessBaseWithCacheIntervalVariableMode
.Map(dest => dest.CreateTime, (src) => DateTime.UtcNow)
;//注意sqlsugar插入时无时区直接utc时间
base.InitChannel(channel);
await base.InitChannelAsync(channel).ConfigureAwait(false);
}
/// <inheritdoc/>

View File

@@ -150,7 +150,7 @@ public partial class SqlDBProducer : BusinessBaseWithCacheIntervalVariableModel<
return ret;
}
protected override void InitChannel(IChannel? channel = null)
protected override async Task InitChannelAsync(IChannel? channel = null)
{
_config = new TypeAdapterConfig();
_config.ForType<VariableRuntime, SQLHistoryValue>()
@@ -161,7 +161,7 @@ public partial class SqlDBProducer : BusinessBaseWithCacheIntervalVariableModel<
_exRealTimerTick = new(_driverPropertys.RealTableBusinessInterval);
base.InitChannel(channel);
await base.InitChannelAsync(channel).ConfigureAwait(false);
}

View File

@@ -34,17 +34,17 @@ public partial class SqlHistoryAlarm : BusinessBaseWithCacheVariableModel<Histor
protected override BusinessPropertyWithCache _businessPropertyWithCache => _driverPropertys;
protected override void InitChannel(IChannel? channel = null)
protected override async Task InitChannelAsync(IChannel? channel = null)
{
_config.ForType<AlarmVariable, HistoryAlarm>().Map(dest => dest.Id, (src) => CommonUtils.GetSingleId());
GlobalData.AlarmChangedEvent += AlarmWorker_OnAlarmChanged;
base.InitChannel(channel);
await base.InitChannelAsync(channel).ConfigureAwait(false);
}
public override void AfterVariablesChanged()
public override async Task AfterVariablesChangedAsync()
{
base.AfterVariablesChanged();
await base.AfterVariablesChangedAsync().ConfigureAwait(false);
VariableRuntimes = GlobalData.ReadOnlyVariables.Where(a => a.Value.AlarmEnable).ToDictionary(a => a.Key, a => a.Value);
CollectDevices = GlobalData.ReadOnlyIdDevices

View File

@@ -57,7 +57,7 @@ public partial class TDengineDBProducer : BusinessBaseWithCacheIntervalVariableM
var data = await Query(input).ToListAsync().ConfigureAwait(false);
return data.Cast<IDBHistoryValue>().ToList(); ;
}
protected override void InitChannel(IChannel? channel = null)
protected override async Task InitChannelAsync(IChannel? channel = null)
{
_config = new TypeAdapterConfig();
_config.ForType<VariableRuntime, TDengineDBHistoryValue>()
@@ -66,7 +66,7 @@ public partial class TDengineDBProducer : BusinessBaseWithCacheIntervalVariableM
.Map(dest => dest.Id, src => src.Id)//Id更改为变量Id
;//注意sqlsugar插入时无时区直接utc时间
base.InitChannel(channel);
await base.InitChannelAsync(channel).ConfigureAwait(false);
}
/// <inheritdoc/>

View File

@@ -45,7 +45,7 @@ public class Dlt645_2007Master : CollectBase
public override IDevice? FoundationDevice => _plc;
protected override void InitChannel(IChannel? channel = null)
protected override async Task InitChannelAsync(IChannel? channel = null)
{
ArgumentNullException.ThrowIfNull(channel);
@@ -62,12 +62,13 @@ public class Dlt645_2007Master : CollectBase
_plc.Heartbeat = _driverPropertys.Heartbeat;
_plc.InitChannel(channel, LogMessage);
base.InitChannel(channel);
await base.InitChannelAsync(channel).ConfigureAwait(false);
}
/// <inheritdoc/>
protected override List<VariableSourceRead> ProtectedLoadSourceRead(List<VariableRuntime> deviceVariables)
protected override async Task<List<VariableSourceRead>> ProtectedLoadSourceReadAsync(List<VariableRuntime> deviceVariables)
{
await Task.CompletedTask.ConfigureAwait(false);
return _plc.LoadSourceRead<VariableSourceRead>(deviceVariables, 0, CurrentDevice.IntervalTime);
}
}

View File

@@ -27,7 +27,7 @@ public partial class KafkaProducer : BusinessBaseWithCacheIntervalScript<Variabl
protected override BusinessPropertyWithCacheIntervalScript _businessPropertyWithCacheIntervalScript => _driverPropertys;
protected override void InitChannel(IChannel? channel = null)
protected override async Task InitChannelAsync(IChannel? channel = null)
{
#region
@@ -60,7 +60,7 @@ public partial class KafkaProducer : BusinessBaseWithCacheIntervalScript<Variabl
#endregion Kafka
#endregion
base.InitChannel(channel);
await base.InitChannelAsync(channel).ConfigureAwait(false);
}
/// <inheritdoc/>

View File

@@ -46,7 +46,7 @@ public class ModbusMaster : CollectBase
public override Type DriverVariableAddressUIType => typeof(ModbusAddressComponent);
/// <inheritdoc/>
protected override void InitChannel(IChannel? channel = null)
protected override async Task InitChannelAsync(IChannel? channel = null)
{
ArgumentNullException.ThrowIfNull(channel);
@@ -61,12 +61,13 @@ public class ModbusMaster : CollectBase
_plc.ModbusType = _driverPropertys.ModbusType;
_plc.Heartbeat = _driverPropertys.Heartbeat;
_plc.InitChannel(channel, LogMessage);
base.InitChannel(channel);
await base.InitChannelAsync(channel).ConfigureAwait(false);
}
/// <inheritdoc/>
protected override List<VariableSourceRead> ProtectedLoadSourceRead(List<VariableRuntime> deviceVariables)
protected override async Task<List<VariableSourceRead>> ProtectedLoadSourceReadAsync(List<VariableRuntime> deviceVariables)
{
await Task.CompletedTask.ConfigureAwait(false);
return _plc.LoadSourceRead<VariableSourceRead>(deviceVariables, _driverPropertys.MaxPack, CurrentDevice.IntervalTime);
}
}

View File

@@ -70,7 +70,7 @@ public class ModbusSlave : BusinessBase
protected IStringLocalizer Localizer { get; private set; }
/// <inheritdoc/>
protected override void InitChannel(IChannel? channel = null)
protected override async Task InitChannelAsync(IChannel? channel = null)
{
ArgumentNullException.ThrowIfNull(channel);
//载入配置
@@ -86,7 +86,7 @@ public class ModbusSlave : BusinessBase
_plc.HeartbeatTime = _driverPropertys.HeartbeatTime;
_plc.Heartbeat = _driverPropertys.Heartbeat;
_plc.InitChannel(channel, LogMessage);
base.InitChannel(channel);
await base.InitChannelAsync(channel).ConfigureAwait(false);
_plc.WriteData -= OnWriteData;
_plc.WriteData += OnWriteData;
@@ -105,9 +105,9 @@ public class ModbusSlave : BusinessBase
GlobalData.VariableValueChangeEvent += VariableValueChange;
}
public override void AfterVariablesChanged()
public override async Task AfterVariablesChangedAsync()
{
base.AfterVariablesChanged();
await base.AfterVariablesChangedAsync().ConfigureAwait(false);
_modbusVariableQueue.Clear();
VariableRuntimes.ForEach(a =>
{

View File

@@ -27,7 +27,7 @@ public partial class MqttClient : BusinessBaseWithCacheIntervalScript<VariableDa
public override VariablePropertyBase VariablePropertys => _variablePropertys;
protected override BusinessPropertyWithCacheIntervalScript _businessPropertyWithCacheIntervalScript => _driverPropertys;
protected override void InitChannel(IChannel? channel = null)
protected override async Task InitChannelAsync(IChannel? channel = null)
{
@@ -96,7 +96,7 @@ public partial class MqttClient : BusinessBaseWithCacheIntervalScript<VariableDa
_mqttClient.ApplicationMessageReceivedAsync += MqttClient_ApplicationMessageReceivedAsync;
#endregion
base.InitChannel(channel);
await base.InitChannelAsync(channel).ConfigureAwait(false);
}
/// <inheritdoc/>

View File

@@ -96,9 +96,9 @@ public partial class MqttCollect : CollectBase
}
}
protected override List<VariableSourceRead> ProtectedLoadSourceRead(List<VariableRuntime> deviceVariables)
protected override async Task<List<VariableSourceRead>> ProtectedLoadSourceReadAsync(List<VariableRuntime> deviceVariables)
{
await Task.CompletedTask.ConfigureAwait(false);
if (deviceVariables.Count > 0)
{
var dataResult = new List<VariableSourceRead>();
@@ -176,7 +176,7 @@ public partial class MqttCollect : CollectBase
}
protected override void InitChannel(IChannel? channel = null)
protected override async Task InitChannelAsync(IChannel? channel = null)
{
#region
@@ -212,7 +212,7 @@ public partial class MqttCollect : CollectBase
_mqttClient.ApplicationMessageReceivedAsync += MqttClient_ApplicationMessageReceivedAsync;
#endregion
base.InitChannel(channel);
await base.InitChannelAsync(channel).ConfigureAwait(false);
}
protected override async Task ProtectedStartAsync(CancellationToken cancellationToken)
{

View File

@@ -31,7 +31,7 @@ public partial class MqttServer : BusinessBaseWithCacheIntervalScript<VariableDa
protected override BusinessPropertyWithCacheIntervalScript _businessPropertyWithCacheIntervalScript => _driverPropertys;
protected override void InitChannel(IChannel? channel = null)
protected override async Task InitChannelAsync(IChannel? channel = null)
{
#region
@@ -53,7 +53,7 @@ public partial class MqttServer : BusinessBaseWithCacheIntervalScript<VariableDa
_webHost.Start();
#endregion
base.InitChannel(channel);
await base.InitChannelAsync(channel).ConfigureAwait(false);
}
/// <inheritdoc/>

View File

@@ -273,7 +273,7 @@ public partial class MqttServer : BusinessBaseWithCacheIntervalScript<VariableDa
private async Task MqttServer_ValidatingConnectionAsync(ValidatingConnectionEventArgs arg)
{
if (!arg.ClientId.StartsWith(_driverPropertys.StartWithId))
if (string.IsNullOrEmpty(_driverPropertys.StartWithId) || !arg.ClientId.StartsWith(_driverPropertys.StartWithId))
{
arg.ReasonCode = MqttConnectReasonCode.ClientIdentifierNotValid;
return;

View File

@@ -43,7 +43,7 @@ public class MqttServerProperty : BusinessPropertyWithCacheIntervalScript
/// 允许连接的ID(前缀)
/// </summary>
[DynamicProperty]
public string StartWithId { get; set; } = "ThingsGatewayId";
public string StartWithId { get; set; }
/// <summary>
/// 允许Rpc写入

View File

@@ -43,7 +43,7 @@ public class OpcDaMaster : CollectBase
/// <inheritdoc/>
protected override void InitChannel(IChannel? channel = null)
protected override async Task InitChannelAsync(IChannel? channel = null)
{
//载入配置
OpcDaProperty opcNode = new()
@@ -63,7 +63,7 @@ public class OpcDaMaster : CollectBase
_plc.LogEvent = (a, b, c, d) => LogMessage.Log((LogLevel)a, b, c, d);
}
_plc.Init(opcNode);
base.InitChannel(channel);
await base.InitChannelAsync(channel).ConfigureAwait(false);
}
/// <inheritdoc/>
@@ -122,32 +122,42 @@ public class OpcDaMaster : CollectBase
}
/// <inheritdoc/>
protected override List<VariableSourceRead> ProtectedLoadSourceRead(List<VariableRuntime> deviceVariables)
protected override async Task<List<VariableSourceRead>> ProtectedLoadSourceReadAsync(List<VariableRuntime> deviceVariables)
{
await Task.CompletedTask.ConfigureAwait(false);
_plc?.Disconnect();
if (deviceVariables.Count > 0)
try
{
var result = _plc.AddItemsWithSave(deviceVariables.Where(a => !string.IsNullOrEmpty(a.RegisterAddress)).Select(a => a.RegisterAddress!).ToList());
var sourVars = result?.Select(
it =>
{
var read = new VariableSourceRead()
if (deviceVariables.Count > 0)
{
var result = _plc.AddItemsWithSave(deviceVariables.Where(a => !string.IsNullOrEmpty(a.RegisterAddress)).Select(a => a.RegisterAddress!).ToList());
var sourVars = result?.Select(
it =>
{
TimeTick = new(_driverProperties.UpdateRate.ToString()),
RegisterAddress = it.Key,
};
var variables = deviceVariables.Where(a => it.Value.Select(b => b.ItemID).Contains(a.RegisterAddress));
foreach (var v in variables)
{
read.AddVariable(v);
}
return read;
}).ToList();
return sourVars;
var read = new VariableSourceRead()
{
TimeTick = new(_driverProperties.UpdateRate.ToString()),
RegisterAddress = it.Key,
};
var variables = deviceVariables.Where(a => it.Value.Select(b => b.ItemID).Contains(a.RegisterAddress));
foreach (var v in variables)
{
read.AddVariable(v);
}
return read;
}).ToList();
return sourVars;
}
else
{
return new();
}
}
else
finally
{
return new();
_plc?.Connect();
}
}
@@ -192,6 +202,14 @@ public class OpcDaMaster : CollectBase
{
}
}
public override async Task AfterVariablesChangedAsync()
{
await base.AfterVariablesChangedAsync().ConfigureAwait(false);
VariableAddresDicts = VariableRuntimes.Select(a => a.Value).Where(it => !it.RegisterAddress.IsNullOrEmpty()).GroupBy(a => a.RegisterAddress).ToDictionary(a => a.Key!, b => b.ToList());
}
private Dictionary<string, List<VariableRuntime>> VariableAddresDicts { get; set; } = new();
private void DataChangedHandler(string name, int serverGroupHandle, List<ItemReadResult> values)
{
@@ -215,7 +233,8 @@ public class OpcDaMaster : CollectBase
{
type = type.GetElementType();
}
var itemReads = VariableRuntimes.Select(a => a.Value).Where(it => it.RegisterAddress == data.Name);
if (!VariableAddresDicts.TryGetValue(data.Name, out var itemReads)) return;
foreach (var item in itemReads)
{
if (CurrentDevice.Pause)

View File

@@ -47,7 +47,7 @@ public class OpcUaMaster : CollectBase
protected override void InitChannel(IChannel? channel = null)
protected override async Task InitChannelAsync(IChannel? channel = null)
{
@@ -74,7 +74,7 @@ public class OpcUaMaster : CollectBase
_plc.DataChangedHandler += DataChangedHandler;
}
_plc.OpcUaProperty = config;
base.InitChannel(channel);
await base.InitChannelAsync(channel).ConfigureAwait(false);
}
/// <inheritdoc/>
@@ -165,34 +165,42 @@ public class OpcUaMaster : CollectBase
}
/// <inheritdoc/>
protected override List<VariableSourceRead> ProtectedLoadSourceRead(List<VariableRuntime> deviceVariables)
protected override async Task<List<VariableSourceRead>> ProtectedLoadSourceReadAsync(List<VariableRuntime> deviceVariables)
{
_plc?.Disconnect();
if (deviceVariables.Count > 0)
try
{
var dataLists = deviceVariables.ChunkBetter(_driverProperties.GroupSize);
_plc.Variables = new();
_plc.Variables.AddRange(dataLists.Select(a => a.Where(a => !a.RegisterAddress.IsNullOrEmpty()).Select(a => a.RegisterAddress!).ToList()).ToList());
var dataResult = new List<VariableSourceRead>();
foreach (var variable in dataLists)
_plc?.Disconnect();
if (deviceVariables.Count > 0)
{
var sourVars = new VariableSourceRead()
var dataLists = deviceVariables.ChunkBetter(_driverProperties.GroupSize);
_plc.Variables = new();
_plc.Variables.AddRange(dataLists.Select(a => a.Where(a => !a.RegisterAddress.IsNullOrEmpty()).Select(a => a.RegisterAddress!).ToList()).ToList());
var dataResult = new List<VariableSourceRead>();
foreach (var variable in dataLists)
{
TimeTick = new(_driverProperties.UpdateRate.ToString()),
RegisterAddress = "",
};
foreach (var item in variable)
{
sourVars.AddVariable(item);
var sourVars = new VariableSourceRead()
{
TimeTick = new(_driverProperties.UpdateRate.ToString()),
RegisterAddress = "",
};
foreach (var item in variable)
{
sourVars.AddVariable(item);
}
dataResult.Add(sourVars);
}
dataResult.Add(sourVars);
}
return dataResult;
return dataResult;
}
else
{
return new();
}
}
else
finally
{
return new();
await _plc.ConnectAsync(default).ConfigureAwait(false);
}
}
@@ -289,6 +297,15 @@ public class OpcUaMaster : CollectBase
LogMessage?.Log((LogLevel)level, sender, message, ex);
}
public override async Task AfterVariablesChangedAsync()
{
await base.AfterVariablesChangedAsync().ConfigureAwait(false);
VariableAddresDicts = VariableRuntimes.Select(a => a.Value).Where(it => !it.RegisterAddress.IsNullOrEmpty()).GroupBy(a => a.RegisterAddress).ToDictionary(a => a.Key!, b => b.ToList());
}
private Dictionary<string, List<VariableRuntime>> VariableAddresDicts { get; set; } = new();
private void DataChangedHandler((VariableNode variableNode, DataValue dataValue, JToken jToken) data)
{
DateTime time = DateTime.Now;
@@ -299,16 +316,18 @@ public class OpcUaMaster : CollectBase
if (_token.IsCancellationRequested)
return;
LogMessage.Trace($"{ToString()} Change: {Environment.NewLine} {data.variableNode.NodeId} : {data.jToken?.ToString()}");
if (CurrentDevice.Pause)
{
return;
}
LogMessage.Trace($"{ToString()} Change: {Environment.NewLine} {data.variableNode.NodeId} : {data.jToken?.ToString()}");
//尝试固定点位的数据类型
var type = TypeInfo.GetSystemType(TypeInfo.GetBuiltInType(data.variableNode.DataType, _plc.Session.SystemContext.TypeTable), data.variableNode.ValueRank);
var itemReads = VariableRuntimes.Select(a => a.Value).Where(it => it.RegisterAddress == data.variableNode.NodeId);
if (!VariableAddresDicts.TryGetValue(data.variableNode.NodeId.ToString(), out var itemReads)) return;
object value;
if (data.jToken is JValue jValue)

View File

@@ -50,7 +50,7 @@ public partial class OpcUaServer : BusinessBase
private static readonly string[] separator = new string[] { ";" };
public override void AfterVariablesChanged()
public override async Task AfterVariablesChangedAsync()
{
// 如果业务属性指定了全部变量,则设置当前设备的变量运行时列表和采集设备列表
if (_driverPropertys.IsAllVariable)
@@ -61,7 +61,7 @@ public partial class OpcUaServer : BusinessBase
}
else
{
base.AfterVariablesChanged();
await base.AfterVariablesChangedAsync().ConfigureAwait(false);
}
VariableRuntimes.ForEach(a =>
{
@@ -70,14 +70,14 @@ public partial class OpcUaServer : BusinessBase
}
protected override void InitChannel(IChannel? channel = null)
protected override async Task InitChannelAsync(IChannel? channel = null)
{
ApplicationInstance.MessageDlg = new ApplicationMessageDlg(LogMessage);//默认返回true
//Utils.SetLogger(new OpcUaLogger(LogMessage)); //调试用途
m_application = new ApplicationInstance();
m_configuration = GetDefaultConfiguration();
m_configuration.Validate(ApplicationType.Server).GetAwaiter().GetResult();
await m_configuration.Validate(ApplicationType.Server).ConfigureAwait(false);
m_application.ApplicationConfiguration = m_configuration;
if (m_configuration.SecurityConfiguration.AutoAcceptUntrustedCertificates)
{
@@ -94,7 +94,7 @@ public partial class OpcUaServer : BusinessBase
GlobalData.VariableValueChangeEvent += VariableValueChange;
Localizer = App.CreateLocalizerByType(typeof(OpcUaServer))!;
base.InitChannel(channel);
await base.InitChannelAsync(channel).ConfigureAwait(false);
}
/// <inheritdoc/>
@@ -374,7 +374,7 @@ public partial class OpcUaServer : BusinessBase
config.TraceConfiguration = new TraceConfiguration();
config.CertificateValidator = new CertificateValidator();
config.CertificateValidator.Update(config).GetAwaiter().GetResult();
config.CertificateValidator.Update(config).ConfigureAwait(false);
config.Extensions = new XmlElementCollection();
return config;

View File

@@ -167,10 +167,10 @@ public partial class OpcUaMaster : IDisposable
}
}
private void Remove()
private async Task Remove()
{
if (_plc.Connected)
_plc.RemoveSubscription("");
if (_plc?.Connected == true)
await _plc.RemoveSubscriptionAsync("");
}
private async Task ShowImport()

View File

@@ -47,7 +47,7 @@ public class OpcUa105Master : CollectBase
protected override void InitChannel(IChannel? channel = null)
protected override async Task InitChannelAsync(IChannel? channel = null)
{
@@ -74,7 +74,7 @@ public class OpcUa105Master : CollectBase
_plc.DataChangedHandler += DataChangedHandler;
}
_plc.OpcUaProperty = config;
base.InitChannel(channel);
await base.InitChannelAsync(channel).ConfigureAwait(false);
}
/// <inheritdoc/>
@@ -165,34 +165,41 @@ public class OpcUa105Master : CollectBase
}
/// <inheritdoc/>
protected override List<VariableSourceRead> ProtectedLoadSourceRead(List<VariableRuntime> deviceVariables)
protected override async Task<List<VariableSourceRead>> ProtectedLoadSourceReadAsync(List<VariableRuntime> deviceVariables)
{
_plc?.Disconnect();
if (deviceVariables.Count > 0)
try
{
var dataLists = deviceVariables.ChunkBetter(_driverProperties.GroupSize);
_plc.Variables = new();
_plc.Variables.AddRange(dataLists.Select(a => a.Where(a => !a.RegisterAddress.IsNullOrEmpty()).Select(a => a.RegisterAddress!).ToList()).ToList());
var dataResult = new List<VariableSourceRead>();
foreach (var variable in dataLists)
_plc?.Disconnect();
if (deviceVariables.Count > 0)
{
var sourVars = new VariableSourceRead()
var dataLists = deviceVariables.ChunkBetter(_driverProperties.GroupSize);
_plc.Variables = new();
_plc.Variables.AddRange(dataLists.Select(a => a.Where(a => !a.RegisterAddress.IsNullOrEmpty()).Select(a => a.RegisterAddress!).ToList()).ToList());
var dataResult = new List<VariableSourceRead>();
foreach (var variable in dataLists)
{
TimeTick = new(_driverProperties.UpdateRate.ToString()),
RegisterAddress = "",
};
foreach (var item in variable)
{
sourVars.AddVariable(item);
var sourVars = new VariableSourceRead()
{
TimeTick = new(_driverProperties.UpdateRate.ToString()),
RegisterAddress = "",
};
foreach (var item in variable)
{
sourVars.AddVariable(item);
}
dataResult.Add(sourVars);
}
dataResult.Add(sourVars);
}
return dataResult;
return dataResult;
}
else
{
return new();
}
}
else
finally
{
return new();
await _plc.ConnectAsync(default).ConfigureAwait(false);
}
}
@@ -288,6 +295,14 @@ public class OpcUa105Master : CollectBase
{
LogMessage?.Log((LogLevel)level, sender, message, ex);
}
public override async Task AfterVariablesChangedAsync()
{
await base.AfterVariablesChangedAsync().ConfigureAwait(false);
VariableAddresDicts = VariableRuntimes.Select(a => a.Value).Where(it => !it.RegisterAddress.IsNullOrEmpty()).GroupBy(a => a.RegisterAddress).ToDictionary(a => a.Key!, b => b.ToList());
}
private Dictionary<string, List<VariableRuntime>> VariableAddresDicts { get; set; } = new();
private void DataChangedHandler((VariableNode variableNode, DataValue dataValue, JToken jToken) data)
{
@@ -299,16 +314,18 @@ public class OpcUa105Master : CollectBase
if (_token.IsCancellationRequested)
return;
LogMessage.Trace($"{ToString()} Change: {Environment.NewLine} {data.variableNode.NodeId} : {data.jToken?.ToString()}");
if (CurrentDevice.Pause)
{
return;
}
LogMessage.Trace($"{ToString()} Change: {Environment.NewLine} {data.variableNode.NodeId} : {data.jToken?.ToString()}");
//尝试固定点位的数据类型
var type = TypeInfo.GetSystemType(TypeInfo.GetBuiltInType(data.variableNode.DataType, _plc.Session.SystemContext.TypeTable), data.variableNode.ValueRank);
var itemReads = VariableRuntimes.Select(a => a.Value).Where(it => it.RegisterAddress == data.variableNode.NodeId);
if (!VariableAddresDicts.TryGetValue(data.variableNode.NodeId.ToString(), out var itemReads)) return;
object value;
if (data.jToken is JValue jValue)

View File

@@ -50,7 +50,7 @@ public partial class OpcUa105Server : BusinessBase
private static readonly string[] separator = new string[] { ";" };
public override void AfterVariablesChanged()
public override async Task AfterVariablesChangedAsync()
{
// 如果业务属性指定了全部变量,则设置当前设备的变量运行时列表和采集设备列表
if (_driverPropertys.IsAllVariable)
@@ -61,7 +61,7 @@ public partial class OpcUa105Server : BusinessBase
}
else
{
base.AfterVariablesChanged();
await base.AfterVariablesChangedAsync().ConfigureAwait(false);
}
VariableRuntimes.ForEach(a =>
{
@@ -70,14 +70,14 @@ public partial class OpcUa105Server : BusinessBase
}
protected override void InitChannel(IChannel? channel = null)
protected override async Task InitChannelAsync(IChannel? channel = null)
{
ApplicationInstance.MessageDlg = new ApplicationMessageDlg(LogMessage);//默认返回true
//Utils.SetLogger(new OpcUaLogger(LogMessage)); //调试用途
m_application = new ApplicationInstance();
m_configuration = GetDefaultConfiguration();
m_configuration.Validate(ApplicationType.Server).GetAwaiter().GetResult();
await m_configuration.Validate(ApplicationType.Server).ConfigureAwait(false);
m_application.ApplicationConfiguration = m_configuration;
if (m_configuration.SecurityConfiguration.AutoAcceptUntrustedCertificates)
{
@@ -94,7 +94,7 @@ public partial class OpcUa105Server : BusinessBase
GlobalData.VariableValueChangeEvent += VariableValueChange;
Localizer = App.CreateLocalizerByType(typeof(OpcUa105Server))!;
base.InitChannel(channel);
await base.InitChannelAsync(channel).ConfigureAwait(false);
}
/// <inheritdoc/>
@@ -374,7 +374,7 @@ public partial class OpcUa105Server : BusinessBase
config.TraceConfiguration = new TraceConfiguration();
config.CertificateValidator = new CertificateValidator();
config.CertificateValidator.Update(config).GetAwaiter().GetResult();
config.CertificateValidator.Update(config).ConfigureAwait(false);
config.Extensions = new XmlElementCollection();
return config;

View File

@@ -166,10 +166,10 @@ public partial class OpcUaMaster : IDisposable
}
}
private void Remove()
private async Task Remove()
{
if (_plc.Connected)
_plc.RemoveSubscription("");
if (_plc?.Connected == true)
await _plc.RemoveSubscriptionAsync("");
}
private async Task ShowImport()

View File

@@ -26,7 +26,7 @@ public partial class RabbitMQProducer : BusinessBaseWithCacheIntervalScript<Vari
public override VariablePropertyBase VariablePropertys => _variablePropertys;
protected override BusinessPropertyWithCacheIntervalScript _businessPropertyWithCacheIntervalScript => _driverPropertys;
protected override void InitChannel(Foundation.IChannel? channel = null)
protected override async Task InitChannelAsync(Foundation.IChannel? channel = null)
{
@@ -42,7 +42,7 @@ public partial class RabbitMQProducer : BusinessBaseWithCacheIntervalScript<Vari
};
#endregion
base.InitChannel(channel);
await base.InitChannelAsync(channel).ConfigureAwait(false);
}
/// <inheritdoc/>

View File

@@ -17,6 +17,7 @@ using ThingsGateway.Foundation.SiemensS7;
using ThingsGateway.Gateway.Application;
using TouchSocket.Core;
using TouchSocket.Sockets;
namespace ThingsGateway.Plugin.SiemensS7;
@@ -53,7 +54,7 @@ public class SiemensS7Master : CollectBase
public override Type DriverVariableAddressUIType => typeof(SiemensS7AddressComponent);
protected override void InitChannel(IChannel? channel = null)
protected override async Task InitChannelAsync(IChannel? channel = null)
{
ArgumentNullException.ThrowIfNull(channel);
//载入配置
@@ -65,7 +66,7 @@ public class SiemensS7Master : CollectBase
_plc.Rack = _driverPropertys.Rack;
_plc.Slot = _driverPropertys.Slot;
_plc.InitChannel(channel, LogMessage);
base.InitChannel(channel);
await base.InitChannelAsync(channel).ConfigureAwait(false);
}
/// <summary>
@@ -208,10 +209,18 @@ public class SiemensS7Master : CollectBase
}
/// <inheritdoc/>
protected override List<VariableSourceRead> ProtectedLoadSourceRead(List<VariableRuntime> deviceVariables)
protected override async Task<List<VariableSourceRead>> ProtectedLoadSourceReadAsync(List<VariableRuntime> deviceVariables)
{
try
{
try
{
await _plc.Channel.ConnectAsync().ConfigureAwait(false);
}
catch
{
}
return _plc.LoadSourceRead<VariableSourceRead>(deviceVariables, _plc.OnLine ? _plc.PduLength : _driverPropertys.MaxPack, CurrentDevice.IntervalTime);
}
finally { }

View File

@@ -34,7 +34,7 @@
// /// <summary>
// /// 在插件初始化时调用只会执行一次参数为插件默认的链路通道类如未实现可忽略l
// /// </summary>
// protected override void InitChannel(IChannel? channel = null)
// protected override Task InitChannelAsync(IChannel? channel = null)
// {
// //做一些初始化操作
// }
@@ -42,7 +42,7 @@
// /// <summary>
// /// 变量打包操作会在Init方法后执行参数为设备变量列表返回源读取变量列表
// /// </summary>
// protected override List<VariableSourceRead> ProtectedLoadSourceRead(List<VariableRuntime> deviceVariables)
// protected override Task<List<VariableSourceRead>> ProtectedLoadSourceReadAsync(List<VariableRuntime> deviceVariables)
// {
// //实现将设备变量打包成源读取变量
// //比如如果需要实现MC中的字多读功能需将多个变量地址打包成一个源读取地址和读取长度根据一系列规则添加解析标识然后在返回的整个字节数组中解析出原来的变量地址代表的数据字节

View File

@@ -1,6 +1,6 @@
<Project>
<PropertyGroup>
<Version>10.0.1.9</Version>
<Version>10.0.2.1</Version>
</PropertyGroup>
<ItemGroup>