mirror of
https://gitee.com/ThingsGateway/ThingsGateway.git
synced 2025-11-03 09:03:58 +08:00
feat:ManageGatewayWorker part
This commit is contained in:
@@ -171,6 +171,8 @@ public class ManageGatewayWorker : BackgroundService
|
||||
|
||||
|
||||
#endregion
|
||||
|
||||
#region public
|
||||
/// <summary>
|
||||
/// 获取子网关的配置信息
|
||||
/// </summary>
|
||||
@@ -201,52 +203,6 @@ public class ManageGatewayWorker : BackgroundService
|
||||
await StartAsync();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// RPC请求子网关并返回,需要传入子网关ID,作为Topic参数一部分
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
public async Task<byte[]> RpcDataExecuteAsync(string gatewayId, string topic, byte[] payload, int timeOut, MqttQualityOfServiceLevel qualityOfServiceLevel, CancellationToken token = default)
|
||||
{
|
||||
var responseTopic = GetRpcReturnTopic(gatewayId, topic);
|
||||
var requestTopic = GetRpcTopic(gatewayId, topic);
|
||||
|
||||
try
|
||||
{
|
||||
using WaitDataAsync<byte[]> waitDataAsync = new();
|
||||
if (!_waitingCalls.TryAdd(responseTopic, waitDataAsync))
|
||||
{
|
||||
throw new InvalidOperationException();
|
||||
}
|
||||
waitDataAsync.SetCancellationToken(token);
|
||||
|
||||
//请求子网关的数据
|
||||
var message = new MqttApplicationMessageBuilder().WithTopic(requestTopic).WithPayload(payload).Build();
|
||||
await _mqttServer.InjectApplicationMessage(new InjectedMqttApplicationMessage(message), token);
|
||||
|
||||
var result = await waitDataAsync.WaitAsync(timeOut);
|
||||
switch (result)
|
||||
{
|
||||
case WaitDataStatus.SetRunning:
|
||||
return waitDataAsync.WaitResult;
|
||||
case WaitDataStatus.Overtime:
|
||||
throw new TimeoutException();
|
||||
case WaitDataStatus.Canceled:
|
||||
{
|
||||
throw new Exception("等待已终止。可能是客户端已掉线,或者被注销。");
|
||||
}
|
||||
case WaitDataStatus.Default:
|
||||
case WaitDataStatus.Disposed:
|
||||
default:
|
||||
throw new Exception(ThingsGatewayStatus.UnknownError.GetDescription());
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
_waitingCalls.Remove(responseTopic);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 下载配置信息到子网关
|
||||
/// </summary>
|
||||
@@ -309,6 +265,109 @@ public class ManageGatewayWorker : BackgroundService
|
||||
}
|
||||
|
||||
|
||||
#endregion
|
||||
|
||||
|
||||
|
||||
|
||||
#region RPC实现
|
||||
|
||||
readonly ConcurrentDictionary<string, WaitDataAsync<byte[]>> _waitingCalls = new();
|
||||
readonly ConcurrentDictionary<string, WaitDataAsync<ManageMqttRpcResult>> _writerRpcResultWaitingCalls = new();
|
||||
private readonly EasyLock clientLock = new();
|
||||
|
||||
|
||||
private async Task<ManageMqttRpcResult> RpcWriteExecuteAsync(int timeOut, byte[] payload, string requestTopic, string key, CancellationToken token)
|
||||
{
|
||||
try
|
||||
{
|
||||
using WaitDataAsync<ManageMqttRpcResult> waitDataAsync = new();
|
||||
if (!_writerRpcResultWaitingCalls.TryAdd(key, waitDataAsync))
|
||||
{
|
||||
throw new InvalidOperationException();
|
||||
}
|
||||
waitDataAsync.SetCancellationToken(token);
|
||||
|
||||
//请求子网关的数据
|
||||
var message = new MqttApplicationMessageBuilder().WithTopic(requestTopic).WithPayload(payload).Build();
|
||||
await _mqttServer.InjectApplicationMessage(new InjectedMqttApplicationMessage(message), token);
|
||||
|
||||
var result = await waitDataAsync.WaitAsync(timeOut);
|
||||
switch (result)
|
||||
{
|
||||
case WaitDataStatus.SetRunning:
|
||||
return waitDataAsync.WaitResult;
|
||||
case WaitDataStatus.Overtime:
|
||||
throw new TimeoutException();
|
||||
case WaitDataStatus.Canceled:
|
||||
{
|
||||
throw new Exception("等待已终止。可能是客户端已掉线,或者被注销。");
|
||||
}
|
||||
case WaitDataStatus.Default:
|
||||
case WaitDataStatus.Disposed:
|
||||
default:
|
||||
throw new Exception(ThingsGatewayStatus.UnknownError.GetDescription());
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
_writerRpcResultWaitingCalls.Remove(key);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// <summary>
|
||||
/// RPC请求子网关并返回,需要传入子网关ID,作为Topic参数一部分
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
private async Task<byte[]> RpcDataExecuteAsync(string gatewayId, string topic, byte[] payload, int timeOut, MqttQualityOfServiceLevel qualityOfServiceLevel, CancellationToken token = default)
|
||||
{
|
||||
var responseTopic = GetRpcReturnTopic(gatewayId, topic);
|
||||
var requestTopic = GetRpcTopic(gatewayId, topic);
|
||||
|
||||
try
|
||||
{
|
||||
using WaitDataAsync<byte[]> waitDataAsync = new();
|
||||
if (!_waitingCalls.TryAdd(responseTopic, waitDataAsync))
|
||||
{
|
||||
throw new InvalidOperationException();
|
||||
}
|
||||
waitDataAsync.SetCancellationToken(token);
|
||||
|
||||
//请求子网关的数据
|
||||
var message = new MqttApplicationMessageBuilder().WithTopic(requestTopic).WithPayload(payload).Build();
|
||||
await _mqttServer.InjectApplicationMessage(new InjectedMqttApplicationMessage(message), token);
|
||||
|
||||
var result = await waitDataAsync.WaitAsync(timeOut);
|
||||
switch (result)
|
||||
{
|
||||
case WaitDataStatus.SetRunning:
|
||||
return waitDataAsync.WaitResult;
|
||||
case WaitDataStatus.Overtime:
|
||||
throw new TimeoutException();
|
||||
case WaitDataStatus.Canceled:
|
||||
{
|
||||
throw new Exception("等待已终止。可能是客户端已掉线,或者被注销。");
|
||||
}
|
||||
case WaitDataStatus.Default:
|
||||
case WaitDataStatus.Disposed:
|
||||
default:
|
||||
throw new Exception(ThingsGatewayStatus.UnknownError.GetDescription());
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
_waitingCalls.Remove(responseTopic);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#endregion
|
||||
|
||||
#region 核心实现
|
||||
|
||||
internal async Task StartAsync()
|
||||
{
|
||||
try
|
||||
@@ -356,12 +415,98 @@ public class ManageGatewayWorker : BackgroundService
|
||||
restartLock.Release();
|
||||
}
|
||||
}
|
||||
/// <summary>
|
||||
/// 初始化
|
||||
/// </summary>
|
||||
private async Task InitAsync()
|
||||
{
|
||||
try
|
||||
{
|
||||
ManageGatewayConfig = App.GetConfig<ManageGatewayConfig>("ManageGatewayConfig");
|
||||
if (ManageGatewayConfig?.Enable != true)
|
||||
{
|
||||
ManageStatuString = new OperResult($"已退出:不启用管理功能");
|
||||
_manageLogger.LogWarning("已退出:不启用管理功能");
|
||||
}
|
||||
else
|
||||
{
|
||||
var mqttFactory = new MqttFactory(new MqttNetLogger(_manageLogger));
|
||||
var mqttServerOptions = mqttFactory.CreateServerOptionsBuilder()
|
||||
.WithDefaultEndpointBoundIPAddress(string.IsNullOrEmpty(ManageGatewayConfig.MqttBrokerIP) ? null : IPAddress.Parse(ManageGatewayConfig.MqttBrokerIP))
|
||||
.WithDefaultEndpointPort(ManageGatewayConfig.MqttBrokerPort)
|
||||
.WithDefaultEndpoint()
|
||||
.Build();
|
||||
_mqttServer = mqttFactory.CreateMqttServer(mqttServerOptions);
|
||||
if (_mqttServer != null)
|
||||
{
|
||||
_mqttServer.ValidatingConnectionAsync += MqttServer_ValidatingConnectionAsync;//认证
|
||||
_mqttServer.InterceptingPublishAsync += MqttServer_InterceptingPublishAsync;//消息
|
||||
|
||||
await _mqttServer.StartAsync();
|
||||
}
|
||||
ManageStatuString = OperResult.CreateSuccessResult();
|
||||
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_manageLogger.LogError(ex, "初始化失败");
|
||||
ManageStatuString = new($"初始化失败-{ex.Message}");
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
ClientGatewayConfig = App.GetConfig<ClientGatewayConfig>("ClientGatewayConfig");
|
||||
if (ClientGatewayConfig?.Enable != true)
|
||||
{
|
||||
ClientStatuString = new OperResult($"已退出:不启用子网关功能");
|
||||
_clientLogger.LogWarning("已退出:不启用子网关功能");
|
||||
}
|
||||
else
|
||||
{
|
||||
var mqttFactory = new MqttFactory(new MqttNetLogger(_clientLogger));
|
||||
_mqttClientOptions = mqttFactory.CreateClientOptionsBuilder()
|
||||
.WithCredentials(ClientGatewayConfig.UserName, ClientGatewayConfig.Password)//账密
|
||||
.WithTcpServer(ClientGatewayConfig.MqttBrokerIP, ClientGatewayConfig.MqttBrokerPort)//服务器
|
||||
.WithClientId(ClientGatewayConfig.GatewayId)
|
||||
.WithCleanSession(true)
|
||||
.WithKeepAlivePeriod(TimeSpan.FromSeconds(120.0))
|
||||
.WithoutThrowOnNonSuccessfulConnectResponse()
|
||||
.Build();
|
||||
_mqttSubscribeOptions = mqttFactory.CreateSubscribeOptionsBuilder()
|
||||
.WithTopicFilter(
|
||||
f =>
|
||||
{
|
||||
f.WithTopic(ClientGatewayConfig.WriteRpcTopic);
|
||||
f.WithAtMostOnceQoS();
|
||||
})
|
||||
.WithTopicFilter(
|
||||
f =>
|
||||
{
|
||||
f.WithTopic(GetRpcTopic(ClientGatewayConfig.GatewayId, ClientGatewayConfig.DBDownTopic));
|
||||
f.WithAtMostOnceQoS();
|
||||
})
|
||||
.WithTopicFilter(
|
||||
f =>
|
||||
{
|
||||
f.WithTopic(GetRpcTopic(ClientGatewayConfig.GatewayId, ClientGatewayConfig.DBUploadTopic));
|
||||
f.WithAtMostOnceQoS();
|
||||
})
|
||||
.Build();
|
||||
_mqttClient = mqttFactory.CreateMqttClient();
|
||||
_mqttClient.ConnectedAsync += MqttClient_ConnectedAsync;
|
||||
_mqttClient.ApplicationMessageReceivedAsync += MqttClient_ApplicationMessageReceivedAsync;
|
||||
await TryMqttClientAsync(CancellationToken.None);
|
||||
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_clientLogger.LogError(ex, "初始化失败");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
#region 核心实现
|
||||
readonly ConcurrentDictionary<string, WaitDataAsync<byte[]>> _waitingCalls = new();
|
||||
readonly ConcurrentDictionary<string, WaitDataAsync<ManageMqttRpcResult>> _writerRpcResultWaitingCalls = new();
|
||||
private readonly EasyLock clientLock = new();
|
||||
/// <summary>
|
||||
/// ClientGatewayConfig
|
||||
/// </summary>
|
||||
@@ -529,97 +674,7 @@ public class ManageGatewayWorker : BackgroundService
|
||||
return requestTopic;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 初始化
|
||||
/// </summary>
|
||||
private async Task InitAsync()
|
||||
{
|
||||
try
|
||||
{
|
||||
ManageGatewayConfig = App.GetConfig<ManageGatewayConfig>("ManageGatewayConfig");
|
||||
if (ManageGatewayConfig?.Enable != true)
|
||||
{
|
||||
ManageStatuString = new OperResult($"已退出:不启用管理功能");
|
||||
_manageLogger.LogWarning("已退出:不启用管理功能");
|
||||
}
|
||||
else
|
||||
{
|
||||
var mqttFactory = new MqttFactory(new MqttNetLogger(_manageLogger));
|
||||
var mqttServerOptions = mqttFactory.CreateServerOptionsBuilder()
|
||||
.WithDefaultEndpointBoundIPAddress(string.IsNullOrEmpty(ManageGatewayConfig.MqttBrokerIP) ? null : IPAddress.Parse(ManageGatewayConfig.MqttBrokerIP))
|
||||
.WithDefaultEndpointPort(ManageGatewayConfig.MqttBrokerPort)
|
||||
.WithDefaultEndpoint()
|
||||
.Build();
|
||||
_mqttServer = mqttFactory.CreateMqttServer(mqttServerOptions);
|
||||
if (_mqttServer != null)
|
||||
{
|
||||
_mqttServer.ValidatingConnectionAsync += MqttServer_ValidatingConnectionAsync;//认证
|
||||
_mqttServer.InterceptingPublishAsync += MqttServer_InterceptingPublishAsync;//消息
|
||||
|
||||
await _mqttServer.StartAsync();
|
||||
}
|
||||
ManageStatuString = OperResult.CreateSuccessResult();
|
||||
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_manageLogger.LogError(ex, "初始化失败");
|
||||
ManageStatuString = new($"初始化失败-{ex.Message}");
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
ClientGatewayConfig = App.GetConfig<ClientGatewayConfig>("ClientGatewayConfig");
|
||||
if (ClientGatewayConfig?.Enable != true)
|
||||
{
|
||||
ClientStatuString = new OperResult($"已退出:不启用子网关功能");
|
||||
_clientLogger.LogWarning("已退出:不启用子网关功能");
|
||||
}
|
||||
else
|
||||
{
|
||||
var mqttFactory = new MqttFactory(new MqttNetLogger(_clientLogger));
|
||||
_mqttClientOptions = mqttFactory.CreateClientOptionsBuilder()
|
||||
.WithCredentials(ClientGatewayConfig.UserName, ClientGatewayConfig.Password)//账密
|
||||
.WithTcpServer(ClientGatewayConfig.MqttBrokerIP, ClientGatewayConfig.MqttBrokerPort)//服务器
|
||||
.WithClientId(ClientGatewayConfig.GatewayId)
|
||||
.WithCleanSession(true)
|
||||
.WithKeepAlivePeriod(TimeSpan.FromSeconds(120.0))
|
||||
.WithoutThrowOnNonSuccessfulConnectResponse()
|
||||
.Build();
|
||||
_mqttSubscribeOptions = mqttFactory.CreateSubscribeOptionsBuilder()
|
||||
.WithTopicFilter(
|
||||
f =>
|
||||
{
|
||||
f.WithTopic(ClientGatewayConfig.WriteRpcTopic);
|
||||
f.WithAtMostOnceQoS();
|
||||
})
|
||||
.WithTopicFilter(
|
||||
f =>
|
||||
{
|
||||
f.WithTopic(GetRpcTopic(ClientGatewayConfig.GatewayId, ClientGatewayConfig.DBDownTopic));
|
||||
f.WithAtMostOnceQoS();
|
||||
})
|
||||
.WithTopicFilter(
|
||||
f =>
|
||||
{
|
||||
f.WithTopic(GetRpcTopic(ClientGatewayConfig.GatewayId, ClientGatewayConfig.DBUploadTopic));
|
||||
f.WithAtMostOnceQoS();
|
||||
})
|
||||
.Build();
|
||||
_mqttClient = mqttFactory.CreateMqttClient();
|
||||
_mqttClient.ConnectedAsync += MqttClient_ConnectedAsync;
|
||||
_mqttClient.ApplicationMessageReceivedAsync += MqttClient_ApplicationMessageReceivedAsync;
|
||||
await TryMqttClientAsync(CancellationToken.None);
|
||||
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_clientLogger.LogError(ex, "初始化失败");
|
||||
}
|
||||
|
||||
}
|
||||
private async Task MqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs args)
|
||||
{
|
||||
if (args.ApplicationMessage.Topic == GetRpcTopic(ClientGatewayConfig.GatewayId, ClientGatewayConfig.DBUploadTopic))
|
||||
@@ -776,43 +831,8 @@ public class ManageGatewayWorker : BackgroundService
|
||||
await _mqttClient.PublishAsync(variableMessage);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
#endregion
|
||||
private async Task<ManageMqttRpcResult> RpcWriteExecuteAsync(int timeOut, byte[] payload, string requestTopic, string key, CancellationToken token)
|
||||
{
|
||||
try
|
||||
{
|
||||
using WaitDataAsync<ManageMqttRpcResult> waitDataAsync = new();
|
||||
if (!_writerRpcResultWaitingCalls.TryAdd(key, waitDataAsync))
|
||||
{
|
||||
throw new InvalidOperationException();
|
||||
}
|
||||
waitDataAsync.SetCancellationToken(token);
|
||||
|
||||
//请求子网关的数据
|
||||
var message = new MqttApplicationMessageBuilder().WithTopic(requestTopic).WithPayload(payload).Build();
|
||||
await _mqttServer.InjectApplicationMessage(new InjectedMqttApplicationMessage(message), token);
|
||||
|
||||
var result = await waitDataAsync.WaitAsync(timeOut);
|
||||
switch (result)
|
||||
{
|
||||
case WaitDataStatus.SetRunning:
|
||||
return waitDataAsync.WaitResult;
|
||||
case WaitDataStatus.Overtime:
|
||||
throw new TimeoutException();
|
||||
case WaitDataStatus.Canceled:
|
||||
{
|
||||
throw new Exception("等待已终止。可能是客户端已掉线,或者被注销。");
|
||||
}
|
||||
case WaitDataStatus.Default:
|
||||
case WaitDataStatus.Disposed:
|
||||
default:
|
||||
throw new Exception(ThingsGatewayStatus.UnknownError.GetDescription());
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
_writerRpcResultWaitingCalls.Remove(key);
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1250,50 +1250,50 @@
|
||||
</summary>
|
||||
<returns></returns>
|
||||
</member>
|
||||
<member name="T:ThingsGateway.Application.TGAPPInfo">
|
||||
<member name="T:ThingsGateway.Application.APPInfo">
|
||||
<inheritdoc/>
|
||||
</member>
|
||||
<member name="P:ThingsGateway.Application.TGAPPInfo.Environment">
|
||||
<member name="P:ThingsGateway.Application.APPInfo.Environment">
|
||||
<summary>
|
||||
主机环境
|
||||
</summary>
|
||||
</member>
|
||||
<member name="P:ThingsGateway.Application.TGAPPInfo.FrameworkDescription">
|
||||
<member name="P:ThingsGateway.Application.APPInfo.FrameworkDescription">
|
||||
<summary>
|
||||
NET框架
|
||||
</summary>
|
||||
</member>
|
||||
<member name="P:ThingsGateway.Application.TGAPPInfo.HostName">
|
||||
<member name="P:ThingsGateway.Application.APPInfo.HostName">
|
||||
<summary>
|
||||
主机名称
|
||||
</summary>
|
||||
</member>
|
||||
<member name="P:ThingsGateway.Application.TGAPPInfo.OsArchitecture">
|
||||
<member name="P:ThingsGateway.Application.APPInfo.OsArchitecture">
|
||||
<summary>
|
||||
系统架构
|
||||
</summary>
|
||||
</member>
|
||||
<member name="P:ThingsGateway.Application.TGAPPInfo.RemoteIp">
|
||||
<member name="P:ThingsGateway.Application.APPInfo.RemoteIp">
|
||||
<summary>
|
||||
外网地址
|
||||
</summary>
|
||||
</member>
|
||||
<member name="P:ThingsGateway.Application.TGAPPInfo.Stage">
|
||||
<member name="P:ThingsGateway.Application.APPInfo.Stage">
|
||||
<summary>
|
||||
Stage环境
|
||||
</summary>
|
||||
</member>
|
||||
<member name="P:ThingsGateway.Application.TGAPPInfo.SystemOs">
|
||||
<member name="P:ThingsGateway.Application.APPInfo.SystemOs">
|
||||
<summary>
|
||||
操作系统
|
||||
</summary>
|
||||
</member>
|
||||
<member name="P:ThingsGateway.Application.TGAPPInfo.UpdateTime">
|
||||
<member name="P:ThingsGateway.Application.APPInfo.UpdateTime">
|
||||
<summary>
|
||||
更新时间
|
||||
</summary>
|
||||
</member>
|
||||
<member name="P:ThingsGateway.Application.TGAPPInfo.DriveInfo">
|
||||
<member name="P:ThingsGateway.Application.APPInfo.DriveInfo">
|
||||
<summary>
|
||||
当前磁盘信息
|
||||
</summary>
|
||||
|
||||
Reference in New Issue
Block a user