diff --git a/src/Plugins/ThingsGateway.RabbitMQ/RabbitMQClient.cs b/src/Plugins/ThingsGateway.RabbitMQ/RabbitMQClient.cs index 381f7aaaa..17052216f 100644 --- a/src/Plugins/ThingsGateway.RabbitMQ/RabbitMQClient.cs +++ b/src/Plugins/ThingsGateway.RabbitMQ/RabbitMQClient.cs @@ -71,10 +71,11 @@ namespace ThingsGateway.RabbitMQ private IConnection _connection; private UploadDevice _curDevice { get; set; } RpcCore _rpcCore { get; set; } + private ConnectionFactory _connectionFactory; protected override void Init(UploadDevice device) { _curDevice = device; - ConnectionFactory connectionFactory = new ConnectionFactory + _connectionFactory = new ConnectionFactory { HostName = IP, Port = Port, @@ -82,16 +83,7 @@ namespace ThingsGateway.RabbitMQ Password = Password, VirtualHost = VirtualHost, }; - // 创建连接 - _connection = connectionFactory.CreateConnection(); - // 创建通道 - _model = _connection.CreateModel(); - // 声明路由队列 - if (IsQueueDeclare) - { - _model.QueueDeclare(VariableQueueName, true, false, false); - _model.QueueDeclare(DeviceQueueName, true, false, false); - } + using var serviceScope = _scopeFactory.CreateScope(); @@ -114,7 +106,6 @@ namespace ThingsGateway.RabbitMQ } - public override string ToString() { return $" {nameof(RabbitMQClient)} IP:{IP} Port:{Port}"; @@ -140,6 +131,31 @@ namespace ThingsGateway.RabbitMQ { try { + if(_model==null) + { + try + { + // 创建连接 + if (_connection == null) + _connection = _connectionFactory.CreateConnection(); + // 创建通道 + if (_model == null) + _model = _connection.CreateModel(); + // 声明路由队列 + if (IsQueueDeclare) + { + _model?.QueueDeclare(VariableQueueName, true, false, false); + _model?.QueueDeclare(DeviceQueueName, true, false, false); + } + } + catch (Exception ex) + { + _logger.LogError(ex, ToString()); + } + } + + + ////变化推送 var varList = CollectVariableRunTimes.ToListWithDequeue(10000); if (varList?.Count != 0) @@ -153,9 +169,9 @@ namespace ThingsGateway.RabbitMQ { var data = Encoding.UTF8.GetBytes(variables.ToJson()); // 设置消息持久化 - IBasicProperties properties = _model.CreateBasicProperties(); + IBasicProperties properties = _model?.CreateBasicProperties(); properties.Persistent = true; - _model.BasicPublish(ExchangeName, VariableQueueName, properties, data); + _model?.BasicPublish(ExchangeName, VariableQueueName, properties, data); } catch (Exception ex) { @@ -172,9 +188,9 @@ namespace ThingsGateway.RabbitMQ { var data = Encoding.UTF8.GetBytes(variable.ToJson()); // 设置消息持久化 - IBasicProperties properties = _model.CreateBasicProperties(); + IBasicProperties properties = _model?.CreateBasicProperties(); properties.Persistent = true; - _model.BasicPublish(ExchangeName, VariableQueueName, properties, data); + _model?.BasicPublish(ExchangeName, VariableQueueName, properties, data); } catch (Exception ex) { @@ -205,9 +221,9 @@ namespace ThingsGateway.RabbitMQ { var data = Encoding.UTF8.GetBytes(devices.ToJson()); // 设置消息持久化 - IBasicProperties properties = _model.CreateBasicProperties(); + IBasicProperties properties = _model?.CreateBasicProperties(); properties.Persistent = true; - _model.BasicPublish(ExchangeName, DeviceQueueName, properties, data); + _model?.BasicPublish(ExchangeName, DeviceQueueName, properties, data); } catch (Exception ex) { @@ -224,9 +240,9 @@ namespace ThingsGateway.RabbitMQ { var data = Encoding.UTF8.GetBytes(devices.ToJson()); // 设置消息持久化 - IBasicProperties properties = _model.CreateBasicProperties(); + IBasicProperties properties = _model?.CreateBasicProperties(); properties.Persistent = true; - _model.BasicPublish(ExchangeName, DeviceQueueName, properties, data); + _model?.BasicPublish(ExchangeName, DeviceQueueName, properties, data); } catch (Exception ex) {