mirror of
https://gitee.com/ThingsGateway/ThingsGateway.git
synced 2025-10-30 15:13:59 +08:00
feat: 支持字节数组上传
This commit is contained in:
@@ -46,7 +46,7 @@ public partial class KafkaProducer : BusinessBaseWithCacheIntervalScript<Variabl
|
||||
_producerconfig.SaslPassword = _driverPropertys.SaslPassword;
|
||||
|
||||
//2、创建生产者
|
||||
_producerBuilder = new ProducerBuilder<Null, string>(_producerconfig);
|
||||
_producerBuilder = new ProducerBuilder<Null, byte[]>(_producerconfig);
|
||||
//3、错误日志监视
|
||||
_producerBuilder.SetErrorHandler((p, msg) =>
|
||||
{
|
||||
@@ -75,6 +75,14 @@ public partial class KafkaProducer : BusinessBaseWithCacheIntervalScript<Variabl
|
||||
/// <inheritdoc/>
|
||||
protected override void Dispose(bool disposing)
|
||||
{
|
||||
try
|
||||
{
|
||||
_producer?.Flush(TimeSpan.FromSeconds(10));
|
||||
}
|
||||
catch
|
||||
{
|
||||
|
||||
}
|
||||
_producer?.SafeDispose();
|
||||
base.Dispose(disposing);
|
||||
}
|
||||
|
||||
@@ -24,8 +24,8 @@ namespace ThingsGateway.Plugin.Kafka;
|
||||
/// </summary>
|
||||
public partial class KafkaProducer : BusinessBaseWithCacheIntervalScript<VariableBasicData, DeviceBasicData, AlarmVariable>
|
||||
{
|
||||
private IProducer<Null, string> _producer;
|
||||
private ProducerBuilder<Null, string> _producerBuilder;
|
||||
private IProducer<Null, byte[]> _producer;
|
||||
private ProducerBuilder<Null, byte[]> _producerBuilder;
|
||||
private ProducerConfig _producerconfig;
|
||||
private volatile bool producerSuccess = true;
|
||||
|
||||
@@ -127,7 +127,7 @@ public partial class KafkaProducer : BusinessBaseWithCacheIntervalScript<Variabl
|
||||
|
||||
#region private
|
||||
|
||||
private async ValueTask<OperResult> Update(List<TopicJson> topicJsonList, int count, CancellationToken cancellationToken)
|
||||
private async ValueTask<OperResult> Update(List<TopicArray> topicJsonList, int count, CancellationToken cancellationToken)
|
||||
{
|
||||
foreach (var topicJson in topicJsonList)
|
||||
{
|
||||
@@ -150,19 +150,19 @@ public partial class KafkaProducer : BusinessBaseWithCacheIntervalScript<Variabl
|
||||
|
||||
private async ValueTask<OperResult> UpdateAlarmModel(IEnumerable<AlarmVariable> item, CancellationToken cancellationToken)
|
||||
{
|
||||
List<TopicJson> topicJsonList = GetAlarms(item);
|
||||
var topicJsonList = GetAlarmTopicArrays(item);
|
||||
return await Update(topicJsonList, item.Count(), cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private async ValueTask<OperResult> UpdateDevModel(IEnumerable<DeviceBasicData> item, CancellationToken cancellationToken)
|
||||
{
|
||||
List<TopicJson> topicJsonList = GetDeviceData(item);
|
||||
var topicJsonList = GetDeviceTopicArray(item);
|
||||
return await Update(topicJsonList, item.Count(), cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private async ValueTask<OperResult> UpdateVarModel(IEnumerable<VariableBasicData> item, CancellationToken cancellationToken)
|
||||
{
|
||||
List<TopicJson> topicJsonList = GetVariableBasicData(item);
|
||||
var topicJsonList = GetVariableBasicDataTopicArray(item);
|
||||
return await Update(topicJsonList, item.Count(), cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
@@ -203,42 +203,36 @@ public partial class KafkaProducer : BusinessBaseWithCacheIntervalScript<Variabl
|
||||
/// <summary>
|
||||
/// kafka上传,返回上传结果
|
||||
/// </summary>
|
||||
public async ValueTask<OperResult> KafKaUpAsync(string topic, string payLoad, int count, CancellationToken cancellationToken)
|
||||
public async ValueTask<OperResult> KafKaUpAsync(string topic, byte[] payLoad, int count, CancellationToken cancellationToken)
|
||||
{
|
||||
try
|
||||
{
|
||||
using CancellationTokenSource cancellationTokenSource = new();
|
||||
using CancellationTokenSource cancellationTokenSource = new(_driverPropertys.Timeout);
|
||||
using CancellationTokenSource stoppingToken = CancellationTokenSource.CreateLinkedTokenSource(cancellationTokenSource.Token, cancellationToken);
|
||||
Task<DeliveryResult<Null, string>> resultTask = _producer.ProduceAsync(topic, new Message<Null, string> { Value = payLoad }, stoppingToken.Token);
|
||||
var timeOutResult = await Task.WhenAny(resultTask, Task.Delay(_driverPropertys.Timeout, stoppingToken.Token)).ConfigureAwait(false);
|
||||
if (timeOutResult == resultTask)
|
||||
var result = await _producer.ProduceAsync(topic, new Message<Null, byte[]> { Value = payLoad }, stoppingToken.Token).ConfigureAwait(false);
|
||||
if (result.Status != PersistenceStatus.Persisted)
|
||||
{
|
||||
var result = await resultTask.ConfigureAwait(false);
|
||||
if (result.Status != PersistenceStatus.Persisted)
|
||||
{
|
||||
return new OperResult("Upload fail");
|
||||
}
|
||||
else
|
||||
{
|
||||
if (_driverPropertys.DetailLog)
|
||||
{
|
||||
if (LogMessage.LogLevel <= TouchSocket.Core.LogLevel.Trace)
|
||||
LogMessage.LogTrace($"Topic:{topic}{Environment.NewLine}PayLoad:{payLoad} {Environment.NewLine} VarModelQueue:{_memoryVarModelQueue.Count}");
|
||||
}
|
||||
else
|
||||
{
|
||||
LogMessage.LogTrace($"Topic:{topic}{Environment.NewLine}Count:{count} {Environment.NewLine} VarModelQueue:{_memoryVarModelQueue.Count}");
|
||||
|
||||
}
|
||||
return OperResult.Success;
|
||||
}
|
||||
return new OperResult("Upload fail");
|
||||
}
|
||||
else
|
||||
{
|
||||
stoppingToken.Cancel();
|
||||
return new OperResult("Upload timeout");
|
||||
if (_driverPropertys.DetailLog)
|
||||
{
|
||||
if (LogMessage.LogLevel <= TouchSocket.Core.LogLevel.Trace)
|
||||
LogMessage.LogTrace(GetString(topic, payLoad, _memoryVarModelQueue.Count));
|
||||
}
|
||||
else
|
||||
{
|
||||
LogMessage.LogTrace($"Topic:{topic}{Environment.NewLine}Count:{count} {Environment.NewLine} VarModelQueue:{_memoryVarModelQueue.Count}");
|
||||
|
||||
}
|
||||
return OperResult.Success;
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
return new OperResult("Timeout");
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
return new OperResult(ex);
|
||||
|
||||
Reference in New Issue
Block a user