springboot+RabbitMQ系列(四)MessageConvert

可以为ListenerContainerRabbitTemplate设置MessageConverter。这样就不用每次都写重复的消息格式转换代码了。spring提供的Message Converter均是双向的,负责将入站消息转换为特定结构(如:字节数组、序列化java对象、字符串、自定义的消息domain对象),将特定格式转换为出站消息。

消息格式

springboot-amqp涉及到两种消息格式,定义如下:

  1. org.springframework.messaging.Message<?> message,spring框架中通用的Message。简称spring-messaging Message

    1
    2
    3
    4
    public interface Message<T> {    
    T getPayload();
    MessageHeaders getHeaders();
    }
  2. spring AMQP Message,spring为了适配AMQP协议,简化接口参数引入的。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    public class Message {

    private final MessageProperties messageProperties;

    private final byte[] body;

    public Message(byte[] body, MessageProperties messageProperties) {
    this.body = body;
    this.messageProperties = messageProperties;
    }

    public byte[] getBody() {
    return this.body;
    }

    public MessageProperties getMessageProperties() {
    return this.messageProperties;
    }
    }

spring-messaging Message中的payload对应了spring AMQP Message中的byte[] body,他们均是Rabbit Client 中的body,即消息内容。

spring-messaging Message中的MessageHeaders对应了spring AMQP Message中的MessageProperties,他们均是Rabbit Client 中的BasicProperties,即消息头。

因此,后文的MessgeConverter如不加特殊说明,均指的消息内容的格式转换。消息头的格式转换见2.3.7 MessagePropertiesConverter

@RabbitListener底层实现原理

在了解MessageConveter之前,有必要清楚spring底层消息处理机制,此处以最常用的@RabbitListener为例。

通过注解@RabbitListener声明一个消费者时,底层由MessagingMessageListenerAdapteronMessage()负责处理消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
1 public void onMessage(Message amqpMessage, Channel channel) throws Exception { 
2
3 org.springframework.messaging.Message<?> message =
4 this.toMessagingMessage(amqpMessage);
5
6 if (this.logger.isDebugEnabled()) {
7 this.logger.debug("Processing [" + message + "]");
8 }
9
10 try {
11 Object result = this.invokeHandler(amqpMessage, channel, message);
12 if (result != null) {
13 this.handleResult(result, amqpMessage, channel, message);
14 } else {
15 this.logger.trace("No result object given - no result to handle");
16 }
17 } catch (ListenerExecutionFailedException var7) {
18 ListenerExecutionFailedException e = var7;
19 if (this.errorHandler != null) {
20 try {
21 Object result = this.errorHandler.handleError(amqpMessage, message, e); 22
23 if (result != null) {
24 this.handleResult(result, amqpMessage, channel, message); 25
26 } else {
27 this.logger.trace("Error handler returned no result"); 28
29 }
30 } catch (Exception var6) {
31 this.returnOrThrow(amqpMessage, channel, message, var6, var6); 32
33 }
34 } else {
35 this.returnOrThrow(amqpMessage, channel, message, var7.getCause(), var7); 36
37 }
38 }
39 }

3~4行,通过toMessagingMessage()将spring AMQP的Message转换为spring-messaging的Message。this.getMessagingMessageConverter是一个内部类的实例,内部类继承了MessagingMessageConverter,最终调用的是MessagingMessageConverterfromMessage。完成Spring AMQP Message至spring-messaging Message的转换。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
protected org.springframework.messaging.Message<?> toMessagingMessage(Message amqpMessage) {    
return (org.springframework.messaging.Message)this
.getMessagingMessageConverter()
.fromMessage(amqpMessage);
}

// fromMessage
public Object fromMessage(Message message) throws MessageConversionException {
if (message == null) {
return null;
} else {
Map<String, Object> mappedHeaders =
this.headerMapper.toHeaders(message.getMessageProperties());

Object convertedObject = this.extractPayload(message);

MessageBuilder<Object> builder = convertedObject instanceof
org.springframework.messaging.Message ?
MessageBuilder.fromMessage((org.springframework.messaging.Message)convertedObject)
: MessageBuilder.withPayload(convertedObject);

return builder.copyHeadersIfAbsent(mappedHeaders).build();
}
}

11行通过反射处理消息,调用的是HandlerAdapter中的invoke(),为了简便,把这些调用全部放在了同一个代码块中,一般地,使用@RabbitListener时不会自定义invokHandler,所以调用的是代理的反射方法:delegatingHandler.invoke()

再继续关注下getMethodArgumentValues,包含了两部分,一部分是预设参数转换,如:MessageChannel,这个也是最开始传入Spring AMQP的Message的原因,它的作用就是作为预设参数,另一部分是Listener中消息处理的其他自定义参数,如@Payload注解、@Headers注解等声明的参数,args[i] == null时,抛出MethodArgumentResolutionException异常,这就是1.3中异常抛出的地方,参数为空。该异常会一直向上抛,直至17行被捕获,如果在Listener容器中注册了errorHandler,调用errorHandler处理异常。

还有一点值得注意的是:在整个过程中,真正作为消息载体的就是spring-messaging.Message而不是Spring AMQP的`Message。因此,消息处理的过程实际如下:

调用RabbitMQ JAVA API接收消息并封装为Spring AMQP Message,在消息处理onMessage中调用toMessagingMessage(Message amqpMessage)将消息转换至spring-messaging.Message,通过反射处理消息。

因此,消息转换实际上包含了两个过程,一个是消息的反序列化并封装为Spring AMQP Message,另一个是Spring AMQP Messagespring-messaging.Message之间的转换。后文中所指的消息转换如果不加特别说明,均指第一个转换。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
private Object invokeHandler(Message amqpMessage, Channel channel, org.springframework.messaging.Message<?> message) {    
try {
return this.handlerMethod.invoke(message, new Object[]{amqpMessage, channel});
} catch (MessagingException var5) {
throw new
ListenerExecutionFailedException(this.createMessagingErrorMessage("Listener method could not be invoked with the incoming message", message.getPayload()),
var5, amqpMessage);
}
catch (Exception var6) {
throw new ListenerExecutionFailedException("Listener method '" + this.handlerMethod.getMethodAsString(message.getPayload()) + "' threw exception", var6, amqpMessage);
}
}

//HandlerAdapter中的invoke
public Object invoke(Message<?> message, Object... providedArgs) throws Exception {
if (this.invokerHandlerMethod != null) {
return this.invokerHandlerMethod.invoke(message, providedArgs);
} else if (this.delegatingHandler.hasDefaultHandler()) {
Object[] args = new Object[providedArgs.length + 1];
args[0] = message.getPayload();
System.arraycopy(providedArgs, 0, args, 1, providedArgs.length);
return this.delegatingHandler.invoke(message, args);
} else {
return this.delegatingHandler.invoke(message, providedArgs);
}
}

// delegatingHandler.invoke()
public Object invoke(Message<?> message, Object... providedArgs) throws Exception {
Class<? extends Object> payloadClass = message.getPayload().getClass();
InvocableHandlerMethod handler = this.getHandlerForPayload(payloadClass);
Object result = handler.invoke(message, providedArgs);
if (message.getHeaders().get("amqp_replyTo") == null) {
Expression replyTo = (Expression)this.handlerSendTo.get(handler);
if (replyTo != null) {
result = new ResultHolder(result, replyTo);
}
}

return result;
}

// 代理完成的invoke
@Nullable
public Object invoke(Message<?> message, Object... providedArgs) throws Exception {
Object[] args = this.getMethodArgumentValues(message, providedArgs);
if (this.logger.isTraceEnabled()) {
this.logger.trace("Invoking '" + ClassUtils.getQualifiedMethodName(this.getMethod(), this.getBeanType())
+ "' with arguments " + Arrays.toString(args));
}

Object returnValue = this.doInvoke(args);
if (this.logger.isTraceEnabled()) {
this.logger.trace("Method [" + ClassUtils.getQualifiedMethodName(this.getMethod(),this.getBeanType())
+ "] returned [" + returnValue + "]");
}

return returnValue;
}

// 关注下getMethodArgumentValues
private Object[] getMethodArgumentValues(Message<?> message, Object... providedArgs) throws Exception {
MethodParameter[] parameters = this.getMethodParameters();
Object[] args = new Object[parameters.length];

for(int i = 0; i < parameters.length; ++i) {
MethodParameter parameter = parameters[i];
parameter.initParameterNameDiscovery(this.parameterNameDiscoverer);
args[i] = this.resolveProvidedArgument(parameter, providedArgs);//预设参数的转换
if (args[i] == null) {
if (this.argumentResolvers.supportsParameter(parameter)) {
try {
args[i] = this.argumentResolvers
.resolveArgument(parameter, message);// 自定义的参数的转换
} catch (Exception var8) {
if (this.logger.isDebugEnabled()) {
this.logger.debug(this.getArgumentResolutionErrorMessage(
"Failed to resolve", i), var8);
}

throw var8;
}
} else if (args[i] == null) {
throw new MethodArgumentResolutionException(
message, parameter, this.getArgumentResolutionErrorMessage(
"No suitable resolver for", i));
}
}
}
return args;
}

已有的MessageConverter

前面提到,消息格式转换有两次,第一次转换完成序列化与反序列化的工作,被称为Message Converter,spring AMQP提供了默认的转换器SimpleMessageConverter。以反序列化为例,将Spring AMQP Message转换为字符串、序列化对象、字节数组,这次转换也是文中所指的Message Convert。反序列化源码如下:

第二次转换,springboot默认使用的是GenericMessageConverter。它是属于org.springframework.messaging.包下的,继承了该包下的SimpleMessageConverter(第一次转换的SimpleMessageConverter在org.springframework.amqp包下),默认情况下,不需要特别的设置。

spring-messaging的MessageConverter是所有消息转换器(无论是spring-messaging还是spring AMQP`)最底层的接口。

1
2
3
4
5
6
7
8
9
10
11
package org.springframework.messaging.converter;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
public interface MessageConverter {
@Nullable
Object fromMessage(Message<?> var1, Class<?> var2);

@Nullable Message<?> toMessage(Object var1, @Nullable MessageHeaders var2);

}

SimpleMessageConverter

spring AMQP的SimpleMessageConverter实现了MessageConverter接口(最底层),是默认的消息转换器。在未给RabbitTemplate配置message conveter时,将会调用SimpleMessageConverterfromMessagecreateMessage处理消息,从源码可以看出,支持三种类型:字符串、序列化java对象,字节数组。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package org.springframework.amqp.support.converter;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectStreamClass;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.utils.SerializationUtils;
import org.springframework.beans.factory.BeanClassLoaderAware;
import org.springframework.remoting.rmi.CodebaseAwareObjectInputStream;
import org.springframework.util.ClassUtils;

public class SimpleMessageConverter extends WhiteListDeserializingMessageConverter implements BeanClassLoaderAware {
public static final String DEFAULT_CHARSET = "UTF-8";
private volatile String defaultCharset = "UTF-8";
private String codebaseUrl;
private ClassLoader beanClassLoader = ClassUtils.getDefaultClassLoader();
public SimpleMessageConverter() { }

public void setBeanClassLoader(ClassLoader beanClassLoader) {
this.beanClassLoader = beanClassLoader;
}

public void setCodebaseUrl(String codebaseUrl) {
this.codebaseUrl = codebaseUrl;
}

public void setDefaultCharset(String defaultCharset) {
this.defaultCharset = defaultCharset != null ? defaultCharset : "UTF-8";
}

public Object fromMessage(Message message) throws MessageConversionException {
Object content = null;
MessageProperties properties = message.getMessageProperties();

if (properties != null) {
String contentType = properties.getContentType();
if (contentType != null && contentType.startsWith("text")) {
String encoding = properties.getContentEncoding();
if (encoding == null) {
encoding = this.defaultCharset;
}

try {
content = new String(message.getBody(), encoding);
} catch (UnsupportedEncodingException var8) {
throw new MessageConversionException("failed to convert text-based Message content", var8);
}
} else if (contentType != null
&& contentType.equals("application/x-java-serialized-object")) {
try {
content = SerializationUtils
.deserialize(this.createObjectInputStream(
new ByteArrayInputStream(message.getBody()),this.codebaseUrl)); } catch (IllegalArgumentException | IllegalStateException | IOException var7) {
throw new MessageConversionException("failed to convert serialized Message content", var7);
}
}
}

if (content == null) {
content = message.getBody();
}

return content;

}


protected Message createMessage(Object object, MessageProperties messageProperties)
throws MessageConversionException {
byte[] bytes = null;
if (object instanceof byte[]) {
bytes = (byte[])((byte[])object);
messageProperties.setContentType("application/octet-stream");
} else if (object instanceof String) {
try {
bytes = ((String)object).getBytes(this.defaultCharset);
} catch (UnsupportedEncodingException var6) {
throw new MessageConversionException("failed to convert to Message content", var6);
}
messageProperties.setContentType("text/plain");
messageProperties.setContentEncoding(this.defaultCharset);
} else if (object instanceof Serializable) {
try {
bytes = SerializationUtils.serialize(object);
} catch (IllegalArgumentException var5) {
throw new MessageConversionException("failed to convert to serialized Message content", var5);
}
messageProperties.setContentType("application/x-java-serialized-object");
}

if (bytes != null) {
messageProperties.setContentLength((long)bytes.length);
return new Message(bytes, messageProperties);
} else {
throw new IllegalArgumentException(this.getClass().getSimpleName() + " only supports String, byte[] and Serializable payloads, received: " +
object.getClass().getName());
}
}

protected ObjectInputStream createObjectInputStream(InputStream is, String
codebaseUrl) throws IOException {
return new CodebaseAwareObjectInputStream(is, this.beanClassLoader, codebaseUrl)
{
protected Class<?> resolveClass(ObjectStreamClass classDesc) throws
IOException, ClassNotFoundException {
Class<?> clazz = super.resolveClass(classDesc);
SimpleMessageConverter.this.checkWhiteList(clazz);
return clazz;
}
};
}


}

SerializerMessageConverter

SimpleMessageConverter类似,唯一不同的是,多了一个属性用来自定义序列化与反序列化规则。

Jackson2JsonMessageConverter

消息载体是网络字节序时,使用默认的SimpleMessageConverter就足够了,但是消息载体为java序列化对象application/x-java-serialized-object时,不利于跨语言和跨平台,更推荐使用JSON作为消息的载体,Jackson2JsonMessageConverter负责JSON和java bean之间转换。使用时将jsonConverter注入rabbitTemplate实例中,替换SimpleMessageConverter。在替换后,收发消息可以直接发送消息Object的实例,大大得简化了开发。

注意事项:

使用时,需要生产者额外在消息头中添加一个字段”__ TypeId __ “用于注明该消息映射的domain对象,在下方的示例中,头信息中的字段"__ TypeId __"分别"foo"”bar“如果生产者未注明,可以为classMapper设置默认值映射domain对象,例如:classMapper.setDefaultType(MyMessage.class)

需要生产者在消息头注明contentType为application/json或text/x-json 或者生产者也使用 Jackson2JsonMessageConverter,它会自动在消息头中声明contentType。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Bean
public Jackson2JsonMessageConverter jsonMessageConverter() {
Jackson2JsonMessageConverter jsonConverter = new Jackson2JsonMessageConverter();
jsonConverter.setClassMapper(classMapper());
return jsonConverter;
}

@Bean
public DefaultClassMapper classMapper() {
DefaultClassMapper classMapper = new DefaultClassMapper();
Map<String, Class<?>> idClassMapping = new HashMap<>();
idClassMapping.put("foo", Foo.class);
idClassMapping.put("bar", Bar.class);
classMapper.setIdClassMapping(idClassMapping);
return classMapper;
}

ContentTypeDelegatingMessageConverter

顾名思义,ContentTypeDelegatingMessageConverter是一个根据消息头中content-Type动态选择MessageConverter的Message Converter。当content-Type为空或根据content-Type匹配不到MessageConverter时,将Message Convert的任务委托给SimpleMessageConverter

1
2
3
4
5
6
7
8
<bean id="contentTypeConverter" class="ContentTypeDelegatingMessageConverter">
<property name="delegates">
<map>
<entry key="application/json" value-ref="jsonMessageConverter" />
<entry key="application/xml" value-ref="xmlMessageConverter" />
</map>
</property>
</bean>

MarshallingMessageConverter

负责Spring的Object与XML之间的转换。

为默认的Message Converter设置反序列化权限

在处理 content-typeapplication/x-java-serialized-object的java序列化对象时,默认会扫描所有的packages/classes,为了提高安全性,可以设置白名单,所有的Message Converter都有一个属性whiteListPatterns,示例如下:

1
2
3
4
5
6
7
8
SimpleMessageConverter messageConverter = new SimpleMessageConverter();List<String> 

myWhiteList = new ArrayList<>(10);
myWhiteList.add("safe.*");
myWhiteList.add("unstable.recent.SafeClass");
myWhiteList.add("*.MySafeClass");

messageConverter.setWhiteListPatterns(myWhiteList);

注意:该属性仅在Message Converter使用DefaultDeserializer有效,即不要主动去配置DefaultDeserializer

特殊的Conerter——MessagePropertiesConverter

前面介绍的MessageConverter负责body的转换,MessagePropertiesConverter 负责Rabbit Client的BasicProperties与Spring AMQP MessageProperties之间的转换,它的默认实现是DefaultMessagePropertiesConverter,足以满足绝多数场景下的需求。部分源码如下,仅截取了构造器和属性声明,当BasicProperties中的某一元素长度小于等于longStringLimit时,转化为MessageProperties中的String属性,当BasicProperties中的某一元素长度超过longStringLimit时,根据convertLongLongStrings判断是否需要转换为LongString,如果不需要则转换为DataInputStream

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class DefaultMessagePropertiesConverter implements MessagePropertiesConverter {    
private static final int DEFAULT_LONG_STRING_LIMIT = 1024;
private final int longStringLimit;
private final boolean convertLongLongStrings;

public DefaultMessagePropertiesConverter() {
this(1024, false);
}

public DefaultMessagePropertiesConverter(int longStringLimit) {
this(longStringLimit, false);
}

public DefaultMessagePropertiesConverter(int longStringLimit,
boolean convertLongLongStrings) {
this.longStringLimit = longStringLimit;
this.convertLongLongStrings = convertLongLongStrings;
}

private Object convertLongString(LongString longString, String charset) {
try {
if (longString.length() <= (long)this.longStringLimit) {
return new String(longString.getBytes(), charset);
} else {
return this.convertLongLongStrings ? longString.getStream() : longString;
}
} catch (Exception var4) {
throw RabbitExceptionTranslator.convertRabbitAccessException(var4);
}
}

}