springboot+RabbitMQ系列(一)数据库连接

本文参考了 springboot官方文档,主要介绍依赖springboot-amqp模块实现与RabbitMQ服务端的连接。

ConnectionFactory

spring AMQP默认使用CachingConnectionFactory创建一个应用程序共享的连接工厂,也是用途最广泛的ConnectionFactory构建方法,也为Junit提供了SingleConnectionFactorySingleConncetionFactory不常用,不再赘述。

与AMQP通信的工作单元实际上是Channel,TCP连接可以共享。connectionFactory分为两种模式,一种是缓存channel,一种是缓存connection(同时也缓存该connection的channel)。默认是缓存channel的模式,高可用集群场景下(镜像队列),通过负载均衡器连接至集群中不同的实例时,可以通过setCacheMode设置为缓存connection的模式。代码示例中给出了缓存connection的模式,同时也设置了channelCacheSize

1
2
3
4
5
6
7
8
9
10
11
12
private CachingConnectionFactory buildConnFactory(String addresses, String username, String password, String vhost) {    
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(addresses);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(vhost);
connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);
connectionFactory.setConnectionCacheSize(10);
connectionFactory.setChannelCacheSize(200);

return connectionFactory;
}

如果需要采用缓存Channel的模式,示例如下:

1
2
3
4
5
6
7
8
9
10
11
private CachingConnectionFactory buildConnFactory(String addresses, String username, String password, String vhost) {    
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(addresses);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(vhost);
connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL);
connectionFactory.setChannelCacheSize(channelCacheSize);

return connectionFactory;
}

在缓存connection模式下,不支持自动声明队列、exchange、binding等,rabbitmq-client默认只提供了5个线程处理connection,因此,当connection较多时,应该自定义线程池,并配置到CachingConnectionFactory中。自定义的线程池将会被所有connection共享,建议线程池的最大线程数设置的与预期connection数相等,因为可能存在对于大部分connection都有多个channel的情况。示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
ExecutorService executor = new 
ThreadPoolExecutor(corePoolSize,
maxPoolSize, keepAliveSeconds,TimeUnit.SECONDS,
new ArrayBlockingQueue<>(queueSize),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r)
{
Thread thread = new Thread(r);
thread.setName("myThread" + autoInt.getAndIncrement());
thread.setDaemon(false);
return thread;
}
},
new ThreadPoolExecutor.CallerRunsPolicy());

connectionFactory.setExecutor(executor);

channelCacheSize

前面了解到,Spring AMQP通过缓存channel或connection提高吞吐量。connectionFactory分为两种模式,一种是缓存channel,一种是缓存connection(同时也缓存该connection的channel)。本节主要介绍channelCacheSizeConnectionCacheSize也是类似的,不赘述。

默认仅限制缓存的channelSize

默认地,最大channelSize是没有限制的,限制的仅仅是缓存的channelSize(connection也一样),默认值是25,缓存channel的目的是减小高并发多线程环境中频繁创建销毁channel的开销,比如:在某一时刻有100个channel处于工作状态,当channel空闲后,只会缓存channelSize个channel,剩下的都会被销毁。

通过RabbitMQ的Web管理插件观察到channel在频繁的被创建和销毁时,应及时的提高channelCacheSize。建议最少要保证线程数<channelCacheSize。可以通过压力测试,观察高峰期channel动态平衡的数量,从而决定channelCacheSize的大小。

channelSize限制

也可以通过channelCheckoutTimeout参数设置connectionFactorychannelSize限制,当该参数大于0时,表示最大的channel数目=channelCacheSize,达到channelCacheSize的上限后,调用createChannel的线程会阻塞,直至有空闲channel出现或阻塞时间超过chanelCheckoutTimeout,在超时的情况下,抛出AmqpTimeoutException,可以设置一些Retry的策略来处理这些异常。

连接命名

通过ConnectionNameStrategy属性设置connection名称。

1
2
3
4
5
6
7
8
connectionFactory.setConnectionNameStrategy(new ConnectionNameStrategy() {   
@Override
public String obtainNewConnectionName(ConnectionFactory connectionFactory) {
return connectionFactory.getHost() + atomicInteger.getAndIncrement();
}
});

return connectionFactory;

使用RabbitMQ Java API提供的connectionFactory

Spring AMQP也支持使用RabbitMQ Java API提供的connectionFactory,即Rabbit Client的connectionFactory,位于包com.rabbitmq.client.ConnectionFactory中,通过构造器参数设置在CachingConnectionFactory中,示例如下:

1
2
ConnectionFactory factory = new ConnectionFactory();
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(factory);

自定义RabbitClient的属性

上述示例为RabbitClient的connectionFactory —–> Spring AMQP的connectionFactiory的转换,Spring也提供了Spring AMQP的connectionFactiory —–> RabbitClient的connectionFactory的转换。

比如可以通过CachingConnectionFactory设置RabbitClient connectionFactory的属性。

示例如下:

1
connectionFactory.getRabbitConnectionFactory().getClientProperties().put("foo", "bar");

连接恢复

Rabbit Client4.0后的connectionFactory和Spring AMQP的·connectionFactory均默认提供了自动恢复连接的机制;虽然两者的自动恢复机制是兼容的,但使用构造注入conncetionFactory时建议关闭其中一个。

否则,在MQ服务器节点可用但连接尚未恢复时,出现AutoRecoverConnectionNotCurrentlyOpenException 异常。比如:如果在RabbitTemplate中配置RetryTemplate(Spring AMQP的手动恢复),甚至在故障转移到集群中的另一个代理时,可能抛出上述异常。

关闭Rabbit Client的自动恢复

由于Spring AMQP的自动恢复连接在计时器上恢复,因此可以使用SpringAMQP的恢复机制更快地恢复连接。

springboot-amqp1.7版本以后,默认关闭RabbitClient的connectionFactory的自动重连,但是,在通过构造参数注入RabbitClient的connectionFactory时,是没有办法默认关闭的,需要手动设置

1
2
3
ConnectionFactory factory = new ConnectionFactory();
factory.setAutomaticRecoveryEnabled(false); // 显示关闭RabbitClient的自动重连
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(factory);

一般地,关闭掉Rabbit Client的自动恢复,使用Spring AMQP可以满足绝大多数使用场景,而且框架提供的自动恢复机制已经很完善。

SSL连接

建议通过注入RabbitMQ client connectionFactory的方式配置SSL连接,示例如下:

1
2
3
4
5
6
7
8
9
10
ConnectionFactory factory = new ConnectionFactory();

factory.setAutomaticRecoveryEnabled(false);

File keyFile = new File(keyPath);
File certFile = new File(cerPath);
SSLContexts sslCtx = SslContextBuilder.forServer(certFile, keyFile).build();
factory.useSslProtocol(sslCtx);

CachingConnectionFactory connectionFactory = new CachingConnectionFactory(factory);

避免Connection死锁

在内存不足或MQ服务端出现异常时,可能会出现连接阻塞,对于默认的CachingConnectionFactory,MQ服务端导致连接阻塞时,客户端会主动地关闭。

因此,如果生产者消费者共用同一个connectionFactory,MQ服务端导致生产者客户端与消费者客户端关闭,可能死锁的情况:生产者与消费者持有相同的连接资源时,MQ服务器异常触发生产者和消费者中断与服务端的连接,可能会出现死锁。

为避免死锁的产生,建议对于生产者和消费者分别配置不同的connectionFactory。需要注意的是:如果生产者消费者处于同一个事物时,不建议生产者消费者配置相同的connectionFactory,因为消费者(或生产者)需要复用对方的channel。

1
2
3
4
5
6
7
@Bean(name = "myRabbitTemplate")
public RabbitTemplate basicCloudRabbitTemplate(CachingConnectionFactory f) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(f);
rabbitTemplate.setUsePublisherConnection(true);

return rabbitTemplate;
}

Routing ConnectionFactory实现多数据源收发消息

spring AMQP提供SimpleRoutingConnectionFactory负责在运行时根据查找键动态选择connectionFactory,通常,以线程的上下文作为查找键,比如地址、vHost等,SimpleRoutingConnectionFactory继承了AbstractRoutingConnectionFactory,通过SimpleResourceHolder获取当前线程的查找键。示例如下:

首先,为RabbitTemplate配置一个以vHost作为查找键的SimpleRoutingConnectionFactory。key分别为factory1virtualHostfactory2virtualHost,当然,也可以使用addresses作为查找键。使用方式为factoryMap.put("#{factory1.addresses}", factory1)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private SimpleRoutingConnectionFactory myFactory() {    
SimpleRoutingConnectionFactory routingFactory = new SimpleRoutingConnectionFactory();
CachingConnectionFactory factory1 = new CachingConnectionFactory();
factory1.setAddresses("localhost:5672");
factory1.setUsername("username1");
factory1.setPassword("password1");
factory1.setVirtualHost("v1");

CachingConnectionFactory factory2 = new CachingConnectionFactory();
factory2.setAddresses("202.130.1.1:5672");
factory2.setUsername("username2");
factory2.setPassword("password2");
factory2.setVirtualHost("v2");

Map<Object, ConnectionFactory> factoryMap = new HashMap<>(5);
factoryMap.put("#{factory1.virtualHost}", factory1);
factoryMap.put("#{factory2.virtualHost}", factory2);

routingFactory.setTargetConnectionFactories(factoryMap);

return routingFactory;
}

@Bean("myRabbitTemplate")
public RabbitTemplate myRabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(myFactory());
return rabbitTemplate;
}

使用方法如下,SimpleResourceHolder的bind和unbind都是必须的,分别指获取当前线程查找键,释放查找键。bind有两个参数,第一个为待获取的connectFactory,第二个为key

1
2
3
4
5
6
7
8
9
10
11
12
13
public class MyService {

@Autowired
@Qualifier("myRabbitTemplate")
private RabbitTemplate rabbitTemplate;

public void service(String vHost, String payload) {
SimpleResourceHolder.bind(rabbitTemplate.getConnectionFactory(), vHost);
rabbitTemplate.convertAndSend(payload);
SimpleResourceHolder.unbind(rabbitTemplate.getConnectionFactory());
}

}

消费者使用多数据源的方式略有不同,首先,配置两个不同的containerFactory

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Bean("myContainerFactory1")
public SimpleRabbitListenerContainerFactory containerFactory1() {
SimpleRabbitListenerContainerFactory containerFactory1 = new SimpleRabbitListenerContainerFactory();

CachingConnectionFactory factory1 = new CachingConnectionFactory();
factory1.setAddresses("localhost:5672");
factory1.setUsername("username1");
factory1.setPassword("password1");
factory1.setVirtualHost("v1");

containerFactory1.setConnectionFactory(factory1);

return containerFactory1;
}

@Bean("myContainerFactory2")
public SimpleRabbitListenerContainerFactory containerFactory2() {
SimpleRabbitListenerContainerFactory containerFactory2 = new SimpleRabbitListenerContainerFactory();

CachingConnectionFactory factory2 = new CachingConnectionFactory();
factory2.setAddresses("host2:5672");
factory2.setUsername("username2");
factory2.setPassword("password2");
factory2.setVirtualHost("v2");

containerFactory2.setConnectionFactory(factory2);

return containerFactory2;
}

使用@RabbitListener接收消息时指定containerFactory

1
2
3
4
5
6
7
8
9
10
11
12
13
@RabbitListener(queues = "queue.test", containerFactory = "myContainerFactory1")
public void receiveMsg1(Message message, Channel channel) {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
logger.debug("rcv msg {}", msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

@RabbitListener(queues = "queue.test", containerFactory = "myContainerFactory2")
public void receiveMsg2(Message message, Channel channel) {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
logger.debug("rcv msg {}", msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

集群中的连接管理

MQ的集群根据集群中各节点队列信息区分为镜像队列和普通队列。只有队列所在节点知道该队列的所有信息,默认情况下,MQ是普通队列,队列只存活于集群中的一个节点上,称为主队列。镜像队列与普通队列的相同点是:队列的主拷贝仅存在于一个节点上(主队列,master节点)。不同点是,镜像节点在集群中的其他节点上拥有从队列的拷贝。一旦队列主节点不可用,最老的从队列自动被选举为新的主队列。

镜像队列的原理:在非镜像队列的集群中,信道负责将消息路由至合适的队列。当加入镜像队列后,信道除了负责将消息路按照路由绑定规则路由至合适的队列外,它也要将消息投递到镜像队列的从拷贝,在某种程度上,可以将镜像队列视为拥有一个隐藏的fanout交换器,它指示着信道将消息分发到队列的从拷贝上。

无论对于普通队列还是镜像队列,所要面临的问题是:主节点崩溃时,消费者该与哪个节点建立连接。

普通队列

对于普通队列,使用CachingConnectionFactory就足够了,它支持配置多个连接地址,当一个连接失败时,会按顺序尝试与其他地址建立连接。

1
2
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setAddresses("host1:5672, host2:5672");

高可用队列(镜像队列)

  • 消费者

LocalizedQueueConnectionFactory是Spring AMQP为消费者提供的用于高可用集群的连接工厂,使用它时,MQ服务端必须设置enable management plugin。

它通过MQ服务器management 插件提供的REST API 获取哪个节点是主节点,创建CachingConnectionFactory连接至该节点,如果连接失败,将再次调用MQ服务器management 插件提供的REST API 获取新选举的主节点。当无法获取新选举的主节点是,将通过CachingConnectionFactory像普通队列一样按顺序连接至MQ服务器。

LocalizedQueueConnectionFactory本质上是一个RoutingConnectionFactory,它的查找键是队列名。

正是由于查找键是对列名,必需保证查找键的唯一性,消费者的ContainerFactory只能配置在一个队列上。

注意:

由于每次创建连接时,都要通过REST API获取连接节点,开销很大,只适用于长连接的场景(消费者),不适用短连接的场景(生产者)。

LocalizedQueueConnectionFactory只适用于消费者,比如配置在SimpleMessageListenerContainer中,不适用于生产者(比如将LocalizedQueueConnectionFactory配置在RabbitTemplate中)。

示例如下:LocalizedQueueConnectionFactory构造方法的前三个参数 addresses, adminUrisnodes都是字符串数组,LocalizedQueueConnectionFactory使用数组下标相同的参数构建连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Autowired
private RabbitProperties props;

private final String[] adminUris = { "http://host1:15672", "http://host2:15672" };

private final String[] nodes = { "rabbit@host1", "rabbit@host2" };

@Bean
public ConnectionFactory defaultConnectionFactory() {
CachingConnectionFactory cf = new CachingConnectionFactory();
cf.setAddresses(this.props.getAddresses());
cf.setUsername(this.props.getUsername());
cf.setPassword(this.props.getPassword());
cf.setVirtualHost(this.props.getVirtualHost());
return cf;
}

@Bean
public ConnectionFactory queueAffinityCF(
@Qualifier("defaultConnectionFactory") ConnectionFactory defaultCF) {
return new LocalizedQueueConnectionFactory(defaultCF,
StringUtils.commaDelimitedListToStringArray(this.props.getAddresses()),
this.adminUris, this.nodes,
this.props.getVirtualHost(), this.props.getUsername(), this.props.getPassword(),
false, null);
}
  • 生产者

生产者本身也不需要考虑哪个节点是主节点,对于生产者来说,队列是未知的,消息会发送至集群中所有的节点。节点会判断该消息对应的主队列是否在自己的节点上。

连接和信道监听

Spring AMQP提供了connection和channel的监听器接口,通过回调实现自定义的创建和销毁监听逻辑。

ConnectionListener

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
CachingConnectionFactory factory = new CachingConnectionFactory();
List<ConnectionListener> connectionListeners = new ArrayList<>();

ConnectionListener connectionListener = new ConnectionListener() {
@Override
public void onClose(Connection connection) {
// do something
}

@Override
public void onShutDown(ShutdownSignalException signal) {
// do something
}

@Override
public void onCreate(Connection connection) {
// do something
}
};
connectionListeners.add(connectionListener);

factory.setConnectionListeners(connectionListeners);

ChannelListener

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
CachingConnectionFactory factory = new CachingConnectionFactory();
List<ChannelListener> channelListeners = new ArrayList<>();
ChannelListener channelListener = new ChannelListener() {
@Override
public void onCreate(Channel channel, boolean b) {
// do something
}

@Override
public void onShutDown(ShutdownSignalException signal) {
// do something
}
};

channelListeners.add(channelListener);
List<ConnectionListener> connectionListeners = new ArrayList<>();

factory.setChannelListeners(channelListeners);

channel关闭的日志级别控制

当channel关闭时,CachingConnectionFactory的默认日志规则如下:

  • 正常的channel销毁,无log
  • 队列声明失败导致的channel关闭,debug级别
  • 在exclusive(独占)队列上,basic.consume因为独占使用条件而关闭channel,info级别
  • 其他的channel关闭情况输出error级别log

自定义log级别:为CachingConnectionFactory配置自定义的ConditionalExceptionLogger。