AMQP定义了普遍适用的消息通信框架,只规定了最底层的协议,有关协议的细节可以有多种实现,因此,Spring AMQP提供了AmqpTemplate
作为收发消息的模板,定义了一些常用的收发消息的接口,AmqpTemplate
是一个底层的模板,它的上层实现可以根据具体的协议细节定义其他的接口。
目前,Spring AMQP只有一个实现——RabbitTemplate
。
RabbitTemplate
可以用来收发消息,但是一般的,RabbitTemplate
只用来发送消息,接收消息使用@RabbitListener
与容器工厂。这是生产者和消费者特性导致的:生产者采用阻塞的方式,与MQ服务器之间是短连接,消费者采用非阻塞,一个线程接收线程,另外的线程处理消息,与MQ服务器之间是长连接。本章介绍RabbitTemplate
的一些属性的配置,以及如何使用RabbitTemplate
发送消息。
RabbitTemplate配置
重试机制
在MQ客户端与MQ服务器之间连接出现异常时,消息收发可能会抛出异常,为了避免频繁抛出异常,可以加入一些重试机制,在抛出异常前,重试几次,通过设置RetryTemplate
实现消息的重试。
1 |
|
生产者可靠投递
生产者是异步的,无法通过返回值知道消息是否成功发送,默认的,无法路由至MQ服务器的消息会直接被丢弃,无任何异常抛出。
ConfirmCallback
消息发送到RabbitMQ exchange
(消息发送至MQ服务器)后接收ack回调。消息只要被Rabbit服务器接收到就会发调用该回调。但是不能保证消息一定会投递至目标队列,为了确保投递至目标队列,需要设置ReturnCallback
注意:必需把CachingConnectionFactory
的PublierConfirms
设置为true
1 | CachingConnectionFactory factory = new CachingConnectionFactory(); |
ReturnCallback
消息发送至MQ服务器的exchange,但是无相应队列与该交换器绑定时的回调。发送失败的消息重新返回给生产者,在returnedMessage
方法中处理失败的消息。
注意:必需把CachingConnectionFactory
的PublierReturns
设置为true
1 | CachingConnectionFactory factory = new CachingConnectionFactory(); |
注意:为确保接收到ReturnCallback
、ConfirmCallback
,channel必须处于alive的状态,当channelCacheSize
满时,spring框架默认等待5秒然后关闭channel。所以,使用ReturnCallback
、ConfirmCallback
时要确保channelCacheSize
足够大,尽可能的确保channel处于alive状态。
(3) 消息发送的目的得是一个不存在的exchange,底层会抛出异常并关闭连接,通过addConnectionListener,实现onShutDown方法处理该异常。见2.9节。
连接池
在2.6节中,了解到为了避免connection释放出现死锁,也避免消费者因生产者阻塞而阻塞,生产者和消费者应使用独立的连接池,springboot2.0.2版本后,CachingConnectionFactory
内部有两个连接池,一个是生产者的一个是消费者,但是要显示设置usePublisherConnection
为ture,默认是false。
1 | RabbitTemplate rabbitTemplate = new RabbitTemplate(); |
注意:如果是事务性的,此设置无效,生产者和消费者还是会共用同一个连接池。
生产消息
构建消息
可以直接发送字节数组,SimpleMessageConverter
默认地封装消息的ContenType、Hearder、MessageId
等,但有时,想要控制这些属性的生成规则,需要通过MessageBuiler
和MessagePropertiesBuilder
构建Spring AMQP Message
。
1 | MessageProperties props = MessagePropertiesBuilder.newInstance() |
构建CorrelationData
CorrelationData
是与消息关联的数据,比如,通过设置分布式的唯一id作为CorrelationData
的id,即消息ID。可以实现消费端的去重的功能。它的定义如下,只有一个属性:id。
1 | public class CorrelationData implements Correlation { |
生产者设置CorrelationData:RabbitTemplate
的send()
方法或为rabbitTemplate
配置setCorrelationDataPostProcessor
1 | public void send(String exchange, String routingKey, Message message, CorrelationData correlationData) throws AmqpException {} |
消费者获取CorrelationData:
在Spring AMQP Message
中获取。
1 | String id = message.getMessageProperties().getCorrelationId(); |
统一修改消息和CorrelationData
在消息Id自定义、消息属性统一修改、消息内容统一修改的场景下,统一的定义修改方法有利于编码和debug。RabbitTemplate
比AmqpTemplate
在所有send方法底层实现上都新增了关联数据correlationDate
信息,correlationDate
也是3.1.2中提到的消息可靠投递的支撑。
correlationDate
和Message
的统一修改示例如下。可以用
1 | RabbitTemplate rabbitTemplate = new RabbitTemplate(); |
发送消息
消息的发送很简答,RabbitTemplate
提供了众多接口供发送消息。主要有两大类:send()、convertAndSend()
。如果使用是字节数组等非Message类型的参数作为消息,考虑采用convertAndSend()
。如果使用的是构建好的Message,使用两者皆可。send()
接口定义如下。
1 | void send(Message message) throws AmqpException; |
如果多除调用send()
,可以统一为rabbitTemplate
配置routingKey、exchange
,这样直接调用第一个方法就可以直接实现消息的发送,这种应用场景更多的在rabbitTemplate
只对应一个队列。
1 | amqpTemplate.setExchange("marketData.topic"); |
使用默认的exchange(简单模式,routingKey==queuename),发送至固定队列
1 | RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange |
与上面效果一样。使用默认的exchange,发送至固定队列
1 | RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange |