添加重启锁

This commit is contained in:
Kimdiego2098
2023-07-17 21:40:47 +08:00
parent 122b833256
commit ba212da222
3 changed files with 178 additions and 118 deletions

View File

@@ -168,7 +168,7 @@ public class AlarmWorker : BackgroundService
private Task<Task> HisAlarmTask;
private Task<Task> RealAlarmTask;
private CacheDb CacheDb { get; set; }
private EasyLock easyLock { get; set; } = new();
/// <summary>
/// 重启服务
/// </summary>
@@ -180,35 +180,74 @@ public class AlarmWorker : BackgroundService
internal void Start()
{
foreach (var item in _globalDeviceData.CollectDevices)
try
{
DeviceChange(item);
}
StoppingTokens.Add(new());
Init();
RealAlarmTask.Start();
HisAlarmTask.Start();
easyLock.Lock();
foreach (var item in _globalDeviceData.CollectDevices)
{
DeviceChange(item);
}
StoppingTokens.Add(new());
Init();
RealAlarmTask.Start();
HisAlarmTask.Start();
}
finally
{
easyLock.UnLock();
}
}
internal void Stop(IEnumerable<CollectDeviceRunTime> devices)
{
foreach (var device in devices)
try
{
device.DeviceStatusCahnge -= DeviceStatusCahnge;
device.DeviceVariableRunTimes?.ForEach(v => { v.VariableCollectChange -= DeviceVariableChange; });
}
easyLock.Lock();
CancellationTokenSource StoppingToken = StoppingTokens.LastOrDefault();
StoppingToken?.Cancel();
_logger?.LogInformation($"实时报警线程停止中");
var realAlarmResult = RealAlarmTask?.Result;
if (realAlarmResult?.Status != TaskStatus.Canceled)
{
bool? realTaskResult = false;
foreach (var device in devices)
{
device.DeviceStatusCahnge -= DeviceStatusCahnge;
device.DeviceVariableRunTimes?.ForEach(v => { v.VariableCollectChange -= DeviceVariableChange; });
}
CancellationTokenSource StoppingToken = StoppingTokens.LastOrDefault();
StoppingToken?.Cancel();
_logger?.LogInformation($"实时报警线程停止中");
var realAlarmResult = RealAlarmTask?.Result;
if (realAlarmResult?.Status != TaskStatus.Canceled)
{
bool? realTaskResult = false;
try
{
realTaskResult = realAlarmResult?.Wait(10000);
}
catch (ObjectDisposedException)
{
}
catch (Exception ex)
{
_logger?.LogWarning(ex, "等待线程停止错误");
}
if (realTaskResult == true)
{
_logger?.LogInformation($"实时报警线程已停止");
}
else
{
_logger?.LogWarning($"实时报警线程停止超时,已强制取消");
}
}
RealAlarmTask?.SafeDispose();
_logger?.LogInformation($"历史报警线程停止中");
var hisAlarmResult = HisAlarmTask?.GetAwaiter().GetResult();
bool? hisTaskResult = false;
try
{
realTaskResult = realAlarmResult?.Wait(10000);
hisTaskResult = hisAlarmResult?.Wait(10000);
}
catch (ObjectDisposedException)
{
@@ -218,44 +257,22 @@ public class AlarmWorker : BackgroundService
{
_logger?.LogWarning(ex, "等待线程停止错误");
}
if (realTaskResult == true)
if (hisTaskResult == true)
{
_logger?.LogInformation($"实时报警线程已停止");
_logger?.LogInformation($"历史报警线程已停止");
}
else
{
_logger?.LogWarning($"实时报警线程停止超时,已强制取消");
_logger?.LogWarning($"历史报警线程停止超时,已强制取消");
}
HisAlarmTask?.SafeDispose();
StoppingToken?.SafeDispose();
StoppingTokens.Remove(StoppingToken);
}
RealAlarmTask?.SafeDispose();
_logger?.LogInformation($"历史报警线程停止中");
var hisAlarmResult = HisAlarmTask?.GetAwaiter().GetResult();
bool? hisTaskResult = false;
try
finally
{
hisTaskResult = hisAlarmResult?.Wait(10000);
easyLock.UnLock();
}
catch (ObjectDisposedException)
{
}
catch (Exception ex)
{
_logger?.LogWarning(ex, "等待线程停止错误");
}
if (hisTaskResult == true)
{
_logger?.LogInformation($"历史报警线程已停止");
}
else
{
_logger?.LogWarning($"历史报警线程停止超时,已强制取消");
}
HisAlarmTask?.SafeDispose();
StoppingToken?.SafeDispose();
StoppingTokens.Remove(StoppingToken);
}
private void AlarmAnalysis(DeviceVariableRunTime item)

View File

@@ -147,6 +147,7 @@ public class HistoryValueWorker : BackgroundService
/// </summary>
public ConcurrentList<CancellationTokenSource> StoppingTokens = new();
private Task<Task> HistoryValueTask;
private EasyLock easyLock { get; set; } = new();
/// <summary>
/// 离线缓存
/// </summary>
@@ -330,55 +331,71 @@ public class HistoryValueWorker : BackgroundService
internal void Start()
{
foreach (var device in _globalDeviceData.CollectDevices)
try
{
device.DeviceVariableRunTimes?.Where(a => a.HisEnable == true)?.ForEach(v => { v.VariableCollectChange += DeviceVariableCollectChange; });
device.DeviceVariableRunTimes?.Where(a => a.HisEnable == true)?.ForEach(v => { v.VariableValueChange += DeviceVariableValueChange; });
}
StoppingTokens.Add(new());
Init();
HistoryValueTask.Start();
easyLock.Lock();
foreach (var device in _globalDeviceData.CollectDevices)
{
device.DeviceVariableRunTimes?.Where(a => a.HisEnable == true)?.ForEach(v => { v.VariableCollectChange += DeviceVariableCollectChange; });
device.DeviceVariableRunTimes?.Where(a => a.HisEnable == true)?.ForEach(v => { v.VariableValueChange += DeviceVariableValueChange; });
}
StoppingTokens.Add(new());
Init();
HistoryValueTask.Start();
}
finally
{
easyLock.UnLock();
}
}
internal void Stop(IEnumerable<CollectDeviceRunTime> devices)
{
foreach (var device in devices)
{
device.DeviceVariableRunTimes?.Where(a => a.HisEnable == true)?.ForEach(v => { v.VariableCollectChange -= DeviceVariableCollectChange; });
device.DeviceVariableRunTimes?.Where(a => a.HisEnable == true)?.ForEach(v => { v.VariableValueChange -= DeviceVariableValueChange; });
}
CancellationTokenSource StoppingToken = StoppingTokens.LastOrDefault();
StoppingToken?.Cancel();
_logger?.LogInformation($"历史数据线程停止中");
var hisHisResult = HistoryValueTask?.GetAwaiter().GetResult();
bool? hisTaskResult = false;
try
{
hisTaskResult = hisHisResult?.Wait(10000);
}
catch (ObjectDisposedException)
{
easyLock.Lock();
}
catch (Exception ex)
{
_logger?.LogInformation(ex, "等待线程停止错误");
}
if (hisTaskResult == true)
{
_logger?.LogInformation($"历史数据线程已停止");
}
else
{
_logger?.LogInformation($"历史数据线程停止超时,已强制取消");
}
HistoryValueTask?.SafeDispose();
StoppingToken?.SafeDispose();
StoppingTokens.Remove(StoppingToken);
foreach (var device in devices)
{
device.DeviceVariableRunTimes?.Where(a => a.HisEnable == true)?.ForEach(v => { v.VariableCollectChange -= DeviceVariableCollectChange; });
device.DeviceVariableRunTimes?.Where(a => a.HisEnable == true)?.ForEach(v => { v.VariableValueChange -= DeviceVariableValueChange; });
}
CancellationTokenSource StoppingToken = StoppingTokens.LastOrDefault();
StoppingToken?.Cancel();
_logger?.LogInformation($"历史数据线程停止中");
var hisHisResult = HistoryValueTask?.GetAwaiter().GetResult();
bool? hisTaskResult = false;
try
{
hisTaskResult = hisHisResult?.Wait(10000);
}
catch (ObjectDisposedException)
{
}
catch (Exception ex)
{
_logger?.LogInformation(ex, "等待线程停止错误");
}
if (hisTaskResult == true)
{
_logger?.LogInformation($"历史数据线程已停止");
}
else
{
_logger?.LogInformation($"历史数据线程停止超时,已强制取消");
}
HistoryValueTask?.SafeDispose();
StoppingToken?.SafeDispose();
StoppingTokens.Remove(StoppingToken);
}
finally
{
easyLock.UnLock();
}
}
private void DeviceVariableCollectChange(DeviceVariableRunTime variable)
{

View File

@@ -150,18 +150,27 @@ public class UploadDeviceWorker : BackgroundService
/// </summary>
private void RemoveAllDeviceThread()
{
foreach (var deviceThread in UploadDeviceThreads)
try
{
try
easyLock.Lock();
foreach (var deviceThread in UploadDeviceThreads)
{
deviceThread.SafeDispose();
}
catch (Exception ex)
{
_logger?.LogError(ex, deviceThread.ToString());
try
{
deviceThread.SafeDispose();
}
catch (Exception ex)
{
_logger?.LogError(ex, deviceThread.ToString());
}
}
UploadDeviceThreads.Clear();
}
finally
{
easyLock.UnLock();
}
UploadDeviceThreads.Clear();
}
/// <summary>
@@ -170,16 +179,25 @@ public class UploadDeviceWorker : BackgroundService
/// <returns></returns>
private void StartAllDeviceThreads()
{
if (!_stoppingToken.IsCancellationRequested)
try
{
foreach (var item in UploadDeviceThreads)
easyLock.Lock();
if (!_stoppingToken.IsCancellationRequested)
{
if (!_stoppingToken.IsCancellationRequested)
foreach (var item in UploadDeviceThreads)
{
item.StartThread();
if (!_stoppingToken.IsCancellationRequested)
{
item.StartThread();
}
}
}
}
finally
{
easyLock.UnLock();
}
}
/// <summary>
/// 创建设备上传线程
@@ -187,30 +205,38 @@ public class UploadDeviceWorker : BackgroundService
/// <returns></returns>
private void CreatAllDeviceThreads()
{
if (!_stoppingToken.IsCancellationRequested)
try
{
var uploadDeviceRunTimes = (_uploadDeviceService.GetUploadDeviceRuntime());
foreach (var uploadDeviceRunTime in uploadDeviceRunTimes)
{
if (!_stoppingToken.IsCancellationRequested)
{
try
{
UploadDeviceCore deviceUploadCore = new(_scopeFactory);
deviceUploadCore.Init(uploadDeviceRunTime);
DeviceThread(deviceUploadCore);
}
catch (Exception ex)
{
_logger.LogError(ex, uploadDeviceRunTime.Name);
easyLock.Lock();
if (!_stoppingToken.IsCancellationRequested)
{
var uploadDeviceRunTimes = (_uploadDeviceService.GetUploadDeviceRuntime());
foreach (var uploadDeviceRunTime in uploadDeviceRunTimes)
{
if (!_stoppingToken.IsCancellationRequested)
{
try
{
UploadDeviceCore deviceUploadCore = new(_scopeFactory);
deviceUploadCore.Init(uploadDeviceRunTime);
DeviceThread(deviceUploadCore);
}
catch (Exception ex)
{
_logger.LogError(ex, uploadDeviceRunTime.Name);
}
}
}
}
}
finally
{
easyLock.UnLock();
}
}
private UploadDeviceThread DeviceThread(UploadDeviceCore deviceUploadCore)