kafka插件增加超时选项

This commit is contained in:
2248356998 qq.com
2023-07-15 17:27:35 +08:00
parent 87ebb7b6c7
commit 6a863cd26a
3 changed files with 40 additions and 17 deletions

View File

@@ -29,6 +29,7 @@ namespace ThingsGateway.Kafka;
using System.Collections.Concurrent;
using System.Runtime.InteropServices;
using System.Threading;
using ThingsGateway.Foundation.Extension.Enumerator;
using ThingsGateway.Foundation.Extension.Generic;
@@ -162,29 +163,50 @@ public class KafkaProducer : UpLoadBase
private async Task KafKaUp(string topic, string payLoad, CancellationToken cancellationToken)
{
var result = await producer.ProduceAsync(topic, new Message<Null, string> { Value = payLoad }, cancellationToken);
if (result.Status != PersistenceStatus.Persisted)
try
{
await CacheDb.AddCacheData(topic, payLoad, driverPropertys.CacheMaxCount);
}
else
{
//连接成功时补发缓存数据
var cacheData = await CacheDb.GetCacheData();
foreach (var item in cacheData)
using CancellationTokenSource cancellationTokenSource = new();
using CancellationTokenSource stoppingToken = CancellationTokenSource.CreateLinkedTokenSource(cancellationTokenSource.Token, cancellationToken);
Task<DeliveryResult<Null, string>> resultTask = producer.ProduceAsync(topic, new Message<Null, string> { Value = payLoad }, stoppingToken.Token);
var timeOutResult = await Task.WhenAny(resultTask, Task.Delay(driverPropertys.TimeOut, stoppingToken.Token));
if (timeOutResult == resultTask)
{
var cacheResult = await producer.ProduceAsync(item.Topic, new Message<Null, string> { Value = item.CacheStr }, cancellationToken);
if (cacheResult.Status == PersistenceStatus.Persisted)
var result = (timeOutResult as Task<DeliveryResult<Null, string>>).Result;
if (result.Status != PersistenceStatus.Persisted)
{
logMessage.Trace(LogMessageHeader + $"主题:{item.Topic}{Environment.NewLine}负载:{item.CacheStr}");
await CacheDb.AddCacheData(topic, payLoad, driverPropertys.CacheMaxCount);
}
else
{
//连接成功时补发缓存数据
var cacheData = await CacheDb.GetCacheData();
foreach (var item in cacheData)
{
var cacheResult = await producer.ProduceAsync(item.Topic, new Message<Null, string> { Value = item.CacheStr }, stoppingToken.Token);
if (cacheResult.Status == PersistenceStatus.Persisted)
{
logMessage.Trace(LogMessageHeader + $"主题:{item.Topic}{Environment.NewLine}负载:{item.CacheStr}");
await CacheDb.DeleteCacheData(item.Id);
}
}
logMessage.Trace(LogMessageHeader + $"主题:{topic}{Environment.NewLine}负载:{payLoad}");
await CacheDb.DeleteCacheData(item.Id);
}
}
logMessage.Trace(LogMessageHeader + $"主题:{topic}{Environment.NewLine}负载:{payLoad}");
else
{
stoppingToken.Cancel();
}
}
catch (Exception ex)
{
_logger.LogWarning(ex, ToString());
await CacheDb.AddCacheData(topic, payLoad, driverPropertys.CacheMaxCount);
}
}
public override OperResult IsConnected()

View File

@@ -25,6 +25,7 @@ public class KafkaProducerProperty : DriverPropertyBase
[DeviceProperty("分组ID", "")] public string GroupId { get; set; } = "test-consumer-group";
[DeviceProperty("客户端ID", "")] public string ClientId { get; set; } = "test-consumer";
[DeviceProperty("线程循环间隔", "最小10ms")] public int CycleInterval { get; set; } = 1000;
[DeviceProperty("发布超时时间", "ms")] public int TimeOut { get; set; } = 5000;
[DeviceProperty("缓存最大条数", "默认2千条")] public int CacheMaxCount { get; set; } = 2000;
}

View File

@@ -51,7 +51,7 @@ public class ThingsGatewayNodeManager : CustomNodeManager2
_config.ForType<HistoryValue, DataValue>()
.Map(dest => dest.WrappedValue, (src) => new Variant(src.Value))
.Map(dest => dest.SourceTimestamp, (src) => DateTime.SpecifyKind(src.CollectTime, DateTimeKind.Utc))
.Map(dest => dest.StatusCode, (src) => src.Quality == 192 ? StatusCodes.Good : StatusCodes.Bad);
.Map(dest => dest.StatusCode, (src) => src.IsOnline ? StatusCodes.Good : StatusCodes.Bad);
}