Files
ThingsGateway/src/Plugin/ThingsGateway.Plugin.Mqtt/MqttCollect/MqttCollect.other.cs
2025-10-18 23:14:55 +08:00

189 lines
7.0 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//------------------------------------------------------------------------------
// 此代码版权声明为全文件覆盖,如有原作者特别声明,会在下方手动补充
// 此代码版权除特别声明外的代码归作者本人Diego所有
// 源代码使用协议遵循本仓库的开源协议及附加协议
// Gitee源代码仓库https://gitee.com/diego2098/ThingsGateway
// Github源代码仓库https://github.com/kimdiego2098/ThingsGateway
// 使用文档https://thingsgateway.cn/
// QQ群605534569
//------------------------------------------------------------------------------
using MQTTnet;
using PooledAwait;
#if NET6_0
using MQTTnet.Client;
#endif
using System.Text;
using ThingsGateway.Foundation;
using ThingsGateway.Gateway.Application.Extensions;
using ThingsGateway.NewLife;
using ThingsGateway.NewLife.Extension;
using ThingsGateway.NewLife.Json.Extension;
namespace ThingsGateway.Plugin.Mqtt;
/// <summary>
/// MqttClient
/// </summary>
public partial class MqttCollect : CollectBase
{
private IMqttClient _mqttClient;
private MqttClientOptions _mqttClientOptions;
private MqttClientSubscribeOptions _mqttSubscribeOptions;
private WaitLock ConnectLock = new(nameof(MqttCollect));
#if !Management
#region mqtt方法
private Task MqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs args)
{
#if NET8_0_OR_GREATER
var payload = args.ApplicationMessage.Payload;
var payloadCount = payload.Length;
#else
var payload = args.ApplicationMessage.PayloadSegment;
var payloadCount = payload.Count;
#endif
try
{
var tuples = TopicItemDict.FirstOrDefault(t => (MqttTopicFilterComparer.Compare(args.ApplicationMessage.Topic, t.Key) == MqttTopicFilterCompareResult.IsMatch)).Value;
if (tuples != null)
{
var payLoad = Encoding.UTF8.GetString(payload);
if (_driverPropertys.DetailLog)
{
if (LogMessage?.LogLevel <= TouchSocket.Core.LogLevel.Trace)
LogMessage?.LogTrace($"Topic{args.ApplicationMessage.Topic}{Environment.NewLine}PayLoad{payLoad}");
}
else
{
LogMessage?.LogTrace($"Topic{args.ApplicationMessage.Topic}");
}
Newtonsoft.Json.Linq.JToken json = Newtonsoft.Json.Linq.JToken.Parse(payLoad);
DateTime dateTime = DateTime.Now;
foreach (var item in tuples)
{
try
{
if (item.Item2.GetExpressionsResult(json).ToBoolean(true))
{
var jtoken = json.SelectToken(item.Item1);
object value;
if (jtoken is Newtonsoft.Json.Linq.JValue jValue)
{
value = jValue.Value;
}
else
{
value = jtoken;
}
item.Item3.SetValue(value, dateTime);
}
}
catch (Exception ex)
{
LogMessage?.LogTrace($"parse error: topic {Environment.NewLine}{args.ApplicationMessage.Topic} {Environment.NewLine} json {Environment.NewLine}{json} {Environment.NewLine} select: {item.Item1} {Environment.NewLine} {ex}");
}
}
}
}
catch (Exception ex)
{
LogMessage?.LogWarning($"parse error: topic {Environment.NewLine}{args.ApplicationMessage.Topic} {Environment.NewLine} {ex}");
}
return Task.CompletedTask;
}
private async Task MqttClient_ConnectedAsync(MqttClientConnectedEventArgs args)
{
//连接成功后订阅相关主题
if (_mqttSubscribeOptions != null)
{
var subResult = await _mqttClient.SubscribeAsync(_mqttSubscribeOptions).ConfigureAwait(false);
if (subResult.Items.Any(a => a.ResultCode > (MqttClientSubscribeResultCode)10))
{
LogMessage?.LogWarning($"Subscribe fail {subResult.Items
.Where(a => a.ResultCode > (MqttClientSubscribeResultCode)10)
.Select(a =>
new
{
Topic = a.TopicFilter.Topic,
ResultCode = a.ResultCode.ToString()
}
)
.ToSystemTextJsonString()}");
}
}
}
private ValueTask<OperResult> TryMqttClientAsync(CancellationToken cancellationToken)
{
if (DisposedValue || _mqttClient == null) return TouchSocket.Core.EasyValueTask.FromResult(new OperResult("MqttClient is disposed"));
if (_mqttClient?.IsConnected == true)
return TouchSocket.Core.EasyValueTask.FromResult(OperResult.Success);
return Client(this, cancellationToken);
static async PooledValueTask<OperResult> Client(MqttCollect @this, CancellationToken cancellationToken)
{
if (@this._mqttClient?.IsConnected == true)
return OperResult.Success;
try
{
await @this.ConnectLock.WaitAsync(cancellationToken).ConfigureAwait(false);
await Task.Delay(100, cancellationToken).ConfigureAwait(false);
if (@this._mqttClient?.IsConnected == true)
return OperResult.Success;
using var timeoutToken = new CancellationTokenSource(TimeSpan.FromMilliseconds(@this._driverPropertys.ConnectTimeout));
using CancellationTokenSource stoppingToken = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutToken.Token);
if (@this._mqttClient?.IsConnected == true)
return OperResult.Success;
if (@this._mqttClient == null)
{
return new OperResult("mqttClient is null");
}
var result = await @this._mqttClient.ConnectAsync(@this._mqttClientOptions, stoppingToken.Token).ConfigureAwait(false);
if (@this._mqttClient.IsConnected)
{
return OperResult.Success;
}
else
{
if (timeoutToken.IsCancellationRequested)
return new OperResult($"Connect timeout");
else
return new OperResult($"Connect fail {result.ReasonString}");
}
}
catch (Exception ex)
{
return new OperResult(ex);
}
finally
{
@this.ConnectLock.Release();
}
}
}
#endregion mqtt方法
#endif
}