Compare commits

...

17 Commits

Author SHA1 Message Date
Kimdiego2098
81fab2be08 更新依赖包 2024-03-01 13:36:20 +08:00
Kimdiego2098
165b742782 更新版本 2024-03-01 13:33:49 +08:00
Kimdiego2098
76fef9c807 优化业务插件缓存逻辑 2024-03-01 13:33:43 +08:00
Kimdiego2098
e69ea0b9dc 更新版本 2024-02-28 17:41:02 +08:00
Kimdiego2098
98d3183f2b 修复sqldb历史表动态分表名称查询失效 2024-02-28 17:40:51 +08:00
Kimdiego2098
a29390a951 更新依赖包 2024-02-28 17:13:42 +08:00
Kimdiego2098
6291ce8617 modbusSlave运行时检查恢复启动 2024-02-28 17:08:31 +08:00
Kimdiego2098
c76b1b50a0 更新版本 2024-02-28 16:12:55 +08:00
Kimdiego2098
cc45e2aec0 更新网关双冗余 2024-02-28 16:12:38 +08:00
Kimdiego2098
17efebb8e8 更新版本 2024-02-28 16:05:50 +08:00
Kimdiego2098
5c94c733ee 整理代码 2024-02-28 16:05:18 +08:00
Kimdiego2098
156b89dd9c 更新双网关冗余 2024-02-28 16:04:04 +08:00
Kimdiego2098
34ba9f67e7 更新双网关冗余,sqldb支持历史表名称更改,更改为按周分表 2024-02-28 15:53:46 +08:00
Kimdiego2098
5ddaa6b872 双网关冗余(未完成) 2024-02-27 17:50:23 +08:00
Kimdiego2098
9043fa7f56 整理 2024-02-27 16:58:01 +08:00
Kimdiego2098
4c8e487dc9 双网关冗余(未完成) 2024-02-27 13:59:12 +08:00
Kimdiego2098
d3b87179aa 双网关冗余(未完成) 2024-02-26 19:49:19 +08:00
46 changed files with 1516 additions and 663 deletions

View File

@@ -5,7 +5,7 @@
<LangVersion>11.0</LangVersion>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<Version>5.0.1.13</Version>
<Version>5.0.2.0</Version>
<Authors>Diego</Authors>
<Company>Diego</Company>
<Product>Diego</Product>

View File

@@ -15,7 +15,7 @@ namespace ThingsGateway.Admin.Application;
/// </summary>
public class UserTokenCacheUtil
{
public static LiteDBCache<UserTokenCache> UserTokenCache = LiteDBCacheUtil.GetDB<UserTokenCache>(nameof(UserTokenCache), $"{typeof(UserTokenCache).FullName}", false);
public static LiteDBCache<UserTokenCache> UserTokenCache = LiteDBCacheUtil.GetDB<UserTokenCache>(nameof(UserTokenCache), $"{typeof(UserTokenCache).FullName}", true, false);
public static List<VerificatInfo>? HashGetOne(long id)
{

View File

@@ -2,7 +2,7 @@
<ItemGroup>
<PackageReference Include="Mapster" Version="7.4.0" />
<PackageReference Include="Microsoft.Data.SqlClient" Version="5.1.5" />
<PackageReference Include="Microsoft.Data.SqlClient" Version="5.2.0" />
<PackageReference Include="Microsoft.Data.Sqlite" Version="8.0.2" />
<PackageReference Include="MySqlConnector" Version="2.3.5" />
<PackageReference Include="Npgsql" Version="8.0.2" />

View File

@@ -61,73 +61,104 @@ public class LiteDBCache<T> : IDisposable where T : IPrimaryIdEntity
public void Dispose()
{
try { _dbProvider.Dispose(); } catch { }
lock (this)
{
try { _dbProvider.Dispose(); } catch { }
}
}
public IEnumerable<T>? GetPage(int skipCount, int pageSize)
public List<T>? GetPage(int skipCount, int pageSize)
{
var results = Collection.Find(Query.All(), skipCount, pageSize);
return results;
lock (this)
{
var results = Collection.Find(Query.All(), skipCount, pageSize).ToList();
return results;
}
}
public T? GetOne(long id)
{
var results = Collection.FindById(id);
return results;
lock (this)
{
var results = Collection.FindById(id);
return results;
}
}
public IEnumerable<T>? Get(long[] ids)
{
var results = Collection.Find(a => ids.Contains(a.Id));
return results;
lock (this)
{
var results = Collection.Find(a => ids.Contains(a.Id));
return results;
}
}
public IEnumerable<T>? GetAll()
{
var results = Collection.FindAll();
return results;
lock (this)
{
var results = Collection.FindAll();
return results;
}
}
public int AddRange(IEnumerable<T> data, int batchSize = 5000)
{
var results = Collection.InsertBulk(data, batchSize);
return results;
lock (this)
{
var results = Collection.InsertBulk(data, batchSize);
return results;
}
}
public void Add(T data)
{
Collection.Insert(data);
lock (this)
{
Collection.Insert(data);
}
}
public int DeleteMany(IEnumerable<T> data)
{
var results = Collection.DeleteMany(a => data.Select(item => item.Id).Contains(a.Id));
return results;
lock (this)
{
var results = Collection.DeleteMany(a => data.Select(item => item.Id).Contains(a.Id));
return results;
}
}
public int DeleteMany(Expression<Func<T, bool>> predicate)
{
var results = Collection.DeleteMany(predicate);
return results;
lock (this)
{
var results = Collection.DeleteMany(predicate);
return results;
}
}
public int DeleteMany(BsonExpression bsonExpression)
{
var results = Collection.DeleteMany(bsonExpression);
return results;
lock (this)
{
var results = Collection.DeleteMany(bsonExpression);
return results;
}
}
/// <summary>
/// init database
/// </summary>
public void InitDb()
public void InitDb(bool isRebuild = false)
{
lock (_dbProvider)
lock (this)
{
try
{
_dbProvider.Checkpoint();
//_dbProvider.Rebuild();
if (isRebuild)
_dbProvider.Rebuild();
lock (Collection)
{
//建立索引
@@ -141,15 +172,18 @@ public class LiteDBCache<T> : IDisposable where T : IPrimaryIdEntity
private LiteDatabase GetConnection(LiteDBOptions options)
{
ConnectionString builder = new ConnectionString()
lock (this)
{
Filename = options.DataSource,
//InitialSize = options.InitialSize,
Connection = options.ConnectionType,
Password = options.Password
};
var _conn = new LiteDatabase(builder);
ConnectionString builder = new ConnectionString()
{
Filename = options.DataSource,
//InitialSize = options.InitialSize,
Connection = options.ConnectionType,
Password = options.Password
};
var _conn = new LiteDatabase(builder);
return _conn;
return _conn;
}
}
}

View File

@@ -9,6 +9,7 @@
//------------------------------------------------------------------------------
using Furion;
using Furion.Logging.Extensions;
using NewLife;
@@ -32,7 +33,7 @@ public static class LiteDBCacheUtil
/// 如果当前文件的大小超限,则返回新的链接
/// 如果当前文件的数量超限,则删除部分旧文件
/// </summary>
public static LiteDBCache<T>? GetDB<T>(string id, string typeName, bool isDeleteRule = true) where T : IPrimaryIdEntity
public static LiteDBCache<T>? GetDB<T>(string id, string typeName, bool isInsert, bool isDeleteRule = true) where T : IPrimaryIdEntity
{
lock (_dictObject)
{
@@ -49,6 +50,7 @@ public static class LiteDBCacheUtil
var driveUsage = (100 - (drive.TotalFreeSpace * 100.00 / drive.TotalSize));
if (driveUsage > LiteDBCacheUtil.config.MaxDriveUsage)
{
$"磁盘使用率超限,将删除缓存文件".LogInformation();
//删除全部文件夹中旧文件
string[] dirs = Directory.GetDirectories(GetFileBasePath());
//遍历全部文件夹删除90%的文件
@@ -89,6 +91,7 @@ public static class LiteDBCacheUtil
string[] files = Directory.GetFiles(dir, searchPattern);
if (files.Length > LiteDBCacheUtil.config.MaxFileCount)
{
$"{dir}缓存文件数量超限,将删除文件".LogInformation();
//数量超限就删除旧文件
//按文件更改时间降序排序
var sortedFiles = files.OrderBy(file => File.GetLastWriteTime(file)).ToArray();
@@ -124,8 +127,9 @@ public static class LiteDBCacheUtil
}
var mb1 = Math.Round((double)length1 / (double)1024 / (double)1024, 2);
if (mb1 > LiteDBCacheUtil.config.MaxFileLength)
if (isInsert && mb1 > LiteDBCacheUtil.config.MaxFileLength)
{
$"{fullName}缓存文件大小超限,将产生新文件".LogInformation();
//大小超限就返回新的文件
var newFullName = dir.CombinePath($"{fileStart}_{maxNum + 1}{ex}");
{
@@ -162,23 +166,54 @@ public static class LiteDBCacheUtil
}
}
}
{
if (_dict.TryGetValue(fullName, out object cache1))
{
//返回原连接
try
{
return (LiteDBCache<T>)cache1;
var connect = (LiteDBCache<T>)cache1;
if (maxNum > 1 && !isInsert)
{
if (connect.GetPage(1, 1).Count == 0)
{
//无内容时,删除文件
DisposeAndDeleteFile(fullName, cache1);
return GetDB<T>(id, typeName, isInsert, isDeleteRule);
}
}
else
{
if (maxNum == 1)
{
if (isDeleteRule)
{
long? length1 = null;
if (!File.Exists(fullName))
{
length1 = 0;
}
else
{
length1 = new FileInfo(fullName).Length;
}
var mb1 = Math.Round((double)length1 / (double)1024 / (double)1024, 2);
if (mb1 > LiteDBCacheUtil.config.MaxFileLength)
{
connect.InitDb(true);
}
}
}
}
return connect;
}
catch (Exception)
{
//可能类型变换导致错误,此时返回null,并释放连接
//可能类型变换导致错误,此时释放连接
DisposeAndDeleteFile(fullName, cache1);
var cache = new LiteDBCache<T>(id, typeName, fullName);
_dict.TryAdd(fullName, cache);
return cache;
return GetDB<T>(id, typeName, isInsert, isDeleteRule);
}
}
{
@@ -194,6 +229,7 @@ public static class LiteDBCacheUtil
{
if (File.Exists(file))
{
$"删除{file}缓存文件".LogInformation();
File.SetAttributes(file, FileAttributes.Normal);
File.Delete(file);
}
@@ -238,24 +274,40 @@ public static class LiteDBCacheUtil
{
var dir = GetFilePath(id);
var fileStart = GetFileStartName(typeName);
string[] files = Directory.GetFiles(dir, $"{fileStart}_*.ldb");
int maxNumber = 1;
Regex regex = new Regex(@"_(\d+)\.ldb$");
foreach (var file in files)
{
Match match = regex.Match(file);
if (match.Success && int.TryParse(match.Groups[1].Value, out int number))
{
if (number > maxNumber)
{
maxNumber = number;
}
}
}
return maxNumber;
//搜索全部符合条件的文件
if (!File.Exists(dir.CombinePath($"{fileStart}_{ex}")))
{
return null;
}
var index = 1;
while (true)
{
var newFileName = dir.CombinePath($"{fileStart}_{index}{ex}");
if (System.IO.File.Exists(newFileName))
{
index++;
}
else
{
return (index == 1 ? null : index - 1);
}
}
//if (!File.Exists(dir.CombinePath($"{fileStart}_1{ex}")))
//{
// return 1;
//}
//var index = 2;
//while (true)
//{
// var newFileName = dir.CombinePath($"{fileStart}_{index}{ex}");
// if (System.IO.File.Exists(newFileName))
// {
// index++;
// }
// else
// {
// return (index - 1);
// }
//}
}
}

View File

@@ -1,8 +1,8 @@
{
//缓存设置
"LiteDBConfig": {
"MaxFileLength": "400",
"MaxFileCount": "20",
"MaxDriveUsage": "90"
"LiteDBConfig": { //单个缓存文件夹
"MaxFileLength": "64", //最大文件大小
"MaxFileCount": "10", //最大文件数量
"MaxDriveUsage": "90" //最大磁盘使用率
}
}

View File

@@ -1,8 +1,8 @@
{
//缓存设置
"LiteDBConfig": {
"MaxFileLength": "400",
"MaxFileCount": "20",
"MaxDriveUsage": "90"
"LiteDBConfig": { //单个缓存文件夹
"MaxFileLength": "64", //最大文件大小
"MaxFileCount": "10", //最大文件数量
"MaxDriveUsage": "90" //最大磁盘使用率
}
}

View File

@@ -2,9 +2,9 @@
<Import Project="$(SolutionDir)PackNuget.props" />
<ItemGroup>
<PackageReference Include="Furion.Pure" Version="4.9.1.32" />
<PackageReference Include="Furion.Extras.Authentication.JwtBearer" Version="4.9.1.32" />
<PackageReference Include="Furion.Extras.ObjectMapper.Mapster" Version="4.9.1.32" />
<PackageReference Include="Furion.Pure" Version="4.9.1.35" />
<PackageReference Include="Furion.Extras.Authentication.JwtBearer" Version="4.9.1.35" />
<PackageReference Include="Furion.Extras.ObjectMapper.Mapster" Version="4.9.1.35" />
<PackageReference Include="NewLife.Redis" Version="5.6.2024.203" />
<PackageReference Include="Yitter.IdGenerator" Version="1.0.14" />
<PackageReference Include="LiteDB" Version="5.0.19" />

View File

@@ -308,5 +308,75 @@
</summary>
<param name="pluginName">插件名称一般建议使用nameof()解决。</param>
</member>
<member name="T:TouchSocket.Rpc.GeneratorRpcProxyAttribute">
<summary>
标识该接口将使用源生成自动生成调用的代理类
</summary>
</member>
<member name="P:TouchSocket.Rpc.GeneratorRpcProxyAttribute.Prefix">
<summary>
调用键的前缀,包括服务的命名空间,类名,不区分大小写。格式:命名空间.类名
</summary>
</member>
<member name="P:TouchSocket.Rpc.GeneratorRpcProxyAttribute.GenericConstraintTypes">
<summary>
生成泛型方法的约束
</summary>
</member>
<member name="P:TouchSocket.Rpc.GeneratorRpcProxyAttribute.MethodInvoke">
<summary>
是否仅以函数名调用当为True是调用时仅需要传入方法名即可。
</summary>
</member>
<member name="P:TouchSocket.Rpc.GeneratorRpcProxyAttribute.Namespace">
<summary>
生成代码的命名空间
</summary>
</member>
<member name="P:TouchSocket.Rpc.GeneratorRpcProxyAttribute.ClassName">
<summary>
生成的类名不要包含“I”生成接口时会自动添加。
</summary>
</member>
<member name="P:TouchSocket.Rpc.GeneratorRpcProxyAttribute.GeneratorFlag">
<summary>
生成代码
</summary>
</member>
<member name="P:TouchSocket.Rpc.GeneratorRpcProxyAttribute.InheritedInterface">
<summary>
继承接口
</summary>
</member>
<member name="T:TouchSocket.Rpc.GeneratorRpcServerAttribute">
<summary>
标识将通过源生成器生成Rpc服务的调用委托。
</summary>
</member>
<member name="T:TouchSocket.Rpc.GeneratorRpcServerRegisterAttribute">
<summary>
标识将通过源生成器生成Rpc服务的注册代码。
</summary>
</member>
<member name="P:TouchSocket.Rpc.GeneratorRpcServerRegisterAttribute.MethodName">
<summary>
方法名称。默认是“RegisterAllFrom+AssemblyName”
</summary>
</member>
<member name="P:TouchSocket.Rpc.GeneratorRpcServerRegisterAttribute.ClassName">
<summary>
扩展类类名默认是“RegisterRpcServerFrom+AssemblyName+Extension”
</summary>
</member>
<member name="P:TouchSocket.Rpc.GeneratorRpcServerRegisterAttribute.Accessibility">
<summary>
访问修饰。
<para>
如果为<see cref="F:TouchSocket.Rpc.Accessibility.Both"/>将生成注册公共Rpc服务与非公共服务两个方法。其中非公共方法会在<see cref="P:TouchSocket.Rpc.GeneratorRpcServerRegisterAttribute.MethodName"/>之前以Internal开头。
如果为<see cref="F:TouchSocket.Rpc.Accessibility.Internal"/>将只生成注册非公共Rpc服务。
如果为<see cref="F:TouchSocket.Rpc.Accessibility.Public"/>将只生成注册公共Rpc服务。
</para>
</summary>
</member>
</members>
</doc>

View File

@@ -0,0 +1,17 @@
{
"Management": {
"RemoteUri": "127.0.0.1:7778", //主(备)站IP
"Port": 7777, //监听端口
"VerifyToken": "ThingsGateway", //登录token双方一致
"HeartbeatInterval": 3000, //心跳间隔
"MaxErrorCount": 3, //最大错误次数
"Redundancy": {
"Enable": false, //启用冗余
"IsPrimary": false, //是否主站
"IsStartBusinessDevice": true //是否启用备用站点的业务设备
//主从站的采集配置必须一致
//默认主站优先,当主站恢复后,从站切换回备用模式。
//主从站都完成对采集的初始化,但从站的数据都来自主站的数据同步
}
}
}

View File

@@ -0,0 +1,17 @@
{
"Management": {
"RemoteUri": "127.0.0.1:7778", //主(备)站IP
"Port": 7777, //监听端口
"VerifyToken": "ThingsGateway", //登录token双方一致
"HeartbeatInterval": 3000, //心跳间隔
"MaxErrorCount": 3, //最大错误次数
"Redundancy": {
"Enable": false, //启用冗余
"IsPrimary": false, //是否主站
"IsStartBusinessDevice": true //是否启用备用站点的业务设备
//主从站的采集配置必须一致
//默认主站优先,当主站恢复后,从站切换回备用模式。
//主从站都完成对采集的初始化,但从站的数据都来自主站的数据同步
}
}
}

View File

@@ -65,4 +65,20 @@ public class DeviceBasicData : DeviceData
/// <inheritdoc cref="Device.Description"/>
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
public string? Description { get; set; }
}
public class DeviceDataWithValue : IPrimaryIdEntity
{
/// <inheritdoc cref="PrimaryIdEntity.Id"/>
public long Id { get; set; }
/// <inheritdoc cref="DeviceRunTime.ActiveTime"/>
public DateTime ActiveTime { get; set; }
/// <inheritdoc cref="DeviceRunTime.DeviceStatus"/>
public DeviceStatusEnum DeviceStatus { get; set; }
/// <inheritdoc cref="DeviceRunTime.LastErrorMessage"/>
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
public string LastErrorMessage { get; set; }
}

View File

@@ -45,7 +45,7 @@ public class DeviceRunTime : Device
/// 设备活跃时间
/// </summary>
[Description("活跃时间")]
public DateTime? ActiveTime { get; private set; }
public DateTime? ActiveTime { get; internal set; }
/// <summary>
/// 设备状态
@@ -60,7 +60,7 @@ public class DeviceRunTime : Device
else
return DeviceStatusEnum.Pause;
}
protected set
internal set
{
if (_deviceStatus != value)
{
@@ -123,7 +123,7 @@ public class DeviceRunTime : Device
{
return _lastErrorMessage;
}
protected set
internal set
{
_lastErrorMessage = DateTimeUtil.TimerXNow.ToDefaultDateTimeFormat() + " - " + value;
}

View File

@@ -81,4 +81,23 @@ public class VariableBasicData : VariableData
/// <inheritdoc cref="Variable.DataType"/>
public DataTypeEnum DataType { get; set; }
}
internal class VariableDataWithValue : IPrimaryIdEntity
{
/// <inheritdoc cref="PrimaryIdEntity.Id"/>
public long Id { get; set; }
/// <inheritdoc cref="VariableRunTime.Value"/>
public object RawValue { get; set; }
/// <inheritdoc cref="VariableRunTime.CollectTime"/>
public DateTime CollectTime { get; set; }
/// <inheritdoc cref="VariableRunTime.IsOnline"/>
public bool IsOnline { get; set; }
/// <inheritdoc cref="VariableRunTime.LastErrorMessage"/>
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
public string? LastErrorMessage { get; set; }
}

View File

@@ -127,21 +127,21 @@ public class VariableRunTime : Variable, IVariable
/// </summary>
[Description("上次值")]
[DataTable(Order = 3, IsShow = true, Sortable = false, CellClass = " table-text-truncate ")]
public object? LastSetValue { get; private set; }
public object? LastSetValue { get; internal set; }
/// <summary>
/// 原始值
/// </summary>
[Description("原始值")]
[DataTable(Order = 3, IsShow = true, Sortable = false, CellClass = " table-text-truncate ")]
public object? RawValue { get; private set; }
public object? RawValue { get; internal set; }
/// <summary>
/// 实时值
/// </summary>
[Description("实时值")]
[DataTable(Order = 3, IsShow = true, Sortable = false, CellClass = " table-text-truncate ")]
public object? Value { get => _value; private set => _value = value; }
public object? Value { get => _value; internal set => _value = value; }
/// <summary>
/// 设置变量值与时间/质量戳
@@ -202,6 +202,12 @@ public class VariableRunTime : Variable, IVariable
}
}
internal void SetErrorMessage(string value)
{
if (VariableSource != null)
VariableSource.LastErrorMessage = value;
}
private IRpcService? _rpcService { get; set; }
/// <inheritdoc/>

View File

@@ -48,11 +48,6 @@ public abstract class BusinessBase : DriverBase
public override async Task AfterStopAsync()
{
//去除全局设备变量
lock (GlobalData.BusinessDevices)
{
GlobalData.BusinessDevices.RemoveWhere(it => it.Id == DeviceId);
}
await base.AfterStopAsync();
}
}

View File

@@ -33,7 +33,10 @@ public abstract class BusinessBaseWithCacheT<T> : BusinessBase
/// <summary>
/// 获取缓存对象,注意每次获取的对象可能不一样,如顺序操作,需固定引用
/// </summary>
protected virtual LiteDBCache<LiteDBDefalutCacheItem<T>> LiteDBCacheT => LiteDBCacheUtil.GetDB<LiteDBDefalutCacheItem<T>>(CurrentDevice.Id.ToString(), $"{CurrentDevice.PluginName}{typeof(T).FullName}_{nameof(T)}");
protected virtual LiteDBCache<LiteDBDefalutCacheItem<T>> LiteDBCacheT(bool isInsert)
{
return LiteDBCacheUtil.GetDB<LiteDBDefalutCacheItem<T>>(CurrentDevice.Id.ToString(), $"{CurrentDevice.PluginName}{typeof(T).FullName}_{nameof(T)}", isInsert);
}
protected override IProtocol? Protocol => null;
@@ -101,7 +104,7 @@ public abstract class BusinessBaseWithCacheT<T> : BusinessBase
{
#region //成功上传时,补上传缓存数据
if (success)
if (IsConnected())
{
List<long> successIds = new();
@@ -111,7 +114,7 @@ public abstract class BusinessBaseWithCacheT<T> : BusinessBase
while (!cancellationToken.IsCancellationRequested)
{
//循环获取
var varList = LiteDBCacheT.GetPage(successCount, _businessPropertyWithCache.SplitSize).ToList(); //按最大列表数量分页
var varList = LiteDBCacheT(false).GetPage(successCount, _businessPropertyWithCache.SplitSize); //按最大列表数量分页
if (varList?.Count != 0)
{
try
@@ -151,7 +154,7 @@ public abstract class BusinessBaseWithCacheT<T> : BusinessBase
LogMessage?.LogWarning(ex);
}
if (successIds.Count > 0)
LiteDBCacheT.DeleteMany(a => successIds.Contains(a.Id));
LiteDBCacheT(false).DeleteMany(a => successIds.Contains(a.Id));
}
#endregion //成功上传时,补上传缓存数据
@@ -165,7 +168,7 @@ public abstract class BusinessBaseWithCacheT<T> : BusinessBase
protected virtual void AddCache(List<LiteDBDefalutCacheItem<T>> data)
{
if (data?.Count > 0)
LiteDBCacheT.AddRange(data);
LiteDBCacheT(true).AddRange(data);
}
/// <summary>

View File

@@ -24,7 +24,10 @@ public abstract class BusinessBaseWithCacheTT<T, T2> : BusinessBaseWithCacheT<T>
/// <summary>
/// 获取缓存对象,注意每次获取的对象可能不一样,如顺序操作,需固定引用
/// </summary>
protected virtual LiteDBCache<LiteDBDefalutCacheItem<T2>> LiteDBCacheT2 => LiteDBCacheUtil.GetDB<LiteDBDefalutCacheItem<T2>>(CurrentDevice.Id.ToString(), $"{CurrentDevice.PluginName}{typeof(T2).FullName}_{nameof(T2)}");
protected virtual LiteDBCache<LiteDBDefalutCacheItem<T2>> LiteDBCacheT2(bool isInsert)
{
return LiteDBCacheUtil.GetDB<LiteDBDefalutCacheItem<T2>>(CurrentDevice.Id.ToString(), $"{CurrentDevice.PluginName}{typeof(T2).FullName}_{nameof(T2)}", isInsert);
}
protected ConcurrentQueue<LiteDBDefalutCacheItem<T2>> _memoryT2Queue = new();
@@ -35,7 +38,7 @@ public abstract class BusinessBaseWithCacheTT<T, T2> : BusinessBaseWithCacheT<T>
protected virtual void AddCache(List<LiteDBDefalutCacheItem<T2>> data)
{
if (data?.Count > 0)
LiteDBCacheT2.AddRange(data);
LiteDBCacheT2(true).AddRange(data);
}
/// <summary>
@@ -130,7 +133,7 @@ public abstract class BusinessBaseWithCacheTT<T, T2> : BusinessBaseWithCacheT<T>
{
#region //成功上传时,补上传缓存数据
if (success)
if (IsConnected())
{
List<long> successIds = new();
@@ -140,7 +143,7 @@ public abstract class BusinessBaseWithCacheTT<T, T2> : BusinessBaseWithCacheT<T>
while (!cancellationToken.IsCancellationRequested)
{
//循环获取
var varList = LiteDBCacheT2.GetPage(successCount, _businessPropertyWithCache.SplitSize).ToList(); //按最大列表数量分页
var varList = LiteDBCacheT2(false).GetPage(successCount, _businessPropertyWithCache.SplitSize); //按最大列表数量分页
if (varList?.Count != 0)
{
try
@@ -180,7 +183,7 @@ public abstract class BusinessBaseWithCacheTT<T, T2> : BusinessBaseWithCacheT<T>
success = false;
}
if (successIds.Count > 0)
LiteDBCacheT2.DeleteMany(a => successIds.Contains(a.Id));
LiteDBCacheT2(false).DeleteMany(a => successIds.Contains(a.Id));
}
#endregion //成功上传时,补上传缓存数据

View File

@@ -24,7 +24,10 @@ public abstract class BusinessBaseWithCacheTTT<T, T2, T3> : BusinessBaseWithCach
/// <summary>
/// 获取缓存对象,注意每次获取的对象可能不一样,如顺序操作,需固定引用
/// </summary>
protected virtual LiteDBCache<LiteDBDefalutCacheItem<T3>> LiteDBCacheT3 => LiteDBCacheUtil.GetDB<LiteDBDefalutCacheItem<T3>>(CurrentDevice.Id.ToString(), $"{CurrentDevice.PluginName}{typeof(T3).FullName}_{nameof(T3)}");
protected virtual LiteDBCache<LiteDBDefalutCacheItem<T3>> LiteDBCacheT3(bool isInsert)
{
return LiteDBCacheUtil.GetDB<LiteDBDefalutCacheItem<T3>>(CurrentDevice.Id.ToString(), $"{CurrentDevice.PluginName}{typeof(T3).FullName}_{nameof(T3)}", isInsert);
}
protected ConcurrentQueue<LiteDBDefalutCacheItem<T3>> _memoryT3Queue = new();
@@ -35,7 +38,7 @@ public abstract class BusinessBaseWithCacheTTT<T, T2, T3> : BusinessBaseWithCach
protected virtual void AddCache(List<LiteDBDefalutCacheItem<T3>> data)
{
if (data?.Count > 0)
LiteDBCacheT3.AddRange(data);
LiteDBCacheT3(true).AddRange(data);
}
/// <summary>
@@ -132,7 +135,7 @@ public abstract class BusinessBaseWithCacheTTT<T, T2, T3> : BusinessBaseWithCach
{
#region //成功上传时,补上传缓存数据
if (success)
if (IsConnected())
{
List<long> successIds = new();
@@ -142,7 +145,7 @@ public abstract class BusinessBaseWithCacheTTT<T, T2, T3> : BusinessBaseWithCach
while (!cancellationToken.IsCancellationRequested)
{
//循环获取
var varList = LiteDBCacheT3.GetPage(successCount, _businessPropertyWithCache.SplitSize).ToList(); //按最大列表数量分页
var varList = LiteDBCacheT3(false).GetPage(successCount, _businessPropertyWithCache.SplitSize); //按最大列表数量分页
if (varList?.Count != 0)
{
try
@@ -182,7 +185,7 @@ public abstract class BusinessBaseWithCacheTTT<T, T2, T3> : BusinessBaseWithCach
LogMessage?.LogWarning(ex);
}
if (successIds.Count > 0)
LiteDBCacheT3.DeleteMany(a => successIds.Contains(a.Id));
LiteDBCacheT3(false).DeleteMany(a => successIds.Contains(a.Id));
}
#endregion //成功上传时,补上传缓存数据

View File

@@ -63,10 +63,11 @@ public class ChannelThread
public string LogPath { get; }
private Channel ChannelTable;
private GlobalData GlobalData;
public ChannelThread(Channel channel, Func<TouchSocketConfig, IChannel> getChannel)
{
Logger = App.GetService<ILoggerFactory>().CreateLogger($"通道:{channel.Name}");
GlobalData = App.GetService<GlobalData>();
ChannelTable = channel;
ChannelId = channel.Id;
//底层配置
@@ -130,7 +131,9 @@ public class ChannelThread
/// </summary>
private ConcurrentDictionary<long, CancellationTokenSource> CancellationTokenSources { get; set; } = new();
public void AddDriver(DriverBase driverBase)
public bool IsCollect { get; private set; }
internal void AddDriver(DriverBase driverBase)
{
DriverBases.Add(driverBase);
driverBase.ChannelThread = this;
@@ -151,6 +154,7 @@ public class ChannelThread
}
var token = CancellationTokenSources.GetOrAdd(0, new CancellationTokenSource());
CancellationTokenSources.TryAdd(driverBase.DeviceId, CancellationTokenSource.CreateLinkedTokenSource(token.Token));
IsCollect = driverBase.IsCollect;
}
/// <summary>
@@ -158,7 +162,7 @@ public class ChannelThread
/// </summary>
/// <param name="deviceId"></param>
/// <returns></returns>
public async Task RemoveDriverAsync(long deviceId)
internal async Task RemoveDriverAsync(long deviceId)
{
var driverBase = DriverBases.FirstOrDefault(a => a.DeviceId == deviceId);
if (driverBase != null)
@@ -177,24 +181,41 @@ public class ChannelThread
{
await Task.Delay(500);
}
//去除全局设备
if (IsCollect)
{
lock (GlobalData.CollectDevices)
{
GlobalData.CollectDevices.RemoveWhere(it => it.Id == driverBase.DeviceId);
}
}
else
{
lock (GlobalData.BusinessDevices)
{
GlobalData.BusinessDevices.RemoveWhere(it => it.Id == driverBase.DeviceId);
}
}
DriverBases.Remove(driverBase);
CancellationTokenSources.Remove(deviceId);
token?.Dispose();
}
}
public DriverBase GetDriver(long deviceId)
internal DriverBase GetDriver(long deviceId)
{
var driverBase = DriverBases.FirstOrDefault(a => a.DeviceId == deviceId);
return driverBase;
}
public IEnumerable<DriverBase> GetDriverEnumerable()
internal IEnumerable<DriverBase> GetDriverEnumerable()
{
return DriverBases;
}
public bool Has(long deviceId)
internal bool Has(long deviceId)
{
return DriverBases.Any(a => a.DeviceId == deviceId);
}
@@ -222,7 +243,7 @@ public class ChannelThread
/// <summary>
/// 停止插件前,执行取消传播
/// </summary>
public virtual void BeforeStopThread()
internal virtual void BeforeStopThread()
{
CancellationTokenSources.ParallelForEach(cancellationToken =>
{
@@ -245,7 +266,7 @@ public class ChannelThread
/// <summary>
/// 开始
/// </summary>
public virtual async Task StartThreadAsync()
internal virtual async Task StartThreadAsync()
{
try
{
@@ -278,7 +299,7 @@ public class ChannelThread
/// <summary>
/// 停止
/// </summary>
public virtual async Task StopThreadAsync()
internal virtual async Task StopThreadAsync(bool isRemoveDevice)
{
if (DriverTask == null)
{
@@ -298,6 +319,24 @@ public class ChannelThread
}
});
try { await DriverTask.WaitAsync(TimeSpan.FromMinutes(3)); } catch (OperationCanceledException) { }
if (isRemoveDevice)
{
//去除全局设备
if (IsCollect)
{
lock (GlobalData.CollectDevices)
{
GlobalData.CollectDevices.RemoveWhere(it => DriverBases.Any(a => a.DeviceId == it.Id));
}
}
else
{
lock (GlobalData.BusinessDevices)
{
GlobalData.BusinessDevices.RemoveWhere(it => DriverBases.Any(a => a.DeviceId == it.Id));
}
}
}
CancellationTokenSources.Clear();
DriverTask?.SafeDispose();
DriverTask = null;

View File

@@ -42,11 +42,6 @@ public abstract class CollectBase : DriverBase
public override async Task AfterStopAsync()
{
//去除全局设备变量
lock (GlobalData.CollectDevices)
{
GlobalData.CollectDevices.RemoveWhere(it => it.Id == DeviceId);
}
await base.AfterStopAsync();
}

View File

@@ -46,6 +46,8 @@ public class Startup : AppStartup
services.AddHostedService<CollectDeviceWorker>();
services.AddHostedService<BusinessDeviceWorker>();
services.AddHostedService<AlarmWorker>();
services.AddConfigurableOptions<ManagementOptions>();
services.AddHostedService<ManagementWoker>();
TypeExtension.DefaultDisplayNameFuncs.Add(a => a.GetCustomAttribute<DynamicPropertyAttribute>()?.Description);
}

View File

@@ -25,11 +25,18 @@
<PackageReference Include="CS-Script" Version="4.8.14" />
<!--CS-Script与Furion冲突直接安装覆盖版本-->
<PackageReference Include="Microsoft.CodeAnalysis.CSharp.Scripting" Version="4.8.0" />
<PackageReference Include="TouchSocket.Dmtp" Version="2.0.0-beta.279" />
</ItemGroup>
<ItemGroup>
<None Update="Config\Management.Production.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
<None Update="Config\Management.Development.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
<None Update="Config\Gateway.Development.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>

View File

@@ -83,6 +83,7 @@ public class AlarmWorker : BackgroundService
{
try
{
await restartLock.WaitAsync();
foreach (var item in GlobalData.CollectDevices)
{
item.VariableRunTimes?.ForEach(v => { v.VariableCollectChange += DeviceVariableChange; });
@@ -446,7 +447,7 @@ public class AlarmWorker : BackgroundService
private EasyLock _easyLock = new(false);
private async Task CollectDeviceWorker_Starting()
private async Task CollectDeviceWorker_Started()
{
await StartAsync();
}
@@ -459,7 +460,6 @@ public class AlarmWorker : BackgroundService
/// <inheritdoc/>
public override async Task StartAsync(CancellationToken cancellationToken)
{
_logger?.LogInformation("报警服务启动");
_appLifetime.ApplicationStarted.Register(() => { _easyLock.Release(); _easyLock = null; });
await base.StartAsync(cancellationToken);
}
@@ -467,7 +467,6 @@ public class AlarmWorker : BackgroundService
/// <inheritdoc/>
public override Task StopAsync(CancellationToken cancellationToken)
{
_logger?.LogInformation("报警服务停止");
return base.StopAsync(cancellationToken);
}
@@ -475,8 +474,9 @@ public class AlarmWorker : BackgroundService
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await _easyLock?.WaitAsync();
WorkerUtil.GetWoker<CollectDeviceWorker>().Starting += CollectDeviceWorker_Starting;
WorkerUtil.GetWoker<CollectDeviceWorker>().Stoping += CollectDeviceWorker_Stoping;
var collectDeviceWorker = WorkerUtil.GetWoker<CollectDeviceWorker>();
collectDeviceWorker.Started += CollectDeviceWorker_Started;
collectDeviceWorker.Stoping += CollectDeviceWorker_Stoping;
GlobalData = _serviceScope.ServiceProvider.GetService<GlobalData>();
while (!stoppingToken.IsCancellationRequested)
{

View File

@@ -8,8 +8,6 @@
// QQ群605534569
//------------------------------------------------------------------------------
using Furion.Logging.Extensions;
using Mapster;
using Microsoft.Extensions.Hosting;
@@ -26,135 +24,27 @@ public class BusinessDeviceWorker : DeviceWorker
{
_logger = _serviceScope.ServiceProvider.GetService<ILoggerFactory>().CreateLogger("业务设备服务");
}
#region public
public async Task RestartAsync()
private async Task CollectDeviceWorker_Starting()
{
await StopAsync();
if (started)
{
await StopAsync(true);
}
await CreatThreadsAsync();
}
private async Task CollectDeviceWorker_Started()
{
await Task.Delay(1000);
await StartAsync();
}
/// <summary>
/// 开始
/// </summary>
public async Task StartAsync()
private async Task CollectDeviceWorker_Stoping()
{
try
{
await restartLock.WaitAsync();
await singleRestartLock.WaitAsync();
if (ChannelThreads.Count == 0)
{
await CreatAllChannelThreadsAsync();
await ProtectedStarting();
}
await StartAllChannelThreadsAsync();
await ProtectedStarted();
}
catch (Exception ex)
{
_logger.LogError(ex, "启动发生错误");
}
finally
{
singleRestartLock.Release();
restartLock.Release();
}
await StopAsync(true);
}
/// <summary>
/// 开始
/// </summary>
public async Task CreatThreadsAsync()
{
try
{
await restartLock.WaitAsync();
await singleRestartLock.WaitAsync();
if (ChannelThreads.Count == 0)
{
await CreatAllChannelThreadsAsync();
await ProtectedStarting();
}
}
catch (Exception ex)
{
_logger.LogError(ex, "启动发生错误");
}
finally
{
singleRestartLock.Release();
restartLock.Release();
}
}
/// <summary>
/// 停止
/// </summary>
public async Task StopAsync()
{
try
{
await restartLock.WaitAsync();
await singleRestartLock.WaitAsync();
await BeforeRemoveAllChannelThreadAsync();
await ProtectedStoping();
await RemoveAllChannelThreadAsync();
await ProtectedStoped();
//清空内存列表
GlobalData.BusinessDevices.Clear();
}
catch (Exception ex)
{
_logger.LogError(ex, "停止错误");
}
finally
{
singleRestartLock.Release();
restartLock.Release();
}
}
#endregion public
#region public
/// <summary>
/// 创建业务设备线程
/// </summary>
/// <returns></returns>
protected async Task CreatAllChannelThreadsAsync()
{
if (!_stoppingToken.IsCancellationRequested)
{
_logger.LogInformation("正在获取业务设备组态信息");
var deviceRunTimes = await _serviceScope.ServiceProvider.GetService<IDeviceService>().GetBusinessDeviceRuntimeAsync();
_logger.LogInformation("获取业务设备组态信息完成");
var idSet = deviceRunTimes.ToDictionary(a => a.Id);
var result = deviceRunTimes.Where(a => !idSet.ContainsKey(a.RedundantDeviceId) && !a.IsRedundant).ToList();
result.ForEach(collectDeviceRunTime =>
{
if (!_stoppingToken.IsCancellationRequested)
{
try
{
DriverBase driverBase = collectDeviceRunTime.CreatDriver(PluginService);
GetChannelThread(driverBase);
}
catch (Exception ex)
{
_logger.LogError(ex, $"{collectDeviceRunTime.Name}初始化错误!");
}
}
});
}
}
#endregion public
#region
/// <summary>
@@ -187,22 +77,6 @@ public class BusinessDeviceWorker : DeviceWorker
await base.StartAsync(cancellationToken);
}
private async Task CollectDeviceWorker_Starting()
{
await CreatThreadsAsync();
}
private async Task CollectDeviceWorker_Started()
{
await Task.Delay(1000);
await StartAsync();
}
private async Task CollectDeviceWorker_Stoping()
{
await StopAsync();
}
/// <inheritdoc/>
public override async Task StopAsync(CancellationToken cancellationToken)
{
@@ -215,18 +89,55 @@ public class BusinessDeviceWorker : DeviceWorker
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await _easyLock?.WaitAsync();
WorkerUtil.GetWoker<CollectDeviceWorker>().Starting += CollectDeviceWorker_Starting;
WorkerUtil.GetWoker<CollectDeviceWorker>().Started += CollectDeviceWorker_Started;
WorkerUtil.GetWoker<CollectDeviceWorker>().Stoping += CollectDeviceWorker_Stoping;
ManagementWoker = WorkerUtil.GetWoker<ManagementWoker>();
var collectDeviceWorker = WorkerUtil.GetWoker<CollectDeviceWorker>();
collectDeviceWorker.Starting += CollectDeviceWorker_Starting;
collectDeviceWorker.Started += CollectDeviceWorker_Started;
collectDeviceWorker.Stoping += CollectDeviceWorker_Stoping;
PluginService = _serviceScope.ServiceProvider.GetService<IPluginService>();
GlobalData = _serviceScope.ServiceProvider.GetService<GlobalData>();
await WhileExecuteAsync(stoppingToken);
}
#endregion worker服务
#region
protected override async Task<IEnumerable<DeviceRunTime>> GetDeviceRunTimeAsync(long deviceId)
{
return await _serviceScope.ServiceProvider.GetService<IDeviceService>().GetBusinessDeviceRuntimeAsync(deviceId);
}
#endregion worker服务
/// <summary>
/// 读取数据库,创建全部设备
/// </summary>
/// <returns></returns>
protected override async Task CreatAllChannelThreadsAsync()
{
if (!_stoppingToken.IsCancellationRequested)
{
_logger.LogInformation("正在获取业务设备组态信息");
var deviceRunTimes = await _serviceScope.ServiceProvider.GetService<IDeviceService>().GetBusinessDeviceRuntimeAsync();
_logger.LogInformation("获取业务设备组态信息完成");
var idSet = deviceRunTimes.ToDictionary(a => a.Id);
var result = deviceRunTimes.Where(a => !idSet.ContainsKey(a.RedundantDeviceId) && !a.IsRedundant).ToList();
result.ForEach(collectDeviceRunTime =>
{
if (!_stoppingToken.IsCancellationRequested)
{
try
{
DriverBase driverBase = collectDeviceRunTime.CreatDriver(PluginService);
GetChannelThread(driverBase);
}
catch (Exception ex)
{
_logger.LogError(ex, $"{collectDeviceRunTime.Name}初始化错误!");
}
}
});
}
}
#endregion
}

View File

@@ -25,56 +25,45 @@ public class CollectDeviceWorker : DeviceWorker
_logger = _serviceScope.ServiceProvider.GetService<ILoggerFactory>().CreateLogger("采集设备服务");
}
#region public
#region worker服务
/// <summary>
/// 重启采集服务
/// </summary>
public async Task RestartAsync()
/// <inheritdoc/>
public override async Task StopAsync(CancellationToken cancellationToken)
{
try
{
await restartLock.WaitAsync();
await singleRestartLock.WaitAsync();
//停止采集服务
await BeforeRemoveAllChannelThreadAsync();
await ProtectedStoping();
//完全停止全部采集线程
await RemoveAllChannelThreadAsync();
await ProtectedStoped();
//清空内存列表
GlobalData.CollectDevices.Clear();
//创建全部采集线程
await CreatAllChannelThreadsAsync();
await ProtectedStarting();
//开始全部采集线程
await StartAllChannelThreadsAsync();
await ProtectedStarted();
}
catch (Exception ex)
{
_logger.LogError(ex, "重启错误");
}
finally
{
singleRestartLock.Release();
restartLock.Release();
}
using var stoppingToken = new CancellationTokenSource();
_stoppingToken = stoppingToken.Token;
stoppingToken.Cancel();
await StopThreadAsync(true);
await base.StopAsync(cancellationToken);
}
#endregion public
#endregion worker服务
#region Private
#region
/// <inheritdoc/>
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await _easyLock?.WaitAsync();
ManagementWoker = WorkerUtil.GetWoker<ManagementWoker>();
PluginService = _serviceScope.ServiceProvider.GetService<IPluginService>();
GlobalData = _serviceScope.ServiceProvider.GetService<GlobalData>();
//重启采集线程,会启动其他后台服务
await ManagementWoker.StartLock.WaitAsync();
//await RestartAsync();
await WhileExecuteAsync(stoppingToken);
}
protected override async Task<IEnumerable<DeviceRunTime>> GetDeviceRunTimeAsync(long deviceId)
{
return await _serviceScope.ServiceProvider.GetService<DeviceService>().GetCollectDeviceRuntimeAsync(deviceId);
}
/// <summary>
/// 创建设备采集线程
/// 读取数据库,创建全部设备
/// </summary>
/// <returns></returns>
protected virtual async Task CreatAllChannelThreadsAsync()
protected override async Task CreatAllChannelThreadsAsync()
{
if (!_stoppingToken.IsCancellationRequested)
{
@@ -101,41 +90,5 @@ public class CollectDeviceWorker : DeviceWorker
}
}
protected override async Task<IEnumerable<DeviceRunTime>> GetDeviceRunTimeAsync(long deviceId)
{
return await _serviceScope.ServiceProvider.GetService<DeviceService>().GetCollectDeviceRuntimeAsync(deviceId);
}
#endregion Private
#region worker服务
/// <inheritdoc/>
public override async Task StopAsync(CancellationToken cancellationToken)
{
using var stoppingToken = new CancellationTokenSource();
_stoppingToken = stoppingToken.Token;
stoppingToken.Cancel();
await BeforeRemoveAllChannelThreadAsync();
//停止其他后台服务
await ProtectedStoping();
//停止全部采集线程
await RemoveAllChannelThreadAsync();
//停止其他后台服务
await ProtectedStoped();
await base.StopAsync(cancellationToken);
}
/// <inheritdoc/>
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await _easyLock?.WaitAsync();
PluginService = _serviceScope.ServiceProvider.GetService<IPluginService>();
GlobalData = _serviceScope.ServiceProvider.GetService<GlobalData>();
//重启采集线程,会启动其他后台服务
await RestartAsync();
await WhileExecuteAsync(stoppingToken);
}
#endregion worker服务
#endregion
}

View File

@@ -41,6 +41,7 @@ public abstract class DeviceWorker : BackgroundService
protected IPluginService PluginService;
protected GlobalData GlobalData;
protected ManagementWoker ManagementWoker;
protected IServiceScope _serviceScope;
private readonly IHostApplicationLifetime _appLifetime;
@@ -61,8 +62,6 @@ public abstract class DeviceWorker : BackgroundService
/// </summary>
protected ConcurrentList<ChannelThread> ChannelThreads { get; set; } = new();
#region public
/// <summary>
/// 控制设备线程启停
/// </summary>
@@ -77,9 +76,7 @@ public abstract class DeviceWorker : BackgroundService
DriverBases.FirstOrDefault(it => it.DeviceId == deviceId)?.PasueThread(isStart);
}
#endregion public
#region Private
#region protected
/// <summary>
/// 根据设备生成/获取通道线程管理器
@@ -94,6 +91,11 @@ public abstract class DeviceWorker : BackgroundService
var channelThread = ChannelThreads.FirstOrDefault(t => t.ChannelId == channelId);
if (channelThread != null)
{
if (channelThread.IsCollect != driverBase.IsCollect)
{
_logger.LogWarning($"设备{driverBase.DeviceName}与通道{channelId}的其他设备类型不相同,不能选择同一个通道");
return null;
}
channelThread.AddDriver(driverBase);
return channelThread;
}
@@ -129,7 +131,7 @@ public abstract class DeviceWorker : BackgroundService
/// <summary>
/// 删除通道线程,并且释放资源
/// </summary>
protected async Task RemoveAllChannelThreadAsync()
protected async Task RemoveAllChannelThreadAsync(bool isRemoveDevice)
{
await BeforeRemoveAllChannelThreadAsync();
@@ -137,15 +139,15 @@ public abstract class DeviceWorker : BackgroundService
{
try
{
await channelThread.StopThreadAsync();
await channelThread.StopThreadAsync(isRemoveDevice);
}
catch (Exception ex)
{
_logger?.LogError(ex, channelThread.ToString());
}
}, Environment.ProcessorCount / 2);
ChannelThreads.Clear();
if (isRemoveDevice)
ChannelThreads.Clear();
}
protected async Task BeforeRemoveAllChannelThreadAsync()
@@ -180,12 +182,55 @@ public abstract class DeviceWorker : BackgroundService
{
if (!_stoppingToken.IsCancellationRequested)
{
await StartChannelThreadAsync(item);
}
}
}
}
private async Task StartChannelThreadAsync(ChannelThread item)
{
if (item.IsCollect)
{
if (ManagementWoker.IsStart)
{
//采集设备启动
await item.StartThreadAsync();
}
}
else
{
if (ManagementWoker.IsStart)
{
//业务设备启动
await item.StartThreadAsync();
}
else
{
if (ManagementWoker.Options.Redundancy.IsStartBusinessDevice)
{
//业务设备启动
await item.StartThreadAsync();
}
}
}
}
#endregion protected
#region
private void SetRedundantDevice(DeviceRunTime? dev, Device? newDev)
{
dev.DevicePropertys = newDev.DevicePropertys;
dev.Description = newDev.Description;
dev.ChannelId = newDev.ChannelId;
dev.Enable = newDev.Enable;
dev.IntervalTime = newDev.IntervalTime;
dev.Name = newDev.Name;
dev.PluginName = newDev.PluginName;
}
/// <summary>
/// 更新设备线程
/// </summary>
@@ -203,7 +248,7 @@ public abstract class DeviceWorker : BackgroundService
var dev = isChanged ? (await GetDeviceRunTimeAsync(deviceId)).FirstOrDefault() : channelThread.GetDriver(deviceId).CurrentDevice;
//这里先停止采集,操作会使线程取消,需要重新恢复线程
//先停止采集,操作会使线程取消,需要重新恢复线程
await channelThread.RemoveDriverAsync(deviceId);
if (isChanged)
await ProtectedStoped();
@@ -218,7 +263,7 @@ public abstract class DeviceWorker : BackgroundService
await ProtectedStarting();
try
{
await newChannelThread.StartThreadAsync();
await StartChannelThreadAsync(newChannelThread);
if (isChanged)
await ProtectedStarted();
}
@@ -312,7 +357,7 @@ public abstract class DeviceWorker : BackgroundService
var newChannelThread = GetChannelThread(newDriverBase);
if (newChannelThread != null && newChannelThread.DriverTask == null)
{
await newChannelThread.StartThreadAsync();
await StartChannelThreadAsync(newChannelThread);
}
}
}
@@ -322,18 +367,7 @@ public abstract class DeviceWorker : BackgroundService
}
}
private void SetRedundantDevice(DeviceRunTime? dev, Device? newDev)
{
dev.DevicePropertys = newDev.DevicePropertys;
dev.Description = newDev.Description;
dev.ChannelId = newDev.ChannelId;
dev.Enable = newDev.Enable;
dev.IntervalTime = newDev.IntervalTime;
dev.Name = newDev.Name;
dev.PluginName = newDev.PluginName;
}
#endregion Private
#endregion
#region
@@ -361,10 +395,6 @@ public abstract class DeviceWorker : BackgroundService
return driverPlugin?.DriverUIType;
}
#endregion
#region
/// <summary>
/// 获取设备方法
/// </summary>
@@ -439,10 +469,22 @@ public abstract class DeviceWorker : BackgroundService
public event RestartEventHandler Starting;
private volatile bool otherstarted = false;
protected async Task ProtectedStarted()
{
if (Started != null)
await Started.Invoke();
try
{
if (!otherstarted)
if (ManagementWoker.IsStart || ManagementWoker.IsStartBusinessDevice)
if (Started != null)
await Started.Invoke();
}
finally
{
otherstarted = true;
otherstoped = false;
}
}
protected async Task ProtectedStarting()
@@ -453,8 +495,17 @@ public abstract class DeviceWorker : BackgroundService
protected async Task ProtectedStoped()
{
if (Stoped != null)
await Stoped.Invoke();
try
{
if (!otherstoped)
if (Stoped != null)
await Stoped.Invoke();
}
finally
{
otherstoped = true;
otherstarted = false;
}
}
protected async Task ProtectedStoping()
@@ -465,15 +516,6 @@ public abstract class DeviceWorker : BackgroundService
protected abstract Task<IEnumerable<DeviceRunTime>> GetDeviceRunTimeAsync(long deviceId);
/// <inheritdoc/>
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await _easyLock?.WaitAsync();
PluginService = _serviceScope.ServiceProvider.GetService<IPluginService>();
GlobalData = _serviceScope.ServiceProvider.GetService<GlobalData>();
await WhileExecuteAsync(stoppingToken);
}
protected virtual async Task WhileExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
@@ -544,6 +586,147 @@ public abstract class DeviceWorker : BackgroundService
}
#endregion worker服务
#region
private EasyLock publicRestartLock = new();
public async Task RestartAsync()
{
try
{
await publicRestartLock.WaitAsync();
await StopAsync(true);
await StartAsync();
}
finally
{
publicRestartLock.Release();
}
}
protected volatile bool started;
/// <summary>
/// 启动全部设备,如果没有找到设备会创建
/// </summary>
public async Task StartAsync()
{
try
{
await restartLock.WaitAsync();
await singleRestartLock.WaitAsync();
if (stoped)
{
ChannelThreads.Clear();
await CreatAllChannelThreadsAsync();
await ProtectedStarting();
}
await StartAllChannelThreadsAsync();
await ProtectedStarted();
}
catch (Exception ex)
{
_logger.LogError(ex, "启动发生错误");
}
finally
{
started = true;
stoped = false;
otherstoped = false;
singleRestartLock.Release();
restartLock.Release();
}
}
/// <summary>
/// 初始化,如果没有找到设备会创建
/// </summary>
public async Task CreatThreadsAsync()
{
try
{
await restartLock.WaitAsync();
await singleRestartLock.WaitAsync();
if (stoped)
{
ChannelThreads.Clear();
await CreatAllChannelThreadsAsync();
await ProtectedStarting();
}
}
catch (Exception ex)
{
_logger.LogError(ex, "启动发生错误");
}
finally
{
started = true;
stoped = false;
otherstoped = false;
singleRestartLock.Release();
restartLock.Release();
}
}
/// <summary>
/// 停止
/// </summary>
public async Task StopAsync(bool isRemoveDevice)
{
try
{
await restartLock.WaitAsync();
await singleRestartLock.WaitAsync();
await StopThreadAsync(isRemoveDevice);
}
catch (Exception ex)
{
_logger.LogError(ex, "停止错误");
}
finally
{
singleRestartLock.Release();
restartLock.Release();
}
}
protected async Task StopThreadAsync(bool isRemoveDevice = true)
{
if (started)
{
//取消全部采集线程
await BeforeRemoveAllChannelThreadAsync();
if (isRemoveDevice)
//取消其他后台服务
await ProtectedStoping();
//停止全部采集线程
await RemoveAllChannelThreadAsync(isRemoveDevice);
if (isRemoveDevice)
//停止其他后台服务
await ProtectedStoped();
//清空内存列表
}
started = false;
otherstarted = false;
stoped = true;
}
private volatile bool otherstoped = true;
protected volatile bool stoped = true;
#endregion
#region
/// <summary>
/// 读取数据库,创建全部设备
/// </summary>
/// <returns></returns>
protected abstract Task CreatAllChannelThreadsAsync();
#endregion
}
public delegate Task RestartEventHandler();

View File

@@ -164,7 +164,7 @@ public class HardwareInfoWorker : BackgroundService
return data.ToList();
}
private LiteDBCache<HisHardwareInfo> cache = LiteDBCacheUtil.GetDB<HisHardwareInfo>(nameof(APPInfo), $"{typeof(HisHardwareInfo).FullName}", false);
private LiteDBCache<HisHardwareInfo> cache = LiteDBCacheUtil.GetDB<HisHardwareInfo>(nameof(APPInfo), $"{typeof(HisHardwareInfo).FullName}", true, false);
}
/// <inheritdoc/>

View File

@@ -0,0 +1,347 @@
//------------------------------------------------------------------------------
// 此代码版权声明为全文件覆盖,如有原作者特别声明,会在下方手动补充
// 此代码版权除特别声明外的代码归作者本人Diego所有
// 源代码使用协议遵循本仓库的开源协议及附加协议
// Gitee源代码仓库https://gitee.com/diego2098/ThingsGateway
// Github源代码仓库https://github.com/kimdiego2098/ThingsGateway
// 使用文档https://diego2098.gitee.io/thingsgateway-docs/
// QQ群605534569
//------------------------------------------------------------------------------
using Furion.ConfigurableOptions;
using Furion.Logging.Extensions;
using Mapster;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System.Runtime.InteropServices;
using TouchSocket.Core;
using TouchSocket.Dmtp;
using TouchSocket.Dmtp.FileTransfer;
using TouchSocket.Dmtp.Rpc;
using TouchSocket.Rpc;
using TouchSocket.Sockets;
namespace ThingsGateway.Gateway.Application;
public class ManagementOptions : IConfigurableOptions
{
public string RemoteUri { get; set; }
public string ServerStandbyUri { get; set; }
public int Port { get; set; }
public string VerifyToken { get; set; }
public int HeartbeatInterval { get; set; }
public int MaxErrorCount { get; set; }
public Redundancy Redundancy { get; set; }
}
public class Redundancy
{
public bool Enable { get; set; }
public bool IsPrimary { get; set; }
public bool IsStartBusinessDevice { get; set; }
}
public class ManagementWoker : BackgroundService
{
protected IServiceScope _serviceScope;
private readonly IHostApplicationLifetime _appLifetime;
private readonly ILogger _logger;
private CollectDeviceWorker CollectDeviceWorker;
/// <inheritdoc cref="ManagementWoker"/>
public ManagementWoker(IServiceScopeFactory serviceScopeFactory, IHostApplicationLifetime appLifetime)
{
_serviceScope = serviceScopeFactory.CreateScope();
_logger = _serviceScope.ServiceProvider.GetService<ILoggerFactory>().CreateLogger("网关管理服务");
_appLifetime = appLifetime;
}
#region worker服务
private EasyLock _easyLock = new();
internal bool IsStart
{
get
{
return isStart;
}
set
{
if (isStart != value)
{
isStart = value;
//TODO:触发启动事件
if (IsStart)
{
//启动采集
_ = CollectDeviceWorker.StartAsync();
}
else
{
_ = CollectDeviceWorker.StopAsync(!IsStartBusinessDevice);
}
}
}
}
private volatile bool isStart = false;
internal volatile bool IsStartBusinessDevice = true;
/// <inheritdoc/>
public override async Task StartAsync(CancellationToken cancellationToken)
{
_logger?.LogInformation("网关管理服务启动");
await _easyLock.WaitAsync();
_appLifetime.ApplicationStarted.Register(() => { _easyLock.Release(); _easyLock = null; });
await base.StartAsync(cancellationToken);
}
/// <inheritdoc/>
public override Task StopAsync(CancellationToken cancellationToken)
{
_logger?.LogInformation("网关管理服务停止");
return base.StopAsync(cancellationToken);
}
internal ManagementOptions Options;
internal GlobalData GlobalData;
/// <summary>
/// 启动锁
/// </summary>
internal EasyLock StartLock = new(true);
/// <inheritdoc/>
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await _easyLock?.WaitAsync();
CollectDeviceWorker = WorkerUtil.GetWoker<CollectDeviceWorker>();
GlobalData = _serviceScope.ServiceProvider.GetService<GlobalData>();
Options = App.GetOptions<ManagementOptions>();
IsStartBusinessDevice = Options.Redundancy.Enable ? Options.Redundancy.IsStartBusinessDevice : true;
//初始化直接启动
IsStart = true;
StartLock.Release();
if (Options.Redundancy.Enable)
{
var udpDmtp = GetUdpDmtp(Options);
await udpDmtp.StartAsync();//启动
if (Options.Redundancy.IsPrimary)
{
//初始化时,主站直接启动
IsStart = true;
StartLock.Release();
}
while (!stoppingToken.IsCancellationRequested)
{
try
{
bool online = false;
var waitInvoke = new DmtpInvokeOption(millisecondsTimeout: 5000)
{
FeedbackType = FeedbackType.WaitInvoke,
Token = stoppingToken,
Timeout = 3000,
SerializationType = SerializationType.Json
};
try
{
GatewayState? gatewayState = null;
online = await udpDmtp.PingAsync(3000);
if (online)
{
var readErrorCount = 0;
while (readErrorCount < Options.MaxErrorCount)
{
try
{
gatewayState = await udpDmtp.GetDmtpRpcActor().InvokeTAsync<GatewayState>(nameof(ReverseCallbackServer.GetGatewayStateAsync), waitInvoke, IsStart);
break;
}
catch
{
readErrorCount++;
}
}
}
if (gatewayState != null)
{
if (gatewayState.IsPrimary == Options.Redundancy.IsPrimary)
{
if (!IsStart)
{
_logger.LogInformation("主备站设置重复!");
IsStart = true;
}
await Task.Delay(1000);
continue;
}
}
if (gatewayState == null)
{
//无法获取状态,启动本机
if (!IsStart)
{
_logger.LogInformation("无法连接冗余站点,本机将切换到正常状态");
IsStart = true;
}
}
else if (gatewayState.IsPrimary)
{
//主站已经启动
if (gatewayState.IsStart)
{
if (IsStart)
{
_logger.LogInformation("主站已恢复,本机(从站)将切换到备用状态");
IsStart = false;
}
}
else
{
//等待主站切换到正常后,再停止从站
}
}
else
{
//从站已经启动
if (gatewayState.IsStart)
{
//等待从站切换到备用后,再启动主站
}
else
{
if (!IsStart)
{
_logger.LogInformation("本机(主站)将切换到正常状态");
IsStart = true;
}
}
}
}
finally
{
StartLock.Release();
}
if (Options.Redundancy.IsPrimary)
{
try
{
if (online)
await udpDmtp.GetDmtpRpcActor().InvokeAsync(nameof(ReverseCallbackServer.UpdateGatewayDataAsync), waitInvoke, GlobalData.CollectDevices.Adapt<List<DeviceDataWithValue>>(), GlobalData.AllVariables.Adapt<List<VariableDataWithValue>>());
}
catch (Exception ex)
{
_logger.LogWarning(ex, "同步数据到从站时,发生错误");
}
}
await Task.Delay(1000, stoppingToken);
}
catch (TaskCanceledException)
{
}
catch (ObjectDisposedException)
{
}
catch (Exception ex)
{
_logger.LogWarning(ex, "循环线程出错");
}
}
}
else
{
//直接启动
IsStart = true;
//无冗余,直接启动采集服务
_logger.LogInformation("不启用网关冗余站点");
StartLock.Release();
}
while (!stoppingToken.IsCancellationRequested)
{
try
{
await Task.Delay(60000, stoppingToken);
}
catch (TaskCanceledException)
{
}
catch (ObjectDisposedException)
{
}
catch (Exception ex)
{
_logger.LogWarning(ex, "循环线程出错");
}
}
}
#endregion worker服务
#region
private void LogOut(TouchSocket.Core.LogLevel logLevel, object source, string message, Exception exception)
{
_logger?.Log_Out(logLevel, source, message, exception);
}
private UdpDmtp GetUdpDmtp(ManagementOptions options)
{
var udpDmtp = new UdpDmtp();
var config = new TouchSocketConfig()
.SetRemoteIPHost(options.RemoteUri)
.SetBindIPHost(options.Port)
.SetDmtpOption(
new DmtpOption() { VerifyToken = options.VerifyToken })
.ConfigureContainer(a =>
{
a.RegisterSingleton<GlobalData>(GlobalData);
a.AddEasyLogger(LogOut);
a.AddRpcStore(store =>
{
store.RegisterServer<ReverseCallbackServer>();
});
})
.ConfigurePlugins(a =>
{
a.UseDmtpFileTransfer();//必须添加文件传输插件
//a.Add<FilePlugin>();
a.UseDmtpHeartbeat()//使用Dmtp心跳
.SetTick(TimeSpan.FromMilliseconds(options.HeartbeatInterval))
.SetMaxFailCount(options.MaxErrorCount);
a.UseDmtpRpc();
});
if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
{
config.UseUdpConnReset();
}
udpDmtp.Setup(config);
return udpDmtp;
}
#endregion
}
internal class GatewayState
{
/// <summary>
/// 是否启动
/// </summary>
public bool IsStart { get; set; }
/// <summary>
/// 是否主站
/// </summary>
public bool IsPrimary { get; set; }
}

View File

@@ -0,0 +1,78 @@
//------------------------------------------------------------------------------
// 此代码版权声明为全文件覆盖,如有原作者特别声明,会在下方手动补充
// 此代码版权除特别声明外的代码归作者本人Diego所有
// 源代码使用协议遵循本仓库的开源协议及附加协议
// Gitee源代码仓库https://gitee.com/diego2098/ThingsGateway
// Github源代码仓库https://github.com/kimdiego2098/ThingsGateway
// 使用文档https://diego2098.gitee.io/thingsgateway-docs/
// QQ群605534569
//------------------------------------------------------------------------------
using TouchSocket.Core;
using TouchSocket.Dmtp.Rpc;
using TouchSocket.Rpc;
namespace ThingsGateway.Gateway.Application;
internal class ReverseCallbackServer : RpcServer
{
private ILog _logger;
private ManagementWoker managementWoker;
private GlobalData globalData;
public ReverseCallbackServer(ILog log, GlobalData globalData)
{
_logger = log;
managementWoker = WorkerUtil.GetWoker<ManagementWoker>();
this.globalData = globalData;
}
private EasyLock easyLock = new();
[DmtpRpc(true)]//使用方法名作为调用键
public async Task<GatewayState> GetGatewayStateAsync(bool isStart)
{
try
{
await easyLock.WaitAsync();
//冗余双方站点可能存在同时执行冗余切换的情况
{
GatewayState result = new();
result.IsStart = managementWoker.IsStart;
result.IsPrimary = managementWoker.Options.Redundancy.IsPrimary;
return result;
}
}
finally
{
easyLock.Release();
}
}
[DmtpRpc(true)]//使用方法名作为调用键
public async Task UpdateGatewayDataAsync(List<DeviceDataWithValue> deviceDatas, List<VariableDataWithValue> variableDatas)
{
//TODO:获取主站数据
await Task.CompletedTask;
foreach (var deviceData in deviceDatas)
{
var dev = globalData.CollectDevices.FirstOrDefault(a => a.Id == deviceData.Id);
if (dev != null)
{
dev.ActiveTime = deviceData.ActiveTime;
dev.DeviceStatus = deviceData.DeviceStatus;
dev.LastErrorMessage = deviceData.LastErrorMessage;
}
}
foreach (var variableData in variableDatas)
{
var variableRunTime = globalData.AllVariables.FirstOrDefault(a => a.Id == variableData.Id);
if (variableRunTime != null)
{
variableRunTime.SetValue(variableData.RawValue, variableData.CollectTime, variableData.IsOnline);
variableRunTime.SetErrorMessage(variableData.LastErrorMessage);
}
}
}
}

View File

@@ -1,78 +0,0 @@
//------------------------------------------------------------------------------
// 此代码版权声明为全文件覆盖,如有原作者特别声明,会在下方手动补充
// 此代码版权除特别声明外的代码归作者本人Diego所有
// 源代码使用协议遵循本仓库的开源协议及附加协议
// Gitee源代码仓库https://gitee.com/diego2098/ThingsGateway
// Github源代码仓库https://github.com/kimdiego2098/ThingsGateway
// 使用文档https://diego2098.gitee.io/thingsgateway-docs/
// QQ群605534569
//------------------------------------------------------------------------------
using Furion.Logging.Extensions;
using Mapster;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace ThingsGateway.Gateway.Application;
/// <summary>
/// TODO:网关远程管理服务使用client模式适用于端口要求严格的网络环境
/// </summary>
public class ManagementSlaveWoker : BackgroundService
{
protected IServiceScope _serviceScope;
private readonly IHostApplicationLifetime _appLifetime;
private readonly ILogger _logger;
/// <inheritdoc cref="AlarmWorker"/>
public ManagementSlaveWoker(IServiceScopeFactory serviceScopeFactory, IHostApplicationLifetime appLifetime)
{
_serviceScope = serviceScopeFactory.CreateScope();
_logger = _serviceScope.ServiceProvider.GetService<ILoggerFactory>().CreateLogger("网关远程管理Slave服务");
_appLifetime = appLifetime;
}
#region worker服务
private EasyLock _easyLock = new();
/// <inheritdoc/>
public override async Task StartAsync(CancellationToken cancellationToken)
{
_logger?.LogInformation("网关远程管理Slave服务启动");
await _easyLock.WaitAsync();
_appLifetime.ApplicationStarted.Register(() => { _easyLock.Release(); _easyLock = null; });
await base.StartAsync(cancellationToken);
}
/// <inheritdoc/>
public override Task StopAsync(CancellationToken cancellationToken)
{
_logger?.LogInformation("网关远程管理Slave服务停止");
return base.StopAsync(cancellationToken);
}
/// <inheritdoc/>
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await _easyLock?.WaitAsync();
while (!stoppingToken.IsCancellationRequested)
{
try
{
await Task.Delay(60000, stoppingToken);
}
catch (TaskCanceledException)
{
}
catch (ObjectDisposedException)
{
}
}
}
#endregion worker服务
}

View File

@@ -1,78 +0,0 @@
//------------------------------------------------------------------------------
// 此代码版权声明为全文件覆盖,如有原作者特别声明,会在下方手动补充
// 此代码版权除特别声明外的代码归作者本人Diego所有
// 源代码使用协议遵循本仓库的开源协议及附加协议
// Gitee源代码仓库https://gitee.com/diego2098/ThingsGateway
// Github源代码仓库https://github.com/kimdiego2098/ThingsGateway
// 使用文档https://diego2098.gitee.io/thingsgateway-docs/
// QQ群605534569
//------------------------------------------------------------------------------
using Furion.Logging.Extensions;
using Mapster;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace ThingsGateway.Gateway.Application;
/// <summary>
/// TODO:网关远程管理服务使用client模式适用于端口要求严格的网络环境
/// </summary>
public class ManagementMasterWoker : BackgroundService
{
protected IServiceScope _serviceScope;
private readonly IHostApplicationLifetime _appLifetime;
private readonly ILogger _logger;
/// <inheritdoc cref="AlarmWorker"/>
public ManagementMasterWoker(IServiceScopeFactory serviceScopeFactory, IHostApplicationLifetime appLifetime)
{
_serviceScope = serviceScopeFactory.CreateScope();
_logger = _serviceScope.ServiceProvider.GetService<ILoggerFactory>().CreateLogger("网关远程管理Master服务");
_appLifetime = appLifetime;
}
#region worker服务
private EasyLock _easyLock = new();
/// <inheritdoc/>
public override async Task StartAsync(CancellationToken cancellationToken)
{
_logger?.LogInformation("网关远程管理Master服务启动");
await _easyLock.WaitAsync();
_appLifetime.ApplicationStarted.Register(() => { _easyLock.Release(); _easyLock = null; });
await base.StartAsync(cancellationToken);
}
/// <inheritdoc/>
public override Task StopAsync(CancellationToken cancellationToken)
{
_logger?.LogInformation("网关远程管理Master服务停止");
return base.StopAsync(cancellationToken);
}
/// <inheritdoc/>
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await _easyLock?.WaitAsync();
while (!stoppingToken.IsCancellationRequested)
{
try
{
await Task.Delay(60000, stoppingToken);
}
catch (TaskCanceledException)
{
}
catch (ObjectDisposedException)
{
}
}
}
#endregion worker服务
}

View File

@@ -1,78 +0,0 @@
//------------------------------------------------------------------------------
// 此代码版权声明为全文件覆盖,如有原作者特别声明,会在下方手动补充
// 此代码版权除特别声明外的代码归作者本人Diego所有
// 源代码使用协议遵循本仓库的开源协议及附加协议
// Gitee源代码仓库https://gitee.com/diego2098/ThingsGateway
// Github源代码仓库https://github.com/kimdiego2098/ThingsGateway
// 使用文档https://diego2098.gitee.io/thingsgateway-docs/
// QQ群605534569
//------------------------------------------------------------------------------
using Furion.Logging.Extensions;
using Mapster;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace ThingsGateway.Gateway.Application;
/// <summary>
/// 网关更新服务
/// </summary>
public class UpdatesWorker : BackgroundService
{
protected IServiceScope _serviceScope;
private readonly IHostApplicationLifetime _appLifetime;
private readonly ILogger _logger;
/// <inheritdoc cref="AlarmWorker"/>
public UpdatesWorker(IServiceScopeFactory serviceScopeFactory, IHostApplicationLifetime appLifetime)
{
_serviceScope = serviceScopeFactory.CreateScope();
_logger = _serviceScope.ServiceProvider.GetService<ILoggerFactory>().CreateLogger("网关更新服务");
_appLifetime = appLifetime;
}
#region worker服务
private EasyLock _easyLock = new();
/// <inheritdoc/>
public override async Task StartAsync(CancellationToken cancellationToken)
{
_logger?.LogInformation("更新服务启动");
await _easyLock.WaitAsync();
_appLifetime.ApplicationStarted.Register(() => { _easyLock.Release(); _easyLock = null; });
await base.StartAsync(cancellationToken);
}
/// <inheritdoc/>
public override Task StopAsync(CancellationToken cancellationToken)
{
_logger?.LogInformation("更新服务停止");
return base.StopAsync(cancellationToken);
}
/// <inheritdoc/>
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await _easyLock?.WaitAsync();
while (!stoppingToken.IsCancellationRequested)
{
try
{
await Task.Delay(60000, stoppingToken);
}
catch (TaskCanceledException)
{
}
catch (ObjectDisposedException)
{
}
}
}
#endregion worker服务
}

View File

@@ -6,9 +6,9 @@
<DocumentationFile></DocumentationFile>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="ThingsGateway.Foundation.Modbus" Version="5.0.1.13" />
<PackageReference Include="ThingsGateway.Foundation.Dlt645" Version="5.0.1.13" />
<PackageReference Include="ThingsGateway.Foundation.SiemensS7" Version="5.0.1.13" />
<PackageReference Include="ThingsGateway.Foundation.Modbus" Version="5.0.1.18" />
<PackageReference Include="ThingsGateway.Foundation.Dlt645" Version="5.0.1.18" />
<PackageReference Include="ThingsGateway.Foundation.SiemensS7" Version="5.0.1.18" />
</ItemGroup>
<!--<ItemGroup>
<ProjectReference Include="..\..\ThingsGateway.Foundation.Dlt645\src\ThingsGateway.Foundation.Dlt645.csproj" />

View File

@@ -684,7 +684,7 @@ namespace ThingsGateway.Foundation
var newByteBlock = new ByteBlock(1024 * 64);
e.UserToken = newByteBlock;
e.SetBuffer(newByteBlock.Buffer, 0, newByteBlock.Buffer.Length);
e.SetBuffer(newByteBlock.Buffer, 0, newByteBlock.Capacity);
try
{

View File

@@ -19,19 +19,25 @@ namespace ThingsGateway.Foundation;
/// </summary>
public abstract class VariableObject
{
private IProtocol protocol;
private int maxPack;
private List<VariableSourceClass>? deviceVariableSourceReads;
private Dictionary<string, VariableRuntimeProperty>? dict;
/// <summary>
/// VariableObject
/// </summary>
public VariableObject(IProtocol protocol, int maxPack)
{
this.protocol = protocol;
this.Protocol = protocol;
this.maxPack = maxPack;
}
private List<VariableSourceClass>? deviceVariableSourceReads;
/// <summary>
/// 协议对象
/// </summary>
public IProtocol Protocol { get; set; }
/// <summary>
/// <see cref="VariableRuntimeAttribute"/>特性连读,反射赋值到继承类中的属性
@@ -44,10 +50,10 @@ public abstract class VariableObject
//连读
foreach (var item in deviceVariableSourceReads)
{
var result = await protocol.ReadAsync(item.RegisterAddress, item.Length);
var result = await Protocol.ReadAsync(item.RegisterAddress, item.Length);
if (result.IsSuccess)
{
item.VariableRunTimes.PraseStructContent(protocol, result.Content, item, exWhenAny: true);
item.VariableRunTimes.PraseStructContent(Protocol, result.Content, item, exWhenAny: true);
}
else
{
@@ -71,30 +77,6 @@ public abstract class VariableObject
}
}
private Dictionary<string, VariableRuntimeProperty>? dict;
private void GetVariableSources()
{
if (deviceVariableSourceReads == null)
{
dict = VariableObjectHelper.GetPairs(GetType());
List<VariableClass> variableClasss = new();
foreach (var pair in dict)
{
var dataType = pair.Value.Attribute.DataType == DataTypeEnum.Object ? Type.GetTypeCode(pair.Value.Property.PropertyType.IsArray ? pair.Value.Property.PropertyType.GetElementType() : pair.Value.Property.PropertyType).GetDataType() : pair.Value.Attribute.DataType;
VariableClass variableClass = new VariableClass()
{
DataType = dataType,
RegisterAddress = pair.Value.Attribute.RegisterAddress,
IntervalTime = 1000,
};
pair.Value.VariableClass = variableClass;
variableClasss.Add(variableClass);
}
deviceVariableSourceReads = protocol.LoadSourceRead<VariableSourceClass>(variableClasss, maxPack, 1000);
}
}
/// <summary>
/// <see cref="VariableRuntimeAttribute"/>特性连读,反射赋值到继承类中的属性
/// </summary>
@@ -106,10 +88,10 @@ public abstract class VariableObject
//连读
foreach (var item in deviceVariableSourceReads)
{
var result = protocol.Read(item.RegisterAddress, item.Length);
var result = Protocol.Read(item.RegisterAddress, item.Length);
if (result.IsSuccess)
{
item.VariableRunTimes.PraseStructContent(protocol, result.Content, item, exWhenAny: true);
item.VariableRunTimes.PraseStructContent(Protocol, result.Content, item, exWhenAny: true);
}
else
{
@@ -153,7 +135,7 @@ public abstract class VariableObject
{
return new($"该属性未被识别,可能没有使用{typeof(VariableRuntimeAttribute)}特性标识");
}
var result = protocol.Write(variableRuntimeProperty.VariableClass.RegisterAddress, JToken.FromObject(value), variableRuntimeProperty.VariableClass.DataType, cancellationToken);
var result = Protocol.Write(variableRuntimeProperty.VariableClass.RegisterAddress, JToken.FromObject(value), variableRuntimeProperty.VariableClass.DataType, cancellationToken);
return result;
}
catch (Exception ex)
@@ -182,7 +164,7 @@ public abstract class VariableObject
{
return new($"该属性未被识别,可能没有使用{typeof(VariableRuntimeAttribute)}特性标识");
}
var result = await protocol.WriteAsync(variableRuntimeProperty.VariableClass.RegisterAddress, JToken.FromObject(value), variableRuntimeProperty.VariableClass.DataType, cancellationToken);
var result = await Protocol.WriteAsync(variableRuntimeProperty.VariableClass.RegisterAddress, JToken.FromObject(value), variableRuntimeProperty.VariableClass.DataType, cancellationToken);
return result;
}
catch (Exception ex)
@@ -190,4 +172,26 @@ public abstract class VariableObject
return new(ex);
}
}
private void GetVariableSources()
{
if (deviceVariableSourceReads == null)
{
dict = VariableObjectHelper.GetPairs(GetType());
List<VariableClass> variableClasss = new();
foreach (var pair in dict)
{
var dataType = pair.Value.Attribute.DataType == DataTypeEnum.Object ? Type.GetTypeCode(pair.Value.Property.PropertyType.IsArray ? pair.Value.Property.PropertyType.GetElementType() : pair.Value.Property.PropertyType).GetDataType() : pair.Value.Attribute.DataType;
VariableClass variableClass = new VariableClass()
{
DataType = dataType,
RegisterAddress = pair.Value.Attribute.RegisterAddress,
IntervalTime = 1000,
};
pair.Value.VariableClass = variableClass;
variableClasss.Add(variableClass);
}
deviceVariableSourceReads = Protocol.LoadSourceRead<VariableSourceClass>(variableClasss, maxPack, 1000);
}
}
}

View File

@@ -13,7 +13,7 @@
<ItemGroup>
<PackageReference Include="NewLife.Core" Version="10.7.2024.202" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
<PackageReference Include="TouchSocket" Version="2.0.0-beta.277" />
<PackageReference Include="TouchSocket" Version="2.0.0-beta.279" />
</ItemGroup>
<ItemGroup Condition="'$(TargetFramework)'!='net45'">

View File

@@ -19,6 +19,8 @@ using ThingsGateway.Foundation.Extension.String;
using ThingsGateway.Foundation.Modbus;
using ThingsGateway.Gateway.Application;
using TouchSocket.Sockets;
namespace ThingsGateway.Plugin.Modbus;
/// <summary>
@@ -96,6 +98,8 @@ public class ModbusSlave : BusinessBase
_plc.WriteData += OnWriteData;
}
private volatile bool success = true;
protected override async Task ProtectedExecuteAsync(CancellationToken cancellationToken)
{
//获取设备连接状态
@@ -107,6 +111,18 @@ public class ModbusSlave : BusinessBase
else
{
CurrentDevice.SetDeviceStatus(DateTimeUtil.Now, 999);
try
{
Protocol.Channel.Close();
await Protocol.Channel.ConnectAsync(3000, cancellationToken);
success = true;
}
catch (Exception ex)
{
if (success)
LogMessage.LogWarning(ex, "无法启动服务");
success = false;
}
}
var list = _modbusVariableDict.ToListWithDequeue();
foreach (var item in list)

View File

@@ -112,6 +112,28 @@ public partial class OpcUaServer : BusinessBase
{
try
{
if (IsConnected())
{
//更新设备活动时间
CurrentDevice.SetDeviceStatus(DateTimeUtil.Now, 0);
}
else
{
CurrentDevice.SetDeviceStatus(DateTimeUtil.Now, 999);
try
{
await m_application.CheckApplicationInstanceCertificate(false, 0, 1200);
await m_application.Start(m_server);
success = true;
}
catch (Exception ex)
{
if (success)
LogMessage.LogWarning(ex, "无法启动服务");
success = false;
}
}
////变化推送
var varList = CollectVariableRunTimes.ToListWithDequeue();
if (varList?.Count != 0)

View File

@@ -14,8 +14,8 @@ using ThingsGateway.Core;
namespace ThingsGateway.Plugin.SqlDB;
[SplitTable(SplitType.Month)]//按分表 (自带分表支持 年、季、月、周、日)
[SugarTable("historyValue_{year}{month}{day}", TableDescription = "设备采集历史表")]//3个变量必须要有
[SplitTable(SplitType.Week)]//按分表 (自带分表支持 年、季、月、周、日)
[SugarTable("{name}_{year}{month}{day}", TableDescription = "设备采集历史表")]//3个变量必须要有
[SugarIndex("index_Name", nameof(SQLHistoryValue.Name), OrderByType.Desc)]
[SugarIndex("index_DeviceName", nameof(SQLHistoryValue.DeviceName), OrderByType.Desc)]
[SugarIndex("index_CollectTime", nameof(SQLHistoryValue.CollectTime), OrderByType.Desc)]

View File

@@ -0,0 +1,55 @@
//------------------------------------------------------------------------------
// 此代码版权声明为全文件覆盖,如有原作者特别声明,会在下方手动补充
// 此代码版权除特别声明外的代码归作者本人Diego所有
// 源代码使用协议遵循本仓库的开源协议及附加协议
// Gitee源代码仓库https://gitee.com/diego2098/ThingsGateway
// Github源代码仓库https://github.com/kimdiego2098/ThingsGateway
// 使用文档https://diego2098.gitee.io/thingsgateway-docs/
// QQ群605534569
//------------------------------------------------------------------------------
using Mapster;
using SqlSugar;
using System.ComponentModel.DataAnnotations;
using System.Reflection;
using ThingsGateway.Admin.Core;
using ThingsGateway.Plugin.SqlDB;
namespace ThingsGateway.Gateway.Application;
/// <summary>
/// 上传数据库插件静态方法
/// </summary>
public static class SqlDBBusinessDatabaseUtil
{
/// <summary>
/// 获取数据库链接
/// </summary>
/// <returns></returns>
public static SqlSugarClient GetDb(SqlDBProducerProperty sqlDBProducerProperty)
{
var configureExternalServices = new ConfigureExternalServices
{
SplitTableService = new SqlDBDateSplitTableService(sqlDBProducerProperty),
EntityService = (type, column) => // 修改列可空-1、带?问号 2、String类型若没有Required
{
if ((type.PropertyType.IsGenericType && type.PropertyType.GetGenericTypeDefinition() == typeof(Nullable<>))
|| (type.PropertyType == typeof(string) && type.GetCustomAttribute<RequiredAttribute>() == null))
column.IsNullable = true;
},
};
var sqlSugarClient = new SqlSugarClient(new ConnectionConfig()
{
ConnectionString = sqlDBProducerProperty.BigTextConnectStr,//连接字符串
DbType = sqlDBProducerProperty.DbType,//数据库类型
IsAutoCloseConnection = true, //不设成true要手动close
ConfigureExternalServices = configureExternalServices,
}
);
DbContext.AopSetting(sqlSugarClient);//aop配置
return sqlSugarClient;
}
}

View File

@@ -0,0 +1,233 @@
//------------------------------------------------------------------------------
// 此代码版权声明为全文件覆盖,如有原作者特别声明,会在下方手动补充
// 此代码版权除特别声明外的代码归作者本人Diego所有
// 源代码使用协议遵循本仓库的开源协议及附加协议
// Gitee源代码仓库https://gitee.com/diego2098/ThingsGateway
// Github源代码仓库https://github.com/kimdiego2098/ThingsGateway
// 使用文档https://diego2098.gitee.io/thingsgateway-docs/
// QQ群605534569
//------------------------------------------------------------------------------
using SqlSugar;
using System.Reflection;
using System.Text.RegularExpressions;
namespace ThingsGateway.Plugin.SqlDB;
public class SqlDBDateSplitTableService : DateSplitTableService
{
private SqlDBProducerProperty sqlDBProducerProperty;
public SqlDBDateSplitTableService(SqlDBProducerProperty sqlDBProducerProperty)
{
this.sqlDBProducerProperty = sqlDBProducerProperty;
}
#region Core
public override List<SplitTableInfo> GetAllTables(ISqlSugarClient db, EntityInfo EntityInfo, List<DbTableInfo> tableInfos)
{
CheckTableName(EntityInfo.DbTableName);
string regex = "^" + EntityInfo.DbTableName.Replace("{year}", "([0-9]{2,4})").Replace("{day}", "([0-9]{1,2})").Replace("{month}", "([0-9]{1,2})").Replace("{name}", sqlDBProducerProperty.HisDBTableName);
List<string> list = (from it in tableInfos
where Regex.IsMatch(it.Name, regex, RegexOptions.IgnoreCase)
select it.Name).Reverse().ToList();
List<SplitTableInfo> list2 = new List<SplitTableInfo>();
foreach (string item in list)
{
SplitTableInfo splitTableInfo = new SplitTableInfo();
splitTableInfo.TableName = item;
Match match = Regex.Match(item, regex, RegexOptions.IgnoreCase);
string value = match.Groups[1].Value;
string value2 = match.Groups[2].Value;
string value3 = match.Groups[3].Value;
splitTableInfo.Date = GetDate(value, value2, value3, EntityInfo.DbTableName);
list2.Add(splitTableInfo);
}
return list2.OrderByDescending((SplitTableInfo it) => it.Date).ToList();
}
public override string GetTableName(ISqlSugarClient db, EntityInfo EntityInfo)
{
var splitTableAttribute = EntityInfo.Type.GetCustomAttribute<SplitTableAttribute>();
if (splitTableAttribute != null)
{
var type = (splitTableAttribute as SplitTableAttribute).SplitType;
return GetTableName(db, EntityInfo, type);
}
else
{
return GetTableName(db, EntityInfo, SplitType.Day);
}
}
public override string GetTableName(ISqlSugarClient db, EntityInfo EntityInfo, SplitType splitType)
{
var date = db.GetDate();
return GetTableNameByDate(EntityInfo, splitType, date);
}
public override string GetTableName(ISqlSugarClient db, EntityInfo entityInfo, SplitType splitType, object fieldValue)
{
var value = Convert.ToDateTime(fieldValue);
return GetTableNameByDate(entityInfo, splitType, value);
}
#endregion Core
#region Private Models
internal class SplitTableSort
{
public string Name { get; set; }
public int Sort { get; set; }
}
#endregion Private Models
#region Common Helper
private DateTime GetDate(string group1, string group2, string group3, string dbTableName)
{
var yearIndex = dbTableName.IndexOf("{year}");
var dayIndex = dbTableName.IndexOf("{day}");
var monthIndex = dbTableName.IndexOf("{month}");
List<SplitTableSort> tables = new List<SplitTableSort>();
tables.Add(new SplitTableSort() { Name = "{year}", Sort = yearIndex });
tables.Add(new SplitTableSort() { Name = "{day}", Sort = dayIndex });
tables.Add(new SplitTableSort() { Name = "{month}", Sort = monthIndex });
tables = tables.OrderBy(it => it.Sort).ToList();
var year = "";
var month = "";
var day = "";
if (tables[0].Name == "{year}")
{
year = group1;
}
if (tables[1].Name == "{year}")
{
year = group2;
}
if (tables[2].Name == "{year}")
{
year = group3;
}
if (tables[0].Name == "{month}")
{
month = group1;
}
if (tables[1].Name == "{month}")
{
month = group2;
}
if (tables[2].Name == "{month}")
{
month = group3;
}
if (tables[0].Name == "{day}")
{
day = group1;
}
if (tables[1].Name == "{day}")
{
day = group2;
}
if (tables[2].Name == "{day}")
{
day = group3;
}
return Convert.ToDateTime($"{year}-{month}-{day}");
}
private string GetTableNameByDate(EntityInfo EntityInfo, SplitType splitType, DateTime date)
{
date = ConvertDateBySplitType(date, splitType);
return EntityInfo.DbTableName.Replace("{year}", date.Year + "").Replace("{day}", PadLeft2(date.Day + "")).Replace("{month}", PadLeft2(date.Month + "")).Replace("{name}", sqlDBProducerProperty.HisDBTableName);
}
private string PadLeft2(string str)
{
if (str.Length < 2)
{
return str.PadLeft(2, '0');
}
else
{
return str;
}
}
private static void CheckTableName(string dbTableName)
{
Check.Exception(!dbTableName.Contains("{year}"), "table name need {{year}}", "分表表名需要占位符 {{year}}");
Check.Exception(!dbTableName.Contains("{month}"), "table name need {{month}}", "分表表名需要占位符 {{month}} ");
Check.Exception(!dbTableName.Contains("{day}"), "table name need {{day}}", "分表表名需要占位符{{day}}");
Check.Exception(Regex.Matches(dbTableName, @"\{year\}").Count > 1, " There can only be one {{year}}", " 只能有一个 {{year}}");
Check.Exception(Regex.Matches(dbTableName, @"\{month\}").Count > 1, "There can only be one {{month}}", "只能有一个 {{month}} ");
Check.Exception(Regex.Matches(dbTableName, @"\{day\}").Count > 1, "There can only be one {{day}}", "只能有一个{{day}}");
Check.Exception(Regex.IsMatch(dbTableName, @"\d\{|\}\d"), " '{{' or '}}' can't be numbers nearby", "占位符相令一位不能是数字,比如 : 1{{day}}2 错误 , 正确: 1_{{day}}_2");
}
#endregion Common Helper
#region Date Helper
private DateTime ConvertDateBySplitType(DateTime time, SplitType type)
{
switch (type)
{
case SplitType.Day:
return Convert.ToDateTime(time.ToString("yyyy-MM-dd"));
case SplitType.Week:
return GetMondayDate(time);
case SplitType.Month:
return Convert.ToDateTime(time.ToString("yyyy-MM-01"));
case SplitType.Season:
if (time.Month <= 3)
{
return Convert.ToDateTime(time.ToString("yyyy-01-01"));
}
else if (time.Month <= 6)
{
return Convert.ToDateTime(time.ToString("yyyy-04-01"));
}
else if (time.Month <= 9)
{
return Convert.ToDateTime(time.ToString("yyyy-07-01"));
}
else
{
return Convert.ToDateTime(time.ToString("yyyy-10-01"));
}
case SplitType.Year:
return Convert.ToDateTime(time.ToString("yyyy-01-01"));
case SplitType.Month_6:
if (time.Month <= 6)
{
return Convert.ToDateTime(time.ToString("yyyy-01-01"));
}
else
{
return Convert.ToDateTime(time.ToString("yyyy-07-01"));
}
default:
throw new Exception($"SplitType parameter error ");
}
}
private DateTime GetMondayDate(DateTime someDate)
{
int i = someDate.DayOfWeek - DayOfWeek.Monday;
if (i == -1) i = 6;
TimeSpan ts = new TimeSpan(i, 0, 0, 0);
return someDate.Subtract(ts);
}
#endregion Date Helper
}

View File

@@ -30,7 +30,7 @@ public partial class SqlDBPage : IDriverUIBase
private async Task<SqlSugarPagedList<SQLHistoryValue>> QueryCallAsync(SqlDBPageInput input)
{
using var db = BusinessDatabaseUtil.GetDb(SqlDBProducer._driverPropertys.DbType, SqlDBProducer._driverPropertys.BigTextConnectStr);
using var db = SqlDBBusinessDatabaseUtil.GetDb(SqlDBProducer._driverPropertys);
var query = db.Queryable<SQLHistoryValue>().SplitTable()
.WhereIF(input.StartTime != null, a => a.CreateTime >= input.StartTime)
.WhereIF(input.EndTime != null, a => a.CreateTime <= input.EndTime)
@@ -49,7 +49,7 @@ public partial class SqlDBPage : IDriverUIBase
private async Task<SqlSugarPagedList<SQLRealValue>> RealQueryCallAsync(SqlDBPageInput input)
{
using var db = BusinessDatabaseUtil.GetDb(SqlDBProducer._driverPropertys.DbType, SqlDBProducer._driverPropertys.BigTextConnectStr);
using var db = SqlDBBusinessDatabaseUtil.GetDb(SqlDBProducer._driverPropertys);
var query = db.Queryable<SQLRealValue>().AS(SqlDBProducer._driverPropertys.ReadDBTableName)
.WhereIF(!string.IsNullOrEmpty(input.VariableName), it => it.Name.Contains(input.VariableName))
;

View File

@@ -59,7 +59,7 @@ YitIdHelper.NextId())
protected override async Task ProtectedBeforStartAsync(CancellationToken cancellationToken)
{
var db = BusinessDatabaseUtil.GetDb(_driverPropertys.DbType, _driverPropertys.BigTextConnectStr);
var db = SqlDBBusinessDatabaseUtil.GetDb(_driverPropertys);
db.CodeFirst.InitTables(typeof(SQLHistoryValue));
db.MappingTables.Add(nameof(SQLRealValue), _driverPropertys.ReadDBTableName);
db.CodeFirst.InitTables(typeof(SQLRealValue));
@@ -77,7 +77,7 @@ YitIdHelper.NextId())
var varList = CurrentDevice.VariableRunTimes.ToList().Adapt<List<SQLRealValue>>();
var result = await UpdateAsync(varList, cancellationToken);
if (success != result.IsSuccess)
if (result != null && success != result.IsSuccess)
{
if (!result.IsSuccess)
LogMessage.LogWarning(result.ToString());

View File

@@ -20,6 +20,7 @@ public class SqlDBProducerProperty : BusinessPropertyWithCacheInterval
[DynamicProperty("是否实时表", "true=>实时表更新")] public bool IsReadDB { get; set; } = false;
[DynamicProperty("是否历史表", "true=>历史存储(按月分表)")] public bool IsHisDB { get; set; } = true;
[DynamicProperty("实时表名称", "")] public string ReadDBTableName { get; set; } = "ReadDBTableName";
[DynamicProperty("历史表名称", "")] public string HisDBTableName { get; set; } = "HisDBTableName";
[DynamicProperty("数据库类型", "MySql/SqlServer")] public DbType DbType { get; set; } = DbType.SqlServer;
[DynamicProperty("链接字符串", "")] public string BigTextConnectStr { get; set; } = "server=.;uid=sa;pwd=111111;database=test;";

View File

@@ -59,7 +59,7 @@ public partial class SqlDBProducer : BusinessBaseWithCacheInterval<SQLHistoryVal
{
try
{
var db = BusinessDatabaseUtil.GetDb(_driverPropertys.DbType, _driverPropertys.BigTextConnectStr);
var db = SqlDBBusinessDatabaseUtil.GetDb(_driverPropertys);
db.Ado.CancellationToken = cancellationToken;
var result = await db.Fastest<SQLHistoryValue>().PageSize(50000).SplitTable().BulkCopyAsync(dbInserts);
//var result = await db.Insertable(dbInserts).SplitTable().ExecuteCommandAsync();
@@ -79,14 +79,19 @@ public partial class SqlDBProducer : BusinessBaseWithCacheInterval<SQLHistoryVal
{
try
{
var db = BusinessDatabaseUtil.GetDb(_driverPropertys.DbType, _driverPropertys.BigTextConnectStr);
var db = SqlDBBusinessDatabaseUtil.GetDb(_driverPropertys);
db.Ado.CancellationToken = cancellationToken;
if (!_initRealData)
{
var result = await db.Storageable(datas).As(_driverPropertys.ReadDBTableName).PageSize(5000).ExecuteSqlBulkCopyAsync();
if (result > 0)
LogMessage.Trace($"主题:{nameof(SQLRealValue)}{Environment.NewLine} ,数量:{result}");
_initRealData = true;
if (datas?.Count != 0)
{
var result = await db.Storageable(datas).As(_driverPropertys.ReadDBTableName).PageSize(5000).ExecuteSqlBulkCopyAsync();
if (result > 0)
LogMessage.Trace($"主题:{nameof(SQLRealValue)}{Environment.NewLine} ,数量:{result}");
_initRealData = true;
return new();
}
return null;
}
else
{
@@ -94,10 +99,11 @@ public partial class SqlDBProducer : BusinessBaseWithCacheInterval<SQLHistoryVal
{
var result = await db.Fastest<SQLRealValue>().AS(_driverPropertys.ReadDBTableName).PageSize(100000).BulkUpdateAsync(datas);
LogMessage.Trace($"主题:{nameof(SQLRealValue)}{Environment.NewLine} ,数量:{result}");
return new();
}
return null;
}
return new();
}
catch (Exception ex)
{