可以为ListenerContainer
和RabbitTemplate
设置MessageConverter
。这样就不用每次都写重复的消息格式转换代码了。spring提供的Message Converter均是双向的,负责将入站消息转换为特定结构(如:字节数组、序列化java对象、字符串、自定义的消息domain对象),将特定格式转换为出站消息。
消息格式
springboot-amqp涉及到两种消息格式,定义如下:
org.springframework.messaging.Message<?> message,spring框架中通用的Message。简称
spring-messaging Message
。1
2
3
4public interface Message<T> {
T getPayload();
MessageHeaders getHeaders();
}spring AMQP Message
,spring为了适配AMQP协议,简化接口参数引入的。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19public class Message {
private final MessageProperties messageProperties;
private final byte[] body;
public Message(byte[] body, MessageProperties messageProperties) {
this.body = body;
this.messageProperties = messageProperties;
}
public byte[] getBody() {
return this.body;
}
public MessageProperties getMessageProperties() {
return this.messageProperties;
}
}
spring-messaging Message
中的payload
对应了spring AMQP Message
中的byte[] body
,他们均是Rabbit Client 中的body,即消息内容。
spring-messaging Message
中的MessageHeaders
对应了spring AMQP Message
中的MessageProperties
,他们均是Rabbit Client 中的BasicProperties
,即消息头。
因此,后文的MessgeConverter如不加特殊说明,均指的消息内容的格式转换。消息头的格式转换见2.3.7 MessagePropertiesConverter
@RabbitListener底层实现原理
在了解MessageConveter之前,有必要清楚spring底层消息处理机制,此处以最常用的@RabbitListener
为例。
通过注解@RabbitListener
声明一个消费者时,底层由MessagingMessageListenerAdapter
的onMessage()
负责处理消息。
1 | 1 public void onMessage(Message amqpMessage, Channel channel) throws Exception { |
3~4行,通过toMessagingMessage()
将spring AMQP的Message转换为spring-messaging的Message。this.getMessagingMessageConverter
是一个内部类的实例,内部类继承了MessagingMessageConverter
,最终调用的是MessagingMessageConverter
的fromMessage
。完成Spring AMQP Message至spring-messaging Message的转换。
1 | protected org.springframework.messaging.Message<?> toMessagingMessage(Message amqpMessage) { |
11行通过反射处理消息,调用的是HandlerAdapter
中的invoke()
,为了简便,把这些调用全部放在了同一个代码块中,一般地,使用@RabbitListener
时不会自定义invokHandler
,所以调用的是代理的反射方法:delegatingHandler.invoke()
再继续关注下getMethodArgumentValues
,包含了两部分,一部分是预设参数转换,如:Message
、Channel
,这个也是最开始传入Spring AMQP的Message
的原因,它的作用就是作为预设参数,另一部分是Listener中消息处理的其他自定义参数,如@Payload注解、@Headers注解等声明的参数,args[i] == null
时,抛出MethodArgumentResolutionException
异常,这就是1.3中异常抛出的地方,参数为空。该异常会一直向上抛,直至17行被捕获,如果在Listener容器中注册了errorHandler
,调用errorHandler
处理异常。
还有一点值得注意的是:在整个过程中,真正作为消息载体的就是spring-messaging.Message
而不是Spring AMQP的`Message。因此,消息处理的过程实际如下:
调用RabbitMQ JAVA API接收消息并封装为Spring AMQP Message
,在消息处理onMessage中调用toMessagingMessage(Message amqpMessage)
将消息转换至spring-messaging.Message
,通过反射处理消息。
因此,消息转换实际上包含了两个过程,一个是消息的反序列化并封装为Spring AMQP Message
,另一个是Spring AMQP Message
与spring-messaging.Message
之间的转换。后文中所指的消息转换如果不加特别说明,均指第一个转换。
1 | private Object invokeHandler(Message amqpMessage, Channel channel, org.springframework.messaging.Message<?> message) { |
已有的MessageConverter
前面提到,消息格式转换有两次,第一次转换完成序列化与反序列化的工作,被称为Message Converter
,spring AMQP提供了默认的转换器SimpleMessageConverter
。以反序列化为例,将Spring AMQP Message转换为字符串、序列化对象、字节数组,这次转换也是文中所指的Message Convert。反序列化源码如下:
第二次转换,springboot默认使用的是GenericMessageConverter。它是属于org.springframework.messaging.包下的,继承了该包下的SimpleMessageConverter(第一次转换的SimpleMessageConverter在org.springframework.amqp包下),默认情况下,不需要特别的设置。
spring-messaging的MessageConverter
是所有消息转换器(无论是spring-messaging
还是spring AMQP`)最底层的接口。
1 | package org.springframework.messaging.converter; |
SimpleMessageConverter
spring AMQP的SimpleMessageConverter
实现了MessageConverter
接口(最底层),是默认的消息转换器。在未给RabbitTemplate
配置message conveter时,将会调用SimpleMessageConverter
的fromMessage
和createMessage
处理消息,从源码可以看出,支持三种类型:字符串、序列化java对象,字节数组。
1 | package org.springframework.amqp.support.converter; |
SerializerMessageConverter
与SimpleMessageConverter
类似,唯一不同的是,多了一个属性用来自定义序列化与反序列化规则。
Jackson2JsonMessageConverter
消息载体是网络字节序时,使用默认的SimpleMessageConverter
就足够了,但是消息载体为java序列化对象application/x-java-serialized-object
时,不利于跨语言和跨平台,更推荐使用JSON作为消息的载体,Jackson2JsonMessageConverter
负责JSON和java bean之间转换。使用时将jsonConverter
注入rabbitTemplate
实例中,替换SimpleMessageConverter
。在替换后,收发消息可以直接发送消息Object的实例,大大得简化了开发。
注意事项:
使用时,需要生产者额外在消息头中添加一个字段”__ TypeId __ “
用于注明该消息映射的domain对象,在下方的示例中,头信息中的字段"__ TypeId __"
分别"foo"
和”bar“
如果生产者未注明,可以为classMapper设置默认值映射domain对象,例如:classMapper.setDefaultType(MyMessage.class)
。
需要生产者在消息头注明contentType为application/json或text/x-json
或者生产者也使用 Jackson2JsonMessageConverter
,它会自动在消息头中声明contentType。
1 |
|
ContentTypeDelegatingMessageConverter
顾名思义,ContentTypeDelegatingMessageConverter
是一个根据消息头中content-Type
动态选择MessageConverter
的Message Converter。当content-Type
为空或根据content-Type
匹配不到MessageConverter
时,将Message Convert的任务委托给SimpleMessageConverter
1 | <bean id="contentTypeConverter" class="ContentTypeDelegatingMessageConverter"> |
MarshallingMessageConverter
负责Spring的Object与XML之间的转换。
为默认的Message Converter设置反序列化权限
在处理 content-type
为application/x-java-serialized-object
的java序列化对象时,默认会扫描所有的packages/classes,为了提高安全性,可以设置白名单,所有的Message Converter都有一个属性whiteListPatterns,示例如下:
1 | SimpleMessageConverter messageConverter = new SimpleMessageConverter();List<String> |
注意:该属性仅在Message Converter
使用DefaultDeserializer
有效,即不要主动去配置DefaultDeserializer
。
特殊的Conerter——MessagePropertiesConverter
前面介绍的MessageConverter负责body的转换,MessagePropertiesConverter 负责Rabbit Client的BasicProperties
与Spring AMQP MessageProperties
之间的转换,它的默认实现是DefaultMessagePropertiesConverter
,足以满足绝多数场景下的需求。部分源码如下,仅截取了构造器和属性声明,当BasicProperties
中的某一元素长度小于等于longStringLimit
时,转化为MessageProperties
中的String属性,当BasicProperties
中的某一元素长度超过longStringLimit
时,根据convertLongLongStrings
判断是否需要转换为LongString,如果不需要则转换为DataInputStream
1 | public class DefaultMessagePropertiesConverter implements MessagePropertiesConverter { |