Compare commits

...

21 Commits

Author SHA1 Message Date
Kimdiego2098
27fae9ebaa 2.1.0.13 2023-09-15 13:23:59 +08:00
Kimdiego2098
b103f25c94 add dispose() 2023-09-15 13:23:39 +08:00
Kimdiego2098
abff450274 2.1.0.12 2023-09-15 13:20:29 +08:00
Kimdiego2098
c260736a11 更新多个依赖包版本 2023-09-15 13:19:04 +08:00
Kimdiego2098
166ac2307a 修复因重复注册cancellationToken.Register导致的内存暴涨! 2023-09-15 13:17:36 +08:00
Kimdiego2098
b21a4e1a4d CancellationToken代替CancellationTokenSource传入 2023-09-15 13:16:56 +08:00
Kimdiego2098
f7dc943fa3 2.1.0.11 2023-09-11 09:35:24 +08:00
Kimdiego2098
bfbd2693ec feat:ManageGatewayWorker part 2023-09-11 09:31:26 +08:00
Kimdiego2098
819e71c993 feat:ManageGatewayWorker part 2023-09-11 09:09:48 +08:00
Kimdiego2098
9fd0b489a2 feat:ManageGatewayWorker part 2023-09-11 09:09:03 +08:00
Kimdiego2098
f5fe9f8dae TGAPPInfp更名APPInfo 2023-09-11 08:57:13 +08:00
Kimdiego2098
f9ffc18145 Merge branch 'master' of https://gitee.com/dotnetchina/ThingsGateway 2023-09-11 08:54:01 +08:00
Kimdiego2098
08db5b983a feat:ManageGatewayWorker part 2023-09-11 08:53:52 +08:00
Diego2098
5b3b4c8c50 !7 修复无法修改变量值问题
Merge pull request !7 from 如阳如木/master
2023-09-07 10:14:45 +00:00
如阳如木
73f914ffc4 删除&& LastSetValue?.ToString() != data?.ToString()
Signed-off-by: 如阳如木 <970143933@qq.com>
2023-09-07 08:42:49 +00:00
Kimdiego2098
d6bdd73ed6 ManageGatewayConfig.json 2023-09-06 17:29:58 +08:00
Kimdiego2098
7370ee7349 fix:_mqttServer null error 2023-09-06 17:27:36 +08:00
Kimdiego2098
4574596bac 2.1.0.10 2023-09-06 17:16:28 +08:00
Kimdiego2098
4d16855e36 从数据库取原属性 2023-09-06 17:14:24 +08:00
Kimdiego2098
13a0d4d282 更改变量 最近一次值 为 上次值 2023-09-06 16:44:58 +08:00
Kimdiego2098
b9cd06b829 更改 变量最近一次值 为 上次值 2023-09-06 16:44:42 +08:00
25 changed files with 709 additions and 294 deletions

View File

@@ -1,7 +1,7 @@
<Project>
<PropertyGroup>
<TargetFrameworks>net6.0;net7.0</TargetFrameworks>
<Version>2.1.0.9</Version>
<Version>2.1.0.13</Version>
<Authors>Diego</Authors>
<Product>ThingsGateway</Product>
<Copyright>© 2023-present Diego</Copyright>

View File

@@ -14,8 +14,8 @@
<ItemGroup>
<PackageReference Include="Masa.Blazor" Version="1.0.3" />
<PackageReference Include="Microsoft.AspNetCore.SignalR.Client" Version="7.0.10" />
<PackageReference Include="Masa.Blazor" Version="1.0.4" />
<PackageReference Include="Microsoft.AspNetCore.SignalR.Client" Version="7.0.11" />
</ItemGroup>

View File

@@ -9,10 +9,10 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="Furion.Extras.Authentication.JwtBearer" Version="4.8.8.42" />
<PackageReference Include="Furion.Extras.ObjectMapper.Mapster" Version="4.8.8.42" />
<PackageReference Include="Furion.Pure" Version="4.8.8.42" />
<PackageReference Include="SqlSugarCore" Version="5.1.4.104" />
<PackageReference Include="Furion.Extras.Authentication.JwtBearer" Version="4.8.8.43" />
<PackageReference Include="Furion.Extras.ObjectMapper.Mapster" Version="4.8.8.43" />
<PackageReference Include="Furion.Pure" Version="4.8.8.43" />
<PackageReference Include="SqlSugarCore" Version="5.1.4.105" />
<PackageReference Include="UAParser" Version="3.1.47" />
<PackageReference Include="Yitter.IdGenerator" Version="1.0.14" />
<PackageReference Include="MiniExcel" Version="1.31.2" />

View File

@@ -97,11 +97,12 @@ public class HardwareInfoService : ISingleton
, TaskCreationOptions.LongRunning);
}
private TGAPPInfo appInfo = new();
private APPInfo appInfo = new();
/// <summary>
/// 运行信息获取
/// </summary>
public TGAPPInfo APPInfo => appInfo;
public APPInfo APPInfo => appInfo;
/// <summary>
@@ -128,7 +129,7 @@ public class HardwareInfoService : ISingleton
}
/// <inheritdoc/>
public class TGAPPInfo
public class APPInfo
{
/// <summary>
/// 主机环境

View File

@@ -171,6 +171,8 @@ public class ManageGatewayWorker : BackgroundService
#endregion
#region public
/// <summary>
/// 获取子网关的配置信息
/// </summary>
@@ -201,11 +203,125 @@ public class ManageGatewayWorker : BackgroundService
await StartAsync();
}
/// <summary>
/// 下载配置信息到子网关
/// </summary>
/// <returns></returns>
public async Task<OperResult> SetClientGatewayDBAsync(string gatewayId, MqttDBDownRpc mqttDBRpc, int timeOut = 3000, CancellationToken token = default)
{
try
{
var buffer = Encoding.UTF8.GetBytes(mqttDBRpc?.ToJsonString() ?? string.Empty);
var response = await RpcDataExecuteAsync(gatewayId, ClientGatewayConfig.DBDownTopic, buffer, timeOut, MqttQualityOfServiceLevel.AtMostOnce, token);
var data = Encoding.UTF8.GetString(response).FromJsonString<OperResult>();
return data;
}
catch (Exception ex)
{
return new OperResult(ex);
}
}
/// <summary>
/// 写入变量到子网关
/// </summary>
/// <returns></returns>
public async Task<OperResult<ManageMqttRpcResult>> WriteVariableAsync(ManageMqttRpcFrom manageMqttRpcFrom, int timeOut = 3000, CancellationToken token = default)
{
try
{
var payload = Encoding.UTF8.GetBytes(manageMqttRpcFrom?.ToJsonString() ?? string.Empty);
var requestTopic = ManageGatewayConfig.WriteRpcTopic;
var responseTopic = GetRpcReturnTopic(ManageGatewayConfig.WriteRpcTopic);
var key = GetRpcReturnIdTopic(manageMqttRpcFrom.GatewayId, requestTopic, manageMqttRpcFrom.RpcId);
ManageMqttRpcResult result = await RpcWriteExecuteAsync(timeOut, payload, requestTopic, key, token);
return OperResult.CreateSuccessResult(result);
}
catch (Exception ex)
{
return new OperResult<ManageMqttRpcResult>(ex);
}
}
/// <summary>
/// 获取子网关列表
/// </summary>
/// <returns></returns>
public async Task<List<MqttClientStatus>> GetClientGatewayAsync()
{
if (_mqttServer != null)
{
var data = await _mqttServer.GetClientsAsync();
return data.ToList();
}
else
{
return new List<MqttClientStatus>();
}
}
#endregion
#region RPC实现
readonly ConcurrentDictionary<string, WaitDataAsync<byte[]>> _waitingCalls = new();
readonly ConcurrentDictionary<string, WaitDataAsync<ManageMqttRpcResult>> _writerRpcResultWaitingCalls = new();
private readonly EasyLock clientLock = new();
private async Task<ManageMqttRpcResult> RpcWriteExecuteAsync(int timeOut, byte[] payload, string requestTopic, string key, CancellationToken token)
{
try
{
using WaitDataAsync<ManageMqttRpcResult> waitDataAsync = new();
if (!_writerRpcResultWaitingCalls.TryAdd(key, waitDataAsync))
{
throw new InvalidOperationException();
}
waitDataAsync.SetCancellationToken(token);
//请求子网关的数据
var message = new MqttApplicationMessageBuilder().WithTopic(requestTopic).WithPayload(payload).Build();
await _mqttServer.InjectApplicationMessage(new InjectedMqttApplicationMessage(message), token);
var result = await waitDataAsync.WaitAsync(timeOut);
switch (result)
{
case WaitDataStatus.SetRunning:
return waitDataAsync.WaitResult;
case WaitDataStatus.Overtime:
throw new TimeoutException();
case WaitDataStatus.Canceled:
{
throw new Exception("等待已终止。可能是客户端已掉线,或者被注销。");
}
case WaitDataStatus.Default:
case WaitDataStatus.Disposed:
default:
throw new Exception(ThingsGatewayStatus.UnknownError.GetDescription());
}
}
finally
{
_writerRpcResultWaitingCalls.Remove(key);
}
}
/// <summary>
/// RPC请求子网关并返回需要传入子网关ID作为Topic参数一部分
/// </summary>
/// <returns></returns>
public async Task<byte[]> RpcDataExecuteAsync(string gatewayId, string topic, byte[] payload, int timeOut, MqttQualityOfServiceLevel qualityOfServiceLevel, CancellationToken token = default)
private async Task<byte[]> RpcDataExecuteAsync(string gatewayId, string topic, byte[] payload, int timeOut, MqttQualityOfServiceLevel qualityOfServiceLevel, CancellationToken token = default)
{
var responseTopic = GetRpcReturnTopic(gatewayId, topic);
var requestTopic = GetRpcTopic(gatewayId, topic);
@@ -247,60 +363,10 @@ public class ManageGatewayWorker : BackgroundService
}
}
/// <summary>
/// 下载配置信息到子网关
/// </summary>
/// <returns></returns>
public async Task<OperResult<OperResult>> SetClientGatewayDBAsync(string gatewayId, MqttDBDownRpc mqttDBRpc, int timeOut = 3000, CancellationToken token = default)
{
try
{
var buffer = Encoding.UTF8.GetBytes(mqttDBRpc?.ToJsonString() ?? string.Empty);
var response = await RpcDataExecuteAsync(gatewayId, ClientGatewayConfig.DBDownTopic, buffer, timeOut, MqttQualityOfServiceLevel.AtMostOnce, token);
var data = Encoding.UTF8.GetString(response).FromJsonString<OperResult>();
return OperResult.CreateSuccessResult(data);
}
catch (Exception ex)
{
return new OperResult<OperResult>(ex);
}
}
/// <summary>
/// 写入变量到子网关
/// </summary>
/// <returns></returns>
public async Task<OperResult<ManageMqttRpcResult>> WriteVariableAsync(ManageMqttRpcFrom manageMqttRpcFrom, int timeOut = 3000, CancellationToken token = default)
{
try
{
var payload = Encoding.UTF8.GetBytes(manageMqttRpcFrom?.ToJsonString() ?? string.Empty);
var requestTopic = ManageGatewayConfig.WriteRpcTopic;
var responseTopic = GetRpcReturnTopic(ManageGatewayConfig.WriteRpcTopic);
var key = GetRpcReturnIdTopic(manageMqttRpcFrom.GatewayId, requestTopic, manageMqttRpcFrom.RpcId);
ManageMqttRpcResult result = await RpcWriteExecuteAsync(timeOut, payload, requestTopic, key, token);
return OperResult.CreateSuccessResult(result);
}
catch (Exception ex)
{
return new OperResult<ManageMqttRpcResult>(ex);
}
}
/// <summary>
/// 获取子网关列表
/// </summary>
/// <returns></returns>
public async Task<List<MqttClientStatus>> GetClientGatewayAsync()
{
var data = await _mqttServer.GetClientsAsync();
return data.ToList();
}
#endregion
#region
internal async Task StartAsync()
{
@@ -349,12 +415,98 @@ public class ManageGatewayWorker : BackgroundService
restartLock.Release();
}
}
/// <summary>
/// 初始化
/// </summary>
private async Task InitAsync()
{
try
{
ManageGatewayConfig = App.GetConfig<ManageGatewayConfig>("ManageGatewayConfig");
if (ManageGatewayConfig?.Enable != true)
{
ManageStatuString = new OperResult($"已退出:不启用管理功能");
_manageLogger.LogWarning("已退出:不启用管理功能");
}
else
{
var mqttFactory = new MqttFactory(new MqttNetLogger(_manageLogger));
var mqttServerOptions = mqttFactory.CreateServerOptionsBuilder()
.WithDefaultEndpointBoundIPAddress(string.IsNullOrEmpty(ManageGatewayConfig.MqttBrokerIP) ? null : IPAddress.Parse(ManageGatewayConfig.MqttBrokerIP))
.WithDefaultEndpointPort(ManageGatewayConfig.MqttBrokerPort)
.WithDefaultEndpoint()
.Build();
_mqttServer = mqttFactory.CreateMqttServer(mqttServerOptions);
if (_mqttServer != null)
{
_mqttServer.ValidatingConnectionAsync += MqttServer_ValidatingConnectionAsync;//认证
_mqttServer.InterceptingPublishAsync += MqttServer_InterceptingPublishAsync;//消息
await _mqttServer.StartAsync();
}
ManageStatuString = OperResult.CreateSuccessResult();
}
}
catch (Exception ex)
{
_manageLogger.LogError(ex, "初始化失败");
ManageStatuString = new($"初始化失败-{ex.Message}");
}
try
{
ClientGatewayConfig = App.GetConfig<ClientGatewayConfig>("ClientGatewayConfig");
if (ClientGatewayConfig?.Enable != true)
{
ClientStatuString = new OperResult($"已退出:不启用子网关功能");
_clientLogger.LogWarning("已退出:不启用子网关功能");
}
else
{
var mqttFactory = new MqttFactory(new MqttNetLogger(_clientLogger));
_mqttClientOptions = mqttFactory.CreateClientOptionsBuilder()
.WithCredentials(ClientGatewayConfig.UserName, ClientGatewayConfig.Password)//账密
.WithTcpServer(ClientGatewayConfig.MqttBrokerIP, ClientGatewayConfig.MqttBrokerPort)//服务器
.WithClientId(ClientGatewayConfig.GatewayId)
.WithCleanSession(true)
.WithKeepAlivePeriod(TimeSpan.FromSeconds(120.0))
.WithoutThrowOnNonSuccessfulConnectResponse()
.Build();
_mqttSubscribeOptions = mqttFactory.CreateSubscribeOptionsBuilder()
.WithTopicFilter(
f =>
{
f.WithTopic(ClientGatewayConfig.WriteRpcTopic);
f.WithAtMostOnceQoS();
})
.WithTopicFilter(
f =>
{
f.WithTopic(GetRpcTopic(ClientGatewayConfig.GatewayId, ClientGatewayConfig.DBDownTopic));
f.WithAtMostOnceQoS();
})
.WithTopicFilter(
f =>
{
f.WithTopic(GetRpcTopic(ClientGatewayConfig.GatewayId, ClientGatewayConfig.DBUploadTopic));
f.WithAtMostOnceQoS();
})
.Build();
_mqttClient = mqttFactory.CreateMqttClient();
_mqttClient.ConnectedAsync += MqttClient_ConnectedAsync;
_mqttClient.ApplicationMessageReceivedAsync += MqttClient_ApplicationMessageReceivedAsync;
await TryMqttClientAsync(CancellationToken.None);
}
}
catch (Exception ex)
{
_clientLogger.LogError(ex, "初始化失败");
}
}
#region
readonly ConcurrentDictionary<string, WaitDataAsync<byte[]>> _waitingCalls = new();
readonly ConcurrentDictionary<string, WaitDataAsync<ManageMqttRpcResult>> _writerRpcResultWaitingCalls = new();
private readonly EasyLock clientLock = new();
/// <summary>
/// ClientGatewayConfig
/// </summary>
@@ -391,7 +543,7 @@ public class ManageGatewayWorker : BackgroundService
if (mqttDBRpc.CollectDevices != null && mqttDBRpc.CollectDevices.Length > 0)
{
MemoryStream stream = new(mqttDBRpc.CollectDevices);
using MemoryStream stream = new(mqttDBRpc.CollectDevices);
var previewResult = await collectDeviceService.PreviewAsync(stream);
if (previewResult.FirstOrDefault().Value.HasError)
{
@@ -419,7 +571,7 @@ public class ManageGatewayWorker : BackgroundService
if (mqttDBRpc.UploadDevices != null && mqttDBRpc.UploadDevices.Length > 0)
{
MemoryStream stream1 = new(mqttDBRpc.UploadDevices);
using MemoryStream stream1 = new(mqttDBRpc.UploadDevices);
var previewResult1 = await uploadDeviceService.PreviewAsync(stream1);
if (previewResult1.FirstOrDefault().Value.HasError)
{
@@ -445,7 +597,7 @@ public class ManageGatewayWorker : BackgroundService
}
if (mqttDBRpc.DeviceVariables != null && mqttDBRpc.DeviceVariables.Length > 0)
{
MemoryStream stream2 = new(mqttDBRpc.DeviceVariables);
using MemoryStream stream2 = new(mqttDBRpc.DeviceVariables);
var previewResult2 = await variableService.PreviewAsync(stream2, collectDevices, uploadDevices);
if (previewResult2.FirstOrDefault().Value.HasError)
{
@@ -522,97 +674,7 @@ public class ManageGatewayWorker : BackgroundService
return requestTopic;
}
/// <summary>
/// 初始化
/// </summary>
private async Task InitAsync()
{
try
{
ManageGatewayConfig = App.GetConfig<ManageGatewayConfig>("ManageGatewayConfig");
if (ManageGatewayConfig?.Enable != true)
{
ManageStatuString = new OperResult($"已退出:不启用管理功能");
_manageLogger.LogWarning("已退出:不启用管理功能");
}
else
{
var mqttFactory = new MqttFactory(new MqttNetLogger(_manageLogger));
var mqttServerOptions = mqttFactory.CreateServerOptionsBuilder()
.WithDefaultEndpointBoundIPAddress(string.IsNullOrEmpty(ManageGatewayConfig.MqttBrokerIP) ? null : IPAddress.Parse(ManageGatewayConfig.MqttBrokerIP))
.WithDefaultEndpointPort(ManageGatewayConfig.MqttBrokerPort)
.WithDefaultEndpoint()
.Build();
_mqttServer = mqttFactory.CreateMqttServer(mqttServerOptions);
if (_mqttServer != null)
{
_mqttServer.ValidatingConnectionAsync += MqttServer_ValidatingConnectionAsync;//认证
_mqttServer.InterceptingPublishAsync += MqttServer_InterceptingPublishAsync;//认证
await _mqttServer.StartAsync();
}
ManageStatuString = OperResult.CreateSuccessResult();
}
}
catch (Exception ex)
{
_manageLogger.LogError(ex, "初始化失败");
ManageStatuString = new($"初始化失败-{ex.Message}");
}
try
{
ClientGatewayConfig = App.GetConfig<ClientGatewayConfig>("ClientGatewayConfig");
if (ClientGatewayConfig?.Enable != true)
{
ClientStatuString = new OperResult($"已退出:不启用子网关功能");
_clientLogger.LogWarning("已退出:不启用子网关功能");
}
else
{
var mqttFactory = new MqttFactory(new MqttNetLogger(_clientLogger));
_mqttClientOptions = mqttFactory.CreateClientOptionsBuilder()
.WithCredentials(ClientGatewayConfig.UserName, ClientGatewayConfig.Password)//账密
.WithTcpServer(ClientGatewayConfig.MqttBrokerIP, ClientGatewayConfig.MqttBrokerPort)//服务器
.WithClientId(ClientGatewayConfig.GatewayId)
.WithCleanSession(true)
.WithKeepAlivePeriod(TimeSpan.FromSeconds(120.0))
.WithoutThrowOnNonSuccessfulConnectResponse()
.Build();
_mqttSubscribeOptions = mqttFactory.CreateSubscribeOptionsBuilder()
.WithTopicFilter(
f =>
{
f.WithTopic(ClientGatewayConfig.WriteRpcTopic);
f.WithAtMostOnceQoS();
})
.WithTopicFilter(
f =>
{
f.WithTopic(GetRpcTopic(ClientGatewayConfig.GatewayId, ClientGatewayConfig.DBDownTopic));
f.WithAtMostOnceQoS();
})
.WithTopicFilter(
f =>
{
f.WithTopic(GetRpcTopic(ClientGatewayConfig.GatewayId, ClientGatewayConfig.DBUploadTopic));
f.WithAtMostOnceQoS();
})
.Build();
_mqttClient = mqttFactory.CreateMqttClient();
_mqttClient.ConnectedAsync += MqttClient_ConnectedAsync;
_mqttClient.ApplicationMessageReceivedAsync += MqttClient_ApplicationMessageReceivedAsync;
await TryMqttClientAsync(CancellationToken.None);
}
}
catch (Exception ex)
{
_clientLogger.LogError(ex, "初始化失败");
}
}
private async Task MqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs args)
{
if (args.ApplicationMessage.Topic == GetRpcTopic(ClientGatewayConfig.GatewayId, ClientGatewayConfig.DBUploadTopic))
@@ -661,7 +723,7 @@ public class ManageGatewayWorker : BackgroundService
{
if (eventArgs.ApplicationMessage.Topic == GetRpcReturnTopic(ManageGatewayConfig.WriteRpcTopic))
{
if (_writerRpcResultWaitingCalls.Count > 0)
if (!_writerRpcResultWaitingCalls.IsEmpty)
{
var payloadBuffer = eventArgs.ApplicationMessage.PayloadSegment.ToArray();
var manageMqttRpcResult = Encoding.UTF8.GetString(payloadBuffer).FromJsonString<ManageMqttRpcResult>();
@@ -769,43 +831,8 @@ public class ManageGatewayWorker : BackgroundService
await _mqttClient.PublishAsync(variableMessage);
}
}
#endregion
private async Task<ManageMqttRpcResult> RpcWriteExecuteAsync(int timeOut, byte[] payload, string requestTopic, string key, CancellationToken token)
{
try
{
using WaitDataAsync<ManageMqttRpcResult> waitDataAsync = new();
if (!_writerRpcResultWaitingCalls.TryAdd(key, waitDataAsync))
{
throw new InvalidOperationException();
}
waitDataAsync.SetCancellationToken(token);
//请求子网关的数据
var message = new MqttApplicationMessageBuilder().WithTopic(requestTopic).WithPayload(payload).Build();
await _mqttServer.InjectApplicationMessage(new InjectedMqttApplicationMessage(message), token);
var result = await waitDataAsync.WaitAsync(timeOut);
switch (result)
{
case WaitDataStatus.SetRunning:
return waitDataAsync.WaitResult;
case WaitDataStatus.Overtime:
throw new TimeoutException();
case WaitDataStatus.Canceled:
{
throw new Exception("等待已终止。可能是客户端已掉线,或者被注销。");
}
case WaitDataStatus.Default:
case WaitDataStatus.Disposed:
default:
throw new Exception(ThingsGatewayStatus.UnknownError.GetDescription());
}
}
finally
{
_writerRpcResultWaitingCalls.Remove(key);
}
}
}

View File

@@ -1,4 +1,4 @@
#region copyright
#region copyright
//------------------------------------------------------------------------------
// 此代码版权声明为全文件覆盖,如有原作者特别声明,会在下方手动补充
// 此代码版权除特别声明外的代码归作者本人Diego所有
@@ -71,9 +71,9 @@ public class DeviceVariableRunTime : DeviceVariable
[DataTable(Order = 3, IsShow = true, Sortable = false, CellClass = " table-text-truncate ")]
public object Value { get => _value; private set => _value = value; }
/// <summary>
/// 最近一次值
/// 次值
/// </summary>
[Description("最近一次值")]
[Description("次值")]
[DataTable(Order = 3, IsShow = true, Sortable = false, CellClass = " table-text-truncate ")]
public object LastSetValue { get; private set; }
@@ -137,14 +137,17 @@ public class DeviceVariableRunTime : DeviceVariable
}
CollectTime = time;
{
if ((data?.ToString() != _value?.ToString() && LastSetValue?.ToString() != data?.ToString()) || isOnlineChanged)
if ((data?.ToString() != _value?.ToString() ) || isOnlineChanged)
{
ChangeTime = time;
LastSetValue = _value;
if (IsOnline)
{
_value = data;
}
LastSetValue = data;
VariableValueChange?.Invoke(this);
}
}

View File

@@ -38,7 +38,7 @@
<PackageReference Include="CS-Script" Version="4.8.1" />
<!--CS-Script与Furion冲突直接安装覆盖版本-->
<PackageReference Include="Microsoft.CodeAnalysis.CSharp.Scripting" Version="4.7.0" />
<PackageReference Include="MQTTnet" Version="4.2.1.781" />
<PackageReference Include="MQTTnet" Version="4.3.1.873" />
</ItemGroup>
<ItemGroup>

View File

@@ -1250,50 +1250,50 @@
</summary>
<returns></returns>
</member>
<member name="T:ThingsGateway.Application.TGAPPInfo">
<member name="T:ThingsGateway.Application.APPInfo">
<inheritdoc/>
</member>
<member name="P:ThingsGateway.Application.TGAPPInfo.Environment">
<member name="P:ThingsGateway.Application.APPInfo.Environment">
<summary>
主机环境
</summary>
</member>
<member name="P:ThingsGateway.Application.TGAPPInfo.FrameworkDescription">
<member name="P:ThingsGateway.Application.APPInfo.FrameworkDescription">
<summary>
NET框架
</summary>
</member>
<member name="P:ThingsGateway.Application.TGAPPInfo.HostName">
<member name="P:ThingsGateway.Application.APPInfo.HostName">
<summary>
主机名称
</summary>
</member>
<member name="P:ThingsGateway.Application.TGAPPInfo.OsArchitecture">
<member name="P:ThingsGateway.Application.APPInfo.OsArchitecture">
<summary>
系统架构
</summary>
</member>
<member name="P:ThingsGateway.Application.TGAPPInfo.RemoteIp">
<member name="P:ThingsGateway.Application.APPInfo.RemoteIp">
<summary>
外网地址
</summary>
</member>
<member name="P:ThingsGateway.Application.TGAPPInfo.Stage">
<member name="P:ThingsGateway.Application.APPInfo.Stage">
<summary>
Stage环境
</summary>
</member>
<member name="P:ThingsGateway.Application.TGAPPInfo.SystemOs">
<member name="P:ThingsGateway.Application.APPInfo.SystemOs">
<summary>
操作系统
</summary>
</member>
<member name="P:ThingsGateway.Application.TGAPPInfo.UpdateTime">
<member name="P:ThingsGateway.Application.APPInfo.UpdateTime">
<summary>
更新时间
</summary>
</member>
<member name="P:ThingsGateway.Application.TGAPPInfo.DriveInfo">
<member name="P:ThingsGateway.Application.APPInfo.DriveInfo">
<summary>
当前磁盘信息
</summary>
@@ -1601,12 +1601,6 @@
</summary>
<returns></returns>
</member>
<member name="M:ThingsGateway.Application.ManageGatewayWorker.RpcDataExecuteAsync(System.String,System.String,System.Byte[],System.Int32,MQTTnet.Protocol.MqttQualityOfServiceLevel,System.Threading.CancellationToken)">
<summary>
RPC请求子网关并返回需要传入子网关ID作为Topic参数一部分
</summary>
<returns></returns>
</member>
<member name="M:ThingsGateway.Application.ManageGatewayWorker.SetClientGatewayDBAsync(System.String,ThingsGateway.Application.MqttDBDownRpc,System.Int32,System.Threading.CancellationToken)">
<summary>
下载配置信息到子网关
@@ -1625,6 +1619,17 @@
</summary>
<returns></returns>
</member>
<member name="M:ThingsGateway.Application.ManageGatewayWorker.RpcDataExecuteAsync(System.String,System.String,System.Byte[],System.Int32,MQTTnet.Protocol.MqttQualityOfServiceLevel,System.Threading.CancellationToken)">
<summary>
RPC请求子网关并返回需要传入子网关ID作为Topic参数一部分
</summary>
<returns></returns>
</member>
<member name="M:ThingsGateway.Application.ManageGatewayWorker.InitAsync">
<summary>
初始化
</summary>
</member>
<member name="F:ThingsGateway.Application.ManageGatewayWorker.ClientGatewayConfig">
<summary>
ClientGatewayConfig
@@ -1635,11 +1640,6 @@
ManageGatewayConfig
</summary>
</member>
<member name="M:ThingsGateway.Application.ManageGatewayWorker.InitAsync">
<summary>
初始化
</summary>
</member>
<member name="T:ThingsGateway.Application.CollectDeviceRunTime">
<summary>
采集设备状态表示
@@ -1841,7 +1841,7 @@
</member>
<member name="P:ThingsGateway.Application.DeviceVariableRunTime.LastSetValue">
<summary>
最近一次值
次值
</summary>
</member>
<member name="M:ThingsGateway.Application.DeviceVariableRunTime.SetValue(System.Object,System.DateTime,System.Boolean)">

View File

@@ -234,7 +234,7 @@ public class AlarmWorker : BackgroundService
/// <summary>
/// 循环线程取消标识
/// </summary>
public ConcurrentList<CancellationTokenSource> StoppingTokens = new();
private ConcurrentList<CancellationTokenSource> StoppingTokens = new();
/// <summary>
/// 全部重启锁
/// </summary>
@@ -498,24 +498,24 @@ public class AlarmWorker : BackgroundService
private async Task InitAsync()
{
CacheDb = new("HistoryAlarmCache");
CancellationTokenSource stoppingToken = StoppingTokens.Last();
var stoppingToken = StoppingTokens.Last().Token;
RealAlarmTask = await Task.Factory.StartNew(async () =>
{
_logger?.LogInformation($"实时报警线程开始");
while (!stoppingToken.Token.IsCancellationRequested)
while (!stoppingToken.IsCancellationRequested)
{
try
{
await Task.Delay(500, stoppingToken.Token);
await Task.Delay(500, stoppingToken);
var list = DeviceVariables.ToListWithDequeue();
foreach (var item in list)
{
if (stoppingToken.Token.IsCancellationRequested)
if (stoppingToken.IsCancellationRequested)
break;
if (!item.AlarmEnable) continue;
AlarmAnalysis(item);
}
if (stoppingToken.Token.IsCancellationRequested)
if (stoppingToken.IsCancellationRequested)
break;
}
catch (TaskCanceledException)
@@ -540,7 +540,7 @@ public class AlarmWorker : BackgroundService
await Task.Yield();//返回线程控制,不再阻塞
try
{
await Task.Delay(500, stoppingToken.Token);
await Task.Delay(500, stoppingToken);
var result = await GetAlarmDbAsync();
if (!result.IsSuccess)
@@ -557,13 +557,13 @@ public class AlarmWorker : BackgroundService
/***创建/更新单个表***/
try
{
await sqlSugarClient.Queryable<HistoryAlarm>().FirstAsync(stoppingToken.Token);
await sqlSugarClient.Queryable<HistoryAlarm>().FirstAsync(stoppingToken);
isSuccess = true;
StatuString = OperResult.CreateSuccessResult();
}
catch (Exception)
{
if (stoppingToken.Token.IsCancellationRequested)
if (stoppingToken.IsCancellationRequested)
{
IsExited = true;
return;
@@ -583,13 +583,13 @@ public class AlarmWorker : BackgroundService
}
}
while (!stoppingToken.Token.IsCancellationRequested)
while (!stoppingToken.IsCancellationRequested)
{
try
{
await Task.Delay(500, stoppingToken.Token);
await Task.Delay(500, stoppingToken);
if (stoppingToken.Token.IsCancellationRequested)
if (stoppingToken.IsCancellationRequested)
break;
//缓存值
@@ -599,7 +599,7 @@ public class AlarmWorker : BackgroundService
var data = cacheData.SelectMany(a => a.CacheStr.FromJsonString<List<HistoryAlarm>>()).ToList();
try
{
var count = await sqlSugarClient.Insertable(data).ExecuteCommandAsync(stoppingToken.Token);
var count = await sqlSugarClient.Insertable(data).ExecuteCommandAsync(stoppingToken);
await CacheDb.DeleteCacheData(cacheData.Select(a => a.Id).ToArray());
}
catch (Exception ex)
@@ -608,7 +608,7 @@ public class AlarmWorker : BackgroundService
_logger.LogWarning(ex, "写入历史报警失败");
}
}
if (stoppingToken.Token.IsCancellationRequested)
if (stoppingToken.IsCancellationRequested)
break;
@@ -623,7 +623,7 @@ public class AlarmWorker : BackgroundService
//插入
try
{
await sqlSugarClient.Insertable(list).ExecuteCommandAsync(stoppingToken.Token);
await sqlSugarClient.Insertable(list).ExecuteCommandAsync(stoppingToken);
isSuccess = true;
}
catch (Exception ex)

View File

@@ -30,7 +30,7 @@ public class CollectDeviceThread : IAsyncDisposable
/// <summary>
/// CancellationTokenSources
/// </summary>
public ConcurrentList<CancellationTokenSource> StoppingTokens = new();
private ConcurrentList<CancellationTokenSource> StoppingTokens = new();
/// <summary>
/// 线程
@@ -165,7 +165,7 @@ public class CollectDeviceThread : IAsyncDisposable
/// </summary>
protected async Task InitTaskAsync()
{
CancellationTokenSource stoppingToken = StoppingTokens.Last();
var stoppingToken = StoppingTokens.Last().Token;
DeviceTask = await Task.Factory.StartNew(async () =>
{
var channelResult = CollectDeviceCores.FirstOrDefault().Driver.GetShareChannel();
@@ -184,11 +184,11 @@ public class CollectDeviceThread : IAsyncDisposable
device.IsShareChannel = CollectDeviceCores.Count > 1;
if (channelResult.IsSuccess)
{
await device.BeforeActionAsync(stoppingToken.Token, channelResult.Content);
await device.BeforeActionAsync(stoppingToken, channelResult.Content);
}
else
{
await device.BeforeActionAsync(stoppingToken.Token);
await device.BeforeActionAsync(stoppingToken);
}
}
@@ -206,7 +206,7 @@ public class CollectDeviceThread : IAsyncDisposable
//如果是共享通道类型,需要每次转换时切换适配器
if (device.IsShareChannel) device.Driver.InitDataAdapter();
var result = await device.RunActionAsync(stoppingToken.Token);
var result = await device.RunActionAsync(stoppingToken);
if (result == ThreadRunReturn.None)
{
await Task.Delay(CycleInterval);
@@ -224,7 +224,7 @@ public class CollectDeviceThread : IAsyncDisposable
}
else
{
await Task.Delay(1000, stoppingToken.Token);
await Task.Delay(1000, stoppingToken);
}
}
catch (TaskCanceledException)

View File

@@ -45,7 +45,7 @@ public class CollectDeviceWorker : BackgroundService
/// 读取未停止的采集设备List
/// </summary>
public List<CollectDeviceCore> CollectDeviceCores => CollectDeviceThreads
.Where(a => !a.StoppingTokens.Any(b => b.IsCancellationRequested))
.Where(a => a.CollectDeviceCores.Any(b => b.Device != null))
.SelectMany(a => a.CollectDeviceCores).ToList();
/// <summary>
@@ -452,8 +452,8 @@ public class CollectDeviceWorker : BackgroundService
var Propertys = _pluginService.GetDriverProperties(driver);
if (devId != 0)
{
var devcore = CollectDeviceCores.FirstOrDefault(it => it.Device.Id == devId);
devcore?.Device?.DevicePropertys?.ForEach(it =>
var devcore = App.GetService<CollectDeviceService>().GetDeviceById(devId);
devcore?.DevicePropertys?.ForEach(it =>
{
var dependencyProperty = Propertys.FirstOrDefault(a => a.PropertyName == it.PropertyName);
if (dependencyProperty != null)

View File

@@ -145,7 +145,7 @@ public class HistoryValueWorker : BackgroundService
/// <summary>
/// 循环线程取消标识
/// </summary>
public ConcurrentList<CancellationTokenSource> StoppingTokens = new();
private ConcurrentList<CancellationTokenSource> StoppingTokens = new();
/// <summary>
/// 全部重启锁
/// </summary>
@@ -164,7 +164,7 @@ public class HistoryValueWorker : BackgroundService
private async Task InitAsync()
{
CacheDb = new("HistoryValueCache");
CancellationTokenSource stoppingToken = StoppingTokens.Last();
var stoppingToken = StoppingTokens.Last().Token;
HistoryValueTask = await Task.Factory.StartNew(async () =>
{
_logger?.LogInformation($"历史数据线程开始");
@@ -186,13 +186,13 @@ public class HistoryValueWorker : BackgroundService
/***创建/更新单个表***/
try
{
await sqlSugarClient.Queryable<HistoryValue>().FirstAsync(stoppingToken.Token);
await sqlSugarClient.Queryable<HistoryValue>().FirstAsync(stoppingToken);
LastIsSuccess = true;
StatuString = OperResult.CreateSuccessResult();
}
catch (Exception)
{
if (stoppingToken.Token.IsCancellationRequested)
if (stoppingToken.IsCancellationRequested)
{
IsExited = true;
return;
@@ -213,14 +213,14 @@ public class HistoryValueWorker : BackgroundService
}
IsExited = false;
while (!stoppingToken.Token.IsCancellationRequested)
while (!stoppingToken.IsCancellationRequested)
{
try
{
await Task.Delay(500, stoppingToken.Token);
await Task.Delay(500, stoppingToken);
if (stoppingToken.Token.IsCancellationRequested)
if (stoppingToken.IsCancellationRequested)
break;
//缓存值
@@ -228,7 +228,7 @@ public class HistoryValueWorker : BackgroundService
var data = cacheData.SelectMany(a => a.CacheStr.FromJsonString<List<HistoryValue>>()).ToList();
try
{
var count = await sqlSugarClient.Insertable(data).ExecuteCommandAsync(stoppingToken.Token);
var count = await sqlSugarClient.Insertable(data).ExecuteCommandAsync(stoppingToken);
await CacheDb.DeleteCacheData(cacheData.Select(a => a.Id).ToArray());
}
catch (Exception ex)
@@ -237,7 +237,7 @@ public class HistoryValueWorker : BackgroundService
_logger.LogWarning(ex, "写入历史数据失败");
}
if (stoppingToken.Token.IsCancellationRequested)
if (stoppingToken.IsCancellationRequested)
break;
var collectList = DeviceVariables.ToListWithDequeue();
if (collectList.Count != 0)
@@ -248,7 +248,7 @@ public class HistoryValueWorker : BackgroundService
//插入
try
{
count = await sqlSugarClient.Insertable(collecthis).ExecuteCommandAsync(stoppingToken.Token);
count = await sqlSugarClient.Insertable(collecthis).ExecuteCommandAsync(stoppingToken);
LastIsSuccess = true;
}
catch (Exception ex)
@@ -264,7 +264,7 @@ public class HistoryValueWorker : BackgroundService
}
if (stoppingToken.Token.IsCancellationRequested)
if (stoppingToken.IsCancellationRequested)
break;
var changeList = ChangeDeviceVariables.ToListWithDequeue();
if (changeList.Count != 0)
@@ -275,7 +275,7 @@ public class HistoryValueWorker : BackgroundService
//插入
try
{
count = await sqlSugarClient.Insertable(changehis).ExecuteCommandAsync(stoppingToken.Token);
count = await sqlSugarClient.Insertable(changehis).ExecuteCommandAsync(stoppingToken);
LastIsSuccess = true;
}
catch (Exception ex)

View File

@@ -84,7 +84,7 @@ public class MemoryVariableWorker : BackgroundService
/// <summary>
/// 循环线程取消标识
/// </summary>
public ConcurrentList<CancellationTokenSource> StoppingTokens = new();
private ConcurrentList<CancellationTokenSource> StoppingTokens = new();
private Task MemoryWorkerTask;
/// <summary>
/// 全部重启锁
@@ -95,7 +95,7 @@ public class MemoryVariableWorker : BackgroundService
/// </summary>
public async Task InitAsync()
{
CancellationTokenSource stoppingToken = StoppingTokens.Last();
var stoppingToken = StoppingTokens.Last().Token;
MemoryWorkerTask = await Task.Factory.StartNew(async () =>
{
_logger?.LogInformation($"中间变量计算线程开始");
@@ -105,13 +105,13 @@ public class MemoryVariableWorker : BackgroundService
var data = await variableService.GetMemoryVariableRuntimeAsync();
_globalDeviceData.MemoryVariables = new(data);
StatuString = OperResult.CreateSuccessResult();
while (!stoppingToken.Token.IsCancellationRequested)
while (!stoppingToken.IsCancellationRequested)
{
try
{
await Task.Delay(500, stoppingToken.Token);
await Task.Delay(500, stoppingToken);
if (stoppingToken.Token.IsCancellationRequested)
if (stoppingToken.IsCancellationRequested)
break;
var isSuccess = true;
foreach (var item in _globalDeviceData.MemoryVariables)

View File

@@ -26,7 +26,7 @@ public class UploadDeviceThread : IAsyncDisposable
/// <summary>
/// CancellationTokenSources
/// </summary>
public ConcurrentList<CancellationTokenSource> StoppingTokens = new();
private ConcurrentList<CancellationTokenSource> StoppingTokens = new();
/// <summary>
/// 线程
@@ -154,7 +154,7 @@ public class UploadDeviceThread : IAsyncDisposable
/// </summary>
protected async Task InitTaskAsync()
{
CancellationTokenSource stoppingToken = StoppingTokens.Last();
var stoppingToken = StoppingTokens.Last().Token;
DeviceTask = await Task.Factory.StartNew(async () =>
{
LoggerGroup log = UploadDeviceCores.FirstOrDefault().Driver.LogMessage;
@@ -168,7 +168,7 @@ public class UploadDeviceThread : IAsyncDisposable
//添加通道报文到每个设备
var data = new EasyLogger(device.Driver.NewMessage) { LogLevel = TouchSocket.Core.LogLevel.Trace };
log.AddLogger(data);
await device.BeforeActionAsync(stoppingToken.Token);
await device.BeforeActionAsync(stoppingToken);
}
while (!stoppingToken.IsCancellationRequested)
@@ -183,7 +183,7 @@ public class UploadDeviceThread : IAsyncDisposable
if (device.IsInitSuccess)
{
var result = await device.RunActionAsync(stoppingToken.Token);
var result = await device.RunActionAsync(stoppingToken);
if (result == ThreadRunReturn.None)
{
await Task.Delay(CycleInterval);

View File

@@ -48,7 +48,7 @@ public class UploadDeviceWorker : BackgroundService
/// 上传设备List
/// </summary>
public List<UploadDeviceCore> UploadDeviceCores => UploadDeviceThreads
.Where(a => !a.StoppingTokens.Any(b => b.IsCancellationRequested))
.Where(a => a.UploadDeviceCores.Any(b => b.Device != null))
.SelectMany(a => a.UploadDeviceCores).ToList();
/// <summary>
/// 全部设备子线程
@@ -302,8 +302,8 @@ public class UploadDeviceWorker : BackgroundService
var Propertys = _pluginService.GetDriverProperties(driver);
if (devId != 0)
{
var devcore = UploadDeviceCores.FirstOrDefault(it => it.Device.Id == devId);
devcore?.Device?.DevicePropertys?.ForEach(it =>
var devcore = App.GetService<UploadDeviceService>().GetDeviceById(devId);
devcore?.DevicePropertys?.ForEach(it =>
{
var dependencyProperty = Propertys.FirstOrDefault(a => a.PropertyName == it.PropertyName);
if (dependencyProperty != null)

View File

@@ -170,7 +170,7 @@ public abstract class DriverDebugUIBase : ComponentBase, IDisposable
isDownExport = true;
StateHasChanged();
using var memoryStream = new MemoryStream();
StreamWriter writer = new(memoryStream);
using StreamWriter writer = new(memoryStream);
foreach (var item in values)
{
writer.WriteLine(item);

View File

@@ -155,7 +155,7 @@ public partial class DeviceStatusPage : IDisposable
isDownExport = true;
StateHasChanged();
using var memoryStream = new MemoryStream();
StreamWriter writer = new(memoryStream);
using StreamWriter writer = new(memoryStream);
foreach (var item in values)
{
writer.WriteLine(item);

View File

@@ -33,7 +33,7 @@
@layout MainLayout
<MRow NoGutters>
<MExpansionPanels @bind-Values="panel"
<MExpansionPanels @bind-Values="Panel"
Multiple>
<MExpansionPanel Value="1">
<MExpansionPanelHeader>

View File

@@ -35,7 +35,7 @@ namespace ThingsGateway.Blazor;
/// </summary>
public partial class ManageGatewayPage
{
List<StringNumber> panel { get; set; } = new();
List<StringNumber> Panel { get; set; } = new();
readonly PeriodicTimer _periodicTimer = new(TimeSpan.FromSeconds(5));
ManageGatewayWorker ManageGatewayWorker { get; set; }
@@ -45,7 +45,7 @@ public partial class ManageGatewayPage
protected override void OnInitialized()
{
ManageGatewayWorker = ServiceHelper.GetBackgroundService<ManageGatewayWorker>();
panel.Add("2");
Panel.Add("2");
_ = RunTimerAsync();
base.OnInitialized();
}
@@ -158,11 +158,13 @@ public partial class ManageGatewayPage
/// <returns></returns>
private async Task DBDown(MqttClientStatus mqttClientStatus)
{
MqttDBDownRpc rpc = new MqttDBDownRpc();
rpc.IsCollectDevicesFullUp = IsCollectDevicesFullUp;
rpc.IsDeviceVariablesFullUp = IsDeviceVariablesFullUp;
rpc.IsUploadDevicesFullUp = IsUploadDevicesFullUp;
rpc.IsRestart = IsRestart;
MqttDBDownRpc rpc = new()
{
IsCollectDevicesFullUp = IsCollectDevicesFullUp,
IsDeviceVariablesFullUp = IsDeviceVariablesFullUp,
IsUploadDevicesFullUp = IsUploadDevicesFullUp,
IsRestart = IsRestart
};
if (_importCollectDevicesFile != null)
{
@@ -193,11 +195,7 @@ public partial class ManageGatewayPage
var data = await ManageGatewayWorker.SetClientGatewayDBAsync(mqttClientStatus.Id, rpc);
if (data.IsSuccess)
{
if (data.Content.IsSuccess)
await PopupService.EnqueueSnackbarAsync("下发成功", AlertTypes.Success);
else
await PopupService.EnqueueSnackbarAsync(data.Content.Message, AlertTypes.Error);
await PopupService.EnqueueSnackbarAsync("下发成功", AlertTypes.Success);
}
else
{

View File

@@ -0,0 +1,142 @@
#region copyright
//------------------------------------------------------------------------------
// 此代码版权声明为全文件覆盖,如有原作者特别声明,会在下方手动补充
// 此代码版权除特别声明外的代码归作者本人Diego所有
// 源代码使用协议遵循本仓库的开源协议及附加协议
// Gitee源代码仓库https://gitee.com/diego2098/ThingsGateway
// Github源代码仓库https://github.com/kimdiego2098/ThingsGateway
// 使用文档https://diego2098.gitee.io/thingsgateway-docs/
// QQ群605534569
//------------------------------------------------------------------------------
#endregion
//------------------------------------------------------------------------------
// 此代码版权除特别声明或在XREF结尾的命名空间的代码归作者本人若汝棋茗所有
// 源代码使用协议遵循本仓库的开源协议及附加协议若本仓库没有设置则按MIT开源协议授权
// CSDN博客https://blog.csdn.net/qq_40374647
// 哔哩哔哩视频https://space.bilibili.com/94253567
// Gitee源代码仓库https://gitee.com/RRQM_Home
// Github源代码仓库https://github.com/RRQM
// API首页http://rrqm_home.gitee.io/touchsocket/
// 交流QQ群234762506
// 感谢您的下载和使用
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
namespace TouchSocket.Core
{
/// <summary>
/// 等待数据对象
/// </summary>
/// <typeparam name="T"></typeparam>
public class WaitDataEx<T> : DisposableObject, IWaitData<T>
{
private readonly AutoResetEvent m_waitHandle;
private volatile WaitDataStatus m_status;
/// <summary>
/// WaitData
/// </summary>
public WaitDataEx()
{
this.m_waitHandle = new AutoResetEvent(false);
}
/// <inheritdoc/>
public WaitDataStatus Status { get => this.m_status; }
/// <inheritdoc/>
public T WaitResult { get; private set; }
/// <inheritdoc/>
public void Cancel()
{
this.m_status = WaitDataStatus.Canceled;
this.m_waitHandle.Set();
}
/// <inheritdoc/>
public void Reset()
{
this.m_status = WaitDataStatus.Default;
this.WaitResult = default;
this.m_waitHandle.Reset();
}
/// <inheritdoc/>
public bool Set()
{
this.m_status = WaitDataStatus.SetRunning;
return this.m_waitHandle.Set();
}
/// <inheritdoc/>
public bool Set(T waitResult)
{
this.WaitResult = waitResult;
this.m_status = WaitDataStatus.SetRunning;
return this.m_waitHandle.Set();
}
CancellationTokenRegistration registration = default;
/// <inheritdoc/>
public void SetCancellationToken(CancellationToken cancellationToken)
{
if (registration == default)
{
if (cancellationToken.CanBeCanceled)
registration = cancellationToken.Register(this.Cancel);
}
else
{
registration.Dispose();
if (cancellationToken.CanBeCanceled)
registration = cancellationToken.Register(this.Cancel);
}
}
/// <inheritdoc/>
public void SetResult(T result)
{
this.WaitResult = result;
}
/// <summary>
/// 等待指定时间
/// </summary>
/// <param name="timeSpan"></param>
public WaitDataStatus Wait(TimeSpan timeSpan)
{
return this.Wait((int)timeSpan.TotalMilliseconds);
}
/// <summary>
/// 等待指定毫秒
/// </summary>
/// <param name="millisecond"></param>
public WaitDataStatus Wait(int millisecond)
{
if (!this.m_waitHandle.WaitOne(millisecond))
{
this.m_status = WaitDataStatus.Overtime;
}
return this.m_status;
}
/// <inheritdoc/>
protected override void Dispose(bool disposing)
{
this.m_status = WaitDataStatus.Disposed;
this.WaitResult = default;
this.m_waitHandle.SafeDispose();
this.registration.Dispose();
base.Dispose(disposing);
}
}
/// <summary>
/// 等待数据对象
/// </summary>
public class WaitDataEx : WaitData<object>
{
}
}

View File

@@ -0,0 +1,133 @@
#region copyright
//------------------------------------------------------------------------------
// 此代码版权声明为全文件覆盖,如有原作者特别声明,会在下方手动补充
// 此代码版权除特别声明外的代码归作者本人Diego所有
// 源代码使用协议遵循本仓库的开源协议及附加协议
// Gitee源代码仓库https://gitee.com/diego2098/ThingsGateway
// Github源代码仓库https://github.com/kimdiego2098/ThingsGateway
// 使用文档https://diego2098.gitee.io/thingsgateway-docs/
// QQ群605534569
//------------------------------------------------------------------------------
#endregion
namespace TouchSocket.Core
{
/// <summary>
/// 等待数据对象
/// </summary>
/// <typeparam name="T"></typeparam>
public class WaitDataExAsync<T> : DisposableObject, IWaitData<T>
{
private readonly AsyncAutoResetEvent m_asyncWaitHandle;
private volatile WaitDataStatus m_status;
/// <summary>
/// 构造函数
/// </summary>
public WaitDataExAsync()
{
this.m_asyncWaitHandle = new AsyncAutoResetEvent(false);
}
/// <inheritdoc/>
public WaitDataStatus Status { get => this.m_status; }
/// <inheritdoc/>
public T WaitResult { get; private set; }
/// <inheritdoc/>
public void Cancel()
{
this.m_status = WaitDataStatus.Canceled;
this.m_asyncWaitHandle.Set();
}
/// <inheritdoc/>
public void Reset()
{
this.m_status = WaitDataStatus.Default;
this.WaitResult = default;
this.m_asyncWaitHandle.Reset();
}
/// <inheritdoc/>
public bool Set()
{
this.m_status = WaitDataStatus.SetRunning;
return this.m_asyncWaitHandle.Set();
}
/// <inheritdoc/>
public bool Set(T waitResult)
{
this.WaitResult = waitResult;
this.m_status = WaitDataStatus.SetRunning;
return this.m_asyncWaitHandle.Set();
}
CancellationTokenRegistration registration = default;
/// <inheritdoc/>
public void SetCancellationToken(CancellationToken cancellationToken)
{
if (registration == default)
{
if (cancellationToken.CanBeCanceled)
registration = cancellationToken.Register(this.Cancel);
}
else
{
registration.Dispose();
if (cancellationToken.CanBeCanceled)
registration = cancellationToken.Register(this.Cancel);
}
}
/// <inheritdoc/>
public void SetResult(T result)
{
this.WaitResult = result;
}
/// <summary>
/// 等待指定时间
/// </summary>
/// <param name="timeSpan"></param>
/// <returns></returns>
public async Task<WaitDataStatus> WaitAsync(TimeSpan timeSpan)
{
if (!await this.m_asyncWaitHandle.WaitOneAsync(timeSpan))
{
this.m_status = WaitDataStatus.Overtime;
}
return this.m_status;
}
/// <summary>
/// 等待指定毫秒
/// </summary>
/// <param name="millisecond"></param>
public Task<WaitDataStatus> WaitAsync(int millisecond)
{
return this.WaitAsync(TimeSpan.FromMilliseconds(millisecond));
}
/// <inheritdoc/>
protected override void Dispose(bool disposing)
{
this.m_status = WaitDataStatus.Disposed;
this.WaitResult = default;
this.m_asyncWaitHandle.SafeDispose();
this.registration.Dispose();
base.Dispose(disposing);
}
}
/// <summary>
/// 等待数据对象
/// </summary>
public class WaitDataExAsync : WaitDataAsync<object>
{
}
}

View File

@@ -20,8 +20,8 @@ internal class WaitingClientEx<TClient> : DisposableObject, IWaitingClient<TClie
{
private readonly Func<ResponsedData, bool> m_func;
private readonly EasyLock easyLock = new();
private readonly WaitData<ResponsedData> m_waitData = new();
private readonly WaitDataAsync<ResponsedData> m_waitDataAsync = new();
private readonly WaitDataEx<ResponsedData> m_waitData = new();
private readonly WaitDataExAsync<ResponsedData> m_waitDataAsync = new();
private volatile bool m_breaked;

View File

@@ -13,7 +13,7 @@
<ItemGroup>
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
<PackageReference Include="TouchSocket" Version="2.0.0-beta.163" />
<PackageReference Include="TouchSocket" Version="2.0.0-beta.176" />
</ItemGroup>
<ItemGroup Condition="'$(TargetFramework)'!='net45'">
<PackageReference Include="System.IO.Ports" Version="7.0.0" />

View File

@@ -2745,5 +2745,116 @@
<member name="M:ThingsGateway.Foundation.ThingsGatewayBitConverter.ToUInt64(System.Byte[],System.Int32,System.Int32)">
<inheritdoc/>
</member>
<member name="T:TouchSocket.Core.WaitDataEx`1">
<summary>
等待数据对象
</summary>
<typeparam name="T"></typeparam>
</member>
<member name="M:TouchSocket.Core.WaitDataEx`1.#ctor">
<summary>
WaitData
</summary>
</member>
<member name="P:TouchSocket.Core.WaitDataEx`1.Status">
<inheritdoc/>
</member>
<member name="P:TouchSocket.Core.WaitDataEx`1.WaitResult">
<inheritdoc/>
</member>
<member name="M:TouchSocket.Core.WaitDataEx`1.Cancel">
<inheritdoc/>
</member>
<member name="M:TouchSocket.Core.WaitDataEx`1.Reset">
<inheritdoc/>
</member>
<member name="M:TouchSocket.Core.WaitDataEx`1.Set">
<inheritdoc/>
</member>
<member name="M:TouchSocket.Core.WaitDataEx`1.Set(`0)">
<inheritdoc/>
</member>
<member name="M:TouchSocket.Core.WaitDataEx`1.SetCancellationToken(System.Threading.CancellationToken)">
<inheritdoc/>
</member>
<member name="M:TouchSocket.Core.WaitDataEx`1.SetResult(`0)">
<inheritdoc/>
</member>
<member name="M:TouchSocket.Core.WaitDataEx`1.Wait(System.TimeSpan)">
<summary>
等待指定时间
</summary>
<param name="timeSpan"></param>
</member>
<member name="M:TouchSocket.Core.WaitDataEx`1.Wait(System.Int32)">
<summary>
等待指定毫秒
</summary>
<param name="millisecond"></param>
</member>
<member name="M:TouchSocket.Core.WaitDataEx`1.Dispose(System.Boolean)">
<inheritdoc/>
</member>
<member name="T:TouchSocket.Core.WaitDataEx">
<summary>
等待数据对象
</summary>
</member>
<member name="T:TouchSocket.Core.WaitDataExAsync`1">
<summary>
等待数据对象
</summary>
<typeparam name="T"></typeparam>
</member>
<member name="M:TouchSocket.Core.WaitDataExAsync`1.#ctor">
<summary>
构造函数
</summary>
</member>
<member name="P:TouchSocket.Core.WaitDataExAsync`1.Status">
<inheritdoc/>
</member>
<member name="P:TouchSocket.Core.WaitDataExAsync`1.WaitResult">
<inheritdoc/>
</member>
<member name="M:TouchSocket.Core.WaitDataExAsync`1.Cancel">
<inheritdoc/>
</member>
<member name="M:TouchSocket.Core.WaitDataExAsync`1.Reset">
<inheritdoc/>
</member>
<member name="M:TouchSocket.Core.WaitDataExAsync`1.Set">
<inheritdoc/>
</member>
<member name="M:TouchSocket.Core.WaitDataExAsync`1.Set(`0)">
<inheritdoc/>
</member>
<member name="M:TouchSocket.Core.WaitDataExAsync`1.SetCancellationToken(System.Threading.CancellationToken)">
<inheritdoc/>
</member>
<member name="M:TouchSocket.Core.WaitDataExAsync`1.SetResult(`0)">
<inheritdoc/>
</member>
<member name="M:TouchSocket.Core.WaitDataExAsync`1.WaitAsync(System.TimeSpan)">
<summary>
等待指定时间
</summary>
<param name="timeSpan"></param>
<returns></returns>
</member>
<member name="M:TouchSocket.Core.WaitDataExAsync`1.WaitAsync(System.Int32)">
<summary>
等待指定毫秒
</summary>
<param name="millisecond"></param>
</member>
<member name="M:TouchSocket.Core.WaitDataExAsync`1.Dispose(System.Boolean)">
<inheritdoc/>
</member>
<member name="T:TouchSocket.Core.WaitDataExAsync">
<summary>
等待数据对象
</summary>
</member>
</members>
</doc>

View File

@@ -1,7 +1,7 @@
<Project>
<PropertyGroup>
<TargetFrameworks>net6.0;net7.0</TargetFrameworks>
<Version>2.1.0.9</Version>
<Version>2.1.0.13</Version>
<Authors>Diego</Authors>
<Product>ThingsGateway</Product>
<Copyright>© 2023-present Diego</Copyright>