您好,欢迎来到叨叨游戏网。
搜索
您的当前位置:首页Rabbitmq如何保证消息可靠性

Rabbitmq如何保证消息可靠性

来源:叨叨游戏网
通常我们在使用rabbitMQ的中主要分为三个部分 消息生产者、RabbitMQ服务端(broker)、消息消费者。所以首先确保这三部分的消息传递正常。

1.确保生产者消息成功发送到exchange中
  通过RabbitTemplate的ConfirmCallback回调方法来查看是否到达exchange

        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            log.info("消息到达exchange状态:correlationData({}),ack({}),cause({})", correlationData, ack, cause);
        });
2.确保exchange中的消息成功到达Queues中
    通过RabbitTemplate的setReturnsCallback回调方法来查看

        rabbitTemplate.setReturnsCallback(returnedMessage -> {
             log.info("exchange到达路由状态:"+ returnedMessage);
        });
rabbitTemplate完整配置如下:
        @Bean
        public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
            connectionFactory.setPublisherConfirms(true);
            connectionFactory.setPublisherReturns(true);
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            rabbitTemplate.setMandatory(true);
            rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
                log.info("消息到达exchange状态:correlationData({}),ack({}),cause({})", correlationData, ack, cause);
            });
            rabbitTemplate.setReturnsCallback(returnedMessage -> {
                log.info("exchange到达路由状态:"+ returnedMessage);
            });
            return rabbitTemplate;
        }
3.确保消费者成功消费
Rabbitmq默认情况下是自动认为消费成功然后移除队列中的这条Message,可以通过配置application.yml来更改
      rabbitmq:
        listener:
          simple:
            acknowledge-mode: manual 手动模式
            #以下两种是自动提交
            # acknowledge-mode: none 自动模式(默认开启)
            # acknowledge-mode: auto 自动模式
更改完成手动提交后,可以通过channel.basicAck和channel.basicNack来确认是否提交或拒绝
    @RabbitListener(queues = "队列名")
    public void onBidDataMessage(Channel channel, Message message) throws IOException {
        log.info("MQ-收到数据:{}", message);
        /**
         * 执行业务代码....
         * 完成
         */

        //执行成功则手动ACK 
        //方法参数说明:channel.basicAck(channel内的顺序tag,true代表确认所有消息)
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
    
        //执行失败则调用Nack 
        //方法参数说明:(channel内的顺序tag,是否批量处理,是否返回该消息到队列中)
        channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
                   
    }
4.确保其他情况下MQ丢失的一些解决方法
   情况一:某一条消息消费者一直处理失败    
        解决方法:添加到死信队列中
   情况二:MQ服务器宕机                 
        解决方法:设置MQ的队列和消息持久化到磁盘中

以上情况后续有时间再写一篇分析...

因篇幅问题不能全部显示,请点此查看更多更全内容

Copyright © 2019- gamedaodao.net 版权所有 湘ICP备2024080961号-6

违法及侵权请联系:TEL:199 18 7713 E-MAIL:2724546146@qq.com

本站由北京市万商天勤律师事务所王兴未律师提供法律服务