rabbitmq-使用publisher confirm代替事务

AMQP 0-9-1中对事务做了规定,一条消息从发出,到channel、交换机、路由到队列,这些都在一个事务控制内。但是使用事务大概会降低2-10倍的消息吞吐量。Rabbitmq团队给出了发布者确认模式。我们可以在两个地方追踪消息的状态。

一、消息是否到达交换机?

直接给出spring整合后的实现代码:

发消息时指定correlationData ID:

public class PublishService {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(String exchange, String routingKey, Object message) {
        String uuid = UUID.randomUUID().toString();
        CorrelationData correlationId = new CorrelationData(uuid);
        rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationId);
    }
}

xml配置文件开启发送者确认模式:

<rabbit:connection-factory 
		id="connectionFactory"
		host="${rabbit.host}" 
		port="${rabbit.port}" 
		username="${rabbit.username}" 
		password="${rabbit.password}"
		virtual-host="${rabbit.vhost}"
		publisher-confirms="true" />

rabbitmqTemplate中指定bean:

<!-- mandatory必须设置true,return callback才生效 -->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
   confirm-callback="confirmCallBackListener"
   return-callback="returnCallBackListener"
   mandatory="true" 
/>

当mandatory标志设置为true时,如果exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息,那么broker会调用basic.return方法将消息返还给生产者;当mandatory设置为false时,出现上述情况broker会直接将消息丢弃;通俗的讲,mandatory标志告诉broker代理服务器至少将消息route到一个队列中,否则就将消息return给发送者;

confirmCallBack:

调用correlationData.getId() 拿到消息id,!ack表示消息没有到达交换机,cause造成的原因

import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.stereotype.Service;

@Service("confirmCallBackListener")
public class ConfirmCallBackListener implements ConfirmCallback {

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            // do something
        }
        if(!ack){
            // do something
        }
    }
}

测试:

发到不存在的Exchange,将进入!ack回调,此处不再给出代码。

二、消息路是否由到队列?

在上面的xml配置了return-callback="returnCallBackListener" mandatory="true"

回调类代码:

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import org.springframework.stereotype.Service;

@Service("returnCallBackListener")
public class ReturnCallBackListener implements ReturnCallback {

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        // do something
    }

如果消息不能路由到队列,将进入此回调方法,

提取消息id:message.getMessageProperties().getCorrelationId();

replyCode:状态码

replyText:返回文本

exchange、routingKey:交换机、路由key名字

控制台打印上面内容:

return--message: msgId:879270ba-fc0c-42c6-a96e-6997cec92dc3,
msgBody:currentTime:1510541211110,replyCode:312,replyText:NO_ROUTE,
exchange:DIRECT_EX,routingKey:testKey

最后,消息是否被消费者收到?

在配置文件中指定acknowledge="manual" ,监听中不需要指定method

<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
   <rabbit:listener ref="foo" method="listen" queue-names="myQueue" />
   <rabbit:listener ref="queue" queue-names="myQueue2"/>
</rabbit:listener-container>

<bean id="queue" class="top.zcjlq.rabbitmq.spring.Foo2"/>

实现ChannelAwareMessageListener接口:

方法体中是三种确认方式

public class Foo2 implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        // 消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        // ack返回false,并重新回到队列,api里面解释得很清楚
        // channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        // 拒绝消息
        // channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
    }
}
相关文章

相关标签/搜索