上传插件添加循环间隔属性

This commit is contained in:
2248356998 qq.com
2023-03-17 17:30:06 +08:00
parent 3bf6e1befc
commit b08282b0c1
3 changed files with 51 additions and 28 deletions

View File

@@ -50,6 +50,7 @@ namespace ThingsGateway.Mqtt
[DeviceProperty("变量Topic", "")] public string VariableTopic { get; set; } = "ThingsGateway/Variable";
[DeviceProperty("设备Topic", "")] public string DeviceTopic { get; set; } = "ThingsGateway/Device";
[DeviceProperty("循环间隔", "最小500ms")] public int CycleInterval { get; set; } = 1000;
public override async Task BeforStart()
@@ -172,17 +173,17 @@ namespace ThingsGateway.Mqtt
private GlobalCollectDeviceData _globalCollectDeviceData;
private IntelligentConcurrentQueue<CollectVariableRunTime> CollectVariableRunTimes { get; set; } = new(10000);
private IntelligentConcurrentQueue<CollectDeviceRunTime> CollectDeviceRunTimes { get; set; } = new(10000);
private IntelligentConcurrentQueue<VariableData> CollectVariableRunTimes { get; set; } = new(10000);
private IntelligentConcurrentQueue<DeviceData> CollectDeviceRunTimes { get; set; } = new(10000);
private void DeviceStatusCahnge(CollectDeviceRunTime collectDeviceRunTime)
{
CollectDeviceRunTimes.Enqueue(collectDeviceRunTime);
CollectDeviceRunTimes.Enqueue(collectDeviceRunTime.Adapt<DeviceData>());
}
private void VariableValueChange(CollectVariableRunTime collectVariableRunTime)
{
CollectVariableRunTimes.Enqueue(collectVariableRunTime);
CollectVariableRunTimes.Enqueue(collectVariableRunTime.Adapt<VariableData>());
}
private EasyLock lockobj { get; set; } = new();
private async Task<OperResult> TryMqttClient(bool reconnect = false)
@@ -302,7 +303,7 @@ namespace ThingsGateway.Mqtt
if (varList?.Count != 0)
{
//分解List避免超出mqtt字节大小限制
var varData = varList.Adapt<List<VariableData>>().ChunkTrivialBetter(500);
var varData = varList.ChunkTrivialBetter(500);
foreach (var item in varData)
{
try
@@ -332,7 +333,7 @@ namespace ThingsGateway.Mqtt
if (devList?.Count != 0)
{
//分解List避免超出mqtt字节大小限制
var devData = devList.Adapt<List<DeviceData>>().ChunkTrivialBetter(500);
var devData = devList.ChunkTrivialBetter(500);
foreach (var item in devData)
{
try
@@ -355,7 +356,15 @@ namespace ThingsGateway.Mqtt
{
_logger?.LogError(ex, ToString());
}
await Task.Delay(1000);
if (CycleInterval > 500 + 50)
{
await Task.Delay(CycleInterval - 500);
}
else
{
}
}
}

View File

@@ -40,6 +40,7 @@ namespace ThingsGateway.Mqtt
[DeviceProperty("变量Topic", "")] public string VariableTopic { get; set; } = "ThingsGateway/Variable";
[DeviceProperty("设备Topic", "")] public string DeviceTopic { get; set; } = "ThingsGateway/Device";
[DeviceProperty("循环间隔", "最小500ms")] public int CycleInterval { get; set; } = 1000;
public override string ToString()
{
@@ -128,17 +129,17 @@ namespace ThingsGateway.Mqtt
a.VariableValueChange += VariableValueChange;
});
}
private IntelligentConcurrentQueue<CollectVariableRunTime> CollectVariableRunTimes { get; set; } = new(10000);
private IntelligentConcurrentQueue<CollectDeviceRunTime> CollectDeviceRunTimes { get; set; } = new(10000);
private IntelligentConcurrentQueue<VariableData> CollectVariableRunTimes { get; set; } = new(10000);
private IntelligentConcurrentQueue<DeviceData> CollectDeviceRunTimes { get; set; } = new(10000);
private void DeviceStatusCahnge(CollectDeviceRunTime collectDeviceRunTime)
{
CollectDeviceRunTimes.Enqueue(collectDeviceRunTime);
CollectDeviceRunTimes.Enqueue(collectDeviceRunTime.Adapt<DeviceData>());
}
private void VariableValueChange(CollectVariableRunTime collectVariableRunTime)
{
CollectVariableRunTimes.Enqueue(collectVariableRunTime);
CollectVariableRunTimes.Enqueue(collectVariableRunTime.Adapt<VariableData>());
}
ConcurrentDictionary<string, string> IdWithName = new();
private async Task _mqttServer_ValidatingConnectionAsync(ValidatingConnectionEventArgs arg)
@@ -209,7 +210,7 @@ namespace ThingsGateway.Mqtt
if (varList?.Count != 0)
{
//分解List避免超出mqtt字节大小限制
var varData = varList.Adapt<List<VariableData>>().ChunkTrivialBetter(500);
var varData = varList.ChunkTrivialBetter(500);
foreach (var item in varData)
{
try
@@ -241,7 +242,7 @@ namespace ThingsGateway.Mqtt
if (devList?.Count != 0)
{
//分解List避免超出mqtt字节大小限制
var varData = devList.Adapt<List<DeviceData>>().ChunkTrivialBetter(500);
var varData = devList.ChunkTrivialBetter(500);
foreach (var item in varData)
{
try
@@ -266,8 +267,14 @@ namespace ThingsGateway.Mqtt
{
_logger.LogError(ex, ToString());
}
await Task.Delay(1000);
if (CycleInterval > 500 + 50)
{
await Task.Delay(CycleInterval - 500);
}
else
{
}
}
public override OperResult Success()

View File

@@ -47,6 +47,7 @@ namespace ThingsGateway.RabbitMQ
[DeviceProperty("设备队列名称", "")] public string DeviceQueueName { get; set; } = "ThingsGateway/Device";
[DeviceProperty("是否发布List", "")] public bool IsList { get; set; } = false;
[DeviceProperty("是否声明队列", "")] public bool IsQueueDeclare { get; set; } = false;
[DeviceProperty("循环间隔", "最小500ms")] public int CycleInterval { get; set; } = 1000;
public string ExchangeName { get; set; } = "";
@@ -106,7 +107,7 @@ namespace ThingsGateway.RabbitMQ
_globalCollectDeviceData.CollectVariables.ForEach(a =>
{
a.VariableValueChange += VariableValueChange;
CollectVariableRunTimes.Enqueue(a);
VariableValueChange(a);
});
@@ -122,17 +123,17 @@ namespace ThingsGateway.RabbitMQ
private GlobalCollectDeviceData _globalCollectDeviceData;
private IntelligentConcurrentQueue<CollectVariableRunTime> CollectVariableRunTimes { get; set; } = new(10000);
private IntelligentConcurrentQueue<CollectDeviceRunTime> CollectDeviceRunTimes { get; set; } = new(10000);
private IntelligentConcurrentQueue<VariableData> CollectVariableRunTimes { get; set; } = new(10000);
private IntelligentConcurrentQueue<DeviceData> CollectDeviceRunTimes { get; set; } = new(10000);
private void DeviceStatusCahnge(CollectDeviceRunTime collectDeviceRunTime)
{
CollectDeviceRunTimes.Enqueue(collectDeviceRunTime);
CollectDeviceRunTimes.Enqueue(collectDeviceRunTime.Adapt<DeviceData>());
}
private void VariableValueChange(CollectVariableRunTime collectVariableRunTime)
{
CollectVariableRunTimes.Enqueue(collectVariableRunTime);
CollectVariableRunTimes.Enqueue(collectVariableRunTime.Adapt<VariableData>());
}
public override async Task ExecuteAsync(CancellationToken cancellationToken)
@@ -145,13 +146,12 @@ namespace ThingsGateway.RabbitMQ
{
if (IsList)
{
var list = varList.Adapt<List<VariableData>>();
var listChunk = list.ChunkTrivialBetter(500);
var listChunk = varList.ChunkTrivialBetter(500);
foreach (var variables in listChunk)
{
try
{
var data = Encoding.UTF8.GetBytes(variables.Adapt<List<VariableData>>().ToJson());
var data = Encoding.UTF8.GetBytes(variables.ToJson());
// 设置消息持久化
IBasicProperties properties = _model.CreateBasicProperties();
properties.Persistent = true;
@@ -170,7 +170,7 @@ namespace ThingsGateway.RabbitMQ
{
try
{
var data = Encoding.UTF8.GetBytes(variable.Adapt<VariableData>().ToJson());
var data = Encoding.UTF8.GetBytes(variable.ToJson());
// 设置消息持久化
IBasicProperties properties = _model.CreateBasicProperties();
properties.Persistent = true;
@@ -198,13 +198,12 @@ namespace ThingsGateway.RabbitMQ
{
if (IsList)
{
var list = devList.Adapt<List<DeviceData>>();
var listChunk = list.ChunkTrivialBetter(500);
var listChunk = devList.ChunkTrivialBetter(500);
foreach (var devices in listChunk)
{
try
{
var data = Encoding.UTF8.GetBytes(devices.Adapt<List<DeviceData>>().ToJson());
var data = Encoding.UTF8.GetBytes(devices.ToJson());
// 设置消息持久化
IBasicProperties properties = _model.CreateBasicProperties();
properties.Persistent = true;
@@ -223,7 +222,7 @@ namespace ThingsGateway.RabbitMQ
{
try
{
var data = Encoding.UTF8.GetBytes(devices.Adapt<List<DeviceData>>().ToJson());
var data = Encoding.UTF8.GetBytes(devices.ToJson());
// 设置消息持久化
IBasicProperties properties = _model.CreateBasicProperties();
properties.Persistent = true;
@@ -244,7 +243,15 @@ namespace ThingsGateway.RabbitMQ
{
_logger?.LogError(ex, ToString());
}
await Task.Delay(1000);
if( CycleInterval>500+50)
{
await Task.Delay(CycleInterval-500);
}
else
{
}
}
}