rbmq插件优化

This commit is contained in:
Diego2098
2023-03-21 23:38:00 +08:00
parent b2170c49a3
commit 45ebe9048d

View File

@@ -71,10 +71,11 @@ namespace ThingsGateway.RabbitMQ
private IConnection _connection;
private UploadDevice _curDevice { get; set; }
RpcCore _rpcCore { get; set; }
private ConnectionFactory _connectionFactory;
protected override void Init(UploadDevice device)
{
_curDevice = device;
ConnectionFactory connectionFactory = new ConnectionFactory
_connectionFactory = new ConnectionFactory
{
HostName = IP,
Port = Port,
@@ -82,16 +83,7 @@ namespace ThingsGateway.RabbitMQ
Password = Password,
VirtualHost = VirtualHost,
};
// 创建连接
_connection = connectionFactory.CreateConnection();
// 创建通道
_model = _connection.CreateModel();
// 声明路由队列
if (IsQueueDeclare)
{
_model.QueueDeclare(VariableQueueName, true, false, false);
_model.QueueDeclare(DeviceQueueName, true, false, false);
}
using var serviceScope = _scopeFactory.CreateScope();
@@ -114,7 +106,6 @@ namespace ThingsGateway.RabbitMQ
}
public override string ToString()
{
return $" {nameof(RabbitMQClient)} IP:{IP} Port:{Port}";
@@ -140,6 +131,31 @@ namespace ThingsGateway.RabbitMQ
{
try
{
if(_model==null)
{
try
{
// 创建连接
if (_connection == null)
_connection = _connectionFactory.CreateConnection();
// 创建通道
if (_model == null)
_model = _connection.CreateModel();
// 声明路由队列
if (IsQueueDeclare)
{
_model?.QueueDeclare(VariableQueueName, true, false, false);
_model?.QueueDeclare(DeviceQueueName, true, false, false);
}
}
catch (Exception ex)
{
_logger.LogError(ex, ToString());
}
}
////变化推送
var varList = CollectVariableRunTimes.ToListWithDequeue(10000);
if (varList?.Count != 0)
@@ -153,9 +169,9 @@ namespace ThingsGateway.RabbitMQ
{
var data = Encoding.UTF8.GetBytes(variables.ToJson());
// 设置消息持久化
IBasicProperties properties = _model.CreateBasicProperties();
IBasicProperties properties = _model?.CreateBasicProperties();
properties.Persistent = true;
_model.BasicPublish(ExchangeName, VariableQueueName, properties, data);
_model?.BasicPublish(ExchangeName, VariableQueueName, properties, data);
}
catch (Exception ex)
{
@@ -172,9 +188,9 @@ namespace ThingsGateway.RabbitMQ
{
var data = Encoding.UTF8.GetBytes(variable.ToJson());
// 设置消息持久化
IBasicProperties properties = _model.CreateBasicProperties();
IBasicProperties properties = _model?.CreateBasicProperties();
properties.Persistent = true;
_model.BasicPublish(ExchangeName, VariableQueueName, properties, data);
_model?.BasicPublish(ExchangeName, VariableQueueName, properties, data);
}
catch (Exception ex)
{
@@ -205,9 +221,9 @@ namespace ThingsGateway.RabbitMQ
{
var data = Encoding.UTF8.GetBytes(devices.ToJson());
// 设置消息持久化
IBasicProperties properties = _model.CreateBasicProperties();
IBasicProperties properties = _model?.CreateBasicProperties();
properties.Persistent = true;
_model.BasicPublish(ExchangeName, DeviceQueueName, properties, data);
_model?.BasicPublish(ExchangeName, DeviceQueueName, properties, data);
}
catch (Exception ex)
{
@@ -224,9 +240,9 @@ namespace ThingsGateway.RabbitMQ
{
var data = Encoding.UTF8.GetBytes(devices.ToJson());
// 设置消息持久化
IBasicProperties properties = _model.CreateBasicProperties();
IBasicProperties properties = _model?.CreateBasicProperties();
properties.Persistent = true;
_model.BasicPublish(ExchangeName, DeviceQueueName, properties, data);
_model?.BasicPublish(ExchangeName, DeviceQueueName, properties, data);
}
catch (Exception ex)
{