okspy


  • 首页

  • 标签

  • 分类

  • 归档

springboot+RabbitMQ系列(二)生产者

发表于 2019-08-05 | 分类于 springboot

AMQP定义了普遍适用的消息通信框架,只规定了最底层的协议,有关协议的细节可以有多种实现,因此,Spring AMQP提供了AmqpTemplate作为收发消息的模板,定义了一些常用的收发消息的接口,AmqpTemplate是一个底层的模板,它的上层实现可以根据具体的协议细节定义其他的接口。

目前,Spring AMQP只有一个实现——RabbitTemplate。

RabbitTemplate可以用来收发消息,但是一般的,RabbitTemplate只用来发送消息,接收消息使用@RabbitListener与容器工厂。这是生产者和消费者特性导致的:生产者采用阻塞的方式,与MQ服务器之间是短连接,消费者采用非阻塞,一个线程接收线程,另外的线程处理消息,与MQ服务器之间是长连接。本章介绍RabbitTemplate的一些属性的配置,以及如何使用RabbitTemplate发送消息。

RabbitTemplate配置

重试机制

在MQ客户端与MQ服务器之间连接出现异常时,消息收发可能会抛出异常,为了避免频繁抛出异常,可以加入一些重试机制,在抛出异常前,重试几次,通过设置RetryTemplate实现消息的重试。

1
2
3
4
5
6
7
8
9
10
11
12
@Bean
public AmqpTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
RetryTemplate retryTemplate = new RetryTemplate();
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(500);
backOffPolicy.setMultiplier(10.0);
backOffPolicy.setMaxInterval(10000);
retryTemplate.setBackOffPolicy(backOffPolicy);
template.setRetryTemplate(retryTemplate);
return template;
}

生产者可靠投递

生产者是异步的,无法通过返回值知道消息是否成功发送,默认的,无法路由至MQ服务器的消息会直接被丢弃,无任何异常抛出。

ConfirmCallback

消息发送到RabbitMQ exchange(消息发送至MQ服务器)后接收ack回调。消息只要被Rabbit服务器接收到就会发调用该回调。但是不能保证消息一定会投递至目标队列,为了确保投递至目标队列,需要设置ReturnCallback

注意:必需把CachingConnectionFactory的PublierConfirms设置为true

1
2
3
4
5
6
7
8
9
10
11
12
13
14
CachingConnectionFactory factory = new CachingConnectionFactory();
RabbitTemplate rabbitTemplate = new RabbitTemplate();
factory.setPublisherConfirms(true);
rabbitTemplate.setConnectionFactory(factory);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) { if(ack){
logger.info("successed send msg to MQ server {}", correlationDate)
} else {
logger.info("failed send msg to MQ server caused by {}", cause)
}

}
});

ReturnCallback

消息发送至MQ服务器的exchange,但是无相应队列与该交换器绑定时的回调。发送失败的消息重新返回给生产者,在returnedMessage方法中处理失败的消息。

注意:必需把CachingConnectionFactory的PublierReturns设置为true

1
2
3
4
5
6
7
8
9
10
11
12
13
CachingConnectionFactory factory = new CachingConnectionFactory();
RabbitTemplate rabbitTemplate = new RabbitTemplate();

factory.setPublisherReturns(true);

rabbitTemplate.setConnectionFactory(factory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
// process message
}
});

注意:为确保接收到ReturnCallback、ConfirmCallback,channel必须处于alive的状态,当channelCacheSize满时,spring框架默认等待5秒然后关闭channel。所以,使用ReturnCallback、ConfirmCallback时要确保channelCacheSize足够大,尽可能的确保channel处于alive状态。

(3) 消息发送的目的得是一个不存在的exchange,底层会抛出异常并关闭连接,通过addConnectionListener,实现onShutDown方法处理该异常。见2.9节。

连接池

在2.6节中,了解到为了避免connection释放出现死锁,也避免消费者因生产者阻塞而阻塞,生产者和消费者应使用独立的连接池,springboot2.0.2版本后,CachingConnectionFactory内部有两个连接池,一个是生产者的一个是消费者,但是要显示设置usePublisherConnection为ture,默认是false。

1
2
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setUsePublisherConnection(true);

注意:如果是事务性的,此设置无效,生产者和消费者还是会共用同一个连接池。

生产消息

构建消息

可以直接发送字节数组,SimpleMessageConverter默认地封装消息的ContenType、Hearder、MessageId等,但有时,想要控制这些属性的生成规则,需要通过MessageBuiler和MessagePropertiesBuilder构建Spring AMQP Message。

1
2
3
4
5
6
7
8
MessageProperties props = MessagePropertiesBuilder.newInstance()
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setMessageId("123")
.setHeader("bar", "baz")
.build();
Message message = MessageBuilder.withBody("foo".getBytes())
.andProperties(props)
.build();

构建CorrelationData

CorrelationData是与消息关联的数据,比如,通过设置分布式的唯一id作为CorrelationData的id,即消息ID。可以实现消费端的去重的功能。它的定义如下,只有一个属性:id。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class CorrelationData implements Correlation {    
private volatile String id;
public CorrelationData() {
}

public CorrelationData(String id) {
this.id = id;
}

public String getId() {
return this.id;
}

public void setId(String id) {
this.id = id;
}

public String toString() {
return "CorrelationData [id=" + this.id + "]";
}
}

生产者设置CorrelationData:RabbitTemplate的send()方法或为rabbitTemplate配置setCorrelationDataPostProcessor

1
public void send(String exchange, String routingKey, Message message, CorrelationData correlationData) throws AmqpException {}

消费者获取CorrelationData:

在Spring AMQP Message中获取。

1
String id = message.getMessageProperties().getCorrelationId();

统一修改消息和CorrelationData

在消息Id自定义、消息属性统一修改、消息内容统一修改的场景下,统一的定义修改方法有利于编码和debug。RabbitTemplate比AmqpTemplate在所有send方法底层实现上都新增了关联数据correlationDate信息,correlationDate也是3.1.2中提到的消息可靠投递的支撑。

correlationDate和Message的统一修改示例如下。可以用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setCorrelationDataPostProcessor(new CorrelationDataPostProcessor() {
@Override
public CorrelationData postProcess(Message message, CorrelationData correlationData)
{
// modify correlationDate
correlationData.setId();
}
});

rabbitTemplate.setBeforePublishPostProcessors(new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// modify message
}
});

发送消息

消息的发送很简答,RabbitTemplate提供了众多接口供发送消息。主要有两大类:send()、convertAndSend()。如果使用是字节数组等非Message类型的参数作为消息,考虑采用convertAndSend()。如果使用的是构建好的Message,使用两者皆可。send()接口定义如下。

1
2
3
4
5
void send(Message message) throws AmqpException;

void send(String routingKey, Message message) throws AmqpException;

void send(String exchange, String routingKey, Message message) throws AmqpException;

如果多除调用send(),可以统一为rabbitTemplate配置routingKey、exchange,这样直接调用第一个方法就可以直接实现消息的发送,这种应用场景更多的在rabbitTemplate只对应一个队列。

1
2
3
amqpTemplate.setExchange("marketData.topic");
amqpTemplate.setRoutingKey("quotes.nasdaq.FOO");
amqpTemplate.send(new Message("12.34".getBytes(), someProperties));

使用默认的exchange(简单模式,routingKey==queuename),发送至固定队列

1
2
RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange
template.send("queue.helloWorld", new Message("Hello World".getBytes(), someProperties));

与上面效果一样。使用默认的exchange,发送至固定队列

1
2
3
RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange
template.setRoutingKey("queue.helloWorld"); // but we'll always send to this Queue
template.send(new Message("Hello World".getBytes(), someProperties));

springboot+RabbitMQ系列(一)数据库连接

发表于 2019-08-01 | 分类于 springboot

本文参考了 springboot官方文档,主要介绍依赖springboot-amqp模块实现与RabbitMQ服务端的连接。

ConnectionFactory

spring AMQP默认使用CachingConnectionFactory创建一个应用程序共享的连接工厂,也是用途最广泛的ConnectionFactory构建方法,也为Junit提供了SingleConnectionFactory,SingleConncetionFactory不常用,不再赘述。

与AMQP通信的工作单元实际上是Channel,TCP连接可以共享。connectionFactory分为两种模式,一种是缓存channel,一种是缓存connection(同时也缓存该connection的channel)。默认是缓存channel的模式,高可用集群场景下(镜像队列),通过负载均衡器连接至集群中不同的实例时,可以通过setCacheMode设置为缓存connection的模式。代码示例中给出了缓存connection的模式,同时也设置了channelCacheSize。

1
2
3
4
5
6
7
8
9
10
11
12
private CachingConnectionFactory buildConnFactory(String addresses, String username, String password, String vhost) {    
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(addresses);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(vhost);
connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);
connectionFactory.setConnectionCacheSize(10);
connectionFactory.setChannelCacheSize(200);

return connectionFactory;
}

如果需要采用缓存Channel的模式,示例如下:

1
2
3
4
5
6
7
8
9
10
11
private CachingConnectionFactory buildConnFactory(String addresses, String username, String password, String vhost) {    
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(addresses);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(vhost);
connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL);
connectionFactory.setChannelCacheSize(channelCacheSize);

return connectionFactory;
}

在缓存connection模式下,不支持自动声明队列、exchange、binding等,rabbitmq-client默认只提供了5个线程处理connection,因此,当connection较多时,应该自定义线程池,并配置到CachingConnectionFactory中。自定义的线程池将会被所有connection共享,建议线程池的最大线程数设置的与预期connection数相等,因为可能存在对于大部分connection都有多个channel的情况。示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
ExecutorService executor = new 
ThreadPoolExecutor(corePoolSize,
maxPoolSize, keepAliveSeconds,TimeUnit.SECONDS,
new ArrayBlockingQueue<>(queueSize),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r)
{
Thread thread = new Thread(r);
thread.setName("myThread" + autoInt.getAndIncrement());
thread.setDaemon(false);
return thread;
}
},
new ThreadPoolExecutor.CallerRunsPolicy());

connectionFactory.setExecutor(executor);

channelCacheSize

前面了解到,Spring AMQP通过缓存channel或connection提高吞吐量。connectionFactory分为两种模式,一种是缓存channel,一种是缓存connection(同时也缓存该connection的channel)。本节主要介绍channelCacheSize。ConnectionCacheSize也是类似的,不赘述。

默认仅限制缓存的channelSize

默认地,最大channelSize是没有限制的,限制的仅仅是缓存的channelSize(connection也一样),默认值是25,缓存channel的目的是减小高并发多线程环境中频繁创建销毁channel的开销,比如:在某一时刻有100个channel处于工作状态,当channel空闲后,只会缓存channelSize个channel,剩下的都会被销毁。

通过RabbitMQ的Web管理插件观察到channel在频繁的被创建和销毁时,应及时的提高channelCacheSize。建议最少要保证线程数<channelCacheSize。可以通过压力测试,观察高峰期channel动态平衡的数量,从而决定channelCacheSize的大小。

channelSize限制

也可以通过channelCheckoutTimeout参数设置connectionFactory的channelSize限制,当该参数大于0时,表示最大的channel数目=channelCacheSize,达到channelCacheSize的上限后,调用createChannel的线程会阻塞,直至有空闲channel出现或阻塞时间超过chanelCheckoutTimeout,在超时的情况下,抛出AmqpTimeoutException,可以设置一些Retry的策略来处理这些异常。

连接命名

通过ConnectionNameStrategy属性设置connection名称。

1
2
3
4
5
6
7
8
connectionFactory.setConnectionNameStrategy(new ConnectionNameStrategy() {   
@Override
public String obtainNewConnectionName(ConnectionFactory connectionFactory) {
return connectionFactory.getHost() + atomicInteger.getAndIncrement();
}
});

return connectionFactory;

使用RabbitMQ Java API提供的connectionFactory

Spring AMQP也支持使用RabbitMQ Java API提供的connectionFactory,即Rabbit Client的connectionFactory,位于包com.rabbitmq.client.ConnectionFactory中,通过构造器参数设置在CachingConnectionFactory中,示例如下:

1
2
ConnectionFactory factory = new ConnectionFactory();
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(factory);

自定义RabbitClient的属性

上述示例为RabbitClient的connectionFactory —–> Spring AMQP的connectionFactiory的转换,Spring也提供了Spring AMQP的connectionFactiory —–> RabbitClient的connectionFactory的转换。

比如可以通过CachingConnectionFactory设置RabbitClient connectionFactory的属性。

示例如下:

1
connectionFactory.getRabbitConnectionFactory().getClientProperties().put("foo", "bar");

连接恢复

Rabbit Client4.0后的connectionFactory和Spring AMQP的·connectionFactory均默认提供了自动恢复连接的机制;虽然两者的自动恢复机制是兼容的,但使用构造注入conncetionFactory时建议关闭其中一个。

否则,在MQ服务器节点可用但连接尚未恢复时,出现AutoRecoverConnectionNotCurrentlyOpenException 异常。比如:如果在RabbitTemplate中配置RetryTemplate(Spring AMQP的手动恢复),甚至在故障转移到集群中的另一个代理时,可能抛出上述异常。

关闭Rabbit Client的自动恢复

由于Spring AMQP的自动恢复连接在计时器上恢复,因此可以使用SpringAMQP的恢复机制更快地恢复连接。

springboot-amqp1.7版本以后,默认关闭RabbitClient的connectionFactory的自动重连,但是,在通过构造参数注入RabbitClient的connectionFactory时,是没有办法默认关闭的,需要手动设置。

1
2
3
ConnectionFactory factory = new ConnectionFactory();
factory.setAutomaticRecoveryEnabled(false); // 显示关闭RabbitClient的自动重连
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(factory);

一般地,关闭掉Rabbit Client的自动恢复,使用Spring AMQP可以满足绝大多数使用场景,而且框架提供的自动恢复机制已经很完善。

SSL连接

建议通过注入RabbitMQ client connectionFactory的方式配置SSL连接,示例如下:

1
2
3
4
5
6
7
8
9
10
ConnectionFactory factory = new ConnectionFactory();

factory.setAutomaticRecoveryEnabled(false);

File keyFile = new File(keyPath);
File certFile = new File(cerPath);
SSLContexts sslCtx = SslContextBuilder.forServer(certFile, keyFile).build();
factory.useSslProtocol(sslCtx);

CachingConnectionFactory connectionFactory = new CachingConnectionFactory(factory);

避免Connection死锁

在内存不足或MQ服务端出现异常时,可能会出现连接阻塞,对于默认的CachingConnectionFactory,MQ服务端导致连接阻塞时,客户端会主动地关闭。

因此,如果生产者消费者共用同一个connectionFactory,MQ服务端导致生产者客户端与消费者客户端关闭,可能死锁的情况:生产者与消费者持有相同的连接资源时,MQ服务器异常触发生产者和消费者中断与服务端的连接,可能会出现死锁。

为避免死锁的产生,建议对于生产者和消费者分别配置不同的connectionFactory。需要注意的是:如果生产者消费者处于同一个事物时,不建议生产者消费者配置相同的connectionFactory,因为消费者(或生产者)需要复用对方的channel。

1
2
3
4
5
6
7
@Bean(name = "myRabbitTemplate")
public RabbitTemplate basicCloudRabbitTemplate(CachingConnectionFactory f) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(f);
rabbitTemplate.setUsePublisherConnection(true);

return rabbitTemplate;
}

Routing ConnectionFactory实现多数据源收发消息

spring AMQP提供SimpleRoutingConnectionFactory负责在运行时根据查找键动态选择connectionFactory,通常,以线程的上下文作为查找键,比如地址、vHost等,SimpleRoutingConnectionFactory继承了AbstractRoutingConnectionFactory,通过SimpleResourceHolder获取当前线程的查找键。示例如下:

首先,为RabbitTemplate配置一个以vHost作为查找键的SimpleRoutingConnectionFactory。key分别为factory1的virtualHost和factory2的virtualHost,当然,也可以使用addresses作为查找键。使用方式为factoryMap.put("#{factory1.addresses}", factory1)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private SimpleRoutingConnectionFactory myFactory() {    
SimpleRoutingConnectionFactory routingFactory = new SimpleRoutingConnectionFactory();
CachingConnectionFactory factory1 = new CachingConnectionFactory();
factory1.setAddresses("localhost:5672");
factory1.setUsername("username1");
factory1.setPassword("password1");
factory1.setVirtualHost("v1");

CachingConnectionFactory factory2 = new CachingConnectionFactory();
factory2.setAddresses("202.130.1.1:5672");
factory2.setUsername("username2");
factory2.setPassword("password2");
factory2.setVirtualHost("v2");

Map<Object, ConnectionFactory> factoryMap = new HashMap<>(5);
factoryMap.put("#{factory1.virtualHost}", factory1);
factoryMap.put("#{factory2.virtualHost}", factory2);

routingFactory.setTargetConnectionFactories(factoryMap);

return routingFactory;
}

@Bean("myRabbitTemplate")
public RabbitTemplate myRabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(myFactory());
return rabbitTemplate;
}

使用方法如下,SimpleResourceHolder的bind和unbind都是必须的,分别指获取当前线程查找键,释放查找键。bind有两个参数,第一个为待获取的connectFactory,第二个为key

1
2
3
4
5
6
7
8
9
10
11
12
13
public class MyService {

@Autowired
@Qualifier("myRabbitTemplate")
private RabbitTemplate rabbitTemplate;

public void service(String vHost, String payload) {
SimpleResourceHolder.bind(rabbitTemplate.getConnectionFactory(), vHost);
rabbitTemplate.convertAndSend(payload);
SimpleResourceHolder.unbind(rabbitTemplate.getConnectionFactory());
}

}

消费者使用多数据源的方式略有不同,首先,配置两个不同的containerFactory

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Bean("myContainerFactory1")
public SimpleRabbitListenerContainerFactory containerFactory1() {
SimpleRabbitListenerContainerFactory containerFactory1 = new SimpleRabbitListenerContainerFactory();

CachingConnectionFactory factory1 = new CachingConnectionFactory();
factory1.setAddresses("localhost:5672");
factory1.setUsername("username1");
factory1.setPassword("password1");
factory1.setVirtualHost("v1");

containerFactory1.setConnectionFactory(factory1);

return containerFactory1;
}

@Bean("myContainerFactory2")
public SimpleRabbitListenerContainerFactory containerFactory2() {
SimpleRabbitListenerContainerFactory containerFactory2 = new SimpleRabbitListenerContainerFactory();

CachingConnectionFactory factory2 = new CachingConnectionFactory();
factory2.setAddresses("host2:5672");
factory2.setUsername("username2");
factory2.setPassword("password2");
factory2.setVirtualHost("v2");

containerFactory2.setConnectionFactory(factory2);

return containerFactory2;
}

使用@RabbitListener接收消息时指定containerFactory

1
2
3
4
5
6
7
8
9
10
11
12
13
@RabbitListener(queues = "queue.test", containerFactory = "myContainerFactory1")
public void receiveMsg1(Message message, Channel channel) {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
logger.debug("rcv msg {}", msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

@RabbitListener(queues = "queue.test", containerFactory = "myContainerFactory2")
public void receiveMsg2(Message message, Channel channel) {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
logger.debug("rcv msg {}", msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

集群中的连接管理

MQ的集群根据集群中各节点队列信息区分为镜像队列和普通队列。只有队列所在节点知道该队列的所有信息,默认情况下,MQ是普通队列,队列只存活于集群中的一个节点上,称为主队列。镜像队列与普通队列的相同点是:队列的主拷贝仅存在于一个节点上(主队列,master节点)。不同点是,镜像节点在集群中的其他节点上拥有从队列的拷贝。一旦队列主节点不可用,最老的从队列自动被选举为新的主队列。

镜像队列的原理:在非镜像队列的集群中,信道负责将消息路由至合适的队列。当加入镜像队列后,信道除了负责将消息路按照路由绑定规则路由至合适的队列外,它也要将消息投递到镜像队列的从拷贝,在某种程度上,可以将镜像队列视为拥有一个隐藏的fanout交换器,它指示着信道将消息分发到队列的从拷贝上。

无论对于普通队列还是镜像队列,所要面临的问题是:主节点崩溃时,消费者该与哪个节点建立连接。

普通队列

对于普通队列,使用CachingConnectionFactory就足够了,它支持配置多个连接地址,当一个连接失败时,会按顺序尝试与其他地址建立连接。

1
2
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setAddresses("host1:5672, host2:5672");

高可用队列(镜像队列)

  • 消费者

LocalizedQueueConnectionFactory是Spring AMQP为消费者提供的用于高可用集群的连接工厂,使用它时,MQ服务端必须设置enable management plugin。

它通过MQ服务器management 插件提供的REST API 获取哪个节点是主节点,创建CachingConnectionFactory连接至该节点,如果连接失败,将再次调用MQ服务器management 插件提供的REST API 获取新选举的主节点。当无法获取新选举的主节点是,将通过CachingConnectionFactory像普通队列一样按顺序连接至MQ服务器。

LocalizedQueueConnectionFactory本质上是一个RoutingConnectionFactory,它的查找键是队列名。

正是由于查找键是对列名,必需保证查找键的唯一性,消费者的ContainerFactory只能配置在一个队列上。

注意:

由于每次创建连接时,都要通过REST API获取连接节点,开销很大,只适用于长连接的场景(消费者),不适用短连接的场景(生产者)。

LocalizedQueueConnectionFactory只适用于消费者,比如配置在SimpleMessageListenerContainer中,不适用于生产者(比如将LocalizedQueueConnectionFactory配置在RabbitTemplate中)。

示例如下:LocalizedQueueConnectionFactory构造方法的前三个参数 addresses, adminUris 和nodes都是字符串数组,LocalizedQueueConnectionFactory使用数组下标相同的参数构建连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Autowired
private RabbitProperties props;

private final String[] adminUris = { "http://host1:15672", "http://host2:15672" };

private final String[] nodes = { "rabbit@host1", "rabbit@host2" };

@Bean
public ConnectionFactory defaultConnectionFactory() {
CachingConnectionFactory cf = new CachingConnectionFactory();
cf.setAddresses(this.props.getAddresses());
cf.setUsername(this.props.getUsername());
cf.setPassword(this.props.getPassword());
cf.setVirtualHost(this.props.getVirtualHost());
return cf;
}

@Bean
public ConnectionFactory queueAffinityCF(
@Qualifier("defaultConnectionFactory") ConnectionFactory defaultCF) {
return new LocalizedQueueConnectionFactory(defaultCF,
StringUtils.commaDelimitedListToStringArray(this.props.getAddresses()),
this.adminUris, this.nodes,
this.props.getVirtualHost(), this.props.getUsername(), this.props.getPassword(),
false, null);
}
  • 生产者

生产者本身也不需要考虑哪个节点是主节点,对于生产者来说,队列是未知的,消息会发送至集群中所有的节点。节点会判断该消息对应的主队列是否在自己的节点上。

连接和信道监听

Spring AMQP提供了connection和channel的监听器接口,通过回调实现自定义的创建和销毁监听逻辑。

ConnectionListener

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
CachingConnectionFactory factory = new CachingConnectionFactory();
List<ConnectionListener> connectionListeners = new ArrayList<>();

ConnectionListener connectionListener = new ConnectionListener() {
@Override
public void onClose(Connection connection) {
// do something
}

@Override
public void onShutDown(ShutdownSignalException signal) {
// do something
}

@Override
public void onCreate(Connection connection) {
// do something
}
};
connectionListeners.add(connectionListener);

factory.setConnectionListeners(connectionListeners);

ChannelListener

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
CachingConnectionFactory factory = new CachingConnectionFactory();
List<ChannelListener> channelListeners = new ArrayList<>();
ChannelListener channelListener = new ChannelListener() {
@Override
public void onCreate(Channel channel, boolean b) {
// do something
}

@Override
public void onShutDown(ShutdownSignalException signal) {
// do something
}
};

channelListeners.add(channelListener);
List<ConnectionListener> connectionListeners = new ArrayList<>();

factory.setChannelListeners(channelListeners);

channel关闭的日志级别控制

当channel关闭时,CachingConnectionFactory的默认日志规则如下:

  • 正常的channel销毁,无log
  • 队列声明失败导致的channel关闭,debug级别
  • 在exclusive(独占)队列上,basic.consume因为独占使用条件而关闭channel,info级别
  • 其他的channel关闭情况输出error级别log

自定义log级别:为CachingConnectionFactory配置自定义的ConditionalExceptionLogger。

12

shipengyang

12 日志
5 分类
2 标签
© 2019 shipengyang
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4