Compare commits

..

22 Commits

Author SHA1 Message Date
Kimdiego2098
f7dc943fa3 2.1.0.11 2023-09-11 09:35:24 +08:00
Kimdiego2098
bfbd2693ec feat:ManageGatewayWorker part 2023-09-11 09:31:26 +08:00
Kimdiego2098
819e71c993 feat:ManageGatewayWorker part 2023-09-11 09:09:48 +08:00
Kimdiego2098
9fd0b489a2 feat:ManageGatewayWorker part 2023-09-11 09:09:03 +08:00
Kimdiego2098
f5fe9f8dae TGAPPInfp更名APPInfo 2023-09-11 08:57:13 +08:00
Kimdiego2098
f9ffc18145 Merge branch 'master' of https://gitee.com/dotnetchina/ThingsGateway 2023-09-11 08:54:01 +08:00
Kimdiego2098
08db5b983a feat:ManageGatewayWorker part 2023-09-11 08:53:52 +08:00
Diego2098
5b3b4c8c50 !7 修复无法修改变量值问题
Merge pull request !7 from 如阳如木/master
2023-09-07 10:14:45 +00:00
如阳如木
73f914ffc4 删除&& LastSetValue?.ToString() != data?.ToString()
Signed-off-by: 如阳如木 <970143933@qq.com>
2023-09-07 08:42:49 +00:00
Kimdiego2098
d6bdd73ed6 ManageGatewayConfig.json 2023-09-06 17:29:58 +08:00
Kimdiego2098
7370ee7349 fix:_mqttServer null error 2023-09-06 17:27:36 +08:00
Kimdiego2098
4574596bac 2.1.0.10 2023-09-06 17:16:28 +08:00
Kimdiego2098
4d16855e36 从数据库取原属性 2023-09-06 17:14:24 +08:00
Kimdiego2098
13a0d4d282 更改变量 最近一次值 为 上次值 2023-09-06 16:44:58 +08:00
Kimdiego2098
b9cd06b829 更改 变量最近一次值 为 上次值 2023-09-06 16:44:42 +08:00
Kimdiego2098
5b460e8fa2 2.1.0.9 2023-09-06 16:30:50 +08:00
Kimdiego2098
41087edf17 fix:串口断连/拔出/断电等情况,重新连接 2023-09-06 16:29:48 +08:00
Kimdiego2098
2afcc38e38 feat:ManageGatewayWorker part 2023-09-06 16:15:38 +08:00
Kimdiego2098
e59ccce25f feat:ManageGatewayWorker part 2023-09-06 16:10:29 +08:00
Kimdiego2098
d7425890e8 feat:ManageGatewayWorker part 2023-09-05 23:59:18 +08:00
Kimdiego2098
a989a837fb feat:ManageGatewayWorker part 2023-09-05 23:37:02 +08:00
Kimdiego2098
db1221da50 feat:ManageGatewayWorker part 2023-09-05 23:33:02 +08:00
34 changed files with 1402 additions and 224 deletions

View File

@@ -1,7 +1,7 @@
<Project>
<PropertyGroup>
<TargetFrameworks>net6.0;net7.0</TargetFrameworks>
<Version>2.1.0.8</Version>
<Version>2.1.0.11</Version>
<Authors>Diego</Authors>
<Product>ThingsGateway</Product>
<Copyright>© 2023-present Diego</Copyright>

View File

@@ -51,7 +51,6 @@ public class FileController : IDynamicApiController
public async Task<IActionResult> DownloadOperateLogAsync([FromQuery] OperateLogInput input)
{
var memoryStream = await _operateLogService.ExportFileAsync(input);
memoryStream.Seek(0, SeekOrigin.Begin);
var data = new FileStreamResult(memoryStream, "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")
{
FileDownloadName = $"operateLog{SysDateTimeExtensions.CurrentDateTime.ToFileDateTimeFormat()}.xlsx"
@@ -66,7 +65,6 @@ public class FileController : IDynamicApiController
public async Task<IActionResult> DownloadVisitLogAsync([FromQuery] VisitLogInput input)
{
var memoryStream = await _visitLogService.ExportFileAsync(input);
memoryStream.Seek(0, SeekOrigin.Begin);
var data = new FileStreamResult(memoryStream, "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")
{
FileDownloadName = $"operateLog{SysDateTimeExtensions.CurrentDateTime.ToFileDateTimeFormat()}.xlsx"

View File

@@ -88,8 +88,7 @@ public class AuthService : IAuthService
}
var password = DESCEncryption.Decrypt(input.Password, DESCKeyConst.DESCKey); // 解密
var userInfo = await _userService.GetUserByAccountAsync(input.Account);//获取用户信息
if (userInfo == null) throw Oops.Bah("用户不存在");//用户不存在
var userInfo = await _userService.GetUserByAccountAsync(input.Account) ?? throw Oops.Bah("用户不存在");//获取用户信息
if (userInfo.Password != password) throw Oops.Bah("账号密码错误");//账号密码错误
return await LoginAsync(userInfo, input.Device);
}

View File

@@ -66,6 +66,7 @@ public class OperateLogService : DbRepository<SysOperateLog>, IOperateLogService
var memoryStream = new MemoryStream();
await memoryStream.SaveAsAsync(sheets);
memoryStream.Seek(0, SeekOrigin.Begin);
return memoryStream;
}

View File

@@ -92,6 +92,7 @@ public class VisitLogService : DbRepository<SysVisitLog>, IVisitLogService
var memoryStream = new MemoryStream();
await memoryStream.SaveAsAsync(sheets);
memoryStream.Seek(0, SeekOrigin.Begin);
return memoryStream;
}

View File

@@ -61,7 +61,6 @@ public class FileController : IDynamicApiController
public async Task<IActionResult> DownloadRpcLogAsync([FromQuery] RpcLogInput input)
{
var memoryStream = await _rpcLogService.ExportFileAsync(input);
memoryStream.Seek(0, SeekOrigin.Begin);
var data = new FileStreamResult(memoryStream, "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")
{
FileDownloadName = $"rpcLog{SysDateTimeExtensions.CurrentDateTime.ToFileDateTimeFormat()}.xlsx"
@@ -76,7 +75,6 @@ public class FileController : IDynamicApiController
public async Task<IActionResult> DownloadBackendLogAsync([FromQuery] BackendLogInput input)
{
var memoryStream = await _backendLogService.ExportFileAsync(input);
memoryStream.Seek(0, SeekOrigin.Begin);
var data = new FileStreamResult(memoryStream, "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")
{
FileDownloadName = $"backendLog{SysDateTimeExtensions.CurrentDateTime.ToFileDateTimeFormat()}.xlsx"
@@ -93,7 +91,6 @@ public class FileController : IDynamicApiController
public async Task<IActionResult> DownloadCollectDeviceAsync([FromQuery] CollectDeviceInput input)
{
var memoryStream = await _collectDeviceService.ExportFileAsync(input);
memoryStream.Seek(0, SeekOrigin.Begin);
var data = new FileStreamResult(memoryStream, "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")
{
FileDownloadName = $"collectDevice{SysDateTimeExtensions.CurrentDateTime.ToFileDateTimeFormat()}.xlsx"
@@ -108,7 +105,6 @@ public class FileController : IDynamicApiController
public async Task<IActionResult> DownloadUploadDeviceAsync([FromQuery] UploadDeviceInput input)
{
var memoryStream = await _uploadDeviceService.ExportFileAsync(input);
memoryStream.Seek(0, SeekOrigin.Begin);
var data = new FileStreamResult(memoryStream, "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")
{
FileDownloadName = $"uploadDevice{SysDateTimeExtensions.CurrentDateTime.ToFileDateTimeFormat()}.xlsx"
@@ -123,7 +119,6 @@ public class FileController : IDynamicApiController
public async Task<IActionResult> DownloadDeviceVariableAsync([FromQuery] DeviceVariableInput input)
{
var memoryStream = await _variableService.ExportFileAsync(input);
memoryStream.Seek(0, SeekOrigin.Begin);
var data = new FileStreamResult(memoryStream, "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")
{
FileDownloadName = $"deviceVariable{SysDateTimeExtensions.CurrentDateTime.ToFileDateTimeFormat()}.xlsx"
@@ -138,7 +133,6 @@ public class FileController : IDynamicApiController
public async Task<IActionResult> DownloadMemoryVariableAsync([FromQuery] MemoryVariableInput input)
{
var memoryStream = await _variableService.ExportFileAsync(input);
memoryStream.Seek(0, SeekOrigin.Begin);
var data = new FileStreamResult(memoryStream, "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")
{
FileDownloadName = $"memoryVariable{SysDateTimeExtensions.CurrentDateTime.ToFileDateTimeFormat()}.xlsx"

View File

@@ -97,11 +97,12 @@ public class HardwareInfoService : ISingleton
, TaskCreationOptions.LongRunning);
}
private TGAPPInfo appInfo = new();
private APPInfo appInfo = new();
/// <summary>
/// 运行信息获取
/// </summary>
public TGAPPInfo APPInfo => appInfo;
public APPInfo APPInfo => appInfo;
/// <summary>
@@ -128,7 +129,7 @@ public class HardwareInfoService : ISingleton
}
/// <inheritdoc/>
public class TGAPPInfo
public class APPInfo
{
/// <summary>
/// 主机环境

View File

@@ -11,6 +11,8 @@
#endregion
using System.ComponentModel;
using ThingsGateway.Foundation;
namespace ThingsGateway.Application;
@@ -23,82 +25,132 @@ public class ManageGatewayConfig
/// <summary>
/// 启用
/// </summary>
[Description("启用")]
public bool Enable { get; set; }
/// <summary>
/// MqttBrokerIP
/// </summary>
[Description("Mqtt-Tcp IP")]
public string MqttBrokerIP { get; set; }
/// <summary>
/// MqttBrokerPort
/// </summary>
[Description("Mqtt-Tcp 端口")]
public int MqttBrokerPort { get; set; }
/// <summary>
/// UserName
/// </summary>
[Description("Mqtt用户名")]
public string UserName { get; set; }
/// <summary>
/// Password
/// </summary>
[Description("Mqtt密码")]
public string Password { get; set; }
/// <summary>
/// DBDownTopicRpc返回为{DBDownTopic}/Return
/// WriteRpcTopicRpc返回为{WriteRpcTopic}/Return只有这个topic才开放外部订阅权限
/// </summary>
[Description("变量写入Rpc主题")]
public string WriteRpcTopic { get; set; }
/// <summary>
/// DBDownTopic
/// </summary>
[Description("配置下发Rpc主题")]
public string DBDownTopic { get; set; }
/// <summary>
/// DBUploadTopicRpc返回为{DBUploadTopic}/Return
/// DBUploadTopic
/// </summary>
[Description("配置上传Rpc主题")]
public string DBUploadTopic { get; set; }
/// <summary>
/// WriteRpcTopicRpc返回为{WriteRpcTopic}/Return
/// </summary>
public string WriteRpcTopic { get; set; }
}
/// <summary>
/// 用于Mqtt Json传输上传/下载配置信息
/// ClientGatewayConfig
/// </summary>
public class MqttDB
public class ClientGatewayConfig : ManageGatewayConfig
{
/// <summary>
/// 标识
/// </summary>
[Description("子网关标识ID")]
public string GatewayId { get; set; }
}
/// <summary>
/// 用于Mqtt Json传输上传/下载配置信息
/// </summary>
public class MqttDBUploadRpcResult
{
/// <summary>
/// 采集设备
/// </summary>
public List<CollectDevice> CollectDevices { get; set; }
/// <summary>
/// true=>删除全部后增加
/// </summary>
public bool IsCollectDevicesFullUp { get; set; }
public List<CollectDevice> CollectDevices { get; set; } = new();
/// <summary>
/// 上传设备
/// </summary>
public List<UploadDevice> UploadDevices { get; set; }
/// <summary>
/// true=>删除全部后增加
/// </summary>
public bool IsUploadDevicesFullUp { get; set; }
public List<UploadDevice> UploadDevices { get; set; } = new();
/// <summary>
/// 变量
/// </summary>
public List<DeviceVariable> DeviceVariables { get; set; }
public List<DeviceVariable> DeviceVariables { get; set; } = new();
}
/// <summary>
/// 用于Mqtt Json传输上传/下载配置信息
/// </summary>
public class MqttDBDownRpc
{
/// <summary>
/// 采集设备
/// </summary>
public byte[] CollectDevices { get; set; }
/// <summary>
/// 上传设备
/// </summary>
public byte[] UploadDevices { get; set; }
/// <summary>
/// 变量
/// </summary>
public byte[] DeviceVariables { get; set; }
/// <summary>
/// true=>删除全部后增加
/// </summary>
public bool IsDeviceVariablesFullUp { get; set; }
[Description("是否删除原采集设备表")]
public bool IsCollectDevicesFullUp { get; set; }
/// <summary>
/// 配置项
/// true=>删除全部后增加
/// </summary>
public List<SysConfig> SysConfigs { get; set; }
[Description("是否删除原上传设备表")]
public bool IsUploadDevicesFullUp { get; set; }
/// <summary>
/// true=>删除全部后增加
/// </summary>
[Description("是否删除原变量表")]
public bool IsDeviceVariablesFullUp { get; set; }
/// <summary>
/// 是否立即重启,使配置生效
/// </summary>
[Description("是否重启子网关线程")]
public bool IsRestart { get; set; }
}
@@ -111,6 +163,7 @@ public class ManageMqttRpcFrom
/// 标识
/// </summary>
public string GatewayId { get; set; }
/// <summary>
/// 标识
/// </summary>

View File

@@ -13,14 +13,20 @@
using Furion;
using Furion.Logging.Extensions;
using Mapster;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Internal;
using MQTTnet.Protocol;
using MQTTnet.Server;
using System.Collections.Concurrent;
using System.Net;
using System.Text;
using ThingsGateway.Foundation;
@@ -29,13 +35,24 @@ using TouchSocket.Core;
namespace ThingsGateway.Application;
/// <summary>
/// 设备采集报警后台服务
/// ManageGatewayWorker
/// </summary>
public class ManageGatewayWorker : BackgroundService
{
private readonly ILogger _clientLogger;
private readonly ILogger _logger;
private readonly ILogger _manageLogger;
private readonly ILogger _clientLogger;
/// <summary>
/// 全部重启锁
/// </summary>
private readonly EasyLock restartLock = new();
private IMqttClient _mqttClient;
private MqttServer _mqttServer;
private MqttClientSubscribeOptions _mqttSubscribeOptions;
/// <inheritdoc cref="ManageGatewayWorker"/>
public ManageGatewayWorker(ILoggerFactory loggerFactory)
{
@@ -46,19 +63,12 @@ public class ManageGatewayWorker : BackgroundService
/// <summary>
/// 服务状态
/// </summary>
public OperResult RealAlarmStatuString { get; set; } = new OperResult("初始化");
public OperResult ClientStatuString { get; set; } = new OperResult("初始化");
/// <summary>
/// 服务状态
/// </summary>
public OperResult HisAlarmStatuString { get; set; } = new OperResult("初始化");
/// <summary>
/// 服务状态
/// </summary>
public OperResult ReadAlarmStatuString { get; set; } = new OperResult("初始化");
private MqttServer _mqttServer;
private IMqttClient _mqttClient;
public OperResult ManageStatuString { get; set; } = new OperResult("初始化");
#region worker服务
/// <inheritdoc/>
public override async Task StartAsync(CancellationToken token)
@@ -69,22 +79,81 @@ public class ManageGatewayWorker : BackgroundService
}
/// <inheritdoc/>
public override Task StopAsync(CancellationToken token)
public override async Task StopAsync(CancellationToken token)
{
_logger?.LogInformation("ManageGatewayWorker停止");
return base.StopAsync(token);
await StopAsync();
await base.StopAsync(token);
}
/// <inheritdoc/>
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await Task.Delay(5000, stoppingToken);
while (!stoppingToken.IsCancellationRequested)
{
try
{
await Task.Delay(60000, stoppingToken);
if (_mqttClient != null)
{
//持续重连
var result = await TryMqttClientAsync(stoppingToken);
if (result.IsSuccess)
{
_clientLogger.LogDebug("连接正常:" + result.Message);
ClientStatuString.ResultCode = ResultCode.Success;
ClientStatuString.Message = "连接正常:" + result.Message;
}
else
{
_clientLogger.LogWarning("连接错误:" + result.Message);
ClientStatuString.ResultCode = ResultCode.Fail;
ClientStatuString.Message = "连接错误:" + result.Message;
}
}
await Task.Delay(10000, stoppingToken);
//if (_mqttServer != null)
//{
// //TODO:test code
// var mqttClientStatuses = await _mqttServer.GetClientsAsync();
// if (mqttClientStatuses.FirstOrDefault() is MqttClientStatus mqttClientStatus)
// {
// //获取子网关信息
// var getClientGatewayDBResult = await GetClientGatewayDBAsync(mqttClientStatus.Id);
// //下发子网关配置
// var mqttDBDownRpc = new MqttDBDownRpc
// {
// IsRestart = true
// };
// var setClientGatewayDBResult = await SetClientGatewayDBAsync(mqttClientStatus.Id, mqttDBDownRpc);
// //下发子网关配置
// var manageMqttRpcFrom = new ManageMqttRpcFrom
// {
// WriteInfos = new Dictionary<string, string>()
// {
// {
// "test41","123"
// }
// },
// GatewayId = "GatewayId",
// RpcId = "123456",
// };
// var WriteVariableResult = await WriteVariableAsync(manageMqttRpcFrom);
// }
//}
}
catch (TaskCanceledException)
{
@@ -93,18 +162,36 @@ public class ManageGatewayWorker : BackgroundService
catch (ObjectDisposedException)
{
}
catch (Exception ex)
{
_logger.LogError(ex, ToString());
}
}
}
#endregion
#region
#region public
/// <summary>
/// 全部重启锁
/// 获取子网关的配置信息
/// </summary>
private readonly EasyLock restartLock = new();
/// <returns></returns>
public async Task<OperResult<MqttDBUploadRpcResult>> GetClientGatewayDBAsync(string gatewayId, int timeOut = 3000, CancellationToken token = default)
{
try
{
var buffer = Encoding.UTF8.GetBytes(string.Empty);
var response = await RpcDataExecuteAsync(gatewayId, ClientGatewayConfig.DBUploadTopic, buffer, timeOut, MqttQualityOfServiceLevel.AtMostOnce, token);
var data = Encoding.UTF8.GetString(response).FromJsonString<MqttDBUploadRpcResult>();
return OperResult.CreateSuccessResult(data);
}
catch (Exception ex)
{
return new OperResult<MqttDBUploadRpcResult>(ex);
}
}
/// <summary>
/// 重启
@@ -116,6 +203,171 @@ public class ManageGatewayWorker : BackgroundService
await StartAsync();
}
/// <summary>
/// 下载配置信息到子网关
/// </summary>
/// <returns></returns>
public async Task<OperResult> SetClientGatewayDBAsync(string gatewayId, MqttDBDownRpc mqttDBRpc, int timeOut = 3000, CancellationToken token = default)
{
try
{
var buffer = Encoding.UTF8.GetBytes(mqttDBRpc?.ToJsonString() ?? string.Empty);
var response = await RpcDataExecuteAsync(gatewayId, ClientGatewayConfig.DBDownTopic, buffer, timeOut, MqttQualityOfServiceLevel.AtMostOnce, token);
var data = Encoding.UTF8.GetString(response).FromJsonString<OperResult>();
return data;
}
catch (Exception ex)
{
return new OperResult(ex);
}
}
/// <summary>
/// 写入变量到子网关
/// </summary>
/// <returns></returns>
public async Task<OperResult<ManageMqttRpcResult>> WriteVariableAsync(ManageMqttRpcFrom manageMqttRpcFrom, int timeOut = 3000, CancellationToken token = default)
{
try
{
var payload = Encoding.UTF8.GetBytes(manageMqttRpcFrom?.ToJsonString() ?? string.Empty);
var requestTopic = ManageGatewayConfig.WriteRpcTopic;
var responseTopic = GetRpcReturnTopic(ManageGatewayConfig.WriteRpcTopic);
var key = GetRpcReturnIdTopic(manageMqttRpcFrom.GatewayId, requestTopic, manageMqttRpcFrom.RpcId);
ManageMqttRpcResult result = await RpcWriteExecuteAsync(timeOut, payload, requestTopic, key, token);
return OperResult.CreateSuccessResult(result);
}
catch (Exception ex)
{
return new OperResult<ManageMqttRpcResult>(ex);
}
}
/// <summary>
/// 获取子网关列表
/// </summary>
/// <returns></returns>
public async Task<List<MqttClientStatus>> GetClientGatewayAsync()
{
if (_mqttServer != null)
{
var data = await _mqttServer.GetClientsAsync();
return data.ToList();
}
else
{
return new List<MqttClientStatus>();
}
}
#endregion
#region RPC实现
readonly ConcurrentDictionary<string, WaitDataAsync<byte[]>> _waitingCalls = new();
readonly ConcurrentDictionary<string, WaitDataAsync<ManageMqttRpcResult>> _writerRpcResultWaitingCalls = new();
private readonly EasyLock clientLock = new();
private async Task<ManageMqttRpcResult> RpcWriteExecuteAsync(int timeOut, byte[] payload, string requestTopic, string key, CancellationToken token)
{
try
{
using WaitDataAsync<ManageMqttRpcResult> waitDataAsync = new();
if (!_writerRpcResultWaitingCalls.TryAdd(key, waitDataAsync))
{
throw new InvalidOperationException();
}
waitDataAsync.SetCancellationToken(token);
//请求子网关的数据
var message = new MqttApplicationMessageBuilder().WithTopic(requestTopic).WithPayload(payload).Build();
await _mqttServer.InjectApplicationMessage(new InjectedMqttApplicationMessage(message), token);
var result = await waitDataAsync.WaitAsync(timeOut);
switch (result)
{
case WaitDataStatus.SetRunning:
return waitDataAsync.WaitResult;
case WaitDataStatus.Overtime:
throw new TimeoutException();
case WaitDataStatus.Canceled:
{
throw new Exception("等待已终止。可能是客户端已掉线,或者被注销。");
}
case WaitDataStatus.Default:
case WaitDataStatus.Disposed:
default:
throw new Exception(ThingsGatewayStatus.UnknownError.GetDescription());
}
}
finally
{
_writerRpcResultWaitingCalls.Remove(key);
}
}
/// <summary>
/// RPC请求子网关并返回需要传入子网关ID作为Topic参数一部分
/// </summary>
/// <returns></returns>
private async Task<byte[]> RpcDataExecuteAsync(string gatewayId, string topic, byte[] payload, int timeOut, MqttQualityOfServiceLevel qualityOfServiceLevel, CancellationToken token = default)
{
var responseTopic = GetRpcReturnTopic(gatewayId, topic);
var requestTopic = GetRpcTopic(gatewayId, topic);
try
{
using WaitDataAsync<byte[]> waitDataAsync = new();
if (!_waitingCalls.TryAdd(responseTopic, waitDataAsync))
{
throw new InvalidOperationException();
}
waitDataAsync.SetCancellationToken(token);
//请求子网关的数据
var message = new MqttApplicationMessageBuilder().WithTopic(requestTopic).WithPayload(payload).Build();
await _mqttServer.InjectApplicationMessage(new InjectedMqttApplicationMessage(message), token);
var result = await waitDataAsync.WaitAsync(timeOut);
switch (result)
{
case WaitDataStatus.SetRunning:
return waitDataAsync.WaitResult;
case WaitDataStatus.Overtime:
throw new TimeoutException();
case WaitDataStatus.Canceled:
{
throw new Exception("等待已终止。可能是客户端已掉线,或者被注销。");
}
case WaitDataStatus.Default:
case WaitDataStatus.Disposed:
default:
throw new Exception(ThingsGatewayStatus.UnknownError.GetDescription());
}
}
finally
{
_waitingCalls.Remove(responseTopic);
}
}
#endregion
#region
internal async Task StartAsync()
{
try
@@ -163,7 +415,6 @@ public class ManageGatewayWorker : BackgroundService
restartLock.Release();
}
}
/// <summary>
/// 初始化
/// </summary>
@@ -171,78 +422,81 @@ public class ManageGatewayWorker : BackgroundService
{
try
{
var manageGatewayConfig = App.GetConfig<ManageGatewayConfig>("ManageGatewayConfig");
if (manageGatewayConfig?.Enable != true)
ManageGatewayConfig = App.GetConfig<ManageGatewayConfig>("ManageGatewayConfig");
if (ManageGatewayConfig?.Enable != true)
{
HisAlarmStatuString = new OperResult($"已退出:不启用管理功能");
return;
ManageStatuString = new OperResult($"已退出:不启用管理功能");
_manageLogger.LogWarning("已退出:不启用管理功能");
}
else
{
var mqttFactory = new MqttFactory(new MqttNetLogger(_manageLogger));
var mqttServerOptions = mqttFactory.CreateServerOptionsBuilder()
.WithDefaultEndpointBoundIPAddress(string.IsNullOrEmpty(manageGatewayConfig.MqttBrokerIP) ? null : IPAddress.Parse(manageGatewayConfig.MqttBrokerIP))
.WithDefaultEndpointPort(manageGatewayConfig.MqttBrokerPort)
.WithDefaultEndpointBoundIPAddress(string.IsNullOrEmpty(ManageGatewayConfig.MqttBrokerIP) ? null : IPAddress.Parse(ManageGatewayConfig.MqttBrokerIP))
.WithDefaultEndpointPort(ManageGatewayConfig.MqttBrokerPort)
.WithDefaultEndpoint()
.Build();
_mqttServer = mqttFactory.CreateMqttServer(mqttServerOptions);
if (_mqttServer != null)
{
_mqttServer.ValidatingConnectionAsync += MqttServer_ValidatingConnectionAsync;
_mqttServer.InterceptingPublishAsync += MqttServer_InterceptingPublishAsync;
_mqttServer.LoadingRetainedMessageAsync += MqttServer_LoadingRetainedMessageAsync;
_mqttServer.InterceptingSubscriptionAsync += MqttServer_InterceptingSubscriptionAsync; ;
_mqttServer.ValidatingConnectionAsync += MqttServer_ValidatingConnectionAsync;//认证
_mqttServer.InterceptingPublishAsync += MqttServer_InterceptingPublishAsync;//消息
await _mqttServer.StartAsync();
}
ManageStatuString = OperResult.CreateSuccessResult();
}
}
catch (Exception ex)
{
_manageLogger.LogError(ex, "初始化失败");
ManageStatuString = new($"初始化失败-{ex.Message}");
}
try
{
var clientGatewayConfig = App.GetConfig<ManageGatewayConfig>("ClientGatewayConfig");
if (clientGatewayConfig?.Enable != true)
ClientGatewayConfig = App.GetConfig<ClientGatewayConfig>("ClientGatewayConfig");
if (ClientGatewayConfig?.Enable != true)
{
RealAlarmStatuString = new OperResult($"已退出:不启用子网关功能");
return;
ClientStatuString = new OperResult($"已退出:不启用子网关功能");
_clientLogger.LogWarning("已退出:不启用子网关功能");
}
else
{
var mqttFactory = new MqttFactory(new MqttNetLogger(_clientLogger));
var _mqttClientOptions = mqttFactory.CreateClientOptionsBuilder()
.WithCredentials(clientGatewayConfig.UserName, clientGatewayConfig.Password)//账密
.WithTcpServer(clientGatewayConfig.MqttBrokerIP, clientGatewayConfig.MqttBrokerPort)//服务器
.WithCleanSession(true)
.WithKeepAlivePeriod(TimeSpan.FromSeconds(120.0))
.WithoutThrowOnNonSuccessfulConnectResponse()
.Build();
var _mqttSubscribeOptions = mqttFactory.CreateSubscribeOptionsBuilder()
.WithTopicFilter(
f =>
{
f.WithTopic(clientGatewayConfig.WriteRpcTopic);
f.WithAtMostOnceQoS();
})
.WithTopicFilter(
f =>
{
f.WithTopic(clientGatewayConfig.DBDownTopic);
f.WithAtMostOnceQoS();
})
.WithTopicFilter(
f =>
{
f.WithTopic(clientGatewayConfig.DBUploadTopic);
f.WithAtMostOnceQoS();
})
.Build();
_mqttClientOptions = mqttFactory.CreateClientOptionsBuilder()
.WithCredentials(ClientGatewayConfig.UserName, ClientGatewayConfig.Password)//账密
.WithTcpServer(ClientGatewayConfig.MqttBrokerIP, ClientGatewayConfig.MqttBrokerPort)//服务器
.WithClientId(ClientGatewayConfig.GatewayId)
.WithCleanSession(true)
.WithKeepAlivePeriod(TimeSpan.FromSeconds(120.0))
.WithoutThrowOnNonSuccessfulConnectResponse()
.Build();
_mqttSubscribeOptions = mqttFactory.CreateSubscribeOptionsBuilder()
.WithTopicFilter(
f =>
{
f.WithTopic(ClientGatewayConfig.WriteRpcTopic);
f.WithAtMostOnceQoS();
})
.WithTopicFilter(
f =>
{
f.WithTopic(GetRpcTopic(ClientGatewayConfig.GatewayId, ClientGatewayConfig.DBDownTopic));
f.WithAtMostOnceQoS();
})
.WithTopicFilter(
f =>
{
f.WithTopic(GetRpcTopic(ClientGatewayConfig.GatewayId, ClientGatewayConfig.DBUploadTopic));
f.WithAtMostOnceQoS();
})
.Build();
_mqttClient = mqttFactory.CreateMqttClient();
_mqttClient.ConnectedAsync += MqttClient_ConnectedAsync;
_mqttClient.ApplicationMessageReceivedAsync += MqttClient_ApplicationMessageReceivedAsync;
await TryMqttClientAsync(CancellationToken.None);
}
}
@@ -251,39 +505,334 @@ public class ManageGatewayWorker : BackgroundService
_clientLogger.LogError(ex, "初始化失败");
}
}
private Task MqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs args)
/// <summary>
/// ClientGatewayConfig
/// </summary>
public ClientGatewayConfig ClientGatewayConfig;
/// <summary>
/// ManageGatewayConfig
/// </summary>
public ManageGatewayConfig ManageGatewayConfig;
private MqttClientOptions _mqttClientOptions;
RpcSingletonService _rpcCore;
private async Task DBDownTopicMethod(MqttApplicationMessageReceivedEventArgs args)
{
throw new NotImplementedException();
var mqttDBRpc = args.ApplicationMessage.PayloadSegment.Count > 0 ? Encoding.UTF8.GetString(args.ApplicationMessage.PayloadSegment).FromJsonString<MqttDBDownRpc>() : null;
if (mqttDBRpc != null)
{
OperResult result = new();
var collectDeviceService = App.GetService<CollectDeviceService>();
var variableService = App.GetService<VariableService>();
var uploadDeviceService = App.GetService<UploadDeviceService>();
collectDeviceService.Context = variableService.Context = uploadDeviceService.Context;
var itenant = collectDeviceService.Context.AsTenant();
//事务
var dbResult = await itenant.UseTranAsync(async () =>
{
if (mqttDBRpc.IsCollectDevicesFullUp)
{
await collectDeviceService.AsDeleteable().ExecuteCommandAsync();
}
var collectDevices = new List<CollectDevice>();
if (mqttDBRpc.CollectDevices != null && mqttDBRpc.CollectDevices.Length > 0)
{
using MemoryStream stream = new(mqttDBRpc.CollectDevices);
var previewResult = await collectDeviceService.PreviewAsync(stream);
if (previewResult.FirstOrDefault().Value.HasError)
{
throw new(previewResult.Select(a => a.Value.Results.Where(a => !a.isSuccess).ToList()).ToList().ToJsonString());
}
foreach (var item in previewResult)
{
if (item.Key == ExportHelpers.CollectDeviceSheetName)
{
var collectDeviceImports = ((ImportPreviewOutput<CollectDevice>)item.Value).Data;
collectDevices = collectDeviceImports.Values.Adapt<List<CollectDevice>>();
break;
}
}
await collectDeviceService.ImportAsync(previewResult);
}
if (mqttDBRpc.IsUploadDevicesFullUp)
{
await uploadDeviceService.AsDeleteable().ExecuteCommandAsync();
}
var uploadDevices = new List<UploadDevice>();
if (mqttDBRpc.UploadDevices != null && mqttDBRpc.UploadDevices.Length > 0)
{
using MemoryStream stream1 = new(mqttDBRpc.UploadDevices);
var previewResult1 = await uploadDeviceService.PreviewAsync(stream1);
if (previewResult1.FirstOrDefault().Value.HasError)
{
throw new(previewResult1.Select(a => a.Value.Results.Where(a => !a.isSuccess).ToList()).ToList().ToJsonString());
}
foreach (var item in previewResult1)
{
if (item.Key == ExportHelpers.UploadDeviceSheetName)
{
var uploadDeviceImports = ((ImportPreviewOutput<UploadDevice>)item.Value).Data;
uploadDevices = uploadDeviceImports.Values.Adapt<List<UploadDevice>>();
break;
}
}
await uploadDeviceService.ImportAsync(previewResult1);
}
if (mqttDBRpc.IsDeviceVariablesFullUp)
{
await variableService.AsDeleteable().ExecuteCommandAsync();
}
if (mqttDBRpc.DeviceVariables != null && mqttDBRpc.DeviceVariables.Length > 0)
{
using MemoryStream stream2 = new(mqttDBRpc.DeviceVariables);
var previewResult2 = await variableService.PreviewAsync(stream2, collectDevices, uploadDevices);
if (previewResult2.FirstOrDefault().Value.HasError)
{
throw new(previewResult2.Select(a => a.Value.Results.Where(a => !a.isSuccess).ToList()).ToList().ToJsonString());
}
await variableService.ImportAsync(previewResult2);
}
});
CacheStatic.Cache.Remove(ThingsGatewayCacheConst.Cache_CollectDevice);//cache删除
CacheStatic.Cache.Remove(ThingsGatewayCacheConst.Cache_UploadDevice);//cache删除
if (dbResult.IsSuccess)//如果成功了
{
_clientLogger.LogInformation("子网关接收配置,并保存至数据库-执行成功");
result = OperResult.CreateSuccessResult();
if (mqttDBRpc.IsRestart)
{
_clientLogger.LogInformation("子网关接收配置,并重启");
await ServiceHelper.GetBackgroundService<CollectDeviceWorker>().RestartDeviceThreadAsync();
}
}
else
{
//写日志
result.Message = dbResult.ErrorMessage;
}
var variableMessage = new MqttApplicationMessageBuilder()
.WithTopic(GetRpcReturnTopic(args.ApplicationMessage.Topic))
.WithPayload(result.ToJsonString()).Build();
if (_mqttClient.IsConnected)
await _mqttClient.PublishAsync(variableMessage);
}
}
private Task MqttClient_ConnectedAsync(MqttClientConnectedEventArgs args)
private async Task DBUploadTopicMethod(MqttApplicationMessageReceivedEventArgs args)
{
throw new NotImplementedException();
MqttDBUploadRpcResult result = new();
var collectDeviceService = App.GetService<CollectDeviceService>();
var variableService = App.GetService<VariableService>();
var uploadDeviceService = App.GetService<UploadDeviceService>();
result.CollectDevices = collectDeviceService.GetCacheList(false);
result.DeviceVariables = await variableService.GetListAsync();
result.UploadDevices = uploadDeviceService.GetCacheList(false);
var variableMessage = new MqttApplicationMessageBuilder()
.WithTopic(GetRpcReturnTopic(args.ApplicationMessage.Topic))
.WithPayload(result.ToJsonString()).Build();
if (_mqttClient.IsConnected)
await _mqttClient.PublishAsync(variableMessage);
}
private Task MqttServer_InterceptingSubscriptionAsync(InterceptingSubscriptionEventArgs args)
private string GetRpcReturnIdTopic(string gatewayId, string topic, string rpcId)
{
throw new NotImplementedException();
var responseTopic = $"{gatewayId}/{topic}/rpc/Return/rpcId";
return responseTopic;
}
private Task MqttServer_LoadingRetainedMessageAsync(LoadingRetainedMessagesEventArgs args)
private string GetRpcReturnTopic(string gatewayId, string topic)
{
throw new NotImplementedException();
var responseTopic = $"{gatewayId}/{topic}/rpc/Return";
return responseTopic;
}
private Task MqttServer_InterceptingPublishAsync(InterceptingPublishEventArgs args)
private string GetRpcReturnTopic(string requestTopic)
{
throw new NotImplementedException();
var responseTopic = $"{requestTopic}/Return";
return responseTopic;
}
private Task MqttServer_ValidatingConnectionAsync(ValidatingConnectionEventArgs args)
private string GetRpcTopic(string gatewayId, string topic)
{
throw new NotImplementedException();
var requestTopic = $"{gatewayId}/{topic}/rpc";
return requestTopic;
}
private async Task MqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs args)
{
if (args.ApplicationMessage.Topic == GetRpcTopic(ClientGatewayConfig.GatewayId, ClientGatewayConfig.DBUploadTopic))
{
_clientLogger.LogInformation("子网关配置上传");
await DBUploadTopicMethod(args);
return;
}
if (args.ApplicationMessage.Topic == GetRpcTopic(ClientGatewayConfig.GatewayId, ClientGatewayConfig.DBDownTopic))
{
_clientLogger.LogInformation("子网关接收配置,并保存至数据库");
await DBDownTopicMethod(args);
return;
}
if (args.ApplicationMessage.Topic == ClientGatewayConfig.WriteRpcTopic)
{
await WriteRpcTopicMethod(args);
return;
}
}
private async Task MqttClient_ConnectedAsync(MqttClientConnectedEventArgs args)
{
var subResult = await _mqttClient.SubscribeAsync(_mqttSubscribeOptions);
if (subResult.Items.Any(a => a.ResultCode > (MqttClientSubscribeResultCode)10))
{
_clientLogger?.LogWarning("订阅失败-" + subResult.Items
.Where(a => a.ResultCode > (MqttClientSubscribeResultCode)10)
.Select(a =>
new
{
Topic = a.TopicFilter.Topic,
ResultCode = a.ResultCode.ToString()
}
)
.ToJsonString()
);
}
}
private Task MqttServer_InterceptingPublishAsync(InterceptingPublishEventArgs eventArgs)
{
if (eventArgs.ApplicationMessage.Topic == GetRpcReturnTopic(ManageGatewayConfig.WriteRpcTopic))
{
if (!_writerRpcResultWaitingCalls.IsEmpty)
{
var payloadBuffer = eventArgs.ApplicationMessage.PayloadSegment.ToArray();
var manageMqttRpcResult = Encoding.UTF8.GetString(payloadBuffer).FromJsonString<ManageMqttRpcResult>();
var key = GetRpcReturnIdTopic(manageMqttRpcResult.GatewayId, ManageGatewayConfig.WriteRpcTopic, manageMqttRpcResult.RpcId);
if (!_writerRpcResultWaitingCalls.TryRemove(key, out var writeRpcResultAsync))
{
return CompletedTask.Instance;
}
writeRpcResultAsync.Set(manageMqttRpcResult);
}
}
else
{
if (!_waitingCalls.TryRemove(eventArgs.ApplicationMessage.Topic, out var awaitable))
{
return CompletedTask.Instance;
}
var payloadBuffer = eventArgs.ApplicationMessage.PayloadSegment.ToArray();
awaitable.Set(payloadBuffer);
}
return CompletedTask.Instance;
}
private Task MqttServer_ValidatingConnectionAsync(ValidatingConnectionEventArgs arg)
{
if (ManageGatewayConfig.UserName != arg.UserName)
{
arg.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
return CompletedTask.Instance;
}
if (ManageGatewayConfig.Password != arg.Password)
{
arg.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
return CompletedTask.Instance;
}
_manageLogger?.LogInformation(ToString() + "-" + arg.ClientId + "-客户端已连接成功");
return CompletedTask.Instance;
}
private async Task<OperResult> TryMqttClientAsync(CancellationToken token)
{
if (_mqttClient?.IsConnected == true)
return OperResult.CreateSuccessResult();
return await Cilent();
async Task<OperResult> Cilent()
{
if (_mqttClient?.IsConnected == true)
return OperResult.CreateSuccessResult();
try
{
await clientLock.WaitAsync();
if (_mqttClient?.IsConnected == true)
return OperResult.CreateSuccessResult();
using var timeoutToken = new CancellationTokenSource(TimeSpan.FromMilliseconds(5000));
using CancellationTokenSource StoppingToken = CancellationTokenSource.CreateLinkedTokenSource(token, timeoutToken.Token);
if (_mqttClient?.IsConnected == true)
return OperResult.CreateSuccessResult();
if (_mqttClient == null)
{
return new OperResult("未初始化");
}
var result = await _mqttClient?.ConnectAsync(_mqttClientOptions, StoppingToken.Token);
if (result.ResultCode == MqttClientConnectResultCode.Success)
{
return OperResult.CreateSuccessResult();
}
else
{
return new OperResult(result.ReasonString);
}
}
catch (Exception ex)
{
return new OperResult(ex);
}
finally
{
clientLock.Release();
}
}
}
private async Task WriteRpcTopicMethod(MqttApplicationMessageReceivedEventArgs args)
{
var manageMqttRpcFrom = args.ApplicationMessage.PayloadSegment.Count > 0 ? Encoding.UTF8.GetString(args.ApplicationMessage.PayloadSegment).FromJsonString<ManageMqttRpcFrom>() : null;
if (manageMqttRpcFrom != null && manageMqttRpcFrom.GatewayId == ClientGatewayConfig.GatewayId)
{
ManageMqttRpcResult mqttRpcResult = new() { RpcId = manageMqttRpcFrom.RpcId, GatewayId = manageMqttRpcFrom.GatewayId };
_rpcCore ??= App.GetService<RpcSingletonService>();
var result = await _rpcCore.InvokeDeviceMethodAsync("子网关RPC" + "-" + args.ClientId,
manageMqttRpcFrom.WriteInfos.Where(
a => !mqttRpcResult.Message.Any(b => b.Key == a.Key)).ToDictionary(a => a.Key, a => a.Value));
mqttRpcResult.Message.AddRange(result);
mqttRpcResult.Success = !mqttRpcResult.Message.Any(a => !a.Value.IsSuccess);
var variableMessage = new MqttApplicationMessageBuilder()
.WithTopic(GetRpcReturnTopic(args.ApplicationMessage.Topic))
.WithPayload(mqttRpcResult.ToJsonString()).Build();
if (_mqttClient.IsConnected)
await _mqttClient.PublishAsync(variableMessage);
}
}
#endregion
}

View File

@@ -1,4 +1,4 @@
#region copyright
#region copyright
//------------------------------------------------------------------------------
// 此代码版权声明为全文件覆盖,如有原作者特别声明,会在下方手动补充
// 此代码版权除特别声明外的代码归作者本人Diego所有
@@ -71,9 +71,9 @@ public class DeviceVariableRunTime : DeviceVariable
[DataTable(Order = 3, IsShow = true, Sortable = false, CellClass = " table-text-truncate ")]
public object Value { get => _value; private set => _value = value; }
/// <summary>
/// 最近一次值
/// 次值
/// </summary>
[Description("最近一次值")]
[Description("次值")]
[DataTable(Order = 3, IsShow = true, Sortable = false, CellClass = " table-text-truncate ")]
public object LastSetValue { get; private set; }
@@ -137,14 +137,17 @@ public class DeviceVariableRunTime : DeviceVariable
}
CollectTime = time;
{
if ((data?.ToString() != _value?.ToString() && LastSetValue?.ToString() != data?.ToString()) || isOnlineChanged)
if ((data?.ToString() != _value?.ToString() ) || isOnlineChanged)
{
ChangeTime = time;
LastSetValue = _value;
if (IsOnline)
{
_value = data;
}
LastSetValue = data;
VariableValueChange?.Invoke(this);
}
}

View File

@@ -95,6 +95,23 @@
"UpdateUser": "superAdmin",
"UpdateUserId": "212725263002001"
},
{
"Id": "200001904",
"Title": "管理网关",
"Icon": "mdi-database-sync-outline",
"Component": "/gatewayconfig/manage",
"Category": "MENU",
"ParentId": "200001",
"SortCode": "3",
"TargetType": "SELF",
"CreateTime": "2023-02-26 01:02:12.089",
"CreateUser": "superAdmin",
"CreateUserId": "212725263002001",
"IsDelete": false,
"UpdateTime": "2023-03-03 18:01:49.2309339",
"UpdateUser": "superAdmin",
"UpdateUserId": "212725263002001"
},
{
"Id": "200001004",
"Title": "运行状态",

View File

@@ -360,6 +360,8 @@ public class CollectDeviceService : DbRepository<CollectDevice>, ICollectDeviceS
var memoryStream = new MemoryStream();
await memoryStream.SaveAsAsync(sheets);
memoryStream.Seek(0, SeekOrigin.Begin);
return memoryStream;
}
@@ -367,10 +369,17 @@ public class CollectDeviceService : DbRepository<CollectDevice>, ICollectDeviceS
public async Task<Dictionary<string, ImportPreviewOutputBase>> PreviewAsync(IBrowserFile file)
{
_fileService.ImportVerification(file);
using var fs = new MemoryStream();
using var stream = file.OpenReadStream(512000000);
await stream.CopyToAsync(fs);
var sheetNames = MiniExcel.GetSheetNames(fs);
using var stream = new MemoryStream();
using var fs = file.OpenReadStream(512000000);
await fs.CopyToAsync(stream);
return await PreviewAsync(stream);
}
/// <inheritdoc/>
public Task<Dictionary<string, ImportPreviewOutputBase>> PreviewAsync(MemoryStream stream)
{
var sheetNames = MiniExcel.GetSheetNames(stream);
var deviceDicts = GetCacheList(false).ToDictionary(a => a.Name);
var pluginDicts = _driverPluginService.GetCacheList(false).ToDictionary(a => a.AssembleName);
@@ -381,7 +390,7 @@ public class CollectDeviceService : DbRepository<CollectDevice>, ICollectDeviceS
foreach (var sheetName in sheetNames)
{
//单页数据
var rows = fs.Query(useHeaderRow: true, sheetName: sheetName).Cast<IDictionary<string, object>>();
var rows = stream.Query(useHeaderRow: true, sheetName: sheetName).Cast<IDictionary<string, object>>();
#region sheet
if (sheetName == ExportHelpers.CollectDeviceSheetName)
{
@@ -496,8 +505,7 @@ public class CollectDeviceService : DbRepository<CollectDevice>, ICollectDeviceS
}
return ImportPreviews;
return Task.FromResult(ImportPreviews);
}
/// <inheritdoc/>

View File

@@ -63,6 +63,7 @@ public class BackendLogService : DbRepository<BackendLog>, IBackendLogService
var memoryStream = new MemoryStream();
await memoryStream.SaveAsAsync(sheets);
memoryStream.Seek(0, SeekOrigin.Begin);
return memoryStream;
}

View File

@@ -90,6 +90,7 @@ public class RpcLogService : DbRepository<RpcLog>, IRpcLogService
var memoryStream = new MemoryStream();
await memoryStream.SaveAsAsync(sheets);
memoryStream.Seek(0, SeekOrigin.Begin);
return memoryStream;
}

View File

@@ -284,6 +284,7 @@ public class UploadDeviceService : DbRepository<UploadDevice>, IUploadDeviceServ
var memoryStream = new MemoryStream();
await memoryStream.SaveAsAsync(sheets);
memoryStream.Seek(0, SeekOrigin.Begin);
return memoryStream;
}
@@ -305,15 +306,19 @@ public class UploadDeviceService : DbRepository<UploadDevice>, IUploadDeviceServ
CacheStatic.Cache.Remove(ThingsGatewayCacheConst.Cache_UploadDevice);//cache删除
}
/// <inheritdoc/>
public async Task<Dictionary<string, ImportPreviewOutputBase>> PreviewAsync(IBrowserFile file)
{
_fileService.ImportVerification(file);
using var fs = new MemoryStream();
using var stream = file.OpenReadStream(512000000);
await stream.CopyToAsync(fs);
var sheetNames = MiniExcel.GetSheetNames(fs);
using var stream = new MemoryStream();
using var fs = file.OpenReadStream(512000000);
await fs.CopyToAsync(stream);
return await PreviewAsync(stream);
}
/// <inheritdoc/>
public Task<Dictionary<string, ImportPreviewOutputBase>> PreviewAsync(MemoryStream stream)
{
var sheetNames = MiniExcel.GetSheetNames(stream);
var deviceDicts = GetCacheList(false).ToDictionary(a => a.Name);
var pluginDicts = _driverPluginService.GetCacheList(false).ToDictionary(a => a.AssembleName);
@@ -324,7 +329,7 @@ public class UploadDeviceService : DbRepository<UploadDevice>, IUploadDeviceServ
foreach (var sheetName in sheetNames)
{
//单页数据
var rows = (fs.Query(useHeaderRow: true, sheetName: sheetName)).Cast<IDictionary<string, object>>();
var rows = (stream.Query(useHeaderRow: true, sheetName: sheetName)).Cast<IDictionary<string, object>>();
#region sheet
if (sheetName == ExportHelpers.UploadDeviceSheetName)
{
@@ -434,7 +439,7 @@ public class UploadDeviceService : DbRepository<UploadDevice>, IUploadDeviceServ
return ImportPreviews;
return Task.FromResult(ImportPreviews);
}
#endregion
}

View File

@@ -313,6 +313,7 @@ public class VariableService : DbRepository<DeviceVariable>, IVariableService
var memoryStream = new MemoryStream();
await memoryStream.SaveAsAsync(sheets);
memoryStream.Seek(0, SeekOrigin.Begin);
return memoryStream;
}
@@ -406,6 +407,7 @@ public class VariableService : DbRepository<DeviceVariable>, IVariableService
var memoryStream = new MemoryStream();
await memoryStream.SaveAsAsync(sheets);
memoryStream.Seek(0, SeekOrigin.Begin);
return memoryStream;
}
@@ -571,15 +573,21 @@ public class VariableService : DbRepository<DeviceVariable>, IVariableService
return ImportPreviews;
}
/// <inheritdoc/>
public async Task<Dictionary<string, ImportPreviewOutputBase>> PreviewAsync(IBrowserFile file)
{
_fileService.ImportVerification(file);
using var fs = new MemoryStream();
using var stream = file.OpenReadStream(512000000);
await stream.CopyToAsync(fs);
var sheetNames = MiniExcel.GetSheetNames(fs);
using var stream = new MemoryStream();
using var fs = file.OpenReadStream(512000000);
await fs.CopyToAsync(stream);
return await PreviewAsync(stream);
}
/// <inheritdoc/>
public async Task<Dictionary<string, ImportPreviewOutputBase>> PreviewAsync(MemoryStream stream, List<CollectDevice> memCollectDevices = null, List<UploadDevice> memUploadDevices = null)
{
var sheetNames = MiniExcel.GetSheetNames(stream);
var dbVariables = await Context.Queryable<DeviceVariable>().Select(it => new { it.Id, it.Name }).ToListAsync();
//转为字典,提高查找效率
@@ -591,7 +599,7 @@ public class VariableService : DbRepository<DeviceVariable>, IVariableService
foreach (var sheetName in sheetNames)
{
//单页数据
var rows = fs.Query(useHeaderRow: true, sheetName: sheetName, configuration: new OpenXmlConfiguration { EnableSharedStringCache = false })
var rows = stream.Query(useHeaderRow: true, sheetName: sheetName, configuration: new OpenXmlConfiguration { EnableSharedStringCache = false })
.Cast<IDictionary<string, object>>();
if (sheetName == ExportHelpers.DeviceVariableSheetName)
@@ -600,7 +608,10 @@ public class VariableService : DbRepository<DeviceVariable>, IVariableService
ImportPreviewOutput<DeviceVariable> importPreviewOutput = new();
ImportPreviews.Add(sheetName, importPreviewOutput);
deviceImportPreview = importPreviewOutput;
var cacheDeviceDicts = _collectDeviceService.GetCacheList(false).ToDictionary(a => a.Name);
var cacheDeviceDicts = memCollectDevices == null ? _collectDeviceService.GetCacheList(false).ToDictionary(a => a.Name) :
_collectDeviceService.GetCacheList(false).Concat(memCollectDevices).ToDictionary(a => a.Name)
;
//线程安全
var variables = new ConcurrentList<DeviceVariable>();
//并行注意线程安全
@@ -679,7 +690,9 @@ public class VariableService : DbRepository<DeviceVariable>, IVariableService
.Where(a => a.GetCustomAttribute<VariablePropertyAttribute>() != null)
.ToDictionary(a => a.FindDisplayAttribute(a => a.GetCustomAttribute<VariablePropertyAttribute>()?.Description));
var cacheUpdeviceDicts = _uploadDeviceService.GetCacheList(false).ToDictionary(a => a.Name);
var cacheUpdeviceDicts = memUploadDevices == null ? _uploadDeviceService.GetCacheList(false).ToDictionary(a => a.Name) :
_uploadDeviceService.GetCacheList(false).Concat(memUploadDevices).ToDictionary(a => a.Name)
;
rows.ParallelForEach(item =>
{
try

View File

@@ -1250,50 +1250,50 @@
</summary>
<returns></returns>
</member>
<member name="T:ThingsGateway.Application.TGAPPInfo">
<member name="T:ThingsGateway.Application.APPInfo">
<inheritdoc/>
</member>
<member name="P:ThingsGateway.Application.TGAPPInfo.Environment">
<member name="P:ThingsGateway.Application.APPInfo.Environment">
<summary>
主机环境
</summary>
</member>
<member name="P:ThingsGateway.Application.TGAPPInfo.FrameworkDescription">
<member name="P:ThingsGateway.Application.APPInfo.FrameworkDescription">
<summary>
NET框架
</summary>
</member>
<member name="P:ThingsGateway.Application.TGAPPInfo.HostName">
<member name="P:ThingsGateway.Application.APPInfo.HostName">
<summary>
主机名称
</summary>
</member>
<member name="P:ThingsGateway.Application.TGAPPInfo.OsArchitecture">
<member name="P:ThingsGateway.Application.APPInfo.OsArchitecture">
<summary>
系统架构
</summary>
</member>
<member name="P:ThingsGateway.Application.TGAPPInfo.RemoteIp">
<member name="P:ThingsGateway.Application.APPInfo.RemoteIp">
<summary>
外网地址
</summary>
</member>
<member name="P:ThingsGateway.Application.TGAPPInfo.Stage">
<member name="P:ThingsGateway.Application.APPInfo.Stage">
<summary>
Stage环境
</summary>
</member>
<member name="P:ThingsGateway.Application.TGAPPInfo.SystemOs">
<member name="P:ThingsGateway.Application.APPInfo.SystemOs">
<summary>
操作系统
</summary>
</member>
<member name="P:ThingsGateway.Application.TGAPPInfo.UpdateTime">
<member name="P:ThingsGateway.Application.APPInfo.UpdateTime">
<summary>
更新时间
</summary>
</member>
<member name="P:ThingsGateway.Application.TGAPPInfo.DriveInfo">
<member name="P:ThingsGateway.Application.APPInfo.DriveInfo">
<summary>
当前磁盘信息
</summary>
@@ -1427,64 +1427,89 @@
Password
</summary>
</member>
<member name="P:ThingsGateway.Application.ManageGatewayConfig.WriteRpcTopic">
<summary>
WriteRpcTopicRpc返回为{WriteRpcTopic}/Return只有这个topic才开放外部订阅权限
</summary>
</member>
<member name="P:ThingsGateway.Application.ManageGatewayConfig.DBDownTopic">
<summary>
DBDownTopicRpc返回为{DBDownTopic}/Return
DBDownTopic
</summary>
</member>
<member name="P:ThingsGateway.Application.ManageGatewayConfig.DBUploadTopic">
<summary>
DBUploadTopicRpc返回为{DBUploadTopic}/Return
DBUploadTopic
</summary>
</member>
<member name="P:ThingsGateway.Application.ManageGatewayConfig.WriteRpcTopic">
<member name="T:ThingsGateway.Application.ClientGatewayConfig">
<summary>
WriteRpcTopicRpc返回为{WriteRpcTopic}/Return
ClientGatewayConfig
</summary>
</member>
<member name="T:ThingsGateway.Application.MqttDB">
<summary>
用于Mqtt Json传输上传/下载配置信息
</summary>
</member>
<member name="P:ThingsGateway.Application.MqttDB.GatewayId">
<member name="P:ThingsGateway.Application.ClientGatewayConfig.GatewayId">
<summary>
标识
</summary>
</member>
<member name="P:ThingsGateway.Application.MqttDB.CollectDevices">
<member name="T:ThingsGateway.Application.MqttDBUploadRpcResult">
<summary>
用于Mqtt Json传输上传/下载配置信息
</summary>
</member>
<member name="P:ThingsGateway.Application.MqttDBUploadRpcResult.CollectDevices">
<summary>
采集设备
</summary>
</member>
<member name="P:ThingsGateway.Application.MqttDB.IsCollectDevicesFullUp">
<summary>
true=>删除全部后增加
</summary>
</member>
<member name="P:ThingsGateway.Application.MqttDB.UploadDevices">
<member name="P:ThingsGateway.Application.MqttDBUploadRpcResult.UploadDevices">
<summary>
上传设备
</summary>
</member>
<member name="P:ThingsGateway.Application.MqttDB.IsUploadDevicesFullUp">
<summary>
true=>删除全部后增加
</summary>
</member>
<member name="P:ThingsGateway.Application.MqttDB.DeviceVariables">
<member name="P:ThingsGateway.Application.MqttDBUploadRpcResult.DeviceVariables">
<summary>
变量
</summary>
</member>
<member name="P:ThingsGateway.Application.MqttDB.IsDeviceVariablesFullUp">
<member name="T:ThingsGateway.Application.MqttDBDownRpc">
<summary>
用于Mqtt Json传输上传/下载配置信息
</summary>
</member>
<member name="P:ThingsGateway.Application.MqttDBDownRpc.CollectDevices">
<summary>
采集设备
</summary>
</member>
<member name="P:ThingsGateway.Application.MqttDBDownRpc.UploadDevices">
<summary>
上传设备
</summary>
</member>
<member name="P:ThingsGateway.Application.MqttDBDownRpc.DeviceVariables">
<summary>
变量
</summary>
</member>
<member name="P:ThingsGateway.Application.MqttDBDownRpc.IsCollectDevicesFullUp">
<summary>
true=>删除全部后增加
</summary>
</member>
<member name="P:ThingsGateway.Application.MqttDB.SysConfigs">
<member name="P:ThingsGateway.Application.MqttDBDownRpc.IsUploadDevicesFullUp">
<summary>
配置项
true=>删除全部后增加
</summary>
</member>
<member name="P:ThingsGateway.Application.MqttDBDownRpc.IsDeviceVariablesFullUp">
<summary>
true=>删除全部后增加
</summary>
</member>
<member name="P:ThingsGateway.Application.MqttDBDownRpc.IsRestart">
<summary>
是否立即重启,使配置生效
</summary>
</member>
<member name="T:ThingsGateway.Application.ManageMqttRpcFrom">
@@ -1534,23 +1559,23 @@
</member>
<member name="T:ThingsGateway.Application.ManageGatewayWorker">
<summary>
设备采集报警后台服务
ManageGatewayWorker
</summary>
</member>
<member name="F:ThingsGateway.Application.ManageGatewayWorker.restartLock">
<summary>
全部重启锁
</summary>
</member>
<member name="M:ThingsGateway.Application.ManageGatewayWorker.#ctor(Microsoft.Extensions.Logging.ILoggerFactory)">
<inheritdoc cref="T:ThingsGateway.Application.ManageGatewayWorker"/>
</member>
<member name="P:ThingsGateway.Application.ManageGatewayWorker.RealAlarmStatuString">
<member name="P:ThingsGateway.Application.ManageGatewayWorker.ClientStatuString">
<summary>
服务状态
</summary>
</member>
<member name="P:ThingsGateway.Application.ManageGatewayWorker.HisAlarmStatuString">
<summary>
服务状态
</summary>
</member>
<member name="P:ThingsGateway.Application.ManageGatewayWorker.ReadAlarmStatuString">
<member name="P:ThingsGateway.Application.ManageGatewayWorker.ManageStatuString">
<summary>
服务状态
</summary>
@@ -1564,10 +1589,11 @@
<member name="M:ThingsGateway.Application.ManageGatewayWorker.ExecuteAsync(System.Threading.CancellationToken)">
<inheritdoc/>
</member>
<member name="F:ThingsGateway.Application.ManageGatewayWorker.restartLock">
<member name="M:ThingsGateway.Application.ManageGatewayWorker.GetClientGatewayDBAsync(System.String,System.Int32,System.Threading.CancellationToken)">
<summary>
全部重启锁
获取子网关的配置信息
</summary>
<returns></returns>
</member>
<member name="M:ThingsGateway.Application.ManageGatewayWorker.RestartAsync">
<summary>
@@ -1575,11 +1601,45 @@
</summary>
<returns></returns>
</member>
<member name="M:ThingsGateway.Application.ManageGatewayWorker.SetClientGatewayDBAsync(System.String,ThingsGateway.Application.MqttDBDownRpc,System.Int32,System.Threading.CancellationToken)">
<summary>
下载配置信息到子网关
</summary>
<returns></returns>
</member>
<member name="M:ThingsGateway.Application.ManageGatewayWorker.WriteVariableAsync(ThingsGateway.Application.ManageMqttRpcFrom,System.Int32,System.Threading.CancellationToken)">
<summary>
写入变量到子网关
</summary>
<returns></returns>
</member>
<member name="M:ThingsGateway.Application.ManageGatewayWorker.GetClientGatewayAsync">
<summary>
获取子网关列表
</summary>
<returns></returns>
</member>
<member name="M:ThingsGateway.Application.ManageGatewayWorker.RpcDataExecuteAsync(System.String,System.String,System.Byte[],System.Int32,MQTTnet.Protocol.MqttQualityOfServiceLevel,System.Threading.CancellationToken)">
<summary>
RPC请求子网关并返回需要传入子网关ID作为Topic参数一部分
</summary>
<returns></returns>
</member>
<member name="M:ThingsGateway.Application.ManageGatewayWorker.InitAsync">
<summary>
初始化
</summary>
</member>
<member name="F:ThingsGateway.Application.ManageGatewayWorker.ClientGatewayConfig">
<summary>
ClientGatewayConfig
</summary>
</member>
<member name="F:ThingsGateway.Application.ManageGatewayWorker.ManageGatewayConfig">
<summary>
ManageGatewayConfig
</summary>
</member>
<member name="T:ThingsGateway.Application.CollectDeviceRunTime">
<summary>
采集设备状态表示
@@ -1781,7 +1841,7 @@
</member>
<member name="P:ThingsGateway.Application.DeviceVariableRunTime.LastSetValue">
<summary>
最近一次值
次值
</summary>
</member>
<member name="M:ThingsGateway.Application.DeviceVariableRunTime.SetValue(System.Object,System.DateTime,System.Boolean)">
@@ -2545,6 +2605,9 @@
<member name="M:ThingsGateway.Application.CollectDeviceService.PreviewAsync(Microsoft.AspNetCore.Components.Forms.IBrowserFile)">
<inheritdoc/>
</member>
<member name="M:ThingsGateway.Application.CollectDeviceService.PreviewAsync(System.IO.MemoryStream)">
<inheritdoc/>
</member>
<member name="M:ThingsGateway.Application.CollectDeviceService.ImportAsync(System.Collections.Generic.Dictionary{System.String,ThingsGateway.Application.ImportPreviewOutputBase})">
<inheritdoc/>
</member>
@@ -3242,6 +3305,9 @@
<member name="M:ThingsGateway.Application.UploadDeviceService.PreviewAsync(Microsoft.AspNetCore.Components.Forms.IBrowserFile)">
<inheritdoc/>
</member>
<member name="M:ThingsGateway.Application.UploadDeviceService.PreviewAsync(System.IO.MemoryStream)">
<inheritdoc/>
</member>
<member name="T:ThingsGateway.Application.VariableAddInput">
<summary>
添加变量DTO
@@ -3498,6 +3564,9 @@
<member name="M:ThingsGateway.Application.VariableService.PreviewAsync(Microsoft.AspNetCore.Components.Forms.IBrowserFile)">
<inheritdoc/>
</member>
<member name="M:ThingsGateway.Application.VariableService.PreviewAsync(System.IO.MemoryStream,System.Collections.Generic.List{ThingsGateway.Application.CollectDevice},System.Collections.Generic.List{ThingsGateway.Application.UploadDevice})">
<inheritdoc/>
</member>
<member name="M:ThingsGateway.Application.VariableService.ImportAsync(System.Collections.Generic.Dictionary{System.String,ThingsGateway.Application.ImportPreviewOutputBase})">
<inheritdoc/>
</member>

View File

@@ -10,6 +10,7 @@
//------------------------------------------------------------------------------
#endregion
using Furion;
using Furion.Logging.Extensions;
using Mapster;
@@ -73,7 +74,7 @@ public class AlarmWorker : BackgroundService
/// <returns></returns>
public async Task<OperResult<SqlSugarClient>> GetAlarmDbAsync()
{
var ConfigService = ServiceHelper.Services.GetService<IConfigService>();
var ConfigService = App.GetService<IConfigService>();
var alarmEnable = (await ConfigService.GetByConfigKeyAsync(ThingsGatewayConfigConst.ThingGateway_AlarmConfig_Base, ThingsGatewayConfigConst.Config_Alarm_Enable))?.ConfigValue?.ToBoolean();
var alarmDbType = (await ConfigService.GetByConfigKeyAsync(ThingsGatewayConfigConst.ThingGateway_AlarmConfig_Base, ThingsGatewayConfigConst.Config_Alarm_DbType))?.ConfigValue;
var alarmConnstr = (await ConfigService.GetByConfigKeyAsync(ThingsGatewayConfigConst.ThingGateway_AlarmConfig_Base, ThingsGatewayConfigConst.Config_Alarm_ConnStr))?.ConfigValue;

View File

@@ -10,6 +10,7 @@
//------------------------------------------------------------------------------
#endregion
using Furion;
using Furion.FriendlyException;
using Furion.Logging.Extensions;
@@ -79,7 +80,7 @@ public class CollectDeviceCore
{
_pluginService = ServiceHelper.Services.GetService<PluginSingletonService>();
GlobalDeviceData = ServiceHelper.Services.GetService<GlobalDeviceData>();
DriverPluginService = ServiceHelper.Services.GetService<IDriverPluginService>();
DriverPluginService = App.GetService<IDriverPluginService>();
}
/// <summary>

View File

@@ -10,6 +10,7 @@
//------------------------------------------------------------------------------
#endregion
using Furion;
using Furion.FriendlyException;
using Furion.Logging.Extensions;
@@ -38,7 +39,7 @@ public class CollectDeviceWorker : BackgroundService
ServiceHelper.Services = serviceProvider;
_logger = logger;
_pluginService = ServiceHelper.Services.GetService<PluginSingletonService>();
_collectDeviceService = ServiceHelper.Services.GetService<ICollectDeviceService>();
_collectDeviceService = App.GetService<ICollectDeviceService>();
}
/// <summary>
/// 读取未停止的采集设备List
@@ -414,7 +415,7 @@ public class CollectDeviceWorker : BackgroundService
/// <returns></returns>
public Type GetDebugUI(long driverId)
{
var driverPluginService = ServiceHelper.Services.GetService<IDriverPluginService>();
var driverPluginService = App.GetService<IDriverPluginService>();
var driverPlugin = driverPluginService.GetDriverPluginById(driverId);
var driver = _pluginService.GetDriver(driverPlugin);
driver?.SafeDispose();
@@ -428,7 +429,7 @@ public class CollectDeviceWorker : BackgroundService
/// <returns></returns>
public List<string> GetDeviceMethods(long devId)
{
var driverPluginService = ServiceHelper.Services.GetService<IDriverPluginService>();
var driverPluginService = App.GetService<IDriverPluginService>();
var driverId = _collectDeviceService.GetDeviceById(devId).PluginId;
var driverPlugin = driverPluginService.GetDriverPluginById(driverId);
var driver = (CollectBase)_pluginService.GetDriver(driverPlugin);
@@ -445,14 +446,14 @@ public class CollectDeviceWorker : BackgroundService
/// <returns></returns>
public List<DependencyProperty> GetDevicePropertys(long driverId, long devId = 0)
{
var driverPluginService = ServiceHelper.Services.GetService<IDriverPluginService>();
var driverPluginService = App.GetService<IDriverPluginService>();
var driverPlugin = driverPluginService.GetDriverPluginById(driverId);
var driver = _pluginService.GetDriver(driverPlugin);
var Propertys = _pluginService.GetDriverProperties(driver);
if (devId != 0)
{
var devcore = CollectDeviceCores.FirstOrDefault(it => it.Device.Id == devId);
devcore?.Device?.DevicePropertys?.ForEach(it =>
var devcore = App.GetService<CollectDeviceService>().GetDeviceById(devId);
devcore?.DevicePropertys?.ForEach(it =>
{
var dependencyProperty = Propertys.FirstOrDefault(a => a.PropertyName == it.PropertyName);
if (dependencyProperty != null)

View File

@@ -10,6 +10,7 @@
//------------------------------------------------------------------------------
#endregion
using Furion;
using Furion.Logging.Extensions;
using Mapster;
@@ -59,7 +60,7 @@ public class HistoryValueWorker : BackgroundService
/// <returns></returns>
public async Task<OperResult<SqlSugarClient>> GetHisDbAsync()
{
var ConfigService = ServiceHelper.Services.GetService<IConfigService>();
var ConfigService = App.GetService<IConfigService>();
var hisEnable = (await ConfigService.GetByConfigKeyAsync(ThingsGatewayConfigConst.ThingGateway_HisConfig_Base, ThingsGatewayConfigConst.Config_His_Enable))?.ConfigValue?.ToBoolean();
var hisDbType = (await ConfigService.GetByConfigKeyAsync(ThingsGatewayConfigConst.ThingGateway_HisConfig_Base, ThingsGatewayConfigConst.Config_His_DbType))?.ConfigValue;
var hisConnstr = (await ConfigService.GetByConfigKeyAsync(ThingsGatewayConfigConst.ThingGateway_HisConfig_Base, ThingsGatewayConfigConst.Config_His_ConnStr))?.ConfigValue;

View File

@@ -10,6 +10,7 @@
//------------------------------------------------------------------------------
#endregion
using Furion;
using Furion.Logging.Extensions;
using Microsoft.Extensions.DependencyInjection;
@@ -100,7 +101,7 @@ public class MemoryVariableWorker : BackgroundService
_logger?.LogInformation($"中间变量计算线程开始");
try
{
var variableService = ServiceHelper.Services.GetService<IVariableService>();
var variableService = App.GetService<IVariableService>();
var data = await variableService.GetMemoryVariableRuntimeAsync();
_globalDeviceData.MemoryVariables = new(data);
StatuString = OperResult.CreateSuccessResult();

View File

@@ -10,6 +10,7 @@
//------------------------------------------------------------------------------
#endregion
using Furion;
using Furion.FriendlyException;
using Furion.Logging.Extensions;
@@ -57,7 +58,7 @@ public class UploadDeviceCore
public UploadDeviceCore()
{
_pluginService = ServiceHelper.Services.GetService<PluginSingletonService>();
DriverPluginService = ServiceHelper.Services.GetService<IDriverPluginService>();
DriverPluginService = App.GetService<IDriverPluginService>();
}
/// <summary>

View File

@@ -10,6 +10,7 @@
//------------------------------------------------------------------------------
#endregion
using Furion;
using Furion.FriendlyException;
using Furion.Logging.Extensions;
@@ -41,7 +42,7 @@ public class UploadDeviceWorker : BackgroundService
_logger = logger;
_pluginService = ServiceHelper.Services.GetService<PluginSingletonService>();
_uploadDeviceService = ServiceHelper.Services.GetService<IUploadDeviceService>();
_uploadDeviceService = App.GetService<IUploadDeviceService>();
}
/// <summary>
/// 上传设备List
@@ -294,15 +295,15 @@ public class UploadDeviceWorker : BackgroundService
/// <returns></returns>
public List<DependencyProperty> GetDevicePropertys(long driverId, long devId = 0)
{
var driverPluginService = ServiceHelper.Services.GetService<IDriverPluginService>();
var driverPluginService = App.GetService<IDriverPluginService>();
var driverPlugin = driverPluginService.GetDriverPluginById(driverId);
var driver = _pluginService.GetDriver(driverPlugin);
var Propertys = _pluginService.GetDriverProperties(driver);
if (devId != 0)
{
var devcore = UploadDeviceCores.FirstOrDefault(it => it.Device.Id == devId);
devcore?.Device?.DevicePropertys?.ForEach(it =>
var devcore = App.GetService<UploadDeviceService>().GetDeviceById(devId);
devcore?.DevicePropertys?.ForEach(it =>
{
var dependencyProperty = Propertys.FirstOrDefault(a => a.PropertyName == it.PropertyName);
if (dependencyProperty != null)
@@ -325,7 +326,7 @@ public class UploadDeviceWorker : BackgroundService
/// <returns></returns>
public List<DependencyProperty> GetVariablePropertys(long driverId, List<DependencyProperty> dependencyProperties = null)
{
var driverPluginService = ServiceHelper.Services.GetService<IDriverPluginService>();
var driverPluginService = App.GetService<IDriverPluginService>();
var driverPlugin = driverPluginService.GetDriverPluginById(driverId);
var driver = (UpLoadBase)_pluginService.GetDriver(driverPlugin);
var Propertys = _pluginService.GetDriverVariableProperties(driver);

View File

@@ -170,7 +170,7 @@ public abstract class DriverDebugUIBase : ComponentBase, IDisposable
isDownExport = true;
StateHasChanged();
using var memoryStream = new MemoryStream();
StreamWriter writer = new(memoryStream);
using StreamWriter writer = new(memoryStream);
foreach (var item in values)
{
writer.WriteLine(item);
@@ -198,7 +198,6 @@ public abstract class DriverDebugUIBase : ComponentBase, IDisposable
isDownExport = true;
StateHasChanged();
using var memoryStream = await App.GetService<CollectDeviceService>().ExportFileAsync(new List<CollectDevice>() { data });
memoryStream.Seek(0, SeekOrigin.Begin);
using var streamRef = new DotNetStreamReference(stream: memoryStream);
_helper ??= await JS.InvokeAsync<IJSObjectReference>("import", $"/_content/ThingsGateway.Admin.Blazor.Core/js/downloadFileFromStream.js");
await _helper.InvokeVoidAsync("downloadFileFromStream", $"设备导出{SysDateTimeExtensions.CurrentDateTime.ToFileDateTimeFormat()}.xlsx", streamRef);
@@ -220,7 +219,6 @@ public abstract class DriverDebugUIBase : ComponentBase, IDisposable
isDownExport = true;
StateHasChanged();
using var memoryStream = await App.GetService<VariableService>().ExportFileAsync(data, devName);
memoryStream.Seek(0, SeekOrigin.Begin);
using var streamRef = new DotNetStreamReference(stream: memoryStream);
_helper ??= await JS.InvokeAsync<IJSObjectReference>("import", $"/_content/ThingsGateway.Admin.Blazor.Core/js/downloadFileFromStream.js");
await _helper.InvokeVoidAsync("downloadFileFromStream", $"变量导出{SysDateTimeExtensions.CurrentDateTime.ToFileDateTimeFormat()}.xlsx", streamRef);

View File

@@ -155,7 +155,7 @@ public partial class DeviceStatusPage : IDisposable
isDownExport = true;
StateHasChanged();
using var memoryStream = new MemoryStream();
StreamWriter writer = new(memoryStream);
using StreamWriter writer = new(memoryStream);
foreach (var item in values)
{
writer.WriteLine(item);

View File

@@ -0,0 +1,198 @@
@*
//------------------------------------------------------------------------------
// 此代码版权声明为全文件覆盖,如有原作者特别声明,会在下方手动补充
// 此代码版权除特别声明外的代码归作者本人Diego所有
// 源代码使用协议遵循本仓库的开源协议及附加协议
// Gitee源代码仓库https://gitee.com/diego2098/ThingsGateway
// Github源代码仓库https://github.com/kimdiego2098/ThingsGateway
// 使用文档https://diego2098.gitee.io/thingsgateway-docs/
// QQ群605534569
//------------------------------------------------------------------------------
*@
@page "/gatewayconfig/manage"
@namespace ThingsGateway.Blazor
@using System.Linq.Expressions;
@using BlazorComponent;
@using MQTTnet.Server;
@using Mapster;
@using Masa.Blazor.Presets;
@using System.IO;
@using Masa.Blazor;
@using Microsoft.AspNetCore.Authorization;
@using ThingsGateway.Admin.Blazor.Core;
@using ThingsGateway.Admin.Blazor;
@using ThingsGateway.Admin.Core;
@using ThingsGateway.Application;
@using TouchSocket.Core;
@attribute [Authorize]
@inject MasaBlazor MasaBlazor
@inherits BaseComponentBase
@inject UserResoures UserResoures
@inject NavigationManager NavigationManager
@layout MainLayout
<MRow NoGutters>
<MExpansionPanels @bind-Values="Panel"
Multiple>
<MExpansionPanel Value="1">
<MExpansionPanelHeader>
@($"子网关服务信息-{ManageGatewayWorker.ClientStatuString.Message}")
</MExpansionPanelHeader>
<MExpansionPanelContent>
@{
var config = ManageGatewayWorker.ClientGatewayConfig;
}
@if (config != null)
{
<MDescriptions Title="子网关服务配置信息" Bordered="true">
<MDescriptionsItem Label=@config.Description(a=>a.Enable)>@config.Enable</MDescriptionsItem>
<MDescriptionsItem Label=@config.Description(a=>a.GatewayId)>@config.GatewayId</MDescriptionsItem>
<MDescriptionsItem Label=@config.Description(a=>a.MqttBrokerIP)>@config.MqttBrokerIP</MDescriptionsItem>
<MDescriptionsItem Label=@config.Description(a=>a.MqttBrokerPort)>@config.MqttBrokerPort</MDescriptionsItem>
<MDescriptionsItem Label=@config.Description(a=>a.UserName)>@config.UserName</MDescriptionsItem>
<MDescriptionsItem Label=@config.Description(a=>a.Password)>@config.Password</MDescriptionsItem>
<MDescriptionsItem Label=@config.Description(a=>a.DBDownTopic)>@config.DBDownTopic</MDescriptionsItem>
<MDescriptionsItem Label=@config.Description(a=>a.DBUploadTopic)>@config.DBUploadTopic</MDescriptionsItem>
<MDescriptionsItem Label=@config.Description(a=>a.WriteRpcTopic)>@config.WriteRpcTopic</MDescriptionsItem>
</MDescriptions>
}
</MExpansionPanelContent>
</MExpansionPanel>
<MExpansionPanel Value="2">
<MExpansionPanelHeader>
@($"管理服务信息-{ManageGatewayWorker.ManageStatuString.Message}")
</MExpansionPanelHeader>
<MExpansionPanelContent>
@{
var config = ManageGatewayWorker.ManageGatewayConfig;
}
@if (config != null)
{
<MDescriptions Title="管理服务配置信息" Bordered="true">
<MDescriptionsItem Label=@config.Description(a=>a.Enable)>@config.Enable</MDescriptionsItem>
<MDescriptionsItem Label=@config.Description(a=>a.MqttBrokerIP)>@config.MqttBrokerIP</MDescriptionsItem>
<MDescriptionsItem Label=@config.Description(a=>a.MqttBrokerPort)>@config.MqttBrokerPort</MDescriptionsItem>
<MDescriptionsItem Label=@config.Description(a=>a.UserName)>@config.UserName</MDescriptionsItem>
<MDescriptionsItem Label=@config.Description(a=>a.Password)>@config.Password</MDescriptionsItem>
<MDescriptionsItem Label=@config.Description(a=>a.DBDownTopic)>@config.DBDownTopic</MDescriptionsItem>
<MDescriptionsItem Label=@config.Description(a=>a.DBUploadTopic)>@config.DBUploadTopic</MDescriptionsItem>
<MDescriptionsItem Label=@config.Description(a=>a.WriteRpcTopic)>@config.WriteRpcTopic</MDescriptionsItem>
</MDescriptions>
}
<MCard Flat Class="ma-0" Style="min-height:1000px">
<div class="m-descriptions-header__title my-2">
当前服务下的子网关
</div>
<MRow NoGutters>
<MCol Md=3>
<MTreeview Dense TItem="MqttClientStatus"
TKey="MqttClientStatus" OpenOnClick @bind-Active=CurClients
Items="MqttClientStatuses" ItemText=@(r=>r.Id) ItemChildren="r=> null"
Activatable ItemKey=@(r=>r)>
<LabelContent>
<span title=@context.Item.Id>
@(context.Item.Id + "-" + context.Item.Endpoint)
</span>
</LabelContent>
</MTreeview>
</MCol>
<MCol Md=9>
@if (CurClients != null && CurClients.Count > 0)
{
var CurClient = CurClients.FirstOrDefault();
<MCard Flat Class="ml-4">
<MDescriptions Title="当前选择的子网关" Bordered="true">
<MDescriptionsItem Label=@CurClient.Description(a=>a.Id)>@CurClient.Id</MDescriptionsItem>
<MDescriptionsItem Label=@CurClient.Description(a=>a.Endpoint)>@CurClient.Endpoint</MDescriptionsItem>
</MDescriptions>
<MDivider></MDivider>
<MRow>
<MCol Cols="12" Md="12">
<div class="m-descriptions-header__title my-2">
导出子网关配置信息
</div>
</MCol>
<MCol Cols="12" Md="12">
<MButton Loading=isDownExport Disabled=@(!UserResoures.IsHasButtonWithRole("gatewaydevicepause")) Class="ma-2"
OnClick=@(()=>DBUpload(CurClient))>
导出
</MButton>
</MCol>
</MRow>
<MDivider></MDivider>
<MRow>
<MCol Cols="12" Md="12">
<div class="m-descriptions-header__title my-2">
下发子网关配置信息
</div>
</MCol>
<MCol Cols="12" Md="12">
<MFileInput Label="采集设备Excel" @bind-Value="_importCollectDevicesFile" Style="width:60%;" ShowSize></MFileInput>
<MSwitch Label=@typeof(MqttDBDownRpc).GetDescription(nameof(MqttDBDownRpc.IsCollectDevicesFullUp)) @bind-Value=@IsCollectDevicesFullUp />
</MCol>
<MCol Cols="12" Md="12">
<MFileInput Label="上传设备Excel" @bind-Value="_importUploadDevicesFile" Style="width:60%;" ShowSize></MFileInput>
<MSwitch Label=@typeof(MqttDBDownRpc).GetDescription(nameof(MqttDBDownRpc.IsUploadDevicesFullUp)) @bind-Value=@IsUploadDevicesFullUp />
</MCol>
<MCol Cols="12" Md="12">
<MFileInput Label="变量Excel" @bind-Value="_importDeviceVariablesFile" Style="width:60%;" ShowSize></MFileInput>
<MSwitch Label=@typeof(MqttDBDownRpc).GetDescription(nameof(MqttDBDownRpc.IsDeviceVariablesFullUp)) @bind-Value=@IsDeviceVariablesFullUp />
</MCol>
<MCol Cols="12" Md="12">
<MSwitch Label=@typeof(MqttDBDownRpc).GetDescription(nameof(MqttDBDownRpc.IsRestart)) @bind-Value=@IsRestart />
</MCol>
<MCol Cols="12" Md="12">
<MButton Loading=isDownExport Disabled=@(!UserResoures.IsHasButtonWithRole("gatewaydevicepause")) Class="ma-2"
OnClick=@(()=>DBDown(CurClient))>
下发
</MButton>
</MCol>
</MRow>
<MDivider></MDivider>
</MCard>
}
</MCol>
</MRow>
</MCard>
</MExpansionPanelContent>
</MExpansionPanel>
</MExpansionPanels>
</MRow>

View File

@@ -0,0 +1,209 @@
#region copyright
//------------------------------------------------------------------------------
// 此代码版权声明为全文件覆盖,如有原作者特别声明,会在下方手动补充
// 此代码版权除特别声明外的代码归作者本人Diego所有
// 源代码使用协议遵循本仓库的开源协议及附加协议
// Gitee源代码仓库https://gitee.com/diego2098/ThingsGateway
// Github源代码仓库https://github.com/kimdiego2098/ThingsGateway
// 使用文档https://diego2098.gitee.io/thingsgateway-docs/
// QQ群605534569
//------------------------------------------------------------------------------
#endregion
using BlazorComponent;
using Furion;
using Masa.Blazor;
using Microsoft.AspNetCore.Components;
using Microsoft.AspNetCore.Components.Forms;
using Microsoft.JSInterop;
using MQTTnet.Server;
using System.IO;
using System.Threading;
using ThingsGateway.Admin.Core;
using ThingsGateway.Application;
namespace ThingsGateway.Blazor;
/// <summary>
/// ManageGatewayPage
/// </summary>
public partial class ManageGatewayPage
{
List<StringNumber> Panel { get; set; } = new();
readonly PeriodicTimer _periodicTimer = new(TimeSpan.FromSeconds(5));
ManageGatewayWorker ManageGatewayWorker { get; set; }
/// <summary>
/// <inheritdoc/>
/// </summary>
protected override void OnInitialized()
{
ManageGatewayWorker = ServiceHelper.GetBackgroundService<ManageGatewayWorker>();
Panel.Add("2");
_ = RunTimerAsync();
base.OnInitialized();
}
private async Task RunTimerAsync()
{
await RefreshAsync();
while (await _periodicTimer.WaitForNextTickAsync())
{
try
{
await RefreshAsync();
await InvokeAsync(StateHasChanged);
}
catch
{
}
}
}
List<MqttClientStatus> CurClients { get; set; }
List<MqttClientStatus> MqttClientStatuses { get; set; } = new();
private async Task RefreshAsync()
{
MqttClientStatuses = await ManageGatewayWorker.GetClientGatewayAsync();
}
/// <inheritdoc/>
public override void Dispose()
{
_periodicTimer?.Dispose();
base.Dispose();
}
IJSObjectReference _helper;
[Inject]
IJSRuntime JS { get; set; }
private bool isDownExport;
private bool IsCollectDevicesFullUp;
private bool IsUploadDevicesFullUp;
private bool IsDeviceVariablesFullUp;
private bool IsRestart;
IBrowserFile _importCollectDevicesFile;
IBrowserFile _importUploadDevicesFile;
IBrowserFile _importDeviceVariablesFile;
/// <summary>
/// 获取子网关配置导出excel
/// </summary>
/// <param name="mqttClientStatus"></param>
/// <returns></returns>
private async Task DBUpload(MqttClientStatus mqttClientStatus)
{
var data = await ManageGatewayWorker.GetClientGatewayDBAsync(mqttClientStatus.Id);
if (data.IsSuccess)
{
isDownExport = true;
await InvokeAsync(StateHasChanged);
if (data.Content.CollectDevices.Count > 0)
{
using var devices = await App.GetService<CollectDeviceService>().ExportFileAsync(data.Content.CollectDevices);
using var streamRef = new DotNetStreamReference(stream: devices);
_helper ??= await JS.InvokeAsync<IJSObjectReference>("import", $"/_content/ThingsGateway.Admin.Blazor.Core/js/downloadFileFromStream.js");
await _helper.InvokeVoidAsync("downloadFileFromStream", $"子网关{mqttClientStatus.Id}采集设备导出{SysDateTimeExtensions.CurrentDateTime.ToFileDateTimeFormat()}.xlsx", streamRef);
}
else
{
await PopupService.EnqueueSnackbarAsync("无采集设备", AlertTypes.None);
}
if (data.Content.UploadDevices.Count > 0)
{
using var devices = await App.GetService<UploadDeviceService>().ExportFileAsync(data.Content.UploadDevices);
using var streamRef = new DotNetStreamReference(stream: devices);
_helper ??= await JS.InvokeAsync<IJSObjectReference>("import", $"/_content/ThingsGateway.Admin.Blazor.Core/js/downloadFileFromStream.js");
await _helper.InvokeVoidAsync("downloadFileFromStream", $"子网关{mqttClientStatus.Id}上传设备导出{SysDateTimeExtensions.CurrentDateTime.ToFileDateTimeFormat()}.xlsx", streamRef);
}
else
{
await PopupService.EnqueueSnackbarAsync("无上传设备", AlertTypes.None);
}
if (data.Content.DeviceVariables.Count > 0)
{
using var devices = await App.GetService<VariableService>().ExportFileAsync(data.Content.DeviceVariables);
using var streamRef = new DotNetStreamReference(stream: devices);
_helper ??= await JS.InvokeAsync<IJSObjectReference>("import", $"/_content/ThingsGateway.Admin.Blazor.Core/js/downloadFileFromStream.js");
await _helper.InvokeVoidAsync("downloadFileFromStream", $"子网关{mqttClientStatus.Id}变量导出{SysDateTimeExtensions.CurrentDateTime.ToFileDateTimeFormat()}.xlsx", streamRef);
}
else
{
await PopupService.EnqueueSnackbarAsync("无采集变量", AlertTypes.None);
}
await PopupService.EnqueueSnackbarAsync("上传成功", AlertTypes.Success);
}
else
{
await PopupService.EnqueueSnackbarAsync(data.Message, AlertTypes.Error);
}
isDownExport = false;
}
/// <summary>
/// 下发子网关配置
/// </summary>
/// <returns></returns>
private async Task DBDown(MqttClientStatus mqttClientStatus)
{
MqttDBDownRpc rpc = new()
{
IsCollectDevicesFullUp = IsCollectDevicesFullUp,
IsDeviceVariablesFullUp = IsDeviceVariablesFullUp,
IsUploadDevicesFullUp = IsUploadDevicesFullUp,
IsRestart = IsRestart
};
if (_importCollectDevicesFile != null)
{
using var fs1 = new MemoryStream();
using var stream1 = _importCollectDevicesFile.OpenReadStream(512000000);
await stream1.CopyToAsync(fs1);
rpc.CollectDevices = fs1.ToArray();
}
if (_importUploadDevicesFile != null)
{
using var fs2 = new MemoryStream();
using var stream2 = _importUploadDevicesFile.OpenReadStream(512000000);
await stream2.CopyToAsync(fs2);
rpc.UploadDevices = fs2.ToArray();
}
if (_importDeviceVariablesFile != null)
{
using var fs3 = new MemoryStream();
using var stream3 = _importDeviceVariablesFile.OpenReadStream(512000000);
await stream3.CopyToAsync(fs3);
rpc.DeviceVariables = fs3.ToArray();
}
var data = await ManageGatewayWorker.SetClientGatewayDBAsync(mqttClientStatus.Id, rpc);
if (data.IsSuccess)
{
await PopupService.EnqueueSnackbarAsync("下发成功", AlertTypes.Success);
}
else
{
await PopupService.EnqueueSnackbarAsync(data.Message, AlertTypes.Error);
}
}
}

View File

@@ -346,6 +346,32 @@
</summary>
<returns></returns>
</member>
<member name="T:ThingsGateway.Blazor.ManageGatewayPage">
<summary>
ManageGatewayPage
</summary>
</member>
<member name="M:ThingsGateway.Blazor.ManageGatewayPage.OnInitialized">
<summary>
<inheritdoc/>
</summary>
</member>
<member name="M:ThingsGateway.Blazor.ManageGatewayPage.Dispose">
<inheritdoc/>
</member>
<member name="M:ThingsGateway.Blazor.ManageGatewayPage.DBUpload(MQTTnet.Server.MqttClientStatus)">
<summary>
获取子网关配置导出excel
</summary>
<param name="mqttClientStatus"></param>
<returns></returns>
</member>
<member name="M:ThingsGateway.Blazor.ManageGatewayPage.DBDown(MQTTnet.Server.MqttClientStatus)">
<summary>
下发子网关配置
</summary>
<returns></returns>
</member>
<member name="T:ThingsGateway.Blazor.MemoryVariablePage">
<summary>
内存变量页面

View File

@@ -20,8 +20,8 @@ internal class WaitingClientEx<TClient> : DisposableObject, IWaitingClient<TClie
{
private readonly Func<ResponsedData, bool> m_func;
private readonly EasyLock easyLock = new();
private readonly WaitData<ResponsedData> m_waitData = new WaitData<ResponsedData>();
private readonly WaitDataAsync<ResponsedData> m_waitDataAsync = new WaitDataAsync<ResponsedData>();
private readonly WaitData<ResponsedData> m_waitData = new();
private readonly WaitDataAsync<ResponsedData> m_waitDataAsync = new();
private volatile bool m_breaked;

View File

@@ -77,7 +77,7 @@ public class SerialSessionBase : BaseSerial, ISerialSession
private DelaySender m_delaySender;
private long m_bufferRate = 1;
private volatile bool m_online;
private bool m_online => MainSerialPort?.IsOpen == true;
ValueCounter m_receiveCounter;
ValueCounter m_sendCounter;
@@ -277,7 +277,6 @@ public class SerialSessionBase : BaseSerial, ISerialSession
{
this.PrivateOnDisconnecting(new DisconnectEventArgs(true, msg));
this.m_online = false;
this.MainSerialPort.TryClose();
this.MainSerialPort.SafeDispose();
@@ -294,7 +293,7 @@ public class SerialSessionBase : BaseSerial, ISerialSession
{
if (this.m_online)
{
this.m_online = false;
this.MainSerialPort.SafeDispose();
this.m_delaySender.SafeDispose();
this.DataHandlingAdapter.SafeDispose();
@@ -313,7 +312,7 @@ public class SerialSessionBase : BaseSerial, ISerialSession
{
if (this.m_online)
{
this.m_online = false;
this.MainSerialPort.TryClose();
this.PrivateOnDisconnecting(new DisconnectEventArgs(true, $"{nameof(Dispose)}主动断开"));
@@ -357,7 +356,7 @@ public class SerialSessionBase : BaseSerial, ISerialSession
this.PrivateOnConnecting(args);
serialPort.Open();
this.m_online = true;
this.SetSerialPort(serialPort);

View File

@@ -1,8 +1,7 @@
<Project>
<PropertyGroup>
<TargetFrameworks>net6.0;net7.0</TargetFrameworks>
<Version>2.1.0.8</Version>
<Version>2.1.0.7</Version>
<Version>2.1.0.11</Version>
<Authors>Diego</Authors>
<Product>ThingsGateway</Product>
<Copyright>© 2023-present Diego</Copyright>

View File

@@ -394,19 +394,19 @@ public class MqttClient : UpLoadBase
_collectDeviceRunTimes.Enqueue(collectDeviceRunTime.Adapt<DeviceData>());
}
private async Task MqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
private async Task MqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs args)
{
if (arg.ApplicationMessage.Topic == driverPropertys.QuestRpcTopic && arg.ApplicationMessage.PayloadSegment.Count > 0)
if (args.ApplicationMessage.Topic == driverPropertys.QuestRpcTopic && args.ApplicationMessage.PayloadSegment.Count > 0)
{
await AllPublishAsync(CancellationToken.None);
return;
}
if (!driverPropertys.DeviceRpcEnable || string.IsNullOrEmpty(arg.ClientId))
if (!driverPropertys.DeviceRpcEnable || string.IsNullOrEmpty(args.ClientId))
return;
if (arg.ApplicationMessage.Topic != driverPropertys.RpcWriteTopic)
if (args.ApplicationMessage.Topic != driverPropertys.RpcWriteTopic)
return;
var rpcDatas = Encoding.UTF8.GetString(arg.ApplicationMessage.PayloadSegment).FromJsonString<MqttRpcNameVaueWithId>();
var rpcDatas = Encoding.UTF8.GetString(args.ApplicationMessage.PayloadSegment).FromJsonString<MqttRpcNameVaueWithId>();
if (rpcDatas == null)
return;
@@ -437,7 +437,7 @@ public class MqttClient : UpLoadBase
}
}
var result = await _rpcCore.InvokeDeviceMethodAsync(ToString() + "-" + arg.ClientId,
var result = await _rpcCore.InvokeDeviceMethodAsync(ToString() + "-" + args.ClientId,
rpcDatas.WriteInfos.Where(
a => !mqttRpcResult.Message.Any(b => b.Key == a.Key)).ToDictionary(a => a.Key, a => a.Value));
@@ -470,7 +470,15 @@ public class MqttClient : UpLoadBase
{
LogMessage?.Warning("订阅失败-" + subResult.Items
.Where(a => a.ResultCode > (MqttClientSubscribeResultCode)10)
.ToJsonString());
.Select(a =>
new
{
Topic = a.TopicFilter.Topic,
ResultCode = a.ResultCode.ToString()
}
)
.ToJsonString()
);
}
}
/// <summary>

View File

@@ -1,4 +1,25 @@
{
"$schema": "https://gitee.com/dotnetchina/Furion/raw/net6/schemas/v3/furion-schema.json",
"ManageGatewayConfig": {
"Enable": true, //是否启用管理服务
"MqttBrokerIP": "127.0.0.1", //管理服务IP
"MqttBrokerPort": 7300, //管理服务端口
"UserName": "admin", //管理服务用户名
"Password": "111111", //管理服务端口
"DBDownTopic": "DBDownTopic", //下发网关配置的主题
"DBUploadTopic": "DBUploadTopic", //子网关上传配置信息的主题
"WriteRpcTopic": "WriteRpcTopic"
},
"ClientGatewayConfig": {
"GatewayId": "GatewayId", //子网关ID需要唯一也用于MqttClientID
"Enable": false, //是否连接管理服务
"MqttBrokerIP": "127.0.0.1", //管理服务IP
"MqttBrokerPort": 7300, //管理服务端口
"UserName": "admin", //管理服务用户名
"Password": "111111", //管理服务端口
"DBDownTopic": "DBDownTopic", //下发网关配置的主题
"DBUploadTopic": "DBUploadTopic", //子网关上传配置信息的主题
"WriteRpcTopic": "WriteRpcTopic"
}
}