可以为ListenerContainer和RabbitTemplate设置MessageConverter。这样就不用每次都写重复的消息格式转换代码了。spring提供的Message Converter均是双向的,负责将入站消息转换为特定结构(如:字节数组、序列化java对象、字符串、自定义的消息domain对象),将特定格式转换为出站消息。
消息格式
springboot-amqp涉及到两种消息格式,定义如下:
org.springframework.messaging.Message<?> message,spring框架中通用的Message。简称
spring-messaging Message。1
2
3
4public interface Message<T> {
T getPayload();
MessageHeaders getHeaders();
}spring AMQP Message,spring为了适配AMQP协议,简化接口参数引入的。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19public class Message {
private final MessageProperties messageProperties;
private final byte[] body;
public Message(byte[] body, MessageProperties messageProperties) {
this.body = body;
this.messageProperties = messageProperties;
}
public byte[] getBody() {
return this.body;
}
public MessageProperties getMessageProperties() {
return this.messageProperties;
}
}
spring-messaging Message中的payload对应了spring AMQP Message中的byte[] body,他们均是Rabbit Client 中的body,即消息内容。
spring-messaging Message中的MessageHeaders对应了spring AMQP Message中的MessageProperties,他们均是Rabbit Client 中的BasicProperties,即消息头。
因此,后文的MessgeConverter如不加特殊说明,均指的消息内容的格式转换。消息头的格式转换见2.3.7 MessagePropertiesConverter
@RabbitListener底层实现原理
在了解MessageConveter之前,有必要清楚spring底层消息处理机制,此处以最常用的@RabbitListener为例。
通过注解@RabbitListener声明一个消费者时,底层由MessagingMessageListenerAdapter的onMessage()负责处理消息。
1 | 1 public void onMessage(Message amqpMessage, Channel channel) throws Exception { |
3~4行,通过toMessagingMessage()将spring AMQP的Message转换为spring-messaging的Message。this.getMessagingMessageConverter是一个内部类的实例,内部类继承了MessagingMessageConverter,最终调用的是MessagingMessageConverter的fromMessage。完成Spring AMQP Message至spring-messaging Message的转换。
1 | protected org.springframework.messaging.Message<?> toMessagingMessage(Message amqpMessage) { |
11行通过反射处理消息,调用的是HandlerAdapter中的invoke(),为了简便,把这些调用全部放在了同一个代码块中,一般地,使用@RabbitListener时不会自定义invokHandler,所以调用的是代理的反射方法:delegatingHandler.invoke()
再继续关注下getMethodArgumentValues,包含了两部分,一部分是预设参数转换,如:Message、Channel,这个也是最开始传入Spring AMQP的Message的原因,它的作用就是作为预设参数,另一部分是Listener中消息处理的其他自定义参数,如@Payload注解、@Headers注解等声明的参数,args[i] == null时,抛出MethodArgumentResolutionException异常,这就是1.3中异常抛出的地方,参数为空。该异常会一直向上抛,直至17行被捕获,如果在Listener容器中注册了errorHandler,调用errorHandler处理异常。
还有一点值得注意的是:在整个过程中,真正作为消息载体的就是spring-messaging.Message而不是Spring AMQP的`Message。因此,消息处理的过程实际如下:
调用RabbitMQ JAVA API接收消息并封装为Spring AMQP Message,在消息处理onMessage中调用toMessagingMessage(Message amqpMessage)将消息转换至spring-messaging.Message,通过反射处理消息。
因此,消息转换实际上包含了两个过程,一个是消息的反序列化并封装为Spring AMQP Message,另一个是Spring AMQP Message与spring-messaging.Message之间的转换。后文中所指的消息转换如果不加特别说明,均指第一个转换。
1 | private Object invokeHandler(Message amqpMessage, Channel channel, org.springframework.messaging.Message<?> message) { |
已有的MessageConverter
前面提到,消息格式转换有两次,第一次转换完成序列化与反序列化的工作,被称为Message Converter,spring AMQP提供了默认的转换器SimpleMessageConverter。以反序列化为例,将Spring AMQP Message转换为字符串、序列化对象、字节数组,这次转换也是文中所指的Message Convert。反序列化源码如下:
第二次转换,springboot默认使用的是GenericMessageConverter。它是属于org.springframework.messaging.包下的,继承了该包下的SimpleMessageConverter(第一次转换的SimpleMessageConverter在org.springframework.amqp包下),默认情况下,不需要特别的设置。
spring-messaging的MessageConverter是所有消息转换器(无论是spring-messaging还是spring AMQP`)最底层的接口。
1 | package org.springframework.messaging.converter; |
SimpleMessageConverter
spring AMQP的SimpleMessageConverter实现了MessageConverter接口(最底层),是默认的消息转换器。在未给RabbitTemplate配置message conveter时,将会调用SimpleMessageConverter的fromMessage和createMessage处理消息,从源码可以看出,支持三种类型:字符串、序列化java对象,字节数组。
1 | package org.springframework.amqp.support.converter; |
SerializerMessageConverter
与SimpleMessageConverter类似,唯一不同的是,多了一个属性用来自定义序列化与反序列化规则。
Jackson2JsonMessageConverter
消息载体是网络字节序时,使用默认的SimpleMessageConverter就足够了,但是消息载体为java序列化对象application/x-java-serialized-object时,不利于跨语言和跨平台,更推荐使用JSON作为消息的载体,Jackson2JsonMessageConverter负责JSON和java bean之间转换。使用时将jsonConverter注入rabbitTemplate实例中,替换SimpleMessageConverter。在替换后,收发消息可以直接发送消息Object的实例,大大得简化了开发。
注意事项:
使用时,需要生产者额外在消息头中添加一个字段”__ TypeId __ “用于注明该消息映射的domain对象,在下方的示例中,头信息中的字段"__ TypeId __"分别"foo"和”bar“如果生产者未注明,可以为classMapper设置默认值映射domain对象,例如:classMapper.setDefaultType(MyMessage.class)。
需要生产者在消息头注明contentType为application/json或text/x-json 或者生产者也使用 Jackson2JsonMessageConverter,它会自动在消息头中声明contentType。
1 |
|
ContentTypeDelegatingMessageConverter
顾名思义,ContentTypeDelegatingMessageConverter是一个根据消息头中content-Type动态选择MessageConverter的Message Converter。当content-Type为空或根据content-Type匹配不到MessageConverter时,将Message Convert的任务委托给SimpleMessageConverter
1 | <bean id="contentTypeConverter" class="ContentTypeDelegatingMessageConverter"> |
MarshallingMessageConverter
负责Spring的Object与XML之间的转换。
为默认的Message Converter设置反序列化权限
在处理 content-type 为application/x-java-serialized-object的java序列化对象时,默认会扫描所有的packages/classes,为了提高安全性,可以设置白名单,所有的Message Converter都有一个属性whiteListPatterns,示例如下:
1 | SimpleMessageConverter messageConverter = new SimpleMessageConverter();List<String> |
注意:该属性仅在Message Converter使用DefaultDeserializer有效,即不要主动去配置DefaultDeserializer。
特殊的Conerter——MessagePropertiesConverter
前面介绍的MessageConverter负责body的转换,MessagePropertiesConverter 负责Rabbit Client的BasicProperties与Spring AMQP MessageProperties之间的转换,它的默认实现是DefaultMessagePropertiesConverter,足以满足绝多数场景下的需求。部分源码如下,仅截取了构造器和属性声明,当BasicProperties中的某一元素长度小于等于longStringLimit时,转化为MessageProperties中的String属性,当BasicProperties中的某一元素长度超过longStringLimit时,根据convertLongLongStrings判断是否需要转换为LongString,如果不需要则转换为DataInputStream
1 | public class DefaultMessagePropertiesConverter implements MessagePropertiesConverter { |