添加mqttClient离线缓存

This commit is contained in:
2248356998 qq.com
2023-06-11 17:45:46 +08:00
parent 562093c468
commit ad2c9f585a
2 changed files with 142 additions and 24 deletions

View File

@@ -20,6 +20,8 @@ using MQTTnet.Client;
using NewLife.Serialization;
using SqlSugar;
using System.Collections.Concurrent;
using System.Text;
@@ -97,12 +99,8 @@ public class MqttClient : UpLoadBase
{
if (!cancellationToken.IsCancellationRequested)
{
var variableMessage = new MqttApplicationMessageBuilder()
.WithTopic($"{driverPropertys.VariableTopic}")
.WithPayload(item.GetSciptListValue(driverPropertys.BigTextScriptVariableModel)).Build();
var isConnect = await TryMqttClientAsync(cancellationToken);
if (isConnect.IsSuccess)
await _mqttClient.PublishAsync(variableMessage, cancellationToken);
await MqttUp($"{driverPropertys.VariableTopic}", item.GetSciptListValue(driverPropertys.BigTextScriptVariableModel), cancellationToken);
}
else
{
@@ -135,12 +133,9 @@ public class MqttClient : UpLoadBase
{
if (!cancellationToken.IsCancellationRequested)
{
var variableMessage = new MqttApplicationMessageBuilder()
.WithTopic($"{driverPropertys.VariableTopic}")
.WithPayload(item.GetSciptListValue(driverPropertys.BigTextScriptVariableModel)).Build();
var isConnect = await TryMqttClientAsync(cancellationToken);
if (isConnect.IsSuccess)
await _mqttClient.PublishAsync(variableMessage, cancellationToken);
await MqttUp($"{driverPropertys.VariableTopic}", item.GetSciptListValue(driverPropertys.BigTextScriptVariableModel), cancellationToken);
}
else
{
@@ -186,12 +181,7 @@ public class MqttClient : UpLoadBase
{
if (!cancellationToken.IsCancellationRequested)
{
var variableMessage = new MqttApplicationMessageBuilder()
.WithTopic($"{driverPropertys.DeviceTopic}")
.WithPayload(item.GetSciptListValue(driverPropertys.BigTextScriptDeviceModel)).Build();
var isConnect = await TryMqttClientAsync(cancellationToken);
if (isConnect.IsSuccess)
await _mqttClient.PublishAsync(variableMessage);
await MqttUp($"{driverPropertys.DeviceTopic}", item.GetSciptListValue(driverPropertys.BigTextScriptDeviceModel), cancellationToken);
}
else
{
@@ -223,12 +213,7 @@ public class MqttClient : UpLoadBase
{
if (!cancellationToken.IsCancellationRequested)
{
var variableMessage = new MqttApplicationMessageBuilder()
.WithTopic($"{driverPropertys.DeviceTopic}")
.WithPayload(item.GetSciptListValue(driverPropertys.BigTextScriptDeviceModel)).Build();
var isConnect = await TryMqttClientAsync(cancellationToken);
if (isConnect.IsSuccess)
await _mqttClient.PublishAsync(variableMessage);
await MqttUp($"{driverPropertys.DeviceTopic}", item.GetSciptListValue(driverPropertys.BigTextScriptDeviceModel), cancellationToken);
}
else
{
@@ -270,6 +255,49 @@ public class MqttClient : UpLoadBase
}
/// <summary>
/// 上传mqtt内容并进行离线缓存
/// </summary>
/// <param name="topic"></param>
/// <param name="payLoad"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
private async Task MqttUp(string topic, string payLoad, CancellationToken cancellationToken)
{
var variableMessage = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(payLoad).Build();
var isConnect = await TryMqttClientAsync(cancellationToken);
if (isConnect.IsSuccess)
{
//连接成功时补发缓存数据
var cacheData = await GetCacheData();
foreach (var item in cacheData)
{
var cacheMessage = new MqttApplicationMessageBuilder()
.WithTopic(item.Topic)
.WithPayload(item.CacheStr).Build();
var cacheResult = await _mqttClient.PublishAsync(cacheMessage);
if (cacheResult.IsSuccess)
{
await DeleteCacheData(item.Id);
}
}
var result = await _mqttClient.PublishAsync(variableMessage);
if (!result.IsSuccess)
{
await AddCacheData(topic, payLoad);
}
}
else
{
await AddCacheData(topic, payLoad);
}
}
public override OperResult IsConnected()
{
if (_mqttClient?.IsConnected == true)

View File

@@ -12,7 +12,10 @@
using Microsoft.Extensions.Logging;
using System.ComponentModel.DataAnnotations;
using System.IO;
using System.Linq;
using System.Reflection;
using System.Threading;
using TouchSocket.Core;
@@ -95,6 +98,9 @@ public abstract class UpLoadBase : DriverBase
_logger = logger;
IsLogOut = device.IsLogOut;
CurDevice = device;
Directory.CreateDirectory("Cache");
GetCacheDb().DbMaintenance.CreateDatabase();//创建数据库
GetCacheDb().CodeFirst.InitTables(typeof(CacheTable));
Init(device);
}
@@ -122,4 +128,88 @@ public abstract class UpLoadBase : DriverBase
}
return null;
}
/// <summary>
/// 获取数据库链接
/// </summary>
/// <returns></returns>
public SqlSugarClient GetCacheDb()
{
var configureExternalServices = new ConfigureExternalServices
{
EntityService = (type, column) => // 修改列可空-1、带?问号 2、String类型若没有Required
{
if ((type.PropertyType.IsGenericType && type.PropertyType.GetGenericTypeDefinition() == typeof(Nullable<>))
|| (type.PropertyType == typeof(string) && type.GetCustomAttribute<RequiredAttribute>() == null))
column.IsNullable = true;
},
};
var sqlSugarClient = new SqlSugarClient(new ConnectionConfig()
{
ConnectionString = $"Data Source=Cache/{CurDevice.Id}.db;",//连接字符串
DbType = DbType.Sqlite,//数据库类型
IsAutoCloseConnection = true, //不设成true要手动close
ConfigureExternalServices = configureExternalServices,
}
);
return sqlSugarClient;
}
/// <summary>
/// 获取缓存表前十条
/// </summary>
/// <returns></returns>
public async Task<List<CacheTable>> GetCacheData()
{
var db = GetCacheDb();
var data = await db.Queryable<CacheTable>().Take(10).ToListAsync();
return data;
}
/// <summary>
/// 增加离线缓存限制表最大默认2000行
/// </summary>
/// <returns></returns>
public async Task<bool> AddCacheData(string topic, string data, int max = 2000)
{
var db = GetCacheDb();
var count = await db.Queryable<CacheTable>().CountAsync();
if (count > max)
{
var data1 = await db.Queryable<CacheTable>().OrderBy(a => a.Id).Take(count - max).ToListAsync();
await db.Deleteable(data1).ExecuteCommandAsync();
}
var result = await db.Insertable(new CacheTable() { Id = YitIdHelper.NextId(), Topic = topic, CacheStr = data }).ExecuteCommandAsync();
return result > 0;
}
/// <summary>
/// 清除离线缓存
/// </summary>
/// <returns></returns>
public async Task<bool> DeleteCacheData(params long[] data)
{
var db = GetCacheDb();
var result = await db.Deleteable<CacheTable>().In(data).ExecuteCommandAsync();
return result > 0;
}
}
/// <summary>
/// 缓存表
/// </summary>
public class CacheTable
{
/// <summary>
/// Id
/// </summary>
[SugarColumn(IsPrimaryKey = true)]
public long Id { get; set; }
/// <summary>
/// Topic
/// </summary>
public string Topic { get; set; }
/// <summary>
/// 缓存值
/// </summary>
[SugarColumn(ColumnDataType = StaticConfig.CodeFirst_BigString)]
public string CacheStr { get; set; }
}