本文参考了 springboot官方文档,主要介绍依赖springboot-amqp模块实现与RabbitMQ服务端的连接。
ConnectionFactory
spring AMQP默认使用CachingConnectionFactory
创建一个应用程序共享的连接工厂,也是用途最广泛的ConnectionFactory
构建方法,也为Junit提供了SingleConnectionFactory
,SingleConncetionFactory
不常用,不再赘述。
与AMQP通信的工作单元实际上是Channel,TCP连接可以共享。connectionFactory分为两种模式,一种是缓存channel,一种是缓存connection(同时也缓存该connection的channel)。默认是缓存channel的模式,高可用集群场景下(镜像队列),通过负载均衡器连接至集群中不同的实例时,可以通过setCacheMode设置为缓存connection的模式。代码示例中给出了缓存connection的模式,同时也设置了channelCacheSize
。
1 | private CachingConnectionFactory buildConnFactory(String addresses, String username, String password, String vhost) { |
如果需要采用缓存Channel的模式,示例如下:
1 | private CachingConnectionFactory buildConnFactory(String addresses, String username, String password, String vhost) { |
在缓存connection模式下,不支持自动声明队列、exchange、binding等,rabbitmq-client默认只提供了5个线程处理connection,因此,当connection较多时,应该自定义线程池,并配置到CachingConnectionFactory
中。自定义的线程池将会被所有connection共享,建议线程池的最大线程数设置的与预期connection数相等,因为可能存在对于大部分connection都有多个channel的情况。示例如下:
1 | ExecutorService executor = new |
channelCacheSize
前面了解到,Spring AMQP通过缓存channel或connection提高吞吐量。connectionFactory
分为两种模式,一种是缓存channel,一种是缓存connection(同时也缓存该connection的channel)。本节主要介绍channelCacheSize
。ConnectionCacheSize
也是类似的,不赘述。
默认仅限制缓存的channelSize
默认地,最大channelSize是没有限制的,限制的仅仅是缓存的channelSize(connection也一样),默认值是25,缓存channel的目的是减小高并发多线程环境中频繁创建销毁channel的开销,比如:在某一时刻有100个channel处于工作状态,当channel空闲后,只会缓存channelSize个channel,剩下的都会被销毁。
通过RabbitMQ的Web管理插件观察到channel在频繁的被创建和销毁时,应及时的提高channelCacheSize
。建议最少要保证线程数<channelCacheSize
。可以通过压力测试,观察高峰期channel动态平衡的数量,从而决定channelCacheSize
的大小。
channelSize限制
也可以通过channelCheckoutTimeout
参数设置connectionFactory
的channelSize
限制,当该参数大于0时,表示最大的channel数目
=channelCacheSize
,达到channelCacheSize
的上限后,调用createChannel
的线程会阻塞,直至有空闲channel
出现或阻塞时间超过chanelCheckoutTimeout
,在超时的情况下,抛出AmqpTimeoutException
,可以设置一些Retry的策略来处理这些异常。
连接命名
通过ConnectionNameStrategy
属性设置connection
名称。
1 | connectionFactory.setConnectionNameStrategy(new ConnectionNameStrategy() { |
使用RabbitMQ Java API提供的connectionFactory
Spring AMQP也支持使用RabbitMQ Java API提供的connectionFactory
,即Rabbit Client的connectionFactory
,位于包com.rabbitmq.client.ConnectionFactory
中,通过构造器参数设置在CachingConnectionFactory
中,示例如下:
1 | ConnectionFactory factory = new ConnectionFactory(); |
自定义RabbitClient的属性
上述示例为RabbitClient的connectionFactory
—–> Spring AMQP的connectionFactiory
的转换,Spring也提供了Spring AMQP的connectionFactiory
—–> RabbitClient的connectionFactory
的转换。
比如可以通过CachingConnectionFactory设置RabbitClient connectionFactory
的属性。
示例如下:
1 | connectionFactory.getRabbitConnectionFactory().getClientProperties().put("foo", "bar"); |
连接恢复
Rabbit Client4.0后的connectionFactory
和Spring AMQP的·connectionFactory
均默认提供了自动恢复连接的机制;虽然两者的自动恢复机制是兼容的,但使用构造注入conncetionFactory
时建议关闭其中一个。
否则,在MQ服务器节点可用但连接尚未恢复时,出现AutoRecoverConnectionNotCurrentlyOpenException
异常。比如:如果在RabbitTemplate
中配置RetryTemplate
(Spring AMQP的手动恢复),甚至在故障转移到集群中的另一个代理时,可能抛出上述异常。
关闭Rabbit Client的自动恢复
由于Spring AMQP的自动恢复连接在计时器上恢复,因此可以使用SpringAMQP的恢复机制更快地恢复连接。
springboot-amqp1.7版本以后,默认关闭RabbitClient的connectionFactory
的自动重连,但是,在通过构造参数注入RabbitClient的connectionFactory
时,是没有办法默认关闭的,需要手动设置。
1 | ConnectionFactory factory = new ConnectionFactory(); |
一般地,关闭掉Rabbit Client的自动恢复,使用Spring AMQP可以满足绝大多数使用场景,而且框架提供的自动恢复机制已经很完善。
SSL连接
建议通过注入RabbitMQ client connectionFactory
的方式配置SSL连接,示例如下:
1 | ConnectionFactory factory = new ConnectionFactory(); |
避免Connection死锁
在内存不足或MQ服务端出现异常时,可能会出现连接阻塞,对于默认的CachingConnectionFactory,MQ服务端导致连接阻塞时,客户端会主动地关闭。
因此,如果生产者消费者共用同一个connectionFactory,MQ服务端导致生产者客户端与消费者客户端关闭,可能死锁的情况:生产者与消费者持有相同的连接资源时,MQ服务器异常触发生产者和消费者中断与服务端的连接,可能会出现死锁。
为避免死锁的产生,建议对于生产者和消费者分别配置不同的connectionFactory。需要注意的是:如果生产者消费者处于同一个事物时,不建议生产者消费者配置相同的connectionFactory,因为消费者(或生产者)需要复用对方的channel。
1 | "myRabbitTemplate") (name = |
Routing ConnectionFactory实现多数据源收发消息
spring AMQP提供SimpleRoutingConnectionFactory负责在运行时根据查找键动态选择connectionFactory,通常,以线程的上下文作为查找键,比如地址、vHost
等,SimpleRoutingConnectionFactory
继承了AbstractRoutingConnectionFactory
,通过SimpleResourceHolder获取当前线程的查找键。示例如下:
首先,为RabbitTemplate配置一个以vHost作为查找键的SimpleRoutingConnectionFactory
。key分别为factory1
的virtualHost
和factory2
的virtualHost
,当然,也可以使用addresses作为查找键。使用方式为factoryMap.put("#{factory1.addresses}", factory1)
1 | private SimpleRoutingConnectionFactory myFactory() { |
使用方法如下,SimpleResourceHolder的bind和unbind都是必须的,分别指获取当前线程查找键,释放查找键。bind有两个参数,第一个为待获取的connectFactory,第二个为key
1 | public class MyService { |
消费者使用多数据源的方式略有不同,首先,配置两个不同的containerFactory
1 | "myContainerFactory1") ( |
使用@RabbitListener接收消息时指定containerFactory
1 | "queue.test", containerFactory = "myContainerFactory1") (queues = |
集群中的连接管理
MQ的集群根据集群中各节点队列信息区分为镜像队列和普通队列。只有队列所在节点知道该队列的所有信息,默认情况下,MQ是普通队列,队列只存活于集群中的一个节点上,称为主队列。镜像队列与普通队列的相同点是:队列的主拷贝仅存在于一个节点上(主队列,master节点)。不同点是,镜像节点在集群中的其他节点上拥有从队列的拷贝。一旦队列主节点不可用,最老的从队列自动被选举为新的主队列。
镜像队列的原理:在非镜像队列的集群中,信道负责将消息路由至合适的队列。当加入镜像队列后,信道除了负责将消息路按照路由绑定规则路由至合适的队列外,它也要将消息投递到镜像队列的从拷贝,在某种程度上,可以将镜像队列视为拥有一个隐藏的fanout交换器,它指示着信道将消息分发到队列的从拷贝上。
无论对于普通队列还是镜像队列,所要面临的问题是:主节点崩溃时,消费者该与哪个节点建立连接。
普通队列
对于普通队列,使用CachingConnectionFactory
就足够了,它支持配置多个连接地址,当一个连接失败时,会按顺序尝试与其他地址建立连接。
1 | CachingConnectionFactory factory = new CachingConnectionFactory(); |
高可用队列(镜像队列)
- 消费者
LocalizedQueueConnectionFactory
是Spring AMQP为消费者提供的用于高可用集群的连接工厂,使用它时,MQ服务端必须设置enable management plugin。
它通过MQ服务器management 插件提供的REST API 获取哪个节点是主节点,创建CachingConnectionFactory
连接至该节点,如果连接失败,将再次调用MQ服务器management 插件提供的REST API 获取新选举的主节点。当无法获取新选举的主节点是,将通过CachingConnectionFactory
像普通队列一样按顺序连接至MQ服务器。
LocalizedQueueConnectionFactory
本质上是一个RoutingConnectionFactory,它的查找键是队列名。
正是由于查找键是对列名,必需保证查找键的唯一性,消费者的ContainerFactory
只能配置在一个队列上。
注意:
由于每次创建连接时,都要通过REST API获取连接节点,开销很大,只适用于长连接的场景(消费者),不适用短连接的场景(生产者)。
LocalizedQueueConnectionFactory
只适用于消费者,比如配置在SimpleMessageListenerContainer
中,不适用于生产者(比如将LocalizedQueueConnectionFactory
配置在RabbitTemplate
中)。
示例如下:LocalizedQueueConnectionFactory
构造方法的前三个参数 addresses
, adminUris
和nodes
都是字符串数组,LocalizedQueueConnectionFactory
使用数组下标相同的参数构建连接。
1 |
|
- 生产者
生产者本身也不需要考虑哪个节点是主节点,对于生产者来说,队列是未知的,消息会发送至集群中所有的节点。节点会判断该消息对应的主队列是否在自己的节点上。
连接和信道监听
Spring AMQP提供了connection和channel的监听器接口,通过回调实现自定义的创建和销毁监听逻辑。
ConnectionListener
1 | CachingConnectionFactory factory = new CachingConnectionFactory(); |
ChannelListener
1 | CachingConnectionFactory factory = new CachingConnectionFactory(); |
channel关闭的日志级别控制
当channel关闭时,CachingConnectionFactory
的默认日志规则如下:
- 正常的channel销毁,无log
- 队列声明失败导致的channel关闭,debug级别
- 在exclusive(独占)队列上,basic.consume因为独占使用条件而关闭channel,info级别
- 其他的channel关闭情况输出error级别log
自定义log级别:为CachingConnectionFactory配置自定义的ConditionalExceptionLogger。