feat:mqttBroker part

This commit is contained in:
Kimdiego2098
2023-09-02 13:55:21 +08:00
parent 5299c5c4be
commit 579b1a59f9
15 changed files with 254 additions and 251 deletions

View File

@@ -9,10 +9,10 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="Furion.Extras.Authentication.JwtBearer" Version="4.8.8.41" />
<PackageReference Include="Furion.Extras.ObjectMapper.Mapster" Version="4.8.8.41" />
<PackageReference Include="Furion.Pure" Version="4.8.8.41" />
<PackageReference Include="SqlSugarCore" Version="5.1.4.102" />
<PackageReference Include="Furion.Extras.Authentication.JwtBearer" Version="4.8.8.42" />
<PackageReference Include="Furion.Extras.ObjectMapper.Mapster" Version="4.8.8.42" />
<PackageReference Include="Furion.Pure" Version="4.8.8.42" />
<PackageReference Include="SqlSugarCore" Version="5.1.4.104" />
<PackageReference Include="UAParser" Version="3.1.47" />
<PackageReference Include="Yitter.IdGenerator" Version="1.0.14" />
<PackageReference Include="MiniExcel" Version="1.31.2" />

View File

@@ -0,0 +1,39 @@
#region copyright
//------------------------------------------------------------------------------
// 此代码版权声明为全文件覆盖,如有原作者特别声明,会在下方手动补充
// 此代码版权除特别声明外的代码归作者本人Diego所有
// 源代码使用协议遵循本仓库的开源协议及附加协议
// Gitee源代码仓库https://gitee.com/diego2098/ThingsGateway
// Github源代码仓库https://github.com/kimdiego2098/ThingsGateway
// 使用文档https://diego2098.gitee.io/thingsgateway-docs/
// QQ群605534569
//------------------------------------------------------------------------------
#endregion
namespace ThingsGateway.Application;
/// <summary>
/// MqttBrokerConfig
/// </summary>
public class MqttBrokerConfig
{
/// <summary>
/// 是否启用管理网关
/// </summary>
public bool IsMqttBroker { get; set; }
/// <summary>
/// MqttBrokerIP
/// </summary>
public string MqttBrokerIP { get; set; }
/// <summary>
/// MqttBrokerPort
/// </summary>
public int MqttBrokerPort { get; set; }
}

View File

@@ -0,0 +1,100 @@
#region copyright
//------------------------------------------------------------------------------
// 此代码版权声明为全文件覆盖,如有原作者特别声明,会在下方手动补充
// 此代码版权除特别声明外的代码归作者本人Diego所有
// 源代码使用协议遵循本仓库的开源协议及附加协议
// Gitee源代码仓库https://gitee.com/diego2098/ThingsGateway
// Github源代码仓库https://github.com/kimdiego2098/ThingsGateway
// 使用文档https://diego2098.gitee.io/thingsgateway-docs/
// QQ群605534569
//------------------------------------------------------------------------------
#endregion
using Furion;
using Furion.Logging.Extensions;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using MQTTnet;
using System.Net;
using ThingsGateway.Foundation;
namespace ThingsGateway.Application;
/// <summary>
/// MqttBroker
/// </summary>
public class MqttBrokerWorker : BackgroundService
{
private readonly ILogger<MqttBrokerWorker> _logger;
/// <inheritdoc/>
public MqttBrokerWorker(ILogger<MqttBrokerWorker> logger, IServiceProvider serviceProvider)
{
_logger = logger;
}
#region worker服务
/// <inheritdoc/>
public override async Task StartAsync(CancellationToken token)
{
await base.StartAsync(token);
}
/// <inheritdoc/>
public override async Task StopAsync(CancellationToken token)
{
await base.StopAsync(token);
}
private MQTTnet.Server.MqttServer _mqttServer;
/// <inheritdoc/>
protected void Init()
{
var mqttBrokerConfig = App.GetConfig<MqttBrokerConfig>("MqttBrokerConfig");
var mqttFactory = new MqttFactory(new MqttNetLogger(_logger));
var mqttServerOptions = mqttFactory.CreateServerOptionsBuilder()
.WithDefaultEndpointBoundIPAddress(string.IsNullOrEmpty(mqttBrokerConfig.MqttBrokerIP) ? null : IPAddress.Parse(mqttBrokerConfig.MqttBrokerIP))
.WithDefaultEndpointPort(mqttBrokerConfig.MqttBrokerPort)
.WithDefaultEndpoint()
.Build();
_mqttServer = mqttFactory.CreateMqttServer(mqttServerOptions);
}
/// <inheritdoc/>
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
await Task.Delay(300000, stoppingToken);
}
catch (TaskCanceledException)
{
}
catch (ObjectDisposedException)
{
}
catch (Exception ex)
{
_logger.LogError(ex, ToString());
}
}
}
#endregion
}

View File

@@ -0,0 +1,50 @@
#region copyright
//------------------------------------------------------------------------------
// 此代码版权声明为全文件覆盖,如有原作者特别声明,会在下方手动补充
// 此代码版权除特别声明外的代码归作者本人Diego所有
// 源代码使用协议遵循本仓库的开源协议及附加协议
// Gitee源代码仓库https://gitee.com/diego2098/ThingsGateway
// Github源代码仓库https://github.com/kimdiego2098/ThingsGateway
// 使用文档https://diego2098.gitee.io/thingsgateway-docs/
// QQ群605534569
//------------------------------------------------------------------------------
#endregion
using Microsoft.Extensions.Logging;
using MQTTnet.Diagnostics;
namespace ThingsGateway.Application;
internal class MqttNetLogger : IMqttNetLogger
{
readonly ILogger LogMessage;
public MqttNetLogger(ILogger logger)
{
LogMessage = logger;
}
public bool IsEnabled => true;
public void Publish(MqttNetLogLevel logLevel, string source, string message, object[] parameters, Exception exception)
{
switch (logLevel)
{
case MqttNetLogLevel.Verbose:
LogMessage?.Log(LogLevel.Trace, source, message != null ? (parameters != null ? message != null ? (parameters != null ? string.Format(message, parameters) : message) : string.Empty : message) : string.Empty, exception);
break;
case MqttNetLogLevel.Info:
LogMessage?.Log(LogLevel.Information, source, message != null ? (parameters != null ? string.Format(message, parameters) : message) : string.Empty, exception);
break;
case MqttNetLogLevel.Warning:
LogMessage?.Log(LogLevel.Warning, source, message != null ? (parameters != null ? string.Format(message, parameters) : message) : string.Empty, exception);
break;
case MqttNetLogLevel.Error:
LogMessage?.Log(LogLevel.Warning, source, message != null ? (parameters != null ? string.Format(message, parameters) : message) : string.Empty, exception);
break;
}
}
}

View File

@@ -38,6 +38,7 @@
<PackageReference Include="CS-Script" Version="4.8.1" />
<!--CS-Script与Furion冲突直接安装覆盖版本-->
<PackageReference Include="Microsoft.CodeAnalysis.CSharp.Scripting" Version="4.7.0" />
<PackageReference Include="MQTTnet" Version="4.2.1.781" />
</ItemGroup>
<ItemGroup>

View File

@@ -1837,6 +1837,46 @@
<member name="P:ThingsGateway.Application.VariableData.DataTypeEnum">
<inheritdoc cref="P:ThingsGateway.Application.MemoryVariable.DataTypeEnum"/>
</member>
<member name="T:ThingsGateway.Application.MqttBrokerConfig">
<summary>
MqttBrokerConfig
</summary>
</member>
<member name="P:ThingsGateway.Application.MqttBrokerConfig.IsMqttBroker">
<summary>
是否启用管理网关
</summary>
</member>
<member name="P:ThingsGateway.Application.MqttBrokerConfig.MqttBrokerIP">
<summary>
MqttBrokerIP
</summary>
</member>
<member name="P:ThingsGateway.Application.MqttBrokerConfig.MqttBrokerPort">
<summary>
MqttBrokerPort
</summary>
</member>
<member name="T:ThingsGateway.Application.MqttBrokerWorker">
<summary>
MqttBroker
</summary>
</member>
<member name="M:ThingsGateway.Application.MqttBrokerWorker.#ctor(Microsoft.Extensions.Logging.ILogger{ThingsGateway.Application.MqttBrokerWorker},System.IServiceProvider)">
<inheritdoc/>
</member>
<member name="M:ThingsGateway.Application.MqttBrokerWorker.StartAsync(System.Threading.CancellationToken)">
<inheritdoc/>
</member>
<member name="M:ThingsGateway.Application.MqttBrokerWorker.StopAsync(System.Threading.CancellationToken)">
<inheritdoc/>
</member>
<member name="M:ThingsGateway.Application.MqttBrokerWorker.Init">
<inheritdoc/>
</member>
<member name="M:ThingsGateway.Application.MqttBrokerWorker.ExecuteAsync(System.Threading.CancellationToken)">
<inheritdoc/>
</member>
<member name="T:ThingsGateway.Application.CollectBase">
<summary>
<para></para>

View File

@@ -1153,7 +1153,7 @@ public class TcpClientBaseEx : BaseSocket, ITcpClient
var delaySenderOption = this.Config.GetValue(TouchSocketConfigExtension.DelaySenderProperty);
if (delaySenderOption != null)
{
this.m_delaySender = new DelaySender(socket, delaySenderOption, this.OnDelaySenderError);
this.m_delaySender = new DelaySender(delaySenderOption, this.MainSocket.AbsoluteSend);
}
}

View File

@@ -1,201 +0,0 @@
#region copyright
//------------------------------------------------------------------------------
// 此代码版权声明为全文件覆盖,如有原作者特别声明,会在下方手动补充
// 此代码版权除特别声明外的代码归作者本人Diego所有
// 源代码使用协议遵循本仓库的开源协议及附加协议
// Gitee源代码仓库https://gitee.com/diego2098/ThingsGateway
// Github源代码仓库https://github.com/kimdiego2098/ThingsGateway
// 使用文档https://diego2098.gitee.io/thingsgateway-docs/
// QQ群605534569
//------------------------------------------------------------------------------
#endregion
//------------------------------------------------------------------------------
// 此代码版权除特别声明或在XREF结尾的命名空间的代码归作者本人若汝棋茗所有
// 源代码使用协议遵循本仓库的开源协议及附加协议若本仓库没有设置则按MIT开源协议授权
// CSDN博客https://blog.csdn.net/qq_40374647
// 哔哩哔哩视频https://space.bilibili.com/94253567
// Gitee源代码仓库https://gitee.com/RRQM_Home
// Github源代码仓库https://github.com/RRQM
// API首页http://rrqm_home.gitee.io/touchsocket/
// 交流QQ群234762506
// 感谢您的下载和使用
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
using System.IO.Ports;
namespace ThingsGateway.Foundation.Serial;
/// <summary>
/// 延迟发送器
/// </summary>
public sealed class SerialDelaySender : DisposableObject
{
private readonly ReaderWriterLockSlim m_lockSlim;
private readonly Action<Exception> m_onError;
private readonly IntelligentDataQueue<QueueDataBytes> m_queueDatas;
private readonly SerialPort m_serial;
private volatile bool m_sending;
/// <summary>
/// 延迟发送器
/// </summary>
/// <param name="serialPort"></param>
/// <param name="onError"></param>
/// <param name="delaySenderOption"></param>
public SerialDelaySender(SerialPort serialPort, DelaySenderOption delaySenderOption, Action<Exception> onError)
{
this.DelayLength = delaySenderOption.DelayLength;
this.m_serial = serialPort;
this.m_onError = onError;
this.m_queueDatas = new IntelligentDataQueue<QueueDataBytes>(delaySenderOption.QueueLength);
this.m_lockSlim = new ReaderWriterLockSlim();
}
/// <summary>
/// 延迟包最大尺寸。
/// </summary>
public int DelayLength { get; private set; }
/// <summary>
/// 是否处于发送状态
/// </summary>
public bool Sending
{
get
{
using (new ReadLock(this.m_lockSlim))
{
return this.m_sending;
}
}
private set
{
using (new WriteLock(this.m_lockSlim))
{
this.m_sending = value;
}
}
}
/// <summary>
/// 发送
/// </summary>
public void Send(QueueDataBytes dataBytes)
{
this.m_queueDatas.Enqueue(dataBytes);
if (this.SwitchToRun())
{
Task.Factory.StartNew(this.BeginSend);
}
}
/// <summary>
/// 释放
/// </summary>
/// <param name="disposing"></param>
protected override void Dispose(bool disposing)
{
this.m_queueDatas.Clear();
base.Dispose(disposing);
}
private void BeginSend()
{
try
{
var buffer = BytePool.Default.Rent(this.DelayLength);
while (!this.DisposedValue)
{
try
{
if (this.TryGet(buffer, out var asyncByte))
{
this.m_serial.AbsoluteSend(asyncByte.Buffer, asyncByte.Offset, asyncByte.Length);
}
else
{
break;
}
}
catch (Exception ex)
{
this.m_onError?.Invoke(ex);
break;
}
}
BytePool.Default.Return(buffer);
this.Sending = false;
}
catch
{
}
}
private bool SwitchToRun()
{
using (new WriteLock(this.m_lockSlim))
{
if (this.m_sending)
{
return false;
}
else
{
this.m_sending = true;
return true;
}
}
}
private bool TryGet(byte[] buffer, out QueueDataBytes asyncByteDe)
{
var len = 0;
var surLen = buffer.Length;
while (true)
{
if (this.m_queueDatas.TryPeek(out var asyncB))
{
if (surLen > asyncB.Length)
{
if (this.m_queueDatas.TryDequeue(out var asyncByte))
{
Array.Copy(asyncByte.Buffer, asyncByte.Offset, buffer, len, asyncByte.Length);
len += asyncByte.Length;
surLen -= asyncByte.Length;
}
}
else if (asyncB.Length > buffer.Length)
{
if (len > 0)
{
break;
}
else
{
asyncByteDe = asyncB;
return true;
}
}
else
{
break;
}
}
else
{
if (len > 0)
{
break;
}
else
{
asyncByteDe = default;
return false;
}
}
}
asyncByteDe = new QueueDataBytes(buffer, 0, len);
return true;
}
}

View File

@@ -75,7 +75,7 @@ public class SerialSessionBase : BaseSerial, ISerialSession
#region
private SerialDelaySender m_delaySender;
private DelaySender m_delaySender;
private long m_bufferRate = 1;
private volatile bool m_online;
ValueCounter m_receiveCounter;
@@ -713,7 +713,7 @@ public class SerialSessionBase : BaseSerial, ISerialSession
var delaySenderOption = this.Config.GetValue(TouchSocketConfigExtension.DelaySenderProperty);
if (delaySenderOption != null)
{
this.m_delaySender = new SerialDelaySender(MainSerialPort, delaySenderOption, this.OnDelaySenderError);
this.m_delaySender = new DelaySender(delaySenderOption, this.MainSerialPort.AbsoluteSend);
}
}
/// <summary>

View File

@@ -13,7 +13,7 @@
<ItemGroup>
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
<PackageReference Include="TouchSocket" Version="2.0.0-beta.156" />
<PackageReference Include="TouchSocket" Version="2.0.0-beta.163" />
</ItemGroup>
<ItemGroup Condition="'$(TargetFramework)'!='net45'">
<PackageReference Include="System.IO.Ports" Version="7.0.0" />

View File

@@ -1948,40 +1948,6 @@
<member name="P:ThingsGateway.Foundation.Serial.BaseSerial.Logger">
<inheritdoc/>
</member>
<member name="T:ThingsGateway.Foundation.Serial.SerialDelaySender">
<summary>
延迟发送器
</summary>
</member>
<member name="M:ThingsGateway.Foundation.Serial.SerialDelaySender.#ctor(System.IO.Ports.SerialPort,TouchSocket.Sockets.DelaySenderOption,System.Action{System.Exception})">
<summary>
延迟发送器
</summary>
<param name="serialPort"></param>
<param name="onError"></param>
<param name="delaySenderOption"></param>
</member>
<member name="P:ThingsGateway.Foundation.Serial.SerialDelaySender.DelayLength">
<summary>
延迟包最大尺寸。
</summary>
</member>
<member name="P:ThingsGateway.Foundation.Serial.SerialDelaySender.Sending">
<summary>
是否处于发送状态
</summary>
</member>
<member name="M:ThingsGateway.Foundation.Serial.SerialDelaySender.Send(TouchSocket.Core.QueueDataBytes)">
<summary>
发送
</summary>
</member>
<member name="M:ThingsGateway.Foundation.Serial.SerialDelaySender.Dispose(System.Boolean)">
<summary>
释放
</summary>
<param name="disposing"></param>
</member>
<member name="T:ThingsGateway.Foundation.Serial.SerialProperty">
<summary>
串口属性

View File

@@ -67,6 +67,8 @@ public class ModbusServer : UpLoadBase
var list = Values.ToListWithDequeue();
foreach (var item in list)
{
if (token.IsCancellationRequested)
break;
var type = GetPropertyValue(item.Item2, nameof(ModbusServerVariableProperty.ModbusType));
if (Enum.TryParse<DataTypeEnum>(type, out DataTypeEnum result))
{
@@ -110,7 +112,7 @@ public class ModbusServer : UpLoadBase
_plc?.SafeDispose();
_ModbusTags?.Clear();
_ModbusTags = null;
Values.Clear();
Values?.Clear();
Values = null;
base.Dispose(disposing);
}

View File

@@ -47,7 +47,6 @@ public class MqttServer : UpLoadBase
{
private readonly MqttServerProperty driverPropertys = new();
private readonly ConcurrentDictionary<string, string> IdWithName = new();
private readonly MqttClientVariableProperty variablePropertys = new();
private ConcurrentQueue<DeviceData> _collectDeviceRunTimes = new();
private ConcurrentQueue<VariableData> _collectVariableRunTimes = new();
@@ -365,8 +364,7 @@ public class MqttServer : UpLoadBase
arg.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
return;
}
IdWithName.AddOrUpdate(arg.ClientId, (a) => arg.UserName, (a, b) => arg.UserName);
LogMessage?.LogInformation(ToString() + "-" + IdWithName[arg.ClientId] + "-客户端已连接成功");
LogMessage?.LogInformation(ToString() + "-" + arg.ClientId + "-客户端已连接成功");
}
private void VariableValueChange(DeviceVariableRunTime collectVariableRunTime)
{

View File

@@ -11,10 +11,6 @@
</Target>
<ItemGroup>
<PackageReference Include="MQTTnet" Version="4.2.1.781" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\ThingsGateway.Application\ThingsGateway.Application.csproj" >

View File

@@ -8,6 +8,12 @@
</PropertyGroup>
<ItemGroup>
<None Remove="WindowsServiceCreate.bat" />
<None Remove="WindowsServiceDelete.bat" />
</ItemGroup>
<ItemGroup>
@@ -24,6 +30,12 @@
<Content Include="Dockerfile">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</Content>
<Content Include="WindowsServiceCreate.bat">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</Content>
<Content Include="WindowsServiceDelete.bat">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</Content>
</ItemGroup>