springboot+RabbitMQ系列(三)消费者

消息接收有两种模式,简单的模式是消费者不断地去轮询,轮询到一条就消费一条,复杂些的模式是注册一个异步Listener,由容器负责接收消息并选择对应的Listener处理消息。

轮询模式消费者

springboot-amqp中,可使用AmqpTemplate(rabbitTemplate)实现轮询模式接收消息,默认是阻塞的,有消息时就拉去消息,没有消息时,立刻返回null。

从springboot1.5版本以后,可以设置单次轮8询的超时时间,即消费者接收消息的阻塞时间,超时时间设置为负值,意味着无限期阻塞。并发量高时,建议使用异步Listener或将超时时间设为0.

如果需要转换消息格式,需要预先为AmpqTemplate设置MessageConverter,然后调用receiveAndConvert()方法接收消息。

如果需要Replay,可以在调用amqpTemplate.receiveAndReply()接口时传入ReceiveAndReplyCallBack

异步消费者

异步消费者中有一个预取消息(prefetch)的概念,即一个消费者预取一定数目的消息,这可能会导致多消费者情况下其他消费者利用率不足。springboot 2.0之前,预取消息默认值是1,spring boot 2.0以后,默认值为250, 预取值的设置取决于你的业务,要尽可能保证所有消费者的高效运行从而提升吞吐量。比如:

  1. 当单条消息体很大,消息处理的又比较慢时,预取值如果设置的过大,将导致客户端内存占用率飙升。
  2. 如果严格的要求执行顺序时,建议预取值设置为1
  3. 在消息吞吐量不高、消费者又多时,预取值设置的过大会导致消费者利用率不足。
  4. 在手动确认的模式下,预取值应该设置为1,如果prefech不为1,basicAck是异步的操作,如果出现异常时,消费者会继续处理其他预取消息,但是不会ack(批量成功时才会ack),因此,其他的消息处于unack的状态,其他的消费者会重新获取该消息,消息会出现重复消费的情况。

Message Consuming callback

异步消费是通过回调实现的,消息的消费逻辑在回调方法onMessage()中实现,springboot AMQP提供了两个回调接口,MessageListener、ChannelAwareMessageListener,究竟用哪个取决于你是否需要获取channel信息,比如手动ack时必须要有channel才可以。

MessageListenerAdapter

接口回调已经可以实现消息的消费了,这还不够灵活,如果业务需要动态的指定queue或tag对应哪个methodName时,可以继承MessageListenerAdapter。它包含了下面的构造器,使用该构造器时,与前面提到的接口回调本质上是一样的。

1
2
3
4
5
public MessageListenerAdapter(Object delegate) {    
this.queueOrTagToMethodName = new HashMap();
this.defaultListenerMethod = "handleMessage";
this.doSetDelegate(delegate);
}

delegate(代理)即自定义的消费者bean,它必须是ChannelAwareMessageListener或MessageListener的实例,否则,它不会生效。

1
2
3
4
5
6
7
8
9
10
11
12
13
Object delegate = this.getDelegate();
if (delegate != this) {
if (delegate instanceof ChannelAwareMessageListener) {
if (channel != null) { ((ChannelAwareMessageListener)delegate).onMessage(message, channel); return;
}
if (!(delegate instanceof MessageListener)) {
throw new AmqpIllegalStateException("MessageListenerAdapter cannot handle a ChannelAwareMessageListener delegate if it hasn't been invoked with a Channel itself");
}
}
if (delegate instanceof MessageListener) { ((MessageListener)delegate).onMessage(message);
return;
}
}

动态的指定消息处理method由下面两个接口实现。

1
2
3
4
5
6
7
public void setDefaultListenerMethod(String defaultListenerMethod) {
this.defaultListenerMethod = defaultListenerMethod;
}

public void setQueueOrTagToMethodName(Map<String, String> queueOrTagToMethodName) {
this.queueOrTagToMethodName.putAll(queueOrTagToMethodName);
}

它们在getListenerMethodName中被调用,如果你没有定义queueOrTagToMethodName,那么将会调用你设置的defaultListenerMethod,如果你都没有设置,那么默认值是“handleMessage”。

1
2
3
4
5
6
7
8
9
10
11
12
13
protected String getListenerMethodName(Message originalMessage, Object extractedMessage) throws Exception {    
if (this.queueOrTagToMethodName.size() > 0) {
MessageProperties props = originalMessage.getMessageProperties();
String methodName = (String)this.queueOrTagToMethodName.get(props.getConsumerQueue());
if (methodName == null) {
methodName = (String)this.queueOrTagToMethodName.get(props.getConsumerTag()); }
if (methodName != null) {
return methodName;
}
}

return this.getDefaultListenerMethod();
}

容器完成了回调

前面介绍了如何实现异步消费的回调接口,而真正完成回调的是容器(container),容器是有生命周期的,如启动、运行、停止,容器本质上就是桥接AMQP queue和MessageListener的实例。因此,如果想实现消费者的功能,必须为容器配置connectionFactory、队列、MessageLisener, 即告知容器:如何连接MQ服务器、成功连接后哪个队列的消息应该交给哪个消费者。

在springboot 2.0之前,只有一种容器:SimpleMessageListenerContainer。在springboot2.0以后,新增了一种容器:DirectMessageListenerContainer。二者的区别就是消费者的线程与RabbitMQ客户端线程是否共用,对于SimpleMessageListenerContainer,每一个消费者配置一个线程,如果为容器配置了多个队列,可能会使用同一个线程处理多个队列(消费者数量<队列数)。并发性能取决于你设置的消费者的数conCurrentConsumer,它等于消费者线程数。

一个消息的后半生是这样的:spring-amqp负责将消息从MQ服务器传递给消费者,springboot-amqp提供了默认的容器,用来从MQ服务器接收消息,我们提供的消费者完成onMessage()的消息处理逻辑,并将它注册到容器上。

当消息从RabbitMQ客户端传递过来时,客户端线程通过队列将消息传递给消费线程(消息处理线程)。这是由于早期MQ客户端不支持并发传递消息,一个队列只会有一个线程传递消息,更不可能让它完成消息的处理,这个机制的设置当然是低效的,会增加线程之间切换的开销。

在新版本后,MQ客户端已经支持并发了,完全可以使用MQ客户端的线程完成消息的接收、处理工作,在DirectMessageListenerContainer中,不再区分客户端线程和消费线程,并发的控制由参数consumersPerQueue控制,不再使用conCurrentConsumer、maxConCurrentConsumer、txSize(事务大小,一次事务中传递txSize条消息,用来减少ack的次数,这个参数>1且消息消费出现异常时,会导致同一个事务中后续的消息重复消费)。DirectMessageListenerContainer中提供了messagesPerAck,但是它不是事务,每一条消息都有一个独立的事务用来传递和确认,出现异常时,后续的消息会一直处于unack的状态,所以,不会重复消费。

自定义容器

前面介绍了两种容器,而且springboot-amqp会提供默认地容器,如果想个性化的设置或有需要设置多种容器时,就需要考虑自定义容器了,在实际项目中,建议使用自定义容器。

现在以SimpleMessageListenerContainer为例介绍下容器的使用方式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Configuration
public class ExampleAmqpConfiguration {

@Bean
public SimpleMessageListenerContainer messageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory());
container.setQueueName("some.queue");
container.setMessageListener(exampleListener());
return container;
}

@Bean
public ConnectionFactory rabbitConnectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}

@Bean
public MessageListener exampleListener() {
return new MessageListener() {
public void onMessage(Message message) {
System.out.println("received: " + message);
}
};
}
}

还有一种方式是使用@RabbitListener注解+Java Config。

1
2
@RabbitListener(queues = "example.queue", containerFactory = "exampleContainer")
public void onMessage(Message message, Channel channel) throws Exception {}

容器工厂

容器工厂专门用来配合@RabbitListener使用,前面提到,springboot为@RabbitListener提供了默认的容器,但为了个性化的设置,建议自定义容器工厂,然后在@RabbitListener中设置”containerFactory“属性。容器和容器工厂的对应如下:

  1. SimpleMessageListenerContainer 对应 SimpleRabbitListenerContainerFactory
  2. DirectMessageListenerContainer 对应 DirectRabbitListenerContainerFactory
1
2
3
4
5
6
7
8
9
10
11
@Configuration
public class AppConfig {
@Bean(name="myContainerFactory")
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrentConsumers(3);
factory.setMaxConcurrentConsumers(10);
return factory;
}
}
1
2
@RabbitListener(queues = ”myqueue“, containerFactory = "myContainerFactory")
public void onMessage(Message message, Channel channel) throws Exception {}

容器属性

前文涉及的conCurrentConsumer、connectionFactory都是容器的属性,还有其他常用的属性:

Springboot2.0.3.RELEASE容器属性介绍

channelTransacted

是否在事务中ack(确认)所有消息。为true时表示需要确认事务中的所有消息。

transactionManager

提供给Listener的外部事务管理器,是channelTransacted的补充,如果channel是事务的,它的事务会与外部事务进行同步。

acknowledgeMode

  • NONE 配合channelTransacted=false使用,不发送ACK,MQ服务器认为所有的消息都会被确认,所以 在RabbitMQ中称为自动ACK,但在springboot中,称为NONE ACK,视角不同导致的称谓不同。
  • MANUAL 消费者必须手动确认所有的消息,包括异常情况
  • AUTO 容器自动确认消息,除非Listener抛出容器无法自动处理的异常。该模式也是channelTransacted为true时的默认模式。追求并发时可以配合使用。

prefetchCount

每个消费者能够持有的未ack的消息数,该值越大消息传输给消费者的速度越快。该参数越大,消息的顺序处理性越差。值得注意的是:在AcknowledgeMode.NONE模式下,该参数的设置是无效的,这是由于该模式下根本不存在ack。

txSize

适用于SimpleMessageListenerContainer,该参数仅在AcknowledgeMode.AUTO模式下生效,容器在发送一次ack之前批量处理txSize条消息,这一批消息处于同一个事务中,会一直等待它们到超时时间,如果prefetchCount小于txSize,会自动将prefetchCount设置的与txSize相等。只能用于channelTransacted为true的场景下。

messagePerAck

适用于DirectMessageListenerContainer,容器在两次ack之间处理的消息数目,目的是减少向MQ服务器发送ack的次数,代价就是在出现异常时,增大重传消息的可能性,往往用在高并发场景下。在出现异常时(比如拒绝了一批消息中的某一条),则其他消息不管有没有消费完都会被ack,异常的消息被拒绝。所以它不能用在channelTransacted为true的场景。

errorHandler

自定义未捕获的异常的处理机制,默认使用ConditionalRejectingErrorHandler

@RabbitListener

异步接收消息最简单的方式是通过注解实现,前面的@RabbitListener即是,底层是MessagingMessageListenerAdapter实现的,使用注解,不需要指定methodName,因为@RabbitListener已经注明了该方法用来接收消息。在一个类中,可以定义多个不同的Listener,如下:

processOrder中,使用@QueueBinding声明了队列、routingkey、exchange以及他们的绑定关系

processInvoice中,绑定、声明了一个匿名队列,也可以声明多个@QueueBinding

1
2
3
4
5
6
7
@RabbitListener(bindings = {@QueueBinding(value = @Queue(value = "myqueue",         
durable = "true",
autoDelete = "false"),
exchange = @Exchange(value = "ss")),
@QueueBinding(value = @Queue(value = "myqueue2"),
exchange = @Exchange(value = "ss1"))})
public void rcv(Message message, Channel channel) {}

handleWithSimpleDeclare中,没有声明exchange,routingkey,使用默认地exchange,routingkey与队列名称相同,是direct模式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Component
public class MyService {

@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "myQueue", durable = "true"),
exchange = @Exchange(value = "auto.exch", ignoreDeclarationExceptions = "true"),
key = "orderRoutingKey")
)
public void processOrder(Order order) {
...
}

@RabbitListener(bindings = @QueueBinding(
value = @Queue,
exchange = @Exchange(value = "auto.exch"),
key = "invoiceRoutingKey")
)
public void processInvoice(Invoice invoice) {
...
}

@RabbitListener(queuesToDeclare = @Queue(name = "${my.queue}", durable = "true"))
public String handleWithSimpleDeclare(String data) {
...
}

}

在多个方法使用同种@RabbitListener时,可以自定义元注解(常用于广播fanout模式),比如下面的例子,就使用了一个自动删除(默认地,如果不想自动删除,需要设置auto-delete为false)、匿名、广播模式的队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Target({ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@RabbitListener(bindings = @QueueBinding(
value = @Queue,
exchange = @Exchange(value = "metaFanout", type = ExchangeTypes.FANOUT)))
public @interface MyAnonFanoutListener {
}



// 使用元注解
public class MetaListener {

@MyAnonFanoutListener
public void handle1(String foo) {
...
}

@MyAnonFanoutListener
public void handle2(String foo) {
...
}

}

@RabbitListener异常处理

Springboot2.0版之后,@RabbitListener注解新增了errorHandler和returnException属性,默认是无配置的。自定义errorHandler,需要实现RabbitListenerErrorHander接口, 并将其配置在@RabbitListener上。第二个参数messaging.Message是Message Converter产生的,ListenerExecutionFailedException是Listener抛出的。可以在自定义的handleError中处理异常,或抛出其他异常至容器,默认地,如果没有自定义errorHandler,异常将会抛至容器中。由容器的errorHandler处理,见后文。

1
2
3
4
5
6
@FunctionalInterface
public interface RabbitListenerErrorHandler {

Object handleError(Message amqpMessage, org.springframework.messaging.Message<?> message,ListenerExecutionFailedException exception) throws Exception;

}

returnException属性为“true”时,表示异常需要通知到生产者,一般地,不需要设置该属性,毕竟引入MQ的目的是解耦。

容器的errorHandler

前面提到,没有自定义errorHandler时,异常会抛至容器默认异常处理器ConditionalRejectingErrorHandler,包含两个构造器,一个无参,一个含参,通过含参构造器可以自定义异常处理策略。若使用的无参构造器,则默认使用内部类DefaultExceptionStrategy中定义的异常处理策略。

ConditionalRejectingErrorHandler源码如下,11行是处理异常的接口,在13行中,异常如果不是AmqpRejectAndDontRequeueException而且是致命异常时,会抛出AmqpRejectAndDontRequeueException,致命异常的判断在37行,首先判断异常产生的原因:

MessagingException位于spring-messiging包下的,是异常MessageConversionException、MethodArgumentResolutionException、MessageDeliveryException、MessageHandlingException、DestinationResolutionException、MethodArgumentNotValidException、MethodArgumentTypeMismatchException、MissingSessionUserException的父类。

ListenerExecutionFailedException是所有异常被抛出时的最上层栈信息,所有异常都以该形式抛出。

39~42行是一个for循环,遍历异常产生的栈信息一层层解析异常产生的原因,一旦有MessageConversionException、MethodArgumentResolutionException就跳出循环,处理该异常,如果遍历完整个循环都没有出现,则处理最后一个异常(42行cause = cause.getCause())。因此,如果有多个异常均需要处理时,建议重写isFatal方法

59-67行判断cause是否致命,有六种:

  • Spring AMQP的MessageConversionException异常
  • spring-messaging的MessageConversionException异常
  • spring-messaging的MethodArgumentResolutionException异常
  • NoSuchMethodException异常
  • ClassCastException异常
  • 自定义的异常isUserCauseFatal
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package org.springframework.amqp.rabbit.listener;

// omit the package import for brevity

1 public class ConditionalRejectingErrorHandler implements ErrorHandler {
2 protected final Log logger = LogFactory.getLog(this.getClass());
3 private final FatalExceptionStrategy exceptionStrategy;
4 public ConditionalRejectingErrorHandler() {
5 this.exceptionStrategy = new
6 ConditionalRejectingErrorHandler.DefaultExceptionStrategy();
7 }
8 public ConditionalRejectingErrorHandler(FatalExceptionStrategy exceptionStrategy) { 9 this.exceptionStrategy = exceptionStrategy; 10 }
11 public void handleError(Throwable t) {
12 this.log(t);
13 if (!this.causeChainContainsARADRE(t) && this.exceptionStrategy.isFatal(t)) { 14 throw new AmqpRejectAndDontRequeueException("Error Handler converted 15 exception to fatal", t);
16 }
17 }
18 protected void log(Throwable t) {
19 if (this.logger.isWarnEnabled()) {
20 this.logger.warn("Execution of Rabbit message listener failed.", t);
21 }
22 }
23 protected boolean causeChainContainsARADRE(Throwable t) {
24 for(Throwable cause = t.getCause(); cause != null; cause = cause.getCause()) { 25 if (cause instanceof AmqpRejectAndDontRequeueException) {
26 return true;
27 }
28 }
29 return false;
30 }
31
32 public static class DefaultExceptionStrategy implements FatalExceptionStrategy { 33
34 protected final Log logger = LogFactory.getLog(this.getClass());
35 public DefaultExceptionStrategy() {
36 }
37 public boolean isFatal(Throwable t) {
38 Throwable cause;
39 for(cause = t.getCause(); cause instanceof MessagingException
40 && !(cause instanceof MessageConversionException)
41 && !(cause instanceof MethodArgumentResolutionException);
42 cause = cause.getCause()) {
43 ;
44 }
45 if (t instanceof ListenerExecutionFailedException
46 && this.isCauseFatal(cause)) {
47 if (this.logger.isWarnEnabled()) {
48 this.logger.warn("Fatal message conversion error; message rejected; 49 it will be dropped or routed to a dead letter exchange, if so 50 configured: " + ((ListenerExecutionFailedException)t)
51 .getFailedMessage());
52 }
53 return true;
54 } else {
55 return false;
56 }
57 }
58
59 private boolean isCauseFatal(Throwable cause) {
60 return cause instanceof
61 org.springframework.amqp.support.converter.MessageConversionException
62 || cause instanceof MessageConversionException
63 || cause instanceof MethodArgumentResolutionException
64 || cause instanceof NoSuchMethodException
65 || cause instanceof ClassCastException
66 || this.isUserCauseFatal(cause);
67 }
68
69 protected boolean isUserCauseFatal(Throwable cause) {
70 return false;
71 }
72 }
73 }

当然,也可以选择完全自定义异常处理。

1
2
3
4
5
6
listenerContainerFactory.setErrorHandler(new ErrorHandler() {    
@Override
public void handleError(Throwable t) {
// do something handle error
}
});

补充说明

对于消息的异常处理springboot2.0.3还不够完善。比如使用的@RabbitListener、AcknowledgeMent.MANUAL发送一条空的消息,这条消息无法到达自定义的onMessage()方法,提前抛出了异常,若尝试通过RabbitListenerErrorHandler处理异常,并按照下面的方式自定义了一个errorHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Bean(name = "myErrorHandler")
public RabbitListenerErrorHandler rabbitListenerErrorHandler() {
return new RabbitListenerErrorHandler() {
@Override
public Object handleError(Message message,
org.springframework.messaging.Message<?> message1,
ListenerExecutionFailedException e) throws Exception { // 如果消息是空的,这条消息不再归队
if (message.getBody().length == 0) {
Channel channel = message1.getHeaders().get(AmqpHeaders.CHANNEL,
Channel.class);
logger.error("rcv error");
Channel channel1 = (Channel)message1.getHeaders().getReplyChannel();
Channel channel2 = (Channel)message1.getHeaders().getReplyChannel();

logger.error("channel1={}", channel);
logger.error("channel2={}", channel2);
if (channel == null) {
logger.error(”channel null,can not send ack...“)
throw e;
}
channel.basicReject(message1.getHeaders().get(AmqpHeaders.DELIVERY_TAG,
Long.class), false);
throw new AmqpRejectAndDontRequeueException("msg format error");
}
else {
throw e;
}
}
};
}

发送10条empty的消息,打印了10次:channel null,can not send ack…

这10条消息一直处于unack的状态,消息并没有被reject,而是一直处于unack的状态,但是该消息是无意义的,不应该再重新入队.

原因:
springboot ListenerContainer负责传递消息给消费者,容器通过反射调用自定义的Listener并处理消息时出现参数错误异常,message转byte异常报错。
由于方法参数反射错误,无法调用到onMessage方法,又采用的手动确认的方式,导致没办法通过channel.basicReject拒绝该消息,所以这条消息会一直处于unack的状态。

解决的方法有四种:
1、升级springboot至2.1.6版本,在Listener容器中注册RabbitListenerErrorHandler,该版本中,可以通过org.springframework.messaging.Message获取channel信息。通过channel拒绝该消息。在之前的版本中,获取到的channel都是null(如代码所示,channel、channel1、channel2均为null,说明spring没有将channel信息封装在org.springframework.messaging.Message),无法给MQ服务器发送ACK。
2、修改onMessage()的参数,使用Message类型作为消息的载体,不再使用byte、string等其他类型,定义MessageConverter或使用默认的MessageConverter实现消息格式转换。
3、使用springboot的Acknowledge.AUTO模式,该模式下Listener容器会自动发送ACK给MQ服务器

4、自定义MessageConveter

最后使用了方法2,springboot升级会带来兼容性的问题,比如数据库驱动、数据库版本等,方法3中交给容器自动确认更适合高并发设置多个消费者同时消费1个队列、添加一些批量拉取消息、批量事务处理的场景,该场景会牺牲消息的有序性、异常时可能会有重复消费的问题。我们现在的业务场景更偏向于保证消息的可靠性,一个队列只会有一个消费者,消费一条拉取一条,消息处理的线程池也是自定义的,这样的方式更灵活稳定。所以最后否定了方法3,选用方案2