springboot+RabbitMQ系列(二)生产者

AMQP定义了普遍适用的消息通信框架,只规定了最底层的协议,有关协议的细节可以有多种实现,因此,Spring AMQP提供了AmqpTemplate作为收发消息的模板,定义了一些常用的收发消息的接口,AmqpTemplate是一个底层的模板,它的上层实现可以根据具体的协议细节定义其他的接口。

目前,Spring AMQP只有一个实现——RabbitTemplate

RabbitTemplate可以用来收发消息,但是一般的,RabbitTemplate只用来发送消息,接收消息使用@RabbitListener与容器工厂。这是生产者和消费者特性导致的:生产者采用阻塞的方式,与MQ服务器之间是短连接,消费者采用非阻塞,一个线程接收线程,另外的线程处理消息,与MQ服务器之间是长连接。本章介绍RabbitTemplate的一些属性的配置,以及如何使用RabbitTemplate发送消息。

RabbitTemplate配置

重试机制

在MQ客户端与MQ服务器之间连接出现异常时,消息收发可能会抛出异常,为了避免频繁抛出异常,可以加入一些重试机制,在抛出异常前,重试几次,通过设置RetryTemplate实现消息的重试。

1
2
3
4
5
6
7
8
9
10
11
12
@Bean
public AmqpTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
RetryTemplate retryTemplate = new RetryTemplate();
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(500);
backOffPolicy.setMultiplier(10.0);
backOffPolicy.setMaxInterval(10000);
retryTemplate.setBackOffPolicy(backOffPolicy);
template.setRetryTemplate(retryTemplate);
return template;
}

生产者可靠投递

生产者是异步的,无法通过返回值知道消息是否成功发送,默认的,无法路由至MQ服务器的消息会直接被丢弃,无任何异常抛出。

ConfirmCallback

消息发送到RabbitMQ exchange(消息发送至MQ服务器)后接收ack回调。消息只要被Rabbit服务器接收到就会发调用该回调。但是不能保证消息一定会投递至目标队列,为了确保投递至目标队列,需要设置ReturnCallback

注意:必需把CachingConnectionFactoryPublierConfirms设置为true

1
2
3
4
5
6
7
8
9
10
11
12
13
14
CachingConnectionFactory factory = new CachingConnectionFactory();
RabbitTemplate rabbitTemplate = new RabbitTemplate();
factory.setPublisherConfirms(true);
rabbitTemplate.setConnectionFactory(factory);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) { if(ack){
logger.info("successed send msg to MQ server {}", correlationDate)
} else {
logger.info("failed send msg to MQ server caused by {}", cause)
}

}
});

ReturnCallback

消息发送至MQ服务器的exchange,但是无相应队列与该交换器绑定时的回调。发送失败的消息重新返回给生产者,在returnedMessage方法中处理失败的消息。

注意:必需把CachingConnectionFactoryPublierReturns设置为true

1
2
3
4
5
6
7
8
9
10
11
12
13
CachingConnectionFactory factory = new CachingConnectionFactory();
RabbitTemplate rabbitTemplate = new RabbitTemplate();

factory.setPublisherReturns(true);

rabbitTemplate.setConnectionFactory(factory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
// process message
}
});

注意:为确保接收到ReturnCallbackConfirmCallback,channel必须处于alive的状态,当channelCacheSize满时,spring框架默认等待5秒然后关闭channel。所以,使用ReturnCallbackConfirmCallback时要确保channelCacheSize足够大,尽可能的确保channel处于alive状态。

(3) 消息发送的目的得是一个不存在的exchange,底层会抛出异常并关闭连接,通过addConnectionListener,实现onShutDown方法处理该异常。见2.9节。

连接池

在2.6节中,了解到为了避免connection释放出现死锁,也避免消费者因生产者阻塞而阻塞,生产者和消费者应使用独立的连接池,springboot2.0.2版本后,CachingConnectionFactory内部有两个连接池,一个是生产者的一个是消费者,但是要显示设置usePublisherConnection为ture,默认是false。

1
2
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setUsePublisherConnection(true);

注意:如果是事务性的,此设置无效,生产者和消费者还是会共用同一个连接池。

生产消息

构建消息

可以直接发送字节数组,SimpleMessageConverter默认地封装消息的ContenType、Hearder、MessageId等,但有时,想要控制这些属性的生成规则,需要通过MessageBuilerMessagePropertiesBuilder构建Spring AMQP Message

1
2
3
4
5
6
7
8
MessageProperties props = MessagePropertiesBuilder.newInstance()
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setMessageId("123")
.setHeader("bar", "baz")
.build();
Message message = MessageBuilder.withBody("foo".getBytes())
.andProperties(props)
.build();

构建CorrelationData

CorrelationData是与消息关联的数据,比如,通过设置分布式的唯一id作为CorrelationData的id,即消息ID。可以实现消费端的去重的功能。它的定义如下,只有一个属性:id。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class CorrelationData implements Correlation {    
private volatile String id;
public CorrelationData() {
}

public CorrelationData(String id) {
this.id = id;
}

public String getId() {
return this.id;
}

public void setId(String id) {
this.id = id;
}

public String toString() {
return "CorrelationData [id=" + this.id + "]";
}
}

生产者设置CorrelationDataRabbitTemplatesend()方法或为rabbitTemplate配置setCorrelationDataPostProcessor

1
public void send(String exchange, String routingKey, Message message, CorrelationData correlationData) throws AmqpException {}

消费者获取CorrelationData

在Spring AMQP Message中获取。

1
String id = message.getMessageProperties().getCorrelationId();

统一修改消息和CorrelationData

在消息Id自定义、消息属性统一修改、消息内容统一修改的场景下,统一的定义修改方法有利于编码和debug。RabbitTemplateAmqpTemplate在所有send方法底层实现上都新增了关联数据correlationDate信息,correlationDate也是3.1.2中提到的消息可靠投递的支撑。

correlationDateMessage的统一修改示例如下。可以用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setCorrelationDataPostProcessor(new CorrelationDataPostProcessor() {
@Override
public CorrelationData postProcess(Message message, CorrelationData correlationData)
{
// modify correlationDate
correlationData.setId();
}
});

rabbitTemplate.setBeforePublishPostProcessors(new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// modify message
}
});

发送消息

消息的发送很简答,RabbitTemplate提供了众多接口供发送消息。主要有两大类:send()、convertAndSend()。如果使用是字节数组等非Message类型的参数作为消息,考虑采用convertAndSend()。如果使用的是构建好的Message,使用两者皆可。send()接口定义如下。

1
2
3
4
5
void send(Message message) throws AmqpException;

void send(String routingKey, Message message) throws AmqpException;

void send(String exchange, String routingKey, Message message) throws AmqpException;

如果多除调用send(),可以统一为rabbitTemplate配置routingKey、exchange,这样直接调用第一个方法就可以直接实现消息的发送,这种应用场景更多的在rabbitTemplate只对应一个队列。

1
2
3
amqpTemplate.setExchange("marketData.topic");
amqpTemplate.setRoutingKey("quotes.nasdaq.FOO");
amqpTemplate.send(new Message("12.34".getBytes(), someProperties));

使用默认的exchange(简单模式,routingKey==queuename),发送至固定队列

1
2
RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange
template.send("queue.helloWorld", new Message("Hello World".getBytes(), someProperties));

与上面效果一样。使用默认的exchange,发送至固定队列

1
2
3
RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange
template.setRoutingKey("queue.helloWorld"); // but we'll always send to this Queue
template.send(new Message("Hello World".getBytes(), someProperties));