rabbitmq – 如何在Spring AMQP中使用Ack或Nack

我是 Spring AMQP的新手.我有一个应用程序,它是一个生产者向另一个消费者的应用程序发送消息.

消费者收到消息后,我们将对数据进行验证.

如果数据正确,我们必须确认并且应该从队列中删除消息.
如果数据不正确,我们必须NACK(否定确认)数据,以便它将在RabbitMQ中重新排队.

我碰到

** factory.setDefaultRequeueRejected(false); **(它根本不会重新排队消息)

** factory.setDefaultRequeueRejected(true); **(它会在发生异常时重新排队消息)

但我的情况是,我将基于验证确认该消息.然后它应该删除该消息.如果NACK然后重新排列该消息.

我在RabbitMQ网站上看过

AMQP规范定义了basic.reject方法,该方法允许客户拒绝单个传递的消息,指示代理丢弃它们或重新排队它们

如何实现上述场景?请给我一些例子.

我尝试了一个小程序

logger.info("Job Queue Handler::::::::::" + new Date());
        try {

        }catch(Exception e){

            logger.info("Activity Object Not Found Exception so message should be Re-queued the Message::::::::::::::");

        }

        factory.setErrorHandler(new ConditionalRejectingErrorHandler(cause ->{
            return cause instanceof XMLException;
        }));

消息不会为不同的异常重新排队
factory.setDefaultRequeueRejected(真)

09:46:38,854 ERROR [stderr] (SimpleAsyncTaskExecutor-1)
org.activiti.engine.ActivitiObjectNotFoundException: no processes deployed with key ‘WF89012’

09:46:39,102 INFO
[com.example.bip.rabbitmq.handler.ErrorQueueHandler]
(SimpleAsyncTaskExecutor-1) Received from Error Queue: {ERROR=Could
not commit JPA transaction; nested exception is
javax.persistence.RollbackException: Transaction marked as
rollbackOnly
}

the documentation.

默认情况下,(使用defaultRequeueRejected = true)如果侦听器正常退出,则容器将响应消息(导致其被删除),或者如果侦听器抛出异常,则拒绝(并重新排队)它.

如果侦听器(或错误处理程序)抛出AmqpRejectAndDontRequeueException,则会覆盖默认行为并丢弃该消息(如果已配置,则路由到DLX / DLQ) – 容器调用basicReject(false)而不是basicReject(true).

因此,如果验证失败,则抛出AmqpRejectAndDontRequeueException.或者,使用自定义错误处理程序配置侦听器,以将异常转换为AmqpRejectAndDontRequeueException.

这在this answer中描述.

如果您真的想自己负责,请将确认模式设置为MANUAL并使用ChannelAwareMessageListener或this technique(如果您使用的是@RabbitListener).

但大多数人只是让容器处理事情(一旦他们了解正在发生的事情).通常,使用手动ack是针对特殊用例,例如延迟acks或早期acking.

编辑

在我指出的答案中有一个错误(现在已经修复);你必须查看ListenerExecutionFailedException的原因.我刚测试了这个,它按预期工作…

@SpringBootApplication
public class So39530787Application {

    private static final String QUEUE = "So39530787";

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(So39530787Application.class, args);
        RabbitTemplate template = context.getBean(RabbitTemplate.class);
        template.convertAndSend(QUEUE, "foo");
        template.convertAndSend(QUEUE, "bar");
        template.convertAndSend(QUEUE, "baz");
        So39530787Application bean = context.getBean(So39530787Application.class);
        bean.latch.await(10, TimeUnit.SECONDS);
        System.out.println("Expect 1 foo:"  + bean.fooCount);
        System.out.println("Expect 3 bar:"  + bean.barCount);
        System.out.println("Expect 1 baz:"  + bean.bazCount);
        context.close();
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setErrorHandler(new ConditionalRejectingErrorHandler(
                t -> t instanceof ListenerExecutionFailedException && t.getCause() instanceof FooException));
        return factory;
    }

    @Bean
    public Queue queue() {
        return new Queue(QUEUE, false, false, true);
    }
    private int fooCount;

    private int barCount;

    private int bazCount;

    private final CountDownLatch latch = new CountDownLatch(5);

    @RabbitListener(queues = QUEUE)
    public void handle(String in) throws Exception {
        System.out.println(in);
        latch.countDown();
        if ("foo".equals(in) && ++this.fooCount < 3) {
            throw new FooException();
        }
        else if ("bar".equals(in) && ++this.barCount < 3) {
            throw new BarException();
        }
        else if ("baz".equals(in)) {
            this.bazCount++;
        }
    }

    @SuppressWarnings("serial")
    public static class FooException extends Exception { }

    @SuppressWarnings("serial")
    public static class BarException extends Exception { }

}

结果:

Expect 1 foo:1
Expect 3 bar:3
Expect 1 baz:1
相关文章
相关标签/搜索