Files
KinginfoGateway/framework/Plugin/ThingsGateway.Plugin.Kafka/KafkaProducer.cs
2023-09-30 23:05:53 +08:00

413 lines
15 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#region copyright
//------------------------------------------------------------------------------
// 此代码版权声明为全文件覆盖,如有原作者特别声明,会在下方手动补充
// 此代码版权除特别声明外的代码归作者本人Diego所有
// 源代码使用协议遵循本仓库的开源协议及附加协议
// Gitee源代码仓库https://gitee.com/diego2098/ThingsGateway
// Github源代码仓库https://github.com/kimdiego2098/ThingsGateway
// 使用文档https://diego2098.gitee.io/thingsgateway-docs/
// QQ群605534569
//------------------------------------------------------------------------------
#endregion
using Confluent.Kafka;
using Mapster;
namespace ThingsGateway.Plugin.Kafka;
using Furion;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using ThingsGateway.Foundation.Extension.ConcurrentQueue;
using ThingsGateway.Foundation.Extension.Generic;
using ThingsGateway.Foundation.Extension.String;
/// <summary>
/// Kafka消息生产
/// </summary>
public class KafkaProducer : UpLoadBase
{
private readonly ConcurrentQueue<DeviceData> _collectDeviceRunTimes = new();
private readonly ConcurrentQueue<VariableData> _collectVariableRunTimes = new();
private readonly KafkaProducerProperty driverPropertys = new();
private readonly KafkaProducerVariableProperty variablePropertys = new();
private GlobalDeviceData _globalDeviceData;
private List<DeviceVariableRunTime> _uploadVariables = new();
private bool isSuccess = true;
private IProducer<Null, string> producer;
private ProducerBuilder<Null, string> producerBuilder;
private ProducerConfig producerconfig;
/// <inheritdoc/>
public override Type DriverDebugUIType => null;
/// <inheritdoc/>
public override DriverPropertyBase DriverPropertys => driverPropertys;
private TimerTick exVariableTimerTick;
private TimerTick exDeviceTimerTick;
/// <inheritdoc/>
public override List<DeviceVariableRunTime> UploadVariables => _uploadVariables;
/// <inheritdoc/>
public override VariablePropertyBase VariablePropertys => variablePropertys;
/// <inheritdoc/>
public override Task AfterStopAsync()
{
return Task.CompletedTask;
}
/// <inheritdoc/>
public override Task BeforStartAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
/// <inheritdoc/>
public override async Task ExecuteAsync(CancellationToken cancellationToken)
{
try
{
if (!driverPropertys.IsInterval)
{
////变化推送
var varList = _collectVariableRunTimes.ToListWithDequeue();
if (varList?.Count != 0)
{
//分解List避免超出mqtt字节大小限制
var varData = varList.ChunkTrivialBetter(driverPropertys.SplitSize);
foreach (var item in varData)
{
try
{
if (!cancellationToken.IsCancellationRequested)
{
await KafKaUp(driverPropertys.VariableTopic, item.ToJsonString(), cancellationToken);
}
else
{
break;
}
}
catch (Exception ex)
{
LogMessage.LogWarning(ex, ToString());
}
}
if (isSuccess)
producer.Flush(cancellationToken);
}
}
else
{
if (exVariableTimerTick.IsTickHappen())
{
try
{
var varList = _uploadVariables.Adapt<List<VariableData>>();
if (varList?.Count != 0)
{
//分解List避免超出mqtt字节大小限制
var varData = varList.ChunkTrivialBetter(driverPropertys.SplitSize);
foreach (var item in varData)
{
try
{
if (!cancellationToken.IsCancellationRequested)
{
await KafKaUp(driverPropertys.VariableTopic, item.ToJsonString(), cancellationToken);
}
else
{
break;
}
}
catch (Exception ex)
{
LogMessage.LogWarning(ex, ToString());
}
}
if (isSuccess)
producer.Flush(cancellationToken);
}
}
catch (Exception ex)
{
LogMessage?.LogWarning(ex, ToString());
}
}
}
}
catch (Exception ex)
{
LogMessage?.LogWarning(ex, ToString());
}
try
{
if (!driverPropertys.IsInterval)
{
////变化推送
var devList = _collectDeviceRunTimes.ToListWithDequeue();
if (devList?.Count != 0)
{
//分解List避免超出mqtt字节大小限制
var devData = devList.ChunkTrivialBetter(driverPropertys.SplitSize);
foreach (var item in devData)
{
try
{
if (!cancellationToken.IsCancellationRequested)
{
await KafKaUp(driverPropertys.DeviceTopic, item.ToJsonString(), cancellationToken);
}
else
{
break;
}
}
catch (Exception ex)
{
LogMessage?.LogWarning(ex, ToString());
}
}
if (isSuccess)
producer.Flush(cancellationToken);
}
}
else
{
if (exDeviceTimerTick.IsTickHappen())
{
var devList = _collectDevice.Adapt<List<DeviceData>>();
if (devList?.Count != 0)
{
//分解List避免超出mqtt字节大小限制
var devData = devList.ChunkTrivialBetter(driverPropertys.SplitSize);
foreach (var item in devData)
{
try
{
if (!cancellationToken.IsCancellationRequested)
{
await KafKaUp(driverPropertys.DeviceTopic, item.ToJsonString(), cancellationToken);
}
else
{
break;
}
}
catch (Exception ex)
{
LogMessage?.LogWarning(ex, ToString());
}
}
if (isSuccess)
producer.Flush(cancellationToken);
}
}
}
}
catch (Exception ex)
{
LogMessage?.LogWarning(ex, ToString());
}
if (driverPropertys.CycleInterval > UploadDeviceThread.CycleInterval + 50)
{
try
{
await Task.Delay(driverPropertys.CycleInterval - UploadDeviceThread.CycleInterval, cancellationToken);
}
catch
{
}
}
else
{
}
}
/// <summary>
/// <inheritdoc/>
/// </summary>
/// <returns></returns>
public override bool IsConnected() => isSuccess;
/// <inheritdoc/>
protected override void Dispose(bool disposing)
{
_collectDevice.Where(a => _uploadVariables.Select(b => b.DeviceId).Contains(a.Id)).ForEach(a =>
{
a.DeviceStatusChange -= DeviceStatusChange;
});
_uploadVariables?.ForEach(a =>
{
a.VariableValueChange -= VariableValueChange;
});
producer?.Dispose();
_uploadVariables = null;
_collectDeviceRunTimes.Clear();
_collectVariableRunTimes.Clear();
base.Dispose(disposing);
}
private List<CollectDeviceRunTime> _collectDevice;
/// <summary>
/// 初始化
/// </summary>
/// <param name="device"></param>
protected override void Init(UploadDeviceRunTime device)
{
#region Kafka
//1、生产者配置
producerconfig = new ProducerConfig
{
BootstrapServers = driverPropertys.BootStrapServers,
ClientId = driverPropertys.ClientId,
};
//2、创建生产者
producerBuilder = new ProducerBuilder<Null, string>(producerconfig);
//3、错误日志监视
producerBuilder.SetErrorHandler((p, msg) =>
{
isSuccess = false;
LogMessage?.LogWarning(msg.Reason);
});
//kafka
try
{
producer = producerBuilder.Build();
}
catch (DllNotFoundException)
{
if (!Library.IsLoaded)
{
string fileEx = ".dll";
string osStr = "win-";
if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
{
osStr = "win-";
fileEx = ".dll";
}
else if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux))
{
osStr = "linux-";
fileEx = ".so";
}
else
{
osStr = "osx-";
fileEx = ".dylib";
}
osStr += RuntimeInformation.ProcessArchitecture.ToString().ToLower();
var pathToLibrd = System.IO.Path.Combine(AppContext.BaseDirectory, "Plugins", "ThingsGateway.Plugin.Kafka", "runtimes", osStr, "native", $"librdkafka{fileEx}");
Library.Load(pathToLibrd);
}
producer = producerBuilder.Build();
}
#endregion
_globalDeviceData = App.GetService<GlobalDeviceData>();
var tags = _globalDeviceData.AllVariables.Where(a => a.VariablePropertys.ContainsKey(device.Id))
.Where(b => GetPropertyValue(b, nameof(variablePropertys.Enable)).GetBoolValue()).ToList();
_uploadVariables = tags;
_collectDevice = _globalDeviceData.CollectDevices.Where(a => _uploadVariables.Select(b => b.DeviceId).Contains(a.Id)).ToList();
_collectDevice.ForEach(a =>
{
a.DeviceStatusChange += DeviceStatusChange;
});
_uploadVariables.ForEach(a =>
{
a.VariableValueChange += VariableValueChange;
});
if (driverPropertys.UploadInterval <= 1000) driverPropertys.UploadInterval = 1000;
exVariableTimerTick = new(driverPropertys.UploadInterval);
exDeviceTimerTick = new(driverPropertys.UploadInterval);
}
private void DeviceStatusChange(CollectDeviceRunTime collectDeviceRunTime)
{
if (driverPropertys?.IsInterval != true)
_collectDeviceRunTimes.Enqueue(collectDeviceRunTime.Adapt<DeviceData>());
}
private async Task KafKaUp(string topic, string payLoad, CancellationToken cancellationToken)
{
try
{
using CancellationTokenSource cancellationTokenSource = new();
using CancellationTokenSource stoppingToken = CancellationTokenSource.CreateLinkedTokenSource(cancellationTokenSource.Token, cancellationToken);
Task<DeliveryResult<Null, string>> resultTask = producer.ProduceAsync(topic, new Message<Null, string> { Value = payLoad }, stoppingToken.Token);
var timeOutResult = await Task.WhenAny(resultTask, Task.Delay(driverPropertys.TimeOut, stoppingToken.Token));
if (timeOutResult == resultTask)
{
var result = (timeOutResult as Task<DeliveryResult<Null, string>>).Result;
if (result.Status != PersistenceStatus.Persisted)
{
isSuccess = false;
await CacheDb.AddCacheData(topic, payLoad, driverPropertys.CacheMaxCount);
}
else
{
isSuccess = true;
//连接成功时补发缓存数据
var cacheData = await CacheDb.GetCacheData();
foreach (var item in cacheData)
{
var cacheResult = await producer.ProduceAsync(item.Topic, new Message<Null, string> { Value = item.CacheStr }, stoppingToken.Token);
if (cacheResult.Status == PersistenceStatus.Persisted)
{
LogMessage.Trace($"{FoundationConst.LogMessageHeader}主题:{item.Topic}{Environment.NewLine}负载:{item.CacheStr}");
await CacheDb.DeleteCacheData(item.Id);
}
}
LogMessage.Trace($"{FoundationConst.LogMessageHeader}主题:{topic}{Environment.NewLine}负载:{payLoad}");
}
}
else
{
isSuccess = false;
stoppingToken.Cancel();
}
}
catch (Exception ex)
{
LogMessage?.LogWarning(ex, ToString());
await CacheDb.AddCacheData(topic, payLoad, driverPropertys.CacheMaxCount);
}
}
private void VariableValueChange(DeviceVariableRunTime collectVariableRunTime)
{
if (driverPropertys?.IsInterval != true)
_collectVariableRunTimes.Enqueue(collectVariableRunTime.Adapt<VariableData>());
}
}