MqttClient增加间隔上传选项;更改线程循环间隔说明定义

This commit is contained in:
2248356998 qq.com
2023-05-12 16:31:57 +08:00
parent 58d5801fb5
commit d0a2cea772
7 changed files with 168 additions and 68 deletions

View File

@@ -28,7 +28,7 @@ public class KafkaProducerProperty : DriverPropertyBase
[DeviceProperty("变量主题", "")] public string VariableTopic { get; set; } = "test2";
[DeviceProperty("分组ID", "")] public string GroupId { get; set; } = "test-consumer-group";
[DeviceProperty("客户端ID", "")] public string ClientId { get; set; } = "test-consumer";
[DeviceProperty("循环间隔", "最小500ms")] public int CycleInterval { get; set; } = 1000;
[DeviceProperty("线程循环间隔", "最小500ms")] public int CycleInterval { get; set; } = 1000;
}
/// <summary>
@@ -242,9 +242,9 @@ public class KafkaProducer : UpLoadBase
_logger?.LogWarning(ex, ToString());
}
if (driverPropertys.CycleInterval > 500 + 50)
if (driverPropertys.CycleInterval > UploadDeviceThread.CycleInterval + 50)
{
await Task.Delay(driverPropertys.CycleInterval - 500);
await Task.Delay(driverPropertys.CycleInterval - UploadDeviceThread.CycleInterval);
}
else
{

View File

@@ -175,9 +175,9 @@ public class IotSharpClient : UpLoadBase
_logger?.LogWarning(ex, ToString());
}
if (driverPropertys.CycleInterval > 500 + 50)
if (driverPropertys.CycleInterval > UploadDeviceThread.CycleInterval + 50)
{
await Task.Delay(driverPropertys.CycleInterval - 500);
await Task.Delay(driverPropertys.CycleInterval - UploadDeviceThread.CycleInterval);
}
else
{
@@ -499,7 +499,7 @@ public class IotSharpClientProperty : UpDriverPropertyBase
[DeviceProperty("Accesstoken", "")] public string Accesstoken { get; set; } = "Accesstoken";
[DeviceProperty("连接超时时间", "")] public int ConnectTimeOut { get; set; } = 3000;
[DeviceProperty("允许Rpc写入", "")] public bool DeviceRpcEnable { get; set; }
[DeviceProperty("循环间隔", "最小500ms")] public int CycleInterval { get; set; } = 1000;
[DeviceProperty("线程循环间隔", "最小500ms")] public int CycleInterval { get; set; } = 1000;
}
public class IotSharpClientVariableProperty : VariablePropertyBase
{

View File

@@ -21,9 +21,8 @@ namespace ThingsGateway.Mqtt;
public class MqttClientProperty : UpDriverPropertyBase
{
[DeviceProperty("是否间隔上传", "False时为变化检测上传")] public bool IsInterval { get; set; } = false;
[DeviceProperty("上传间隔时间", "最小1000ms")] public int UploadInterval { get; set; } = 1000;
[DeviceProperty("IP", "")] public string IP { get; set; } = "127.0.0.1";
@@ -34,7 +33,7 @@ public class MqttClientProperty : UpDriverPropertyBase
[DeviceProperty("连接超时时间", "")] public int ConnectTimeOut { get; set; } = 3000;
[DeviceProperty("循环间隔", "最小500ms")] public int CycleInterval { get; set; } = 1000;
[DeviceProperty("线程循环间隔", "最小500ms")] public int CycleInterval { get; set; } = 1000;
[DeviceProperty("允许Rpc写入", "")] public bool DeviceRpcEnable { get; set; }
@@ -59,6 +58,7 @@ public class MqttClientVariableProperty : VariablePropertyBase
}
public class MqttClient : UpLoadBase
{
private List<CollectDeviceRunTime> _collectDevice;
private UploadDevice _curDevice;
private GlobalCollectDeviceData _globalCollectDeviceData;
@@ -77,6 +77,7 @@ public class MqttClient : UpLoadBase
private ConcurrentQueue<VariableData> CollectVariableRunTimes = new();
private MqttClientProperty driverPropertys = new();
private TimerTick exTimerTick;
private EasyLock lockobj = new();
private MqttClientVariableProperty variablePropertys = new();
@@ -94,6 +95,7 @@ public class MqttClient : UpLoadBase
public override VariablePropertyBase VariablePropertys => variablePropertys;
public override async Task BeforStartAsync()
{
if (_mqttClient != null)
{
var result = await TryMqttClientAsync();
@@ -124,42 +126,91 @@ public class MqttClient : UpLoadBase
}
}
public override async Task ExecuteAsync(CancellationToken cancellationToken)
{
try
{
////变化推送
var varList = CollectVariableRunTimes.ToListWithDequeue();
if (varList?.Count != 0)
if (!driverPropertys.IsInterval)
{
//分解List避免超出mqtt字节大小限制
var varData = varList.ChunkTrivialBetter(500);
foreach (var item in varData)
////变化推送
var varList = CollectVariableRunTimes.ToListWithDequeue();
if (varList?.Count != 0)
{
//分解List避免超出mqtt字节大小限制
var varData = varList.ChunkTrivialBetter(500);
foreach (var item in varData)
{
try
{
if (!cancellationToken.IsCancellationRequested)
{
var variableMessage = new MqttApplicationMessageBuilder()
.WithTopic($"{driverPropertys.VariableTopic}")
.WithPayload(item.GetSciptListValue(driverPropertys.BigTextScriptVariableModel)).Build();
var isConnect = await TryMqttClientAsync();
if (isConnect.IsSuccess)
await _mqttClient.PublishAsync(variableMessage, cancellationToken);
}
else
{
break;
}
}
catch (Exception ex)
{
_logger.LogWarning(ex, ToString());
}
}
}
}
else
{
if (exTimerTick.IsTickHappen())
{
try
{
if (!cancellationToken.IsCancellationRequested)
var varList = _uploadVariables;
if (varList?.Count != 0)
{
var variableMessage = new MqttApplicationMessageBuilder()
.WithTopic($"{driverPropertys.VariableTopic}")
.WithPayload(item.GetSciptListValue(driverPropertys.BigTextScriptVariableModel)).Build();
var isConnect = await TryMqttClientAsync();
if (isConnect.IsSuccess)
await _mqttClient.PublishAsync(variableMessage, cancellationToken);
}
else
{
break;
}
//分解List避免超出mqtt字节大小限制
var varData = varList.ChunkTrivialBetter(500);
foreach (var item in varData)
{
try
{
if (!cancellationToken.IsCancellationRequested)
{
var variableMessage = new MqttApplicationMessageBuilder()
.WithTopic($"{driverPropertys.VariableTopic}")
.WithPayload(item.GetSciptListValue(driverPropertys.BigTextScriptVariableModel)).Build();
var isConnect = await TryMqttClientAsync();
if (isConnect.IsSuccess)
await _mqttClient.PublishAsync(variableMessage, cancellationToken);
}
else
{
break;
}
}
catch (Exception ex)
{
_logger.LogWarning(ex, ToString());
}
}
}
}
catch (Exception ex)
{
_logger.LogWarning(ex, ToString());
_logger?.LogWarning(ex, ToString());
}
}
}
}
catch (Exception ex)
@@ -168,46 +219,87 @@ public class MqttClient : UpLoadBase
}
try
{
////变化推送
var devList = CollectDeviceRunTimes.ToListWithDequeue();
if (devList?.Count != 0)
if (!driverPropertys.IsInterval)
{
//分解List避免超出mqtt字节大小限制
var devData = devList.ChunkTrivialBetter(500);
foreach (var item in devData)
////变化推送
var devList = CollectDeviceRunTimes.ToListWithDequeue();
if (devList?.Count != 0)
{
try
//分解List避免超出mqtt字节大小限制
var devData = devList.ChunkTrivialBetter(500);
foreach (var item in devData)
{
if (!cancellationToken.IsCancellationRequested)
try
{
var variableMessage = new MqttApplicationMessageBuilder()
.WithTopic($"{driverPropertys.DeviceTopic}")
.WithPayload(item.GetSciptListValue(driverPropertys.BigTextScriptDeviceModel)).Build();
var isConnect = await TryMqttClientAsync();
if (isConnect.IsSuccess)
await _mqttClient.PublishAsync(variableMessage);
if (!cancellationToken.IsCancellationRequested)
{
var variableMessage = new MqttApplicationMessageBuilder()
.WithTopic($"{driverPropertys.DeviceTopic}")
.WithPayload(item.GetSciptListValue(driverPropertys.BigTextScriptDeviceModel)).Build();
var isConnect = await TryMqttClientAsync();
if (isConnect.IsSuccess)
await _mqttClient.PublishAsync(variableMessage);
}
else
{
break;
}
}
else
catch (Exception ex)
{
break;
_logger.LogWarning(ex, ToString());
}
}
catch (Exception ex)
{
_logger.LogWarning(ex, ToString());
}
}
}
else
{
var devList = _collectDevice;
if (devList?.Count != 0)
{
//分解List避免超出mqtt字节大小限制
var devData = devList.ChunkTrivialBetter(500);
foreach (var item in devData)
{
try
{
if (!cancellationToken.IsCancellationRequested)
{
var variableMessage = new MqttApplicationMessageBuilder()
.WithTopic($"{driverPropertys.DeviceTopic}")
.WithPayload(item.GetSciptListValue(driverPropertys.BigTextScriptDeviceModel)).Build();
var isConnect = await TryMqttClientAsync();
if (isConnect.IsSuccess)
await _mqttClient.PublishAsync(variableMessage);
}
else
{
break;
}
}
catch (Exception ex)
{
_logger.LogWarning(ex, ToString());
}
}
}
}
}
catch (Exception ex)
{
_logger?.LogWarning(ex, ToString());
}
if (driverPropertys.CycleInterval > 500 + 50)
if (driverPropertys.CycleInterval > UploadDeviceThread.CycleInterval + 50)
{
await Task.Delay(driverPropertys.CycleInterval - 500);
await Task.Delay(driverPropertys.CycleInterval - UploadDeviceThread.CycleInterval);
}
else
{
@@ -280,7 +372,8 @@ public class MqttClient : UpLoadBase
_uploadVariables = tags;
_globalCollectDeviceData.CollectDevices.Where(a => _uploadVariables.Select(b => b.DeviceId).Contains(a.Id)).ForEach(a =>
_collectDevice= _globalCollectDeviceData.CollectDevices.Where(a => _uploadVariables.Select(b => b.DeviceId).Contains(a.Id)).ToList();
_collectDevice.ForEach(a =>
{
a.DeviceStatusCahnge += DeviceStatusCahnge;
});
@@ -288,11 +381,10 @@ public class MqttClient : UpLoadBase
{
a.VariableValueChange += VariableValueChange;
});
if (driverPropertys.UploadInterval <= 1000) driverPropertys.UploadInterval = 1000;
exTimerTick = new(driverPropertys.UploadInterval);
}
private async Task _mqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
{
if (arg.ApplicationMessage.Topic == driverPropertys.QuestRpcTopic && arg.ApplicationMessage.PayloadSegment.Count > 0)

View File

@@ -27,12 +27,11 @@ namespace ThingsGateway.Mqtt;
public class MqttServerProperty : UpDriverPropertyBase
{
[DeviceProperty("IP", "留空则全部监听")] public string IP { get; set; } = "";
[DeviceProperty("端口", "")] public int Port { get; set; } = 1883;
[DeviceProperty("允许连接的ID(前缀)", "")] public string StartWithId { get; set; } = "ThingsGatewayId";
[DeviceProperty("允许Rpc写入", "")] public bool DeviceRpcEnable { get; set; }
[DeviceProperty("循环间隔", "最小500ms")] public int CycleInterval { get; set; } = 1000;
[DeviceProperty("线程循环间隔", "最小500ms")] public int CycleInterval { get; set; } = 1000;
[DeviceProperty("设备Topic", "")] public string DeviceTopic { get; set; } = "ThingsGateway/Device";
[DeviceProperty("变量Topic", "")] public string VariableTopic { get; set; } = "ThingsGateway/Variable";
[DeviceProperty("Rpc返回Topic", "")] public string RpcSubTopic { get; set; } = "ThingsGateway/RpcSub";
@@ -177,9 +176,9 @@ public class MqttServer : UpLoadBase
{
_logger.LogWarning(ex, ToString());
}
if (driverPropertys.CycleInterval > 500 + 50)
if (driverPropertys.CycleInterval > UploadDeviceThread.CycleInterval + 50)
{
await Task.Delay(driverPropertys.CycleInterval - 500);
await Task.Delay(driverPropertys.CycleInterval - UploadDeviceThread.CycleInterval);
}
else
{

View File

@@ -28,7 +28,7 @@ public class RabbitMQClientProperty : UpDriverPropertyBase
//[DeviceProperty("交换机名称", "")] public string ExchangeName { get; set; } = "RM";
[DeviceProperty("变量队列名称", "")] public string VariableQueueName { get; set; } = "ThingsGateway/Variable";
[DeviceProperty("设备队列名称", "")] public string DeviceQueueName { get; set; } = "ThingsGateway/Device";
[DeviceProperty("循环间隔", "最小500ms")] public int CycleInterval { get; set; } = 1000;
[DeviceProperty("线程循环间隔", "最小500ms")] public int CycleInterval { get; set; } = 1000;
[DeviceProperty("设备实体脚本", "查看文档说明,为空时不起作用")] public string BigTextScriptDeviceModel { get; set; }
@@ -234,9 +234,9 @@ public class RabbitMQClient : UpLoadBase
_logger?.LogWarning(ex, ToString());
}
if (driverPropertys.CycleInterval > 100 + 50)
if (driverPropertys.CycleInterval > UploadDeviceThread.CycleInterval + 50)
{
await Task.Delay(driverPropertys.CycleInterval - 100);
await Task.Delay(driverPropertys.CycleInterval - UploadDeviceThread.CycleInterval);
}
else
{

View File

@@ -34,8 +34,12 @@ public class CollectDeviceThread : IDisposable
/// </summary>
public ConcurrentList<CancellationTokenSource> StoppingTokens = new();
/// <summary>
/// 初始化
/// 默认等待间隔时间
/// </summary>
public static int CycleInterval { get; } = 50;
/// <summary>
/// 初始化
/// </summary>
protected void InitTask()
{
CancellationTokenSource StoppingToken = StoppingTokens.Last();
@@ -82,11 +86,11 @@ public class CollectDeviceThread : IDisposable
var result = await device.RunActionAsync(StoppingToken);
if (result == ThreadRunReturn.None)
{
await Task.Delay(20);
await Task.Delay(CycleInterval);
}
else if (result == ThreadRunReturn.Continue)
{
await Task.Delay(20);
await Task.Delay(1000);
}
else if (result == ThreadRunReturn.Break)
{

View File

@@ -27,6 +27,11 @@ public class UploadDeviceThread : IDisposable
/// CancellationTokenSources
/// </summary>
public ConcurrentList<CancellationTokenSource> StoppingTokens = new();
/// <summary>
/// 默认等待间隔时间
/// </summary>
public static int CycleInterval { get; } =500;
/// <summary>
/// 初始化
/// </summary>
@@ -65,11 +70,11 @@ public class UploadDeviceThread : IDisposable
var result = await device.RunActionAsync(StoppingToken);
if (result == ThreadRunReturn.None)
{
await Task.Delay(20);
await Task.Delay(CycleInterval);
}
else if (result == ThreadRunReturn.Continue)
{
await Task.Delay(20);
await Task.Delay(1000);
}
else if (result == ThreadRunReturn.Break)
{