//------------------------------------------------------------------------------
// 此代码版权声明为全文件覆盖,如有原作者特别声明,会在下方手动补充
// 此代码版权(除特别声明外的代码)归作者本人Diego所有
// 源代码使用协议遵循本仓库的开源协议及附加协议
// Gitee源代码仓库:https://gitee.com/diego2098/ThingsGateway
// Github源代码仓库:https://github.com/kimdiego2098/ThingsGateway
// 使用文档:https://thingsgateway.cn/
// QQ群:605534569
//------------------------------------------------------------------------------
//修改自https://github.com/dathlin/OpcUaHelper 与OPC基金会net库
using System.Collections.Concurrent;
#if NET8_0_OR_GREATER
using System.Collections.Frozen;
#endif
namespace ThingsGateway.Foundation.OpcUa;
///
/// 订阅委托
///
///
public delegate void DataChangedEventHandler((VariableNode variableNode, MonitoredItem monitoredItem, DataValue dataValue, JToken jToken) value);
///
/// 日志输出
///
public delegate void LogEventHandler(byte level, object sender, string message, Exception ex);
///
/// OpcUaMaster
///
public class OpcUaMaster : IAsyncDisposable
{
#region 属性,变量等
///
/// Raised after the client status change
///
public LogEventHandler LogEvent;
///
/// 当前配置
///
public OpcUaProperty OpcUaProperty;
///
/// ProductUri
///
public string ProductUri = "https://thingsgateway.cn/";
///
/// 当前的变量名称/OPC变量节点
///
private readonly ConcurrentDictionary _variableDicts = new();
private readonly object checkLock = new();
///
/// 当前的订阅组,组名称/组
///
private readonly ConcurrentDictionary _subscriptionDicts = new();
private readonly ApplicationInstance m_application = new();
private readonly ApplicationConfiguration m_configuration;
private EventHandler m_ConnectComplete;
private EventHandler m_KeepAliveComplete;
private EventHandler m_ReconnectComplete;
private SessionReconnectHandler m_reConnectHandler;
private EventHandler m_ReconnectStarting;
private ISession m_session;
private ComplexTypeSystem typeSystem;
///
/// 默认的构造函数,实例化一个新的OPC UA类
///
public OpcUaMaster()
{
var certificateValidator = new CertificateValidator();
certificateValidator.CertificateValidation += CertificateValidation;
// 构建应用程序配置
m_configuration = new ApplicationConfiguration
{
ApplicationName = OPCUAName,
ApplicationType = ApplicationType.Client,
CertificateValidator = certificateValidator,
ApplicationUri = Utils.Format(@"urn:{0}:{1}", System.Net.Dns.GetHostName(), OPCUAName),
ProductUri = ProductUri,
ServerConfiguration = new ServerConfiguration
{
MaxSubscriptionCount = 100000,
MaxMessageQueueSize = 1000000,
MaxNotificationQueueSize = 1000000,
MaxPublishRequestCount = 10000000,
},
SecurityConfiguration = new SecurityConfiguration
{
UseValidatedCertificates = true,
AutoAcceptUntrustedCertificates = true,//自动接受证书
RejectSHA1SignedCertificates = true,
AddAppCertToTrustedStore = true,
SendCertificateChain = true,
MinimumCertificateKeySize = 1024,
SuppressNonceValidationErrors = true,
ApplicationCertificate = new CertificateIdentifier
{
StoreType = CertificateStoreType.X509Store,
StorePath = "CurrentUser\\" + OPCUAName,
SubjectName = $"CN={OPCUAName}, C=CN, S=GUANGZHOU, O=ThingsGateway, DC=" + System.Net.Dns.GetHostName(),
},
TrustedIssuerCertificates = new CertificateTrustList
{
StoreType = CertificateStoreType.Directory,
StorePath = AppContext.BaseDirectory + @"OPCUAClientCertificate\pki\trustedIssuer",
},
TrustedPeerCertificates = new CertificateTrustList
{
StoreType = CertificateStoreType.Directory,
StorePath = AppContext.BaseDirectory + @"OPCUAClientCertificate\pki\trustedPeer",
},
RejectedCertificateStore = new CertificateStoreIdentifier
{
StoreType = CertificateStoreType.Directory,
StorePath = AppContext.BaseDirectory + @"OPCUAClientCertificate\pki\rejected",
},
UserIssuerCertificates = new CertificateTrustList
{
StoreType = CertificateStoreType.Directory,
StorePath = AppContext.BaseDirectory + @"OPCUAClientCertificate\pki\issuerUser",
},
TrustedUserCertificates = new CertificateTrustList
{
StoreType = CertificateStoreType.Directory,
StorePath = AppContext.BaseDirectory + @"OPCUAClientCertificate\pki\trustedUser",
}
},
TransportQuotas = new TransportQuotas
{
OperationTimeout = 6000000,
MaxStringLength = int.MaxValue,
MaxByteStringLength = int.MaxValue,
MaxArrayLength = 65535,
MaxMessageSize = 419430400,
MaxBufferSize = 65535,
ChannelLifetime = 300000,
SecurityTokenLifetime = 3600000
},
ClientConfiguration = new ClientConfiguration
{
DefaultSessionTimeout = 60000,
MinSubscriptionLifetime = 10000,
},
};
Task.Run(() => certificateValidator.UpdateAsync(m_configuration)).GetAwaiter().GetResult();
Task.Run(() => m_configuration.ValidateAsync(ApplicationType.Client)).GetAwaiter().GetResult();
m_application.ApplicationConfiguration = m_configuration;
}
///
/// Raised after successfully connecting to or disconnecing from a server.
///
public event EventHandler ConnectComplete
{
add { m_ConnectComplete += value; }
remove { m_ConnectComplete -= value; }
}
///
/// 订阅
///
public event DataChangedEventHandler DataChangedHandler;
///
/// Raised when a good keep alive from the server arrives.
///
public event EventHandler KeepAliveComplete
{
add { m_KeepAliveComplete += value; }
remove { m_KeepAliveComplete -= value; }
}
///
/// Raised when a reconnect operation completes.
///
public event EventHandler ReconnectComplete
{
add { m_ReconnectComplete += value; }
remove { m_ReconnectComplete -= value; }
}
///
/// Raised when a reconnect operation starts.
///
public event EventHandler ReconnectStarting
{
add { m_ReconnectStarting += value; }
remove { m_ReconnectStarting -= value; }
}
///
/// 配置信息
///
public ApplicationConfiguration AppConfig => m_configuration;
///
/// 连接状态
///
public bool Connected => m_session?.Connected == true && connected;
private bool connected = false;
///
/// OpcUaMaster
///
public string OPCUAName { get; set; } = "ThingsGateway";
///
/// SessionReconnectHandler
///
public SessionReconnectHandler ReConnectHandler => m_reConnectHandler;
///
/// 当前活动会话。
///
public ISession Session => m_session;
#endregion 属性,变量等
#region 订阅
///
/// 新增订阅,需要指定订阅组名称,订阅的tag名数组
///
public async Task AddSubscriptionAsync(string subscriptionName, string[] items, bool loadType = true, CancellationToken cancellationToken = default)
{
Subscription m_subscription = new(m_session.DefaultSubscription)
{
PublishingEnabled = true,
PublishingInterval = 0,
KeepAliveCount = uint.MaxValue,
LifetimeCount = uint.MaxValue,
MaxNotificationsPerPublish = uint.MaxValue,
Priority = 100,
DisplayName = subscriptionName
};
List monitoredItems = new();
var variableNodes = loadType ? await ReadNodesAsync(items, false, cancellationToken).ConfigureAwait(false) : null;
for (int i = 0; i < items.Length; i++)
{
try
{
var item = new MonitoredItem
{
StartNodeId = items[i],
AttributeId = Attributes.Value,
DisplayName = items[i],
Filter = OpcUaProperty.DeadBand == 0 ? null : new DataChangeFilter() { DeadbandValue = OpcUaProperty.DeadBand, DeadbandType = (int)DeadbandType.Absolute, Trigger = DataChangeTrigger.StatusValue },
SamplingInterval = OpcUaProperty?.UpdateRate ?? 1000,
};
item.Notification += Callback;
monitoredItems.Add(item);
}
catch (Exception ex)
{
Log(3, ex, $"Failed to initialize {items[i]} variable subscription");
}
}
m_session.AddSubscription(m_subscription);
try
{
await m_subscription.CreateAsync(cancellationToken).ConfigureAwait(false);
await Task.Delay(100, cancellationToken).ConfigureAwait(false); // allow for subscription to be finished on server?
}
catch (Exception)
{
try
{
await m_subscription.CreateAsync(cancellationToken).ConfigureAwait(false);
await Task.Delay(100, cancellationToken).ConfigureAwait(false); // allow for subscription to be finished on server?
}
catch (Exception)
{
await m_session.RemoveSubscriptionAsync(m_subscription, cancellationToken).ConfigureAwait(false);
throw;
}
}
m_subscription.AddItems(monitoredItems);
foreach (var item in m_subscription.MonitoredItems.Where(a => a.Status.Error != null && StatusCode.IsBad(a.Status.Error.StatusCode)))
{
item.Filter = OpcUaProperty.DeadBand == 0 ? null : new DataChangeFilter() { DeadbandValue = OpcUaProperty.DeadBand, DeadbandType = (int)DeadbandType.None, Trigger = DataChangeTrigger.StatusValue };
}
await m_subscription.ApplyChangesAsync(cancellationToken).ConfigureAwait(false);
var isError = m_subscription.MonitoredItems.Any(a => a.Status.Error != null && StatusCode.IsBad(a.Status.Error.StatusCode));
if (isError)
{
Log(3, null, $"Failed to create subscription for the following variables:{Environment.NewLine}{m_subscription.MonitoredItems.Where(
a => a.Status.Error != null && StatusCode.IsBad(a.Status.Error.StatusCode))
.Select(a => $"{a.StartNodeId}:{a.Status.Error}").ToJsonString()}");
}
if (_subscriptionDicts.TryAdd(subscriptionName, m_subscription))
{
}
else if (_subscriptionDicts.TryGetValue(subscriptionName, out var existingSubscription))
{
// remove
await existingSubscription.DeleteAsync(true, cancellationToken).ConfigureAwait(false);
await m_session.RemoveSubscriptionAsync(existingSubscription, cancellationToken).ConfigureAwait(false);
try { existingSubscription.Dispose(); } catch { }
_subscriptionDicts[subscriptionName] = m_subscription;
}
}
///
/// 浏览一个节点的引用
///
/// 节点值
/// 引用节点描述
public async Task BrowseNodeReferenceAsync(string tag)
{
NodeId sourceId = new(tag);
// 该节点可以读取到方法
BrowseDescription nodeToBrowse1 = new()
{
NodeId = sourceId,
BrowseDirection = BrowseDirection.Forward,
ReferenceTypeId = ReferenceTypeIds.Aggregates,
IncludeSubtypes = true,
NodeClassMask = (uint)(NodeClass.Object | NodeClass.Variable | NodeClass.Method),
ResultMask = (uint)BrowseResultMask.All
};
// find all nodes organized by the node.
BrowseDescription nodeToBrowse2 = new()
{
NodeId = sourceId,
BrowseDirection = BrowseDirection.Forward,
ReferenceTypeId = ReferenceTypeIds.Organizes,
IncludeSubtypes = true,
NodeClassMask = (uint)(NodeClass.Object | NodeClass.Variable),
ResultMask = (uint)BrowseResultMask.All
};
BrowseDescriptionCollection nodesToBrowse = new()
{
nodeToBrowse1,
nodeToBrowse2
};
// fetch references from the server.
ReferenceDescriptionCollection references = await OpcUaUtils.BrowseAsync(m_session, nodesToBrowse, false).ConfigureAwait(false);
return references.ToArray();
}
///
/// 调用服务器的方法
///
/// 方法的父节点tag
/// 方法的节点tag
/// cancellationToken
/// 传递的参数
/// 输出的结果值
public async Task> CallMethodByNodeIdAsync(string tagParent, string tag, CancellationToken cancellationToken, params object[] args)
{
if (m_session == null)
{
return null;
}
IList