Compare commits

...

7 Commits

Author SHA1 Message Date
2248356998 qq.com
5ec1ee7627 fix: 上次更新导致大小端配置映射错误 2025-07-02 21:51:00 +08:00
Diego
79789388fc fix: opcuaserver动态刷新变量节点会导致新建的动态节点无法刷新订阅 2025-07-02 19:11:11 +08:00
Diego
2c4194ee18 refactor: opcua某些不存在的节点ID不再影响整体订阅,只出现日志提示 2025-07-02 15:09:52 +08:00
Diego
1b2be585af fix: 报警分析错误设置循环 2025-07-02 14:26:25 +08:00
Diego
83736647e7 feat: S7PLC增加WString支持 2025-07-02 12:49:55 +08:00
Diego
b06405717d build: 10.9.9
fix: timerx 池 max值取消
feat: mqttrpc支持脚本
2025-07-02 10:03:50 +08:00
2248356998 qq.com
298a1f2ed4 更新docker文件 2025-07-02 07:32:23 +08:00
53 changed files with 889 additions and 396 deletions

View File

@@ -26,8 +26,8 @@ public class ObjectPool<T> : DisposeBase, IPool<T> where T : notnull
/// <summary>繁忙个数</summary>
public Int32 BusyCount => _BusyCount;
/// <summary>最大个数。默认1000表示无上限</summary>
public Int32 Max { get; set; } = 100;
/// <summary>最大个数。默认00表示无上限</summary>
public Int32 Max { get; set; } = 0;
/// <summary>最小个数。默认1</summary>
public Int32 Min { get; set; } = 1;

View File

@@ -4,10 +4,20 @@
/// <summary>表示可以更改其到期时间和时间段的计时器。</summary>
public interface ITimer : IDisposable
{
/// <summary>更改计时器的启动时间和方法调用之间的时间间隔,使用 TimeSpan 值度量时间间隔。</summary>
/// <summary>更改计时器的启动时间和方法调用之间的时间间隔。</summary>
/// <param name="dueTime">一个 TimeSpan表示在调用构造 ITimer 时指定的回调方法之前的延迟时间量。 指定 InfiniteTimeSpan 可防止重新启动计时器。 指定 Zero 可立即重新启动计时器。</param>
/// <param name="period">构造 Timer 时指定的回调方法调用之间的时间间隔。 指定 InfiniteTimeSpan 可以禁用定期终止。</param>
/// <returns></returns>
Boolean Change(TimeSpan dueTime, TimeSpan period);
public Boolean Change(TimeSpan dueTime, TimeSpan period);
}
#endif
#endif
/// <summary>表示可以更改其到期时间和时间段的计时器。</summary>
public interface ITimerx : IDisposable
{
/// <summary>更改计时器的启动时间和方法调用之间的时间间隔。</summary>
/// <param name="dueTime">一个 TimeSpan表示在调用构造 ITimer 时指定的回调方法之前的延迟时间量。 指定 InfiniteTimeSpan 可防止重新启动计时器。 指定 Zero 可立即重新启动计时器。</param>
/// <param name="period">构造 Timer 时指定的回调方法调用之间的时间间隔。 指定 InfiniteTimeSpan 可以禁用定期终止。</param>
/// <returns></returns>
public Boolean Change(int dueTime, int period);
}

View File

@@ -17,7 +17,7 @@ namespace ThingsGateway.NewLife.Threading;
///
/// TimerX必须维持对象否则Scheduler也没有维持对象时大家很容易一起被GC回收。
/// </remarks>
public class TimerX : ITimer, IDisposable
public class TimerX : ITimer, ITimerx, IDisposable
{
#region
/// <summary>编号</summary>
@@ -382,25 +382,32 @@ public class TimerX : ITimer, IDisposable
return period;
}
}
/// <summary>更改计时器的启动时间和方法调用之间的时间间隔,使用 TimeSpan 值度量时间间隔。</summary>
/// <summary>更改计时器的启动时间和方法调用之间的时间间隔。</summary>
/// <param name="dueTime">一个 TimeSpan表示在调用构造 ITimer 时指定的回调方法之前的延迟时间量。 指定 InfiniteTimeSpan 可防止重新启动计时器。 指定 Zero 可立即重新启动计时器。</param>
/// <param name="period">构造 Timer 时指定的回调方法调用之间的时间间隔。 指定 InfiniteTimeSpan 可以禁用定期终止。</param>
/// <returns></returns>
public Boolean Change(TimeSpan dueTime, TimeSpan period)
{
return Change(dueTime.Milliseconds, period.Milliseconds);
}
/// <summary>更改计时器的启动时间和方法调用之间的时间间隔。</summary>
/// <param name="dueTime">一个 TimeSpan表示在调用构造 ITimer 时指定的回调方法之前的延迟时间量。 指定 InfiniteTimeSpan 可防止重新启动计时器。 指定 Zero 可立即重新启动计时器。</param>
/// <param name="period">构造 Timer 时指定的回调方法调用之间的时间间隔。 指定 InfiniteTimeSpan 可以禁用定期终止。</param>
/// <returns></returns>
public Boolean Change(int dueTime, int period)
{
if (Absolutely) return false;
if (Crons?.Length > 0) return false;
if (period.TotalMilliseconds <= 0)
if (period <= 0)
{
Dispose();
return true;
}
Period = (Int32)period.TotalMilliseconds;
Period = period;
if (dueTime.TotalMilliseconds >= 0) SetNext((Int32)dueTime.TotalMilliseconds);
if (dueTime >= 0) SetNext(dueTime);
return true;
}

View File

@@ -1,8 +1,8 @@
<Project>
<PropertyGroup>
<PluginVersion>10.9.8</PluginVersion>
<ProPluginVersion>10.9.8</ProPluginVersion>
<PluginVersion>10.9.14</PluginVersion>
<ProPluginVersion>10.9.14</ProPluginVersion>
<AuthenticationVersion>2.9.5</AuthenticationVersion>
<SourceGeneratorVersion>10.9.5</SourceGeneratorVersion>
<NET8Version>8.0.17</NET8Version>

View File

@@ -131,7 +131,7 @@ public abstract class DeviceBase : DisposableObject, IDevice
public virtual int RegisterByteLength { get; protected set; } = 1;
/// <inheritdoc/>
public virtual IThingsGatewayBitConverter ThingsGatewayBitConverter { get; protected set; } = new ThingsGatewayBitConverter();
public virtual IThingsGatewayBitConverter ThingsGatewayBitConverter { get; } = new ThingsGatewayBitConverter();
/// <inheritdoc/>
public bool OnLine => Channel.Online;

View File

@@ -27,7 +27,7 @@ public interface IThingsGatewayBitConverter
/// <summary>
/// 指定大小端。
/// </summary>
EndianType EndianType { get; }
EndianType EndianType { get; set; }
/// <summary>
/// 当前的字符串编码类型
@@ -325,6 +325,7 @@ public interface IThingsGatewayBitConverter
/// <param name="length">length</param>
/// <returns>decimal对象</returns>
decimal[] ToDecimal(byte[] buffer, int offset, int length);
IThingsGatewayBitConverter GetTransByAddress(string? registerAddress);
#endregion ToValue
}

View File

@@ -14,6 +14,7 @@ using System.Runtime.CompilerServices;
using System.Text;
using ThingsGateway.Foundation.Extension.Generic;
using ThingsGateway.Foundation.Extension.String;
namespace ThingsGateway.Foundation;
@@ -61,7 +62,14 @@ public partial class ThingsGatewayBitConverter : IThingsGatewayBitConverter
}
/// <inheritdoc/>
public virtual EndianType EndianType { get; }
public virtual EndianType EndianType
{
get => endianType; set
{
endianType = value;
TouchSocketBitConverter = new TouchSocketBitConverter(endianType);
}
}
/// <inheritdoc/>
public virtual bool IsStringReverseByteWord { get; set; }
@@ -69,7 +77,7 @@ public partial class ThingsGatewayBitConverter : IThingsGatewayBitConverter
/// <inheritdoc/>
public virtual bool IsVariableStringLength { get; set; }
internal protected TouchSocketBitConverter TouchSocketBitConverter => TouchSocketBitConverter.GetBitConverter(EndianType);
internal protected TouchSocketBitConverter TouchSocketBitConverter { get; set; }
static ThingsGatewayBitConverter()
{
@@ -86,7 +94,133 @@ public partial class ThingsGatewayBitConverter : IThingsGatewayBitConverter
/// 以小端
/// </summary>
public static readonly ThingsGatewayBitConverter LittleEndian;
private EndianType endianType;
public virtual void OtherPropertySet(IThingsGatewayBitConverter thingsGatewayBitConverter, string registerAddress)
{
}
/// <summary>
/// 从设备地址中解析附加信息
/// 这个方法获取<see cref="IThingsGatewayBitConverter"/>
/// 解析步骤将被缓存。
/// </summary>
/// <param name="registerAddress">设备地址</param>
/// <returns><see cref="IThingsGatewayBitConverter"/> 实例</returns>
public virtual IThingsGatewayBitConverter GetTransByAddress(string? registerAddress)
{
if (registerAddress.IsNullOrEmpty()) return this;
var type = this.GetType();
// 尝试从缓存中获取解析结果
//var cacheKey = $"{nameof(ThingsGatewayBitConverterExtension)}_{nameof(GetTransByAddress)}_{type.FullName}_{type.TypeHandle.Value}_{this.ToJsonString()}_{registerAddress}_{this.GetHashCode()}";
//if (MemoryCache.TryGetValue(cacheKey, out IThingsGatewayBitConverter cachedConverter))
//{
// if (cachedConverter.Equals(this))
// {
// return this;
// }
// else
// {
// return (IThingsGatewayBitConverter)cachedConverter.Map(type);
// }
//}
// 去除设备地址两端的空格
registerAddress = registerAddress.Trim();
// 根据分号拆分附加信息
var strs = registerAddress.SplitStringBySemicolon();
DataFormatEnum? dataFormat = null;
Encoding? encoding = null;
bool? wstring = null;
int? stringlength = null;
BcdFormatEnum? bcdFormat = null;
StringBuilder sb = new();
foreach (var str in strs)
{
// 解析 dataFormat
if (str.StartsWith("data=", StringComparison.OrdinalIgnoreCase))
{
var dataFormatName = str.Substring(5);
try { if (Enum.TryParse<DataFormatEnum>(dataFormatName, true, out var dataFormat1)) dataFormat = dataFormat1; } catch { }
}
else if (str.StartsWith("vsl=", StringComparison.OrdinalIgnoreCase))
{
var wstringName = str.Substring(4);
try { if (bool.TryParse(wstringName, out var wstring1)) wstring = wstring1; } catch { }
}
// 解析 encoding
else if (str.StartsWith("encoding=", StringComparison.OrdinalIgnoreCase))
{
var encodingName = str.Substring(9);
try { encoding = Encoding.GetEncoding(encodingName); } catch { }
}
// 解析 length
else if (str.StartsWith("len=", StringComparison.OrdinalIgnoreCase))
{
var lenStr = str.Substring(4);
stringlength = lenStr.IsNullOrEmpty() ? null : Convert.ToUInt16(lenStr);
}
// 解析 bcdFormat
else if (str.StartsWith("bcd=", StringComparison.OrdinalIgnoreCase))
{
var bcdName = str.Substring(4);
try { if (Enum.TryParse<BcdFormatEnum>(bcdName, true, out var bcdFormat1)) bcdFormat = bcdFormat1; } catch { }
}
// 处理其他情况,将未识别的部分拼接回去
else
{
if (sb.Length > 0)
sb.Append($";{str}");
else
sb.Append($"{str}");
}
}
// 更新设备地址为去除附加信息后的地址
registerAddress = sb.ToString();
var converter = (IThingsGatewayBitConverter)this!.Map(type);
// 如果没有解析出任何附加信息,则直接返回默认的数据转换器
if (bcdFormat == null && stringlength == null && encoding == null && dataFormat == null && wstring == null)
{
//MemoryCache.Set(cacheKey, this!, 3600);
return converter;
}
// 根据默认的数据转换器创建新的数据转换器实例
// 更新新的数据转换器实例的属性值
if (encoding != null)
{
converter.Encoding = encoding;
}
if (bcdFormat != null)
{
converter.BcdFormat = bcdFormat.Value;
}
if (wstring != null)
{
converter.IsVariableStringLength = wstring.Value;
}
if (stringlength != null)
{
converter.StringLength = stringlength.Value;
}
if (dataFormat != null)
{
converter.DataFormat = dataFormat.Value;
}
OtherPropertySet(converter, registerAddress);
// 将解析结果添加到缓存中缓存有效期为3600秒
//MemoryCache.Set(cacheKey, converter!, 3600);
return converter;
}
#region GetBytes
/// <inheritdoc/>

View File

@@ -10,10 +10,6 @@
using Newtonsoft.Json.Linq;
using System.Text;
using ThingsGateway.Foundation.Extension.String;
namespace ThingsGateway.Foundation;
/// <summary>
@@ -23,126 +19,6 @@ public static class ThingsGatewayBitConverterExtension
{
//private static MemoryCache MemoryCache = new() { Capacity = 10000000 };
/// <summary>
/// 从设备地址中解析附加信息
/// 这个方法获取<see cref="IThingsGatewayBitConverter"/>
/// 解析步骤将被缓存。
/// </summary>
/// <param name="registerAddress">设备地址</param>
/// <param name="defaultBitConverter">默认的数据转换器</param>
/// <returns><see cref="IThingsGatewayBitConverter"/> 实例</returns>
public static IThingsGatewayBitConverter GetTransByAddress(this IThingsGatewayBitConverter defaultBitConverter, string? registerAddress)
{
if (registerAddress.IsNullOrEmpty()) return defaultBitConverter;
var type = defaultBitConverter.GetType();
// 尝试从缓存中获取解析结果
//var cacheKey = $"{nameof(ThingsGatewayBitConverterExtension)}_{nameof(GetTransByAddress)}_{type.FullName}_{type.TypeHandle.Value}_{defaultBitConverter.ToJsonString()}_{registerAddress}_{defaultBitConverter.GetHashCode()}";
//if (MemoryCache.TryGetValue(cacheKey, out IThingsGatewayBitConverter cachedConverter))
//{
// if (cachedConverter.Equals(defaultBitConverter))
// {
// return defaultBitConverter;
// }
// else
// {
// return (IThingsGatewayBitConverter)cachedConverter.Map(type);
// }
//}
// 去除设备地址两端的空格
registerAddress = registerAddress.Trim();
// 根据分号拆分附加信息
var strs = registerAddress.SplitStringBySemicolon();
DataFormatEnum? dataFormat = null;
Encoding? encoding = null;
bool? wstring = null;
int? stringlength = null;
BcdFormatEnum? bcdFormat = null;
StringBuilder sb = new();
foreach (var str in strs)
{
// 解析 dataFormat
if (str.StartsWith("data=", StringComparison.OrdinalIgnoreCase))
{
var dataFormatName = str.Substring(5);
try { if (Enum.TryParse<DataFormatEnum>(dataFormatName, true, out var dataFormat1)) dataFormat = dataFormat1; } catch { }
}
else if (str.StartsWith("vsl=", StringComparison.OrdinalIgnoreCase))
{
var wstringName = str.Substring(4);
try { if (bool.TryParse(wstringName, out var wstring1)) wstring = wstring1; } catch { }
}
// 解析 encoding
else if (str.StartsWith("encoding=", StringComparison.OrdinalIgnoreCase))
{
var encodingName = str.Substring(9);
try { encoding = Encoding.GetEncoding(encodingName); } catch { }
}
// 解析 length
else if (str.StartsWith("len=", StringComparison.OrdinalIgnoreCase))
{
var lenStr = str.Substring(4);
stringlength = lenStr.IsNullOrEmpty() ? null : Convert.ToUInt16(lenStr);
}
// 解析 bcdFormat
else if (str.StartsWith("bcd=", StringComparison.OrdinalIgnoreCase))
{
var bcdName = str.Substring(4);
try { if (Enum.TryParse<BcdFormatEnum>(bcdName, true, out var bcdFormat1)) bcdFormat = bcdFormat1; } catch { }
}
// 处理其他情况,将未识别的部分拼接回去
else
{
if (sb.Length > 0)
sb.Append($";{str}");
else
sb.Append($"{str}");
}
}
// 更新设备地址为去除附加信息后的地址
registerAddress = sb.ToString();
// 如果没有解析出任何附加信息,则直接返回默认的数据转换器
if (bcdFormat == null && stringlength == null && encoding == null && dataFormat == null && wstring == null)
{
//MemoryCache.Set(cacheKey, defaultBitConverter!, 3600);
return defaultBitConverter;
}
// 根据默认的数据转换器创建新的数据转换器实例
var converter = (IThingsGatewayBitConverter)defaultBitConverter!.Map(type);
// 更新新的数据转换器实例的属性值
if (encoding != null)
{
converter.Encoding = encoding;
}
if (bcdFormat != null)
{
converter.BcdFormat = bcdFormat.Value;
}
if (wstring != null)
{
converter.IsVariableStringLength = wstring.Value;
}
if (stringlength != null)
{
converter.StringLength = stringlength.Value;
}
if (dataFormat != null)
{
converter.DataFormat = dataFormat.Value;
}
// 将解析结果添加到缓存中缓存有效期为3600秒
//MemoryCache.Set(cacheKey, converter!, 3600);
return converter;
}
#region

View File

@@ -17,6 +17,7 @@ public class CronScheduledTask : DisposeBase, IScheduledTask
private ILog LogMessage;
private volatile int _isRunning = 0;
private volatile int _pendingTriggers = 0;
public Int32 Period => _timer?.Period ?? 0;
public CronScheduledTask(string interval, Func<object?, CancellationToken, Task> taskFunc, object? state, ILog log, CancellationToken token)
{
@@ -79,7 +80,7 @@ public class CronScheduledTask : DisposeBase, IScheduledTask
}
catch (Exception ex)
{
LogMessage.LogWarning(ex);
LogMessage?.LogWarning(ex);
}
finally
{
@@ -90,7 +91,7 @@ public class CronScheduledTask : DisposeBase, IScheduledTask
{
if (!Check())
{
DelayDo();
SetNext(_interval10MS);
}
}
}
@@ -121,7 +122,7 @@ public class CronScheduledTask : DisposeBase, IScheduledTask
}
catch (Exception ex)
{
LogMessage.LogWarning(ex);
LogMessage?.LogWarning(ex);
}
finally
{
@@ -132,18 +133,25 @@ public class CronScheduledTask : DisposeBase, IScheduledTask
{
if (!Check())
{
DelayDo();
SetNext(_interval10MS);
}
}
}
private void DelayDo()
public void SetNext(int interval)
{
// 延迟触发下一次
if (!Check())
_timer?.SetNext(_interval10MS);
_timer?.SetNext(interval);
}
public bool Change(int dueTime, int period)
{
// 延迟触发下一次
if (!Check())
return _timer?.Change(dueTime, period) == true;
return false;
}
public void Stop()
{
_timer?.SafeDispose();

View File

@@ -2,9 +2,11 @@
{
public interface IScheduledTask
{
bool Change(int dueTime, int period);
void SetNext(int interval);
void Start();
void Stop();
public Int32 Period { get; }
}

View File

@@ -16,6 +16,7 @@ public class ScheduledAsyncTask : DisposeBase, IScheduledTask, IScheduledIntInte
private ILog LogMessage;
private volatile int _isRunning = 0;
private volatile int _pendingTriggers = 0;
public Int32 Period => _timer?.Period ?? 0;
public ScheduledAsyncTask(int interval, Func<object?, CancellationToken, Task> taskFunc, object? state, ILog log, CancellationToken token)
{
@@ -63,7 +64,7 @@ public class ScheduledAsyncTask : DisposeBase, IScheduledTask, IScheduledIntInte
}
catch (Exception ex)
{
LogMessage.LogWarning(ex);
LogMessage?.LogWarning(ex);
}
finally
{
@@ -74,19 +75,26 @@ public class ScheduledAsyncTask : DisposeBase, IScheduledTask, IScheduledIntInte
{
if (!Check())
{
DelayDo();
SetNext(_interval10MS);
}
}
}
private void DelayDo()
public void SetNext(int interval)
{
// 延迟触发下一次
if (!Check())
_timer?.SetNext(_interval10MS);
_timer?.SetNext(interval);
}
public bool Change(int dueTime, int period)
{
// 延迟触发下一次
if (!Check())
return _timer?.Change(dueTime, period) == true;
return false;
}
public void Stop()
{
_timer?.SafeDispose();

View File

@@ -16,6 +16,7 @@ public class ScheduledSyncTask : DisposeBase, IScheduledTask, IScheduledIntInter
private ILog LogMessage;
private volatile int _isRunning = 0;
private volatile int _pendingTriggers = 0;
public Int32 Period => _timer?.Period ?? 0;
public ScheduledSyncTask(int interval, Action<object?, CancellationToken> taskFunc, object? state, ILog log, CancellationToken token)
{
@@ -67,7 +68,7 @@ public class ScheduledSyncTask : DisposeBase, IScheduledTask, IScheduledIntInter
}
catch (Exception ex)
{
LogMessage.LogWarning(ex);
LogMessage?.LogWarning(ex);
}
finally
{
@@ -78,18 +79,25 @@ public class ScheduledSyncTask : DisposeBase, IScheduledTask, IScheduledIntInter
{
if (!Check())
{
DelayDo();
SetNext(_interval10MS);
}
}
}
private void DelayDo()
public void SetNext(int interval)
{
if (!Check())
_timer?.SetNext(interval);
}
public bool Change(int dueTime, int period)
{
// 延迟触发下一次
if (!Check())
_timer?.SetNext(_interval10MS);
}
return _timer?.Change(dueTime, period) == true;
return false;
}
public void Stop()
{
_timer?.SafeDispose();

View File

@@ -150,23 +150,26 @@ public abstract class CollectBase : DriverBase, IRpcDriver
return variablesMethodResult;
}
}
private volatile bool _addVariableTasks;
protected void RefreshVariableTasks(CancellationToken cancellationToken)
{
if (VariableTasks.Count > 0)
if (_addVariableTasks)
{
foreach (var item in VariableTasks)
if (VariableTasks != null)
{
item.Stop();
TaskSchedulerLoop.Remove(item);
}
foreach (var item in VariableTasks)
{
item.Stop();
TaskSchedulerLoop?.Remove(item);
}
VariableTasks = AddVariableTask(cancellationToken);
VariableTasks = AddVariableTask(cancellationToken);
foreach (var item in VariableTasks)
{
TaskSchedulerLoop.Add(item);
item.Start();
foreach (var item in VariableTasks)
{
TaskSchedulerLoop?.Add(item);
item.Start();
}
}
}
}
@@ -198,7 +201,7 @@ public abstract class CollectBase : DriverBase, IRpcDriver
tasks.Add(testOnline);
VariableTasks = AddVariableTask(cancellationToken);
_addVariableTasks = true;
tasks.AddRange(VariableTasks);
return tasks;
@@ -242,6 +245,7 @@ public abstract class CollectBase : DriverBase, IRpcDriver
protected virtual void SetDeviceStatus(object? state, CancellationToken cancellationToken)
{
CurrentDevice.SetDeviceStatus(TimerX.Now);
if (IsConnected())
{
if (CurrentDevice.DeviceStatus == DeviceStatusEnum.OffLine)

View File

@@ -128,6 +128,7 @@ public class Variable : BaseDataEntity, IValidatableObject
[IgnoreExcel]
[Required]
[NotNull]
[MinValue(1)]
public virtual long DeviceId { get => deviceId; set => deviceId = value; }
/// <summary>

View File

@@ -197,7 +197,7 @@ public partial class VariableRuntime : Variable, IVariable, IDisposable
/// <summary>
/// 实时值
/// 实时值类型
/// </summary>
[AutoGenerateColumn(Visible = true, Order = 6)]
public string RuntimeType => Value?.GetType()?.ToString();

View File

@@ -180,7 +180,7 @@ internal sealed class AlarmHostedService : BackgroundService, IAlarmHostedServic
int delay = item.AlarmDelay; // 获取报警延迟时间
// 检查变量的数据类型
if (item.DataType.GetSystemType() == typeof(bool))
if (item.Value?.GetType() == typeof(bool))
{
// 如果数据类型为布尔型则调用GetBoolAlarmCode方法获取布尔型报警类型及相关信息
alarmEnum = GetBoolAlarmCode(item, out limit, out ex, out text);
@@ -394,76 +394,70 @@ internal sealed class AlarmHostedService : BackgroundService, IAlarmHostedServic
/// <summary>
/// 执行工作任务,对设备变量进行报警分析。
/// </summary>
/// <param name="state"></param>
/// <param name="cancellation">取消任务的 CancellationToken</param>
private async Task DoWork(CancellationToken cancellation)
private void DoWork(object? state, CancellationToken cancellation)
{
while (!cancellation.IsCancellationRequested)
try
{
if (!GlobalData.StartBusinessChannelEnable)
return;
try
//Stopwatch stopwatch = Stopwatch.StartNew();
// 遍历设备变量列表
if (!GlobalData.AlarmEnableIdVariables.IsEmpty)
{
var list = GlobalData.AlarmEnableIdVariables.Select(a => a.Value).ToArray();
list.ParallelForEach((item, state, index) =>
{
if (!GlobalData.StartBusinessChannelEnable)
return;
//Stopwatch stopwatch = Stopwatch.StartNew();
// 遍历设备变量列表
if (!GlobalData.AlarmEnableIdVariables.IsEmpty)
{
var list = GlobalData.AlarmEnableIdVariables.Select(a => a.Value).ToArray();
list.ParallelForEach((item, state, index) =>
{
{
// 如果取消请求已经被触发,则结束任务
if (cancellation.IsCancellationRequested)
return;
// 如果取消请求已经被触发,则结束任务
if (cancellation.IsCancellationRequested)
return;
// 如果该变量的报警功能未启用,则跳过该变量
if (!item.AlarmEnable)
return;
// 如果该变量的报警功能未启用,则跳过该变量
if (!item.AlarmEnable)
return;
// 如果该变量离线,则跳过该变量
if (!item.IsOnline)
return;
// 如果该变量离线,则跳过该变量
if (!item.IsOnline)
return;
// 对该变量进行报警分析
AlarmAnalysis(item);
// 对该变量进行报警分析
AlarmAnalysis(item);
}
});
}
else
{
await Task.Delay(5000, cancellation).ConfigureAwait(false);
}
});
}
else
{
//if (scheduledTask.Period != 5000)
// scheduledTask.Change(0, 5000); // 如果没有启用报警的变量则设置下次执行时间为5秒后
scheduledTask.SetNext(5000); // 如果没有启用报警的变量则设置下次执行时间为5秒后
}
//stopwatch.Stop();
//_logger.LogInformation("报警分析耗时:" + stopwatch.ElapsedMilliseconds + "ms");
}
catch (OperationCanceledException)
{
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Alarm analysis fail");
}
finally
{
// 延迟一段时间,避免过于频繁地执行任务
await Task.Delay(50, cancellation).ConfigureAwait(false);
}
//stopwatch.Stop();
//_logger.LogInformation("报警分析耗时:" + stopwatch.ElapsedMilliseconds + "ms");
}
catch (OperationCanceledException)
{
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Alarm analysis fail");
}
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
private ScheduledSyncTask scheduledTask;
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation(AppResource.RealAlarmTaskStart);
await DoWork(stoppingToken).ConfigureAwait(false);
scheduledTask = new ScheduledSyncTask(10, DoWork, null, null, stoppingToken);
scheduledTask.Start();
return Task.CompletedTask;
}
}

View File

@@ -18,6 +18,7 @@ namespace ThingsGateway.Gateway.Application
{
Task<bool> BatchEditAsync(IEnumerable<Variable> models, Variable oldModel, Variable model, bool restart, CancellationToken cancellationToken);
Task<bool> DeleteVariableAsync(IEnumerable<long> ids, bool restart, CancellationToken cancellationToken);
Task<bool> ClearVariableAsync(bool restart, CancellationToken cancellationToken);
Task<Dictionary<string, object>> ExportVariableAsync(ExportFilter exportFilter);
Task ImportVariableAsync(Dictionary<string, ImportPreviewOutputBase> input, bool restart, CancellationToken cancellationToken);

View File

@@ -106,6 +106,7 @@ public class VariableRuntimeService : IVariableRuntimeService
var result = await GlobalData.VariableService.DeleteVariableAsync(variableIds).ConfigureAwait(false);
ConcurrentHashSet<IDriver> changedDriver = new();
RuntimeServiceHelper.AddBusinessChangedDriver(variableIds, changedDriver);
@@ -125,6 +126,38 @@ public class VariableRuntimeService : IVariableRuntimeService
}
public async Task<bool> ClearVariableAsync(bool restart, CancellationToken cancellationToken)
{
try
{
// await WaitLock.WaitAsync().ConfigureAwait(false);
var result = await GlobalData.VariableService.DeleteVariableAsync(null).ConfigureAwait(false);
ConcurrentHashSet<IDriver> changedDriver = new();
var variableIds = GlobalData.IdVariables.Select(a => a.Key).ToHashSet();
RuntimeServiceHelper.AddBusinessChangedDriver(variableIds, changedDriver);
RuntimeServiceHelper.VariableRuntimesDispose(variableIds);
if (restart)
{
await RuntimeServiceHelper.ChangedDriverAsync(changedDriver, _logger, cancellationToken).ConfigureAwait(false);
}
return true;
}
finally
{
//WaitLock.Release();
}
}
public Task<Dictionary<string, object>> ExportVariableAsync(ExportFilter exportFilter) => GlobalData.VariableService.ExportVariableAsync(exportFilter);
public async Task ImportVariableAsync(Dictionary<string, ImportPreviewOutputBase> input, bool restart, CancellationToken cancellationToken)

View File

@@ -335,8 +335,8 @@ internal sealed class VariableService : BaseService<Variable>, IVariableService
{
using var db = GetDB();
var dataScope = await GlobalData.SysUserService.GetCurrentUserDataScopeAsync().ConfigureAwait(false);
var ids = input.ToList();
var result = (await db.Deleteable<Variable>().Where(a => ids.Contains(a.Id))
var ids = input?.ToList();
var result = (await db.Deleteable<Variable>().WhereIF(input != null, a => ids.Contains(a.Id))
.WhereIF(dataScope != null && dataScope?.Count > 0, u => dataScope.Contains(u.CreateOrgId))//在指定机构列表查询
.WhereIF(dataScope?.Count == 0, u => u.CreateUserId == UserManager.UserId)
.ExecuteCommandAsync().ConfigureAwait(false)) > 0;

View File

@@ -424,7 +424,7 @@ finally
await Task.Run(async () =>
{
await GlobalData.VariableRuntimeService.DeleteVariableAsync(Items.Select(a => a.Id), AutoRestartThread, default);
await GlobalData.VariableRuntimeService.ClearVariableAsync(AutoRestartThread, default);
await InvokeAsync(async () =>
{
await ToastService.Default();

View File

@@ -24,7 +24,7 @@ public class Dlt645_2007Master : DtuServiceDeviceBase
RegisterByteLength = 2;
channel.MaxSign = ushort.MaxValue;
}
public override IThingsGatewayBitConverter ThingsGatewayBitConverter { get; protected set; } = new Dlt645_2007BitConverter(EndianType.Big) { };
public override IThingsGatewayBitConverter ThingsGatewayBitConverter { get; } = new Dlt645_2007BitConverter(EndianType.Big) { };
/// <inheritdoc/>
public string FEHead { get; set; } = "FEFEFEFE";

View File

@@ -30,7 +30,7 @@ public partial class ModbusMaster : DtuServiceDeviceBase, IModbusAddress
}
}
public override IThingsGatewayBitConverter ThingsGatewayBitConverter { get; protected set; } = new ThingsGatewayBitConverter(EndianType.Big) { };
public override IThingsGatewayBitConverter ThingsGatewayBitConverter { get; } = new ThingsGatewayBitConverter(EndianType.Big) { };
/// <summary>
/// Modbus类型在initChannelAsync之前设置

View File

@@ -51,7 +51,7 @@ public class ModbusSlave : DeviceBase, IModbusAddress
RegisterByteLength = 2;
channel.MaxSign = ushort.MaxValue;
}
public override IThingsGatewayBitConverter ThingsGatewayBitConverter { get; protected set; } = new ThingsGatewayBitConverter(EndianType.Big) { };
public override IThingsGatewayBitConverter ThingsGatewayBitConverter { get; } = new ThingsGatewayBitConverter(EndianType.Big) { };
public override bool SupportMultipleDevice()
{

View File

@@ -1099,7 +1099,7 @@ public class OpcUaMaster : IDisposable
}
private static List<VariableNode> GetVariableNodes(ReadValueIdCollection itemsToRead, DataValueCollection values, DiagnosticInfoCollection diagnosticInfos, ResponseHeader responseHeader, int count = 1)
private List<VariableNode> GetVariableNodes(ReadValueIdCollection itemsToRead, DataValueCollection values, DiagnosticInfoCollection diagnosticInfos, ResponseHeader responseHeader, int count = 1)
{
ClientBase.ValidateResponse(values, itemsToRead);
ClientBase.ValidateDiagnosticInfos(diagnosticInfos, itemsToRead);
@@ -1130,11 +1130,11 @@ public class OpcUaMaster : IDisposable
// NodeId Attribute
if (!DataValue.IsGood(value))
{
throw ServiceResultException.Create(value.StatusCode, 0 + 2 * i, diagnosticInfos, responseHeader.StringTable);
Log(3, ServiceResultException.Create(value.StatusCode, 0 + 2 * i, diagnosticInfos, responseHeader.StringTable), $"Get nodeid {itemsToRead[0 + 2 * i].NodeId} fail");
}
if (value == null)
{
throw ServiceResultException.Create(StatusCodes.BadUnexpectedError, "Node does not support the NodeId attribute.");
Log(3, ServiceResultException.Create(StatusCodes.BadUnexpectedError, "Node does not support the NodeId attribute."), $"Get nodeid {itemsToRead[0 + 2 * i].NodeId} fail");
}
variableNode.NodeId = (NodeId)value.GetValue(typeof(NodeId));
@@ -1376,7 +1376,7 @@ public class OpcUaMaster : IDisposable
{
if (m_reConnectHandler == null)
{
Log(3, null, "Reconnecting in {0}s", 1);
Log(3, null, "Reconnecting : {0}", e.Status.ToString());
m_ReconnectStarting?.Invoke(this, e);
m_reConnectHandler = new SessionReconnectHandler(true, 10000);

View File

@@ -3,7 +3,8 @@
"AddressStart": "AddressStart",
"BitCode": "BitBitCode",
"DataCode": "DataCode",
"DbBlock": "DbBlock"
"DbBlock": "DbBlock",
"WStringEnable": "WString"
},
"ThingsGateway.Foundation.SiemensS7.SiemensS7Master": {
"LocalTSAP": "LocalTSAP",

View File

@@ -3,7 +3,8 @@
"AddressStart": "起始地址",
"BitCode": "Bit地址",
"DataCode": "寄存器区",
"DbBlock": "DB块"
"DbBlock": "DB块",
"WStringEnable": "WString"
},
"ThingsGateway.Foundation.SiemensS7.SiemensS7Master": {
"LocalTSAP": "本地TSAP",

View File

@@ -24,18 +24,44 @@ public class S7BitConverter : ThingsGatewayBitConverter
public S7BitConverter(EndianType endianType) : base(endianType)
{
}
public bool SMART200 { get; set; } = false;
public bool WStringEnable { get; set; } = false;
public override int? StringLength { get; set; } = 100;
/// <inheritdoc/>
public override string ToString(byte[] buffer, int offset, int length)
{
if (!IsVariableStringLength)
if (WStringEnable)
{
return base.ToString(buffer, offset, length);
if (!SMART200)
return base.ToString(buffer, offset, this.ToUInt16(buffer, offset - 2) * 2);
else
return base.ToString(buffer, offset, buffer[offset - 1] * 2);
}
else if (IsVariableStringLength)
{
if (!SMART200)
return base.ToString(buffer, offset, buffer[offset - 1]);
else
return base.ToString(buffer, offset, buffer[offset - 1]);
}
else
{
return base.ToString(buffer, offset, buffer[offset - 1]);
return base.ToString(buffer, offset, length);
}
}
public override void OtherPropertySet(IThingsGatewayBitConverter thingsGatewayBitConverter, string registerAddress)
{
if (thingsGatewayBitConverter is not S7BitConverter s7BitConverter)
{
return;
}
var sAddress = SiemensS7Address.ParseFrom(registerAddress);
s7BitConverter.WStringEnable = sAddress.WStringEnable;
base.OtherPropertySet(thingsGatewayBitConverter, registerAddress);
}
}

View File

@@ -21,6 +21,8 @@ namespace ThingsGateway.Foundation.SiemensS7;
/// </summary>
public class SiemensS7Address : S7Request
{
public bool WStringEnable { get; set; }
public SiemensS7Address()
{
}
@@ -42,6 +44,7 @@ public class SiemensS7Address : S7Request
public override string ToString()
{
StringBuilder stringBuilder = Pool.StringBuilder.Get();
stringBuilder.Append($"W={WStringEnable};");
if (DataCode == S7Area.TM)
{
stringBuilder.Append($"T{AddressStart}");
@@ -273,6 +276,10 @@ public class SiemensS7Address : S7Request
throw new Exception(string.Format(AppResource.AddressError, address));
}
}
else if (strArr[index].StartsWith("W=", StringComparison.OrdinalIgnoreCase))
{
s7AddressData.WStringEnable = strArr[index].Substring(2).ToBoolean(false);
}
}
//if (isCache)

View File

@@ -8,6 +8,8 @@
// QQ群605534569
//------------------------------------------------------------------------------
using System.Text;
namespace ThingsGateway.Foundation.SiemensS7;
internal static class PackHelper
@@ -49,6 +51,13 @@ internal static class PackHelper
{
// 解析SiemensS7Address对象
var s7Address = SiemensS7Address.ParseFrom(it.RegisterAddress);
// 根据地址获取转换参数
if (it.ThingsGatewayBitConverter is S7BitConverter s7BitConverter)
{
s7BitConverter.WStringEnable = s7Address.WStringEnable;
}
int lastLen = it.DataType.GetByteLength();
// 处理特殊情况下的长度
@@ -76,10 +85,23 @@ internal static class PackHelper
{
lastLen = it.ThingsGatewayBitConverter.StringLength.Value;
}
if (s7Address.WStringEnable)
{
it.ThingsGatewayBitConverter.Encoding = Encoding.Unicode;
}
}
else
{
if (it.ThingsGatewayBitConverter.IsVariableStringLength)
if (s7Address.WStringEnable)
{
// 字符串在S7中前四个字节不属于实际内容
it.Index += 4;
lastLen = it.ThingsGatewayBitConverter.StringLength.Value + 4;
it.ThingsGatewayBitConverter.Encoding = Encoding.BigEndianUnicode;
}
else if (it.ThingsGatewayBitConverter.IsVariableStringLength)
{
// 字符串在S7中前两个字节不属于实际内容
it.Index += 2;

View File

@@ -106,7 +106,7 @@ internal sealed partial class SiemensHelper
}
}
internal static async ValueTask<OperResult> WriteAsync(SiemensS7Master plc, string address, string value, Encoding encoding, CancellationToken cancellationToken = default)
internal static async ValueTask<OperResult> WriteStringAsync(SiemensS7Master plc, string address, string value, Encoding encoding, CancellationToken cancellationToken = default)
{
value ??= string.Empty;
byte[] inBytes = encoding.GetBytes(value);
@@ -125,4 +125,71 @@ internal sealed partial class SiemensHelper
}
return await plc.WriteAsync(address, DataTransUtil.SpliceArray([(byte)value.Length], inBytes), DataTypeEnum.String, cancellationToken).ConfigureAwait(false);
}
internal static async ValueTask<OperResult<string>> ReadWStringAsync(SiemensS7Master plc, string address, CancellationToken cancellationToken)
{
//先读取一次获取长度,再读取实际值
if (plc.SiemensS7Type != SiemensTypeEnum.S200Smart)
{
var encoding = Encoding.BigEndianUnicode;
var result1 = await plc.ReadAsync(address, 4, cancellationToken).ConfigureAwait(false);
if (!result1.IsSuccess)
{
return new OperResult<string>(result1);
}
if (result1.Content[0] == 0 || result1.Content[0] == byte.MaxValue)
{
return new OperResult<string>(AppResource.NotString);
}
var result2 = await plc.ReadAsync(address, 4 + (plc.ThingsGatewayBitConverter.ToUInt16(result1.Content, 2) * 2), cancellationToken).ConfigureAwait(false);
if (!result2.IsSuccess)
{
return new OperResult<string>(result2);
}
else
{
return OperResult.CreateSuccessResult(encoding.GetString(result2.Content, 4, result2.Content.Length - 4));
}
}
else
{
var encoding = Encoding.Unicode;
var result1 = await plc.ReadAsync(address, 1, cancellationToken).ConfigureAwait(false);
if (!result1.IsSuccess)
return new OperResult<string>(result1);
var result2 = await plc.ReadAsync(address, 1 + (result1.Content[0] * 2), cancellationToken).ConfigureAwait(false);
if (!result2.IsSuccess)
{
return new OperResult<string>(result2);
}
else
{
return OperResult.CreateSuccessResult(encoding.GetString(result2.Content, 1, result2.Content.Length - 1));
}
}
}
internal static async ValueTask<OperResult> WriteWStringAsync(SiemensS7Master plc, string address, string value, CancellationToken cancellationToken = default)
{
value ??= string.Empty;
if (plc.SiemensS7Type != SiemensTypeEnum.S200Smart)
{
byte[] inBytes1 = Encoding.BigEndianUnicode.GetBytes(value);
var result = await plc.ReadAsync(address, 4, cancellationToken).ConfigureAwait(false);
if (!result.IsSuccess) return result;
var num = plc.ThingsGatewayBitConverter.ToUInt16(result.Content, 0);
if (num == 0)
num = 254;
if (value.Length > num) return new OperResult<string>(AppResource.WriteDataLengthMore);
return await plc.WriteAsync(
address,
DataTransUtil.SpliceArray(plc.ThingsGatewayBitConverter.GetBytes(num), plc.ThingsGatewayBitConverter.GetBytes((ushort)value.Length),
inBytes1
), DataTypeEnum.String, cancellationToken).ConfigureAwait(false);
}
byte[] inBytes2 = Encoding.Unicode.GetBytes(value);
return await plc.WriteAsync(address, DataTransUtil.SpliceArray([(byte)value.Length], inBytes2), DataTypeEnum.String, cancellationToken).ConfigureAwait(false);
}
}

View File

@@ -32,8 +32,8 @@ public partial class SiemensS7Master : DeviceBase
{
}
public override IThingsGatewayBitConverter ThingsGatewayBitConverter { get; protected set; } = new S7BitConverter(EndianType.Big) { };
public override IThingsGatewayBitConverter ThingsGatewayBitConverter => s7BitConverter;
private S7BitConverter s7BitConverter = new S7BitConverter(EndianType.Big) { };
/// <summary>
/// PduLength
/// </summary>
@@ -54,7 +54,14 @@ public partial class SiemensS7Master : DeviceBase
/// <summary>
/// S7类型
/// </summary>
public SiemensTypeEnum SiemensS7Type { get; set; }
public SiemensTypeEnum SiemensS7Type
{
get => siemensS7Type; set
{
siemensS7Type = value;
s7BitConverter.SMART200 = value == SiemensTypeEnum.S200Smart;
}
}
/// <summary>
/// 槽号,需重新连接
@@ -371,6 +378,8 @@ public partial class SiemensS7Master : DeviceBase
#region
private WaitLock ChannelStartedWaitLock = new();
private SiemensTypeEnum siemensS7Type;
/// <inheritdoc/>
protected override async ValueTask<bool> ChannelStarted(IClientChannel channel, bool last)
{
@@ -553,7 +562,24 @@ public partial class SiemensS7Master : DeviceBase
public override async ValueTask<OperResult<string[]>> ReadStringAsync(string address, int length, IThingsGatewayBitConverter bitConverter = null, CancellationToken cancellationToken = default)
{
bitConverter ??= ThingsGatewayBitConverter.GetTransByAddress(address);
if (bitConverter.IsVariableStringLength)
if (((S7BitConverter)bitConverter)?.WStringEnable == true)
{
if (length > 1)
{
return new OperResult<string[]>(AppResource.StringLengthReadError);
}
var result = await SiemensHelper.ReadWStringAsync(this, address, cancellationToken).ConfigureAwait(false);
if (result.IsSuccess)
{
return OperResult.CreateSuccessResult(new string[] { result.Content });
}
else
{
return new OperResult<string[]>(result);
}
}
else if (bitConverter.IsVariableStringLength)
{
if (length > 1)
{
@@ -579,9 +605,14 @@ public partial class SiemensS7Master : DeviceBase
public override ValueTask<OperResult> WriteAsync(string address, string value, IThingsGatewayBitConverter bitConverter = null, CancellationToken cancellationToken = default)
{
bitConverter ??= ThingsGatewayBitConverter.GetTransByAddress(address);
if (((S7BitConverter)bitConverter)?.WStringEnable == true)
{
return SiemensHelper.WriteWStringAsync(this, address, value, cancellationToken);
}
if (bitConverter.IsVariableStringLength)
{
return SiemensHelper.WriteAsync(this, address, value, bitConverter.Encoding, cancellationToken);
return SiemensHelper.WriteStringAsync(this, address, value, bitConverter.Encoding, cancellationToken);
}
else
{

View File

@@ -16,7 +16,8 @@
"RpcWriteTopic": "RpcWriteTopic",
"TLS": "TLS",
"UserName": "UserName",
"WebSocketUrl": "WebSocketUrl"
"WebSocketUrl": "WebSocketUrl",
"BigTextScriptRpc": "BigTextScriptRpc"
},
"ThingsGateway.Plugin.Mqtt.MqttClientVariableProperty": {
"VariableRpcEnable": "VariableRpcEnable"
@@ -45,7 +46,9 @@
"MqttQualityOfServiceLevel": "QoS",
"Port": "Port",
"RpcWriteTopic": "RpcWriteTopic",
"RpcQuestTopic": "RpcQuestTopic",
"StartWithId": "StartWithId",
"WebSocketPort": "WebSocketPort"
"WebSocketPort": "WebSocketPort",
"BigTextScriptRpc": "BigTextScriptRpc"
}
}

View File

@@ -16,7 +16,8 @@
"RpcWriteTopic": "Rpc写入主题",
"TLS": "TLS",
"UserName": "用户名",
"WebSocketUrl": "WebSocketUrl"
"WebSocketUrl": "WebSocketUrl",
"BigTextScriptRpc": "RPC脚本"
},
"ThingsGateway.Plugin.Mqtt.MqttClientVariableProperty": {
"VariableRpcEnable": "启用RPC"
@@ -45,7 +46,9 @@
"MqttQualityOfServiceLevel": "QoS",
"Port": "端口",
"RpcWriteTopic": "Rpc写入主题",
"RpcQuestTopic": "Rpc请求数据主题",
"StartWithId": "允许连接的ID前缀",
"WebSocketPort": "Websocket端口"
"WebSocketPort": "Websocket端口",
"BigTextScriptRpc": "RPC脚本"
}
}

View File

@@ -0,0 +1,67 @@
//------------------------------------------------------------------------------
// 此代码版权声明为全文件覆盖,如有原作者特别声明,会在下方手动补充
// 此代码版权除特别声明外的代码归作者本人Diego所有
// 源代码使用协议遵循本仓库的开源协议及附加协议
// Gitee源代码仓库https://gitee.com/diego2098/ThingsGateway
// Github源代码仓库https://github.com/kimdiego2098/ThingsGateway
// 使用文档https://thingsgateway.cn/
// QQ群605534569
//------------------------------------------------------------------------------
using MQTTnet;
using Newtonsoft.Json.Linq;
using System.Text;
using ThingsGateway.Foundation;
using ThingsGateway.NewLife.Extension;
using ThingsGateway.NewLife.Json.Extension;
namespace ThingsGateway.Plugin.Mqtt;
public abstract class DynamicMqttClientRpcBase
{
/// <summary>
///触发rpc脚本调用
/// </summary>
/// <param name="logMessage">日志对象</param>
/// <param name="args">InterceptingPublishEventArgs</param>
/// <param name="driverPropertys">插件属性</param>
/// <param name="mqttClient">mqttServer</param>
/// <param name="getRpcResult">传入clientId和rpc数据(设备,变量名称+值字典)返回rpc结果</param>
/// <param name="tryMqttClientAsync">尝试连接</param>
/// <param name="cancellationToken">cancellationToken</param>
/// <returns></returns>
public virtual async Task RPCInvokeAsync(TouchSocket.Core.ILog logMessage, MqttApplicationMessageReceivedEventArgs args, MqttClientProperty driverPropertys, MQTTnet.IMqttClient mqttClient, Func<string, Dictionary<string, Dictionary<string, JToken>>, ValueTask<Dictionary<string, Dictionary<string, IOperResult>>>> getRpcResult, Func<CancellationToken, ValueTask<OperResult>> tryMqttClientAsync, CancellationToken cancellationToken)
{
if (driverPropertys.RpcWriteTopic.IsNullOrWhiteSpace()) return;
var t = string.Format(null, "{0}/+", driverPropertys.RpcWriteTopic);
if (MqttTopicFilterComparer.Compare(args.ApplicationMessage.Topic, t) != MqttTopicFilterCompareResult.IsMatch)
return;
var rpcDatas = Encoding.UTF8.GetString(args.ApplicationMessage.Payload).FromJsonNetString<Dictionary<string, Dictionary<string, JToken>>>();
if (rpcDatas == null)
return;
var mqttRpcResult = await getRpcResult(args.ClientId, rpcDatas).ConfigureAwait(false);
try
{
var isConnect = await tryMqttClientAsync(CancellationToken.None).ConfigureAwait(false);
if (isConnect.IsSuccess)
{
var variableMessage = new MqttApplicationMessageBuilder()
.WithTopic($"{args.ApplicationMessage.Topic}/Response")
.WithPayload(mqttRpcResult.ToSystemTextJsonString(driverPropertys.JsonFormattingIndented)).Build();
await mqttClient.PublishAsync(variableMessage, cancellationToken).ConfigureAwait(false);
}
}
catch
{
}
}
}

View File

@@ -33,7 +33,7 @@ public partial class MqttClient : BusinessBaseWithCacheIntervalScript<VariableBa
public override VariablePropertyBase VariablePropertys => _variablePropertys;
protected override BusinessPropertyWithCacheIntervalScript _businessPropertyWithCacheIntervalScript => _driverPropertys;
public override Type DriverPropertyUIType => typeof(MqttClientPropertyRazor);
public override Type DriverPropertyUIType => typeof(MqttPropertyRazor);
/// <summary>
/// 加载 PEM 证书和私钥

View File

@@ -15,6 +15,7 @@ using CSScripting;
using MQTTnet;
#if NET6_0
using MQTTnet.Client;
#endif
@@ -280,7 +281,7 @@ public partial class MqttClient : BusinessBaseWithCacheIntervalScript<VariableBa
}
}
private async ValueTask<Dictionary<string, Dictionary<string, IOperResult>>> GetResult(MqttApplicationMessageReceivedEventArgs args, Dictionary<string, Dictionary<string, JToken>> rpcDatas)
private async ValueTask<Dictionary<string, Dictionary<string, IOperResult>>> GetRpcResult(string clientId, Dictionary<string, Dictionary<string, JToken>> rpcDatas)
{
var mqttRpcResult = new Dictionary<string, Dictionary<string, IOperResult>>();
rpcDatas.ForEach(a => mqttRpcResult.Add(a.Key, new()));
@@ -325,7 +326,7 @@ public partial class MqttClient : BusinessBaseWithCacheIntervalScript<VariableBa
}
var result = await GlobalData.RpcService.InvokeDeviceMethodAsync(ToString() + "-" + args.ClientId,
var result = await GlobalData.RpcService.InvokeDeviceMethodAsync(ToString() + "-" + clientId,
writeData).ConfigureAwait(false);
foreach (var dictKv in result)
@@ -347,10 +348,13 @@ public partial class MqttClient : BusinessBaseWithCacheIntervalScript<VariableBa
private async Task MqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs args)
{
try
{
#if NET8_0_OR_GREATER
var payload = args.ApplicationMessage.Payload;
var payloadCount = payload.Length;
var payload = args.ApplicationMessage.Payload;
var payloadCount = payload.Length;
#else
var payload = args.ApplicationMessage.PayloadSegment;
@@ -359,82 +363,97 @@ public partial class MqttClient : BusinessBaseWithCacheIntervalScript<VariableBa
#endif
if (args.ApplicationMessage.Topic == _driverPropertys.RpcQuestTopic && payloadCount > 0)
{
await AllPublishAsync(CancellationToken.None).ConfigureAwait(false);
return;
}
if (!_driverPropertys.DeviceRpcEnable)
return;
Dictionary<string, Dictionary<string, JToken>> rpcDatas = new();
//适配 ThingsBoardRp
if (args.ApplicationMessage.Topic == ThingsBoardRpcTopic)
{
var thingsBoardRpcData = Encoding.UTF8.GetString(payload).FromJsonNetString<ThingsBoardRpcData>();
if (thingsBoardRpcData == null)
return;
rpcDatas.Add(thingsBoardRpcData.device, thingsBoardRpcData.data.@params.ToDictionary(a => a.Key, a => JToken.Parse(a.Value)));
if (rpcDatas == null)
return;
var mqttRpcResult = await GetResult(args, rpcDatas).ConfigureAwait(false);
try
if (args.ApplicationMessage.Topic == _driverPropertys.RpcQuestTopic && payloadCount > 0)
{
var isConnect = await TryMqttClientAsync(CancellationToken.None).ConfigureAwait(false);
if (isConnect.IsSuccess)
await AllPublishAsync(CancellationToken.None).ConfigureAwait(false);
return;
}
if (!_driverPropertys.DeviceRpcEnable)
return;
if (!_driverPropertys.BigTextScriptRpc.IsNullOrEmpty())
{
var rpcBase = CSharpScriptEngineExtension.Do<DynamicMqttClientRpcBase>(_driverPropertys.BigTextScriptRpc);
await rpcBase.RPCInvokeAsync(LogMessage, args, _driverPropertys, _mqttClient, GetRpcResult, TryMqttClientAsync, CancellationToken.None).ConfigureAwait(false);
}
else
{
Dictionary<string, Dictionary<string, JToken>> rpcDatas = new();
//适配 ThingsBoardRp
if (args.ApplicationMessage.Topic == ThingsBoardRpcTopic)
{
ThingsBoardRpcResponseData thingsBoardRpcResponseData = new();
thingsBoardRpcResponseData.device = thingsBoardRpcData.device;
thingsBoardRpcResponseData.id = thingsBoardRpcData.data.id;
thingsBoardRpcResponseData.data.success = mqttRpcResult[thingsBoardRpcResponseData.device].All(b => b.Value.IsSuccess);
thingsBoardRpcResponseData.data.message = mqttRpcResult[thingsBoardRpcResponseData.device].Select(a => a.Value.ErrorMessage).ToSystemTextJsonString(_driverPropertys.JsonFormattingIndented);
var thingsBoardRpcData = Encoding.UTF8.GetString(payload).FromJsonNetString<ThingsBoardRpcData>();
if (thingsBoardRpcData == null)
return;
rpcDatas.Add(thingsBoardRpcData.device, thingsBoardRpcData.data.@params.ToDictionary(a => a.Key, a => JToken.Parse(a.Value)));
var variableMessage = new MqttApplicationMessageBuilder()
.WithTopic($"{args.ApplicationMessage.Topic}")
.WithPayload(thingsBoardRpcResponseData.ToSystemTextJsonString(_driverPropertys.JsonFormattingIndented)).Build();
await _mqttClient.PublishAsync(variableMessage).ConfigureAwait(false);
if (rpcDatas == null)
return;
var mqttRpcResult = await GetRpcResult(args.ClientId, rpcDatas).ConfigureAwait(false);
try
{
var isConnect = await TryMqttClientAsync(CancellationToken.None).ConfigureAwait(false);
if (isConnect.IsSuccess)
{
ThingsBoardRpcResponseData thingsBoardRpcResponseData = new();
thingsBoardRpcResponseData.device = thingsBoardRpcData.device;
thingsBoardRpcResponseData.id = thingsBoardRpcData.data.id;
thingsBoardRpcResponseData.data.success = mqttRpcResult[thingsBoardRpcResponseData.device].All(b => b.Value.IsSuccess);
thingsBoardRpcResponseData.data.message = mqttRpcResult[thingsBoardRpcResponseData.device].Select(a => a.Value.ErrorMessage).ToSystemTextJsonString(_driverPropertys.JsonFormattingIndented);
var variableMessage = new MqttApplicationMessageBuilder()
.WithTopic($"{args.ApplicationMessage.Topic}")
.WithPayload(thingsBoardRpcResponseData.ToSystemTextJsonString(_driverPropertys.JsonFormattingIndented)).Build();
await _mqttClient.PublishAsync(variableMessage).ConfigureAwait(false);
}
}
catch
{
}
}
}
catch
{
}
}
else
{
var t = string.Format(null, RpcTopic, _driverPropertys.RpcWriteTopic);
if (MqttTopicFilterComparer.Compare(args.ApplicationMessage.Topic, t) != MqttTopicFilterCompareResult.IsMatch)
return;
rpcDatas = Encoding.UTF8.GetString(payload).FromJsonNetString<Dictionary<string, Dictionary<string, JToken>>>();
if (rpcDatas == null)
return;
var mqttRpcResult = await GetResult(args, rpcDatas).ConfigureAwait(false);
try
{
var isConnect = await TryMqttClientAsync(CancellationToken.None).ConfigureAwait(false);
if (isConnect.IsSuccess)
else
{
var t = string.Format(null, RpcTopic, _driverPropertys.RpcWriteTopic);
if (MqttTopicFilterComparer.Compare(args.ApplicationMessage.Topic, t) != MqttTopicFilterCompareResult.IsMatch)
return;
rpcDatas = Encoding.UTF8.GetString(payload).FromJsonNetString<Dictionary<string, Dictionary<string, JToken>>>();
if (rpcDatas == null)
return;
var variableMessage = new MqttApplicationMessageBuilder()
.WithTopic($"{args.ApplicationMessage.Topic}/Response")
.WithPayload(mqttRpcResult.ToSystemTextJsonString(_driverPropertys.JsonFormattingIndented)).Build();
await _mqttClient.PublishAsync(variableMessage).ConfigureAwait(false);
var mqttRpcResult = await GetRpcResult(args.ClientId, rpcDatas).ConfigureAwait(false);
try
{
var isConnect = await TryMqttClientAsync(CancellationToken.None).ConfigureAwait(false);
if (isConnect.IsSuccess)
{
var variableMessage = new MqttApplicationMessageBuilder()
.WithTopic($"{args.ApplicationMessage.Topic}/Response")
.WithPayload(mqttRpcResult.ToSystemTextJsonString(_driverPropertys.JsonFormattingIndented)).Build();
await _mqttClient.PublishAsync(variableMessage).ConfigureAwait(false);
}
}
catch
{
}
}
}
catch
{
}
}
catch (Exception ex)
{
LogMessage?.LogWarning(ex, $"MqttClient_ApplicationMessageReceivedAsync error");
}
}
private async Task MqttClient_ConnectedAsync(MqttClientConnectedEventArgs args)

View File

@@ -120,4 +120,10 @@ public class MqttClientProperty : BusinessPropertyWithCacheIntervalScript
[DynamicProperty(Remark = "这个主题接收到任何数据都会把全部的信息发送到变量/设备/报警主题中")]
public string RpcQuestTopic { get; set; }
/// <summary>
/// RPC脚本
/// </summary>
[DynamicProperty]
[AutoGenerateColumn(Visible = true, IsVisibleWhenEdit = false, IsVisibleWhenAdd = false)]
public string? BigTextScriptRpc { get; set; }
}

View File

@@ -14,7 +14,7 @@
@ref=Model.ValidateForm
Id=@($"DeviceEditValidateForm{Id}{Model.Value.GetType().TypeHandle.Value}")>
<EditorFormObject class="p-2" Items=PluginPropertyEditorItems IsDisplay="!CanWrite" AutoGenerateAllItem="false" RowType=RowType.Inline ItemsPerRow=@(CanWrite?2:3) ShowLabelTooltip=true LabelWidth=@(CanWrite?240:120) Model="Model.Value" ShowLabel="true" @key=@($"DeviceEditEditorFormObject{Id}{Model.Value.GetType().TypeHandle.Value}")>
<EditorFormObject class="p-2" Items=PluginPropertyEditorItems IsDisplay="!CanWrite" AutoGenerateAllItem="false" RowType=RowType.Inline ItemsPerRow=@(CanWrite ? 2 : 3) ShowLabelTooltip=true LabelWidth=@(CanWrite ? 240 : 120) Model="Model.Value" ShowLabel="true" @key=@($"DeviceEditEditorFormObject{Id}{Model.Value.GetType().TypeHandle.Value}")>
<FieldItems>
@if (Model.Value is MqttClientProperty mqttClientProperty)
@@ -49,14 +49,14 @@
@if (Model.Value is BusinessPropertyWithCacheIntervalScript businessProperty)
{
<EditorItem FieldExpression=@(()=>context) Field=@(context)>
<EditorItem FieldExpression=@(() => context) Field=@(context)>
<EditTemplate Context="value">
<div class="col-12 col-md-12 min-height-500">
<BootstrapLabel Value=@Localizer["BigTextScriptDeviceModel"] ShowLabelTooltip="true" />
<CodeEditor ShowLineNo @bind-Value=@businessProperty.BigTextScriptDeviceModel Language="csharp" Theme="vs-dark" IsReadonly=@(!CanWrite) />
<div class="ms-2 d-flex justify-content-center align-items-center">
<Button IsDisabled=@(!CanWrite) OnClick=@(()=>PropertyComponent.CheckScript(businessProperty,nameof(businessProperty.BigTextScriptDeviceModel), Localizer["check"], this, DialogService))>
<Button IsDisabled=@(!CanWrite) OnClick=@(() => PropertyComponent.CheckScript(businessProperty, nameof(businessProperty.BigTextScriptDeviceModel), Localizer["check"], this, DialogService))>
@Localizer["Check"]
</Button>
</div>
@@ -66,7 +66,7 @@
<CodeEditor IsReadonly=@(!CanWrite) ShowLineNo @bind-Value=@businessProperty.BigTextScriptVariableModel Language="csharp" Theme="vs-dark" />
<div class="ms-2 d-flex justify-content-center align-items-center">
<Button IsDisabled=@(!CanWrite) OnClick=@(()=>PropertyComponent.CheckScript(businessProperty,nameof(businessProperty.BigTextScriptVariableModel), Localizer["check"], this, DialogService))>
<Button IsDisabled=@(!CanWrite) OnClick=@(() => PropertyComponent.CheckScript(businessProperty, nameof(businessProperty.BigTextScriptVariableModel), Localizer["check"], this, DialogService))>
@Localizer["Check"]
</Button>
</div>
@@ -76,7 +76,7 @@
<CodeEditor IsReadonly=@(!CanWrite) ShowLineNo @bind-Value=@businessProperty.BigTextScriptAlarmModel Language="csharp" Theme="vs-dark" />
<div class="ms-2 d-flex justify-content-center align-items-center">
<Button IsDisabled=@(!CanWrite) OnClick=@(()=>PropertyComponent.CheckScript(businessProperty,nameof(businessProperty.BigTextScriptAlarmModel), Localizer["check"], this, DialogService))>
<Button IsDisabled=@(!CanWrite) OnClick=@(() => PropertyComponent.CheckScript(businessProperty, nameof(businessProperty.BigTextScriptAlarmModel), Localizer["check"], this, DialogService))>
@Localizer["Check"]
</Button>
</div>
@@ -85,6 +85,39 @@
</EditTemplate>
</EditorItem>
}
@if (Model.Value is MqttClientProperty clientProperty)
{
<EditorItem FieldExpression=@(() => context) Field=@(context)>
<EditTemplate Context="value">
<div class="col-12 col-md-12 min-height-500">
<BootstrapLabel Value=@Localizer["BigTextScriptRpc"] ShowLabelTooltip="true" />
<CodeEditor ShowLineNo @bind-Value=@clientProperty.BigTextScriptRpc Language="csharp" Theme="vs-dark" IsReadonly=@(!CanWrite) />
@* <div class="ms-2 d-flex justify-content-center align-items-center">
<Button IsDisabled=@(!CanWrite) OnClick=@(() => PropertyComponent.CheckScript(clientProperty, nameof(clientProperty.BigTextScriptRpc), Localizer["check"], this, DialogService))>
@Localizer["Check"]
</Button>
</div> *@
</div>
</EditTemplate>
</EditorItem>
}
else if (Model.Value is MqttServerProperty serverProperty)
{
<EditorItem FieldExpression=@(() => context) Field=@(context)>
<EditTemplate Context="value">
<div class="col-12 col-md-12 min-height-500">
<BootstrapLabel Value=@Localizer["BigTextScriptRpc"] ShowLabelTooltip="true" />
<CodeEditor ShowLineNo @bind-Value=@serverProperty.BigTextScriptRpc Language="csharp" Theme="vs-dark" IsReadonly=@(!CanWrite) />
</div>
</EditTemplate>
</EditorItem>
}
</FieldItems>
</EditorFormObject>

View File

@@ -18,7 +18,7 @@ using ThingsGateway.Razor;
namespace ThingsGateway.Plugin.Mqtt
{
public partial class MqttClientPropertyRazor : IPropertyUIBase
public partial class MqttPropertyRazor : IPropertyUIBase
{
[Parameter, EditorRequired]
public string Id { get; set; }
@@ -135,5 +135,7 @@ namespace ThingsGateway.Plugin.Mqtt
[Inject]
private DialogService DialogService { get; set; }
}
}

View File

@@ -0,0 +1,60 @@
//------------------------------------------------------------------------------
// 此代码版权声明为全文件覆盖,如有原作者特别声明,会在下方手动补充
// 此代码版权除特别声明外的代码归作者本人Diego所有
// 源代码使用协议遵循本仓库的开源协议及附加协议
// Gitee源代码仓库https://gitee.com/diego2098/ThingsGateway
// Github源代码仓库https://github.com/kimdiego2098/ThingsGateway
// 使用文档https://thingsgateway.cn/
// QQ群605534569
//------------------------------------------------------------------------------
using MQTTnet;
using MQTTnet.Server;
using Newtonsoft.Json.Linq;
using System.Text;
using ThingsGateway.Foundation;
using ThingsGateway.NewLife.Extension;
using ThingsGateway.NewLife.Json.Extension;
namespace ThingsGateway.Plugin.Mqtt;
public abstract class DynamicMqttServerRpcBase
{
/// <summary>
///触发rpc脚本调用
/// </summary>
/// <param name="logMessage">日志对象</param>
/// <param name="args">InterceptingPublishEventArgs</param>
/// <param name="driverPropertys">插件属性</param>
/// <param name="mqttServer">mqttServer</param>
/// <param name="getRpcResult">传入clientId和rpc数据(设备,变量名称+值字典)返回rpc结果</param>
/// <param name="cancellationToken">cancellationToken</param>
/// <returns></returns>
public virtual async Task RPCInvokeAsync(TouchSocket.Core.ILog logMessage, InterceptingPublishEventArgs args, MqttServerProperty driverPropertys, MQTTnet.Server.MqttServer mqttServer, Func<string, Dictionary<string, Dictionary<string, JToken>>, ValueTask<Dictionary<string, Dictionary<string, IOperResult>>>> getRpcResult, CancellationToken cancellationToken)
{
if (driverPropertys.RpcWriteTopic.IsNullOrWhiteSpace()) return;
var t = string.Format(null, "{0}/+", driverPropertys.RpcWriteTopic);
if (MqttTopicFilterComparer.Compare(args.ApplicationMessage.Topic, t) != MqttTopicFilterCompareResult.IsMatch)
return;
var rpcDatas = Encoding.UTF8.GetString(args.ApplicationMessage.Payload).FromJsonNetString<Dictionary<string, Dictionary<string, JToken>>>();
if (rpcDatas == null)
return;
var mqttRpcResult = await getRpcResult(args.ClientId, rpcDatas).ConfigureAwait(false);
try
{
var variableMessage = new MqttApplicationMessageBuilder()
.WithTopic($"{args.ApplicationMessage.Topic}/Response")
.WithPayload(mqttRpcResult.ToSystemTextJsonString(driverPropertys.JsonFormattingIndented)).Build();
await mqttServer.InjectApplicationMessage(new InjectedMqttApplicationMessage(variableMessage), cancellationToken).ConfigureAwait(false);
}
catch
{
}
}
}

View File

@@ -30,6 +30,7 @@ public partial class MqttServer : BusinessBaseWithCacheIntervalScript<VariableBa
public override VariablePropertyBase VariablePropertys => _variablePropertys;
protected override BusinessPropertyWithCacheIntervalScript _businessPropertyWithCacheIntervalScript => _driverPropertys;
public override Type DriverPropertyUIType => typeof(MqttPropertyRazor);
protected override async Task InitChannelAsync(IChannel? channel, CancellationToken cancellationToken)
{

View File

@@ -126,8 +126,6 @@ public partial class MqttServer : BusinessBaseWithCacheIntervalScript<VariableBa
{
if (_driverPropertys.GroupUpdate && variable.BusinessGroupUpdateTrigger && !variable.BusinessGroup.IsNullOrEmpty() && VariableRuntimeGroups.TryGetValue(variable.BusinessGroup, out var variableRuntimeGroup))
{
//获取组内全部变量
AddQueueVarModel(new CacheDBItem<List<VariableBasicData>>(variableRuntimeGroup.AdaptListVariableBasicData()));
@@ -182,7 +180,7 @@ public partial class MqttServer : BusinessBaseWithCacheIntervalScript<VariableBa
#endregion private
private async ValueTask<Dictionary<string, Dictionary<string, IOperResult>>> GetResult(InterceptingPublishEventArgs args, Dictionary<string, Dictionary<string, JToken>> rpcDatas)
private async ValueTask<Dictionary<string, Dictionary<string, IOperResult>>> GetRpcResult(string clientId, Dictionary<string, Dictionary<string, JToken>> rpcDatas)
{
var mqttRpcResult = new Dictionary<string, Dictionary<string, IOperResult>>();
rpcDatas.ForEach(a => mqttRpcResult.Add(a.Key, new()));
@@ -226,9 +224,9 @@ public partial class MqttServer : BusinessBaseWithCacheIntervalScript<VariableBa
}
}
var result = await GlobalData.RpcService.InvokeDeviceMethodAsync(ToString() + "-" + args.ClientId,
writeData).ConfigureAwait(false);
var result = await GlobalData.RpcService.InvokeDeviceMethodAsync(ToString() + "-" + clientId,
writeData).ConfigureAwait(false);
foreach (var dictKv in result)
{
@@ -309,37 +307,72 @@ public partial class MqttServer : BusinessBaseWithCacheIntervalScript<VariableBa
private async Task MqttServer_InterceptingPublishAsync(InterceptingPublishEventArgs args)
{
try
{
#if NET8_0_OR_GREATER
var payload = args.ApplicationMessage.Payload;
var payload = args.ApplicationMessage.Payload;
var payloadCount = payload.Length;
#else
var payload = args.ApplicationMessage.PayloadSegment;
var payloadCount = payload.Count;
#endif
if (!_driverPropertys.DeviceRpcEnable || string.IsNullOrEmpty(args.ClientId))
return;
if (_driverPropertys.RpcWriteTopic.IsNullOrWhiteSpace()) return;
if (args.ApplicationMessage.Topic == _driverPropertys.RpcQuestTopic && payloadCount > 0)
{
var data = GetRetainedMessages();
foreach (var item in data)
{
await _mqttServer.InjectApplicationMessage(new InjectedMqttApplicationMessage(item)).ConfigureAwait(false);
}
return;
}
if (!_driverPropertys.DeviceRpcEnable || string.IsNullOrEmpty(args.ClientId))
return;
if (!_driverPropertys.BigTextScriptRpc.IsNullOrEmpty())
{
var rpcBase = CSharpScriptEngineExtension.Do<DynamicMqttServerRpcBase>(_driverPropertys.BigTextScriptRpc);
await rpcBase.RPCInvokeAsync(LogMessage, args, _driverPropertys, _mqttServer, GetRpcResult, CancellationToken.None).ConfigureAwait(false);
}
else
{
if (_driverPropertys.RpcWriteTopic.IsNullOrWhiteSpace()) return;
var t = string.Format(null, RpcTopic, _driverPropertys.RpcWriteTopic);
if (MqttTopicFilterComparer.Compare(args.ApplicationMessage.Topic, t) != MqttTopicFilterCompareResult.IsMatch)
return;
var rpcDatas = Encoding.UTF8.GetString(payload).FromJsonNetString<Dictionary<string, Dictionary<string, JToken>>>();
if (rpcDatas == null)
return;
var mqttRpcResult = await GetRpcResult(args.ClientId, rpcDatas).ConfigureAwait(false);
try
{
var variableMessage = new MqttApplicationMessageBuilder()
.WithTopic($"{args.ApplicationMessage.Topic}/Response")
.WithPayload(mqttRpcResult.ToSystemTextJsonString(_driverPropertys.JsonFormattingIndented)).Build();
await _mqttServer.InjectApplicationMessage(new InjectedMqttApplicationMessage(variableMessage)).ConfigureAwait(false);
}
catch
{
}
}
var t = string.Format(null, RpcTopic, _driverPropertys.RpcWriteTopic);
if (MqttTopicFilterComparer.Compare(args.ApplicationMessage.Topic, t) != MqttTopicFilterCompareResult.IsMatch)
return;
var rpcDatas = Encoding.UTF8.GetString(payload).FromJsonNetString<Dictionary<string, Dictionary<string, JToken>>>();
if (rpcDatas == null)
return;
var mqttRpcResult = await GetResult(args, rpcDatas).ConfigureAwait(false);
try
{
var variableMessage = new MqttApplicationMessageBuilder()
.WithTopic($"{args.ApplicationMessage.Topic}/Response")
.WithPayload(mqttRpcResult.ToSystemTextJsonString(_driverPropertys.JsonFormattingIndented)).Build();
await _mqttServer.InjectApplicationMessage(
new InjectedMqttApplicationMessage(variableMessage)).ConfigureAwait(false);
}
catch
catch (Exception ex)
{
LogMessage?.LogWarning(ex, $"MqttServer_InterceptingPublishAsync error");
}
}
@@ -377,17 +410,6 @@ public partial class MqttServer : BusinessBaseWithCacheIntervalScript<VariableBa
return;
}
LogMessage?.LogInformation($"{ToString()}-{arg.ClientId}-Client Connected");
// _ = Task.Run(async () =>
// {
// //延时发送
// await Task.Delay(1000).ConfigureAwait(false);
// List<MqttApplicationMessage> data = GetRetainedMessages();
// foreach (var item in data)
// {
// await _mqttServer.InjectApplicationMessage(
//new InjectedMqttApplicationMessage(item)).ConfigureAwait(false);
// }
// });
}
/// <summary>

View File

@@ -8,6 +8,8 @@
// QQ群605534569
//------------------------------------------------------------------------------
using BootstrapBlazor.Components;
using MQTTnet.Protocol;
namespace ThingsGateway.Plugin.Mqtt;
@@ -63,6 +65,16 @@ public class MqttServerProperty : BusinessPropertyWithCacheIntervalScript
[DynamicProperty(Remark = "实际的写入主题为固定通配 {RpcWrite/+} 其中RpcWrite为该属性填入内容+通配符是请求GUID值返回结果主题会在主题后添加Response , 也就是{RpcWrite/+/Response}")]
public string RpcWriteTopic { get; set; }
/// <summary>
/// 数据请求Topic
/// </summary>
[DynamicProperty(Remark = "这个主题接收到任何数据都会把全部的信息发送到变量/设备/报警主题中")]
public string RpcQuestTopic { get; set; }
/// <summary>
/// RPC脚本
/// </summary>
[DynamicProperty]
[AutoGenerateColumn(Visible = true, IsVisibleWhenEdit = false, IsVisibleWhenAdd = false)]
public string? BigTextScriptRpc { get; set; }
}

View File

@@ -165,10 +165,11 @@ public class OpcUaMaster : CollectBase
//如果是订阅模式,连接时添加订阅组
if (_plc.OpcUaProperty?.ActiveSubscribe == true && CurrentDevice.VariableSourceReads.Count > 0 && _plc.Session.SubscriptionCount < CurrentDevice.VariableSourceReads.Count)
{
try
{
foreach (var variableSourceRead in CurrentDevice.VariableSourceReads)
foreach (var variableSourceRead in CurrentDevice.VariableSourceReads)
{
try
{
if (_plc.Session.Subscriptions.FirstOrDefault(a => a.DisplayName == variableSourceRead.RegisterAddress) == null)
{
@@ -180,19 +181,22 @@ public class OpcUaMaster : CollectBase
await Task.Delay(100, cancellationToken).ConfigureAwait(false); // allow for subscription to be finished on server?
checkLog = true;
}
catch (Exception ex)
{
if (!checkLog)
LogMessage?.LogWarning(ex, "AddSubscriptions error");
checkLog = false;
}
finally
{
}
LogMessage?.LogInformation("AddSubscriptions done");
checkLog = true;
}
catch (Exception ex)
{
if (!checkLog)
LogMessage?.LogWarning(ex, "AddSubscriptions error");
checkLog = false;
}
finally
{
}
}
}
}

View File

@@ -70,6 +70,8 @@ public class ThingsGatewayNodeManager : CustomNodeManager2
{
if (rootFolder == null) return;
NodeIdTags?.Clear();
RemoveRootNotifier(rootFolder);
rootFolder?.SafeDispose();
rootFolder = null;
rootFolder = CreateFolder(null, "ThingsGateway", "ThingsGateway");
@@ -100,7 +102,6 @@ public class ThingsGatewayNodeManager : CustomNodeManager2
}
}
AddPredefinedNode(SystemContext, rootFolder);
rootFolder.ClearChangeMasks(SystemContext, true);
}
@@ -526,7 +527,7 @@ public class ThingsGatewayNodeManager : CustomNodeManager2
a.ToDictionary(a => a.Item1.Name, a => a.Item2)
);
var result = GlobalData.RpcService.InvokeDeviceMethodAsync("OpcUaServer - " + context?.Session?.Identity?.DisplayName, writeDatas
).GetAwaiter().GetResult(); ;
).GetAwaiter().GetResult();
for (int ii = 0; ii < nodesToWrite.Count; ii++)
{

View File

@@ -44,19 +44,6 @@ public partial class ThingsGatewayServer : StandardServer
}
//public void AfterVariablesChanged()
//{
// if(masterNodeManager!=null)
// {
// masterNodeManager?.Dispose();
// masterNodeManager = CreateMasterNodeManager((ServerInternalData)NodeManager.Server, _opcUaServer.m_configuration);
// ((ServerInternalData)NodeManager.Server).SetNodeManager(masterNodeManager);
// // put the node manager into a state that allows it to be used by other objects.
// masterNodeManager.Startup();
// }
//}
/// <inheritdoc/>
protected override MasterNodeManager CreateMasterNodeManager(IServerInternal server, ApplicationConfiguration configuration)
{

View File

@@ -78,14 +78,14 @@ public partial class OpcUaServer : BusinessBase
CollectVariableRuntimes.Clear();
IdVariableRuntimes.ForEach(a =>
{
VariableValueChange(a.Value, a.Value.AdaptVariableBasicData());
});
m_server?.NodeManager?.RefreshVariable();
//IdVariableRuntimes.ForEach(a =>
//{
// VariableValueChange(a.Value, a.Value.AdaptVariableBasicData());
//});
//动态更新UA库节点暂时取消
//m_server?.NodeManager?.RefreshVariable();
m_server?.Stop();
}
@@ -112,6 +112,7 @@ public partial class OpcUaServer : BusinessBase
GlobalData.VariableValueChangeEvent += VariableValueChange;
Localizer = App.CreateLocalizerByType(typeof(OpcUaServer))!;
await base.InitChannelAsync(channel, cancellationToken).ConfigureAwait(false);

View File

@@ -100,7 +100,7 @@ public partial class OpcUaMaster : IDisposable
LogMessage?.AddLogger(logger);
_plc.LogEvent = (a, b, c, d) => LogMessage?.Log((LogLevel)a, b, c, d);
_plc.DataChangedHandler += (a) => LogMessage?.Trace(a.ToSystemTextJsonString());
_plc.DataChangedHandler += (a) => LogMessage?.Trace($"id:{a.monitoredItem?.StartNodeId};stateCode:{a.dataValue?.StatusCode};value:{a.jToken?.ToString()}");
base.OnInitialized();
}

View File

@@ -15,6 +15,7 @@
<EditorItem @bind-Field=context.BitCode />
<EditorItem @bind-Field=context.DataCode />
<EditorItem @bind-Field=context.DbBlock />
<EditorItem @bind-Field=context.WStringEnable />
</FieldItems>
</EditorFormObject>

View File

@@ -13,7 +13,12 @@ EXPOSE 5000
# 添加时区环境变量,亚洲,上海
ENV TimeZone=Asia/Shanghai
# 转发头
ENV ASPNETCORE_FORWARDEDHEADERS_ENABLED=true
# 使用软连接,并且将时区配置覆盖/etc/timezone
RUN ln -snf /usr/share/zoneinfo/$TimeZone /etc/localtime && echo $TimeZone > /etc/timezone
ENTRYPOINT ["dotnet", "ThingsGateway.Server.dll","--urls","http://*:5000"]

View File

@@ -13,6 +13,8 @@ EXPOSE 5000
# 添加时区环境变量,亚洲,上海
ENV TimeZone=Asia/Shanghai
# 转发头
ENV ASPNETCORE_FORWARDEDHEADERS_ENABLED=true
# 使用软连接,并且将时区配置覆盖/etc/timezone
RUN ln -snf /usr/share/zoneinfo/$TimeZone /etc/localtime && echo $TimeZone > /etc/timezone

View File

@@ -0,0 +1,22 @@
#docker run -d --name tg --restart always -p 127.0.0.1:5000:5000 -e ASPNETCORE_ENVIRONMENT=Demo -v /thingsgateway/Keys:/app/Keys -v /thingsgateway/DB:/app/DB --memory="512m" registry.cn-shenzhen.aliyuncs.com/thingsgateway/thingsgateway:latest
version: "latest" # Docker Compose 配置版本
services: # 定义所有要跑的容器(服务)
thingsgateway: #服务名称
image: registry.cn-shenzhen.aliyuncs.com/thingsgateway/thingsgateway:latest #镜像名:tag
container_name: tg #容器名字
ports:
- "127.0.0.1:5000:5000" #"主机端口:容器端口"
environment:
- ASPNETCORE_ENVIRONMENT=Demo #环境变量名=值
volumes: #挂载本机目录
- /thingsgateway/Keys:/app/Keys
- /thingsgateway/DB:/app/DB
deploy:
resources:
limits:
memory: 512M
restart: always # 容器异常退出自动重启

View File

@@ -1,6 +1,6 @@
<Project>
<PropertyGroup>
<Version>10.9.8</Version>
<Version>10.9.14</Version>
</PropertyGroup>
<ItemGroup>