前言
本篇主要讲解rabbitmq是如何保证消息在投递及消费过程中,不会出现消息丢失的。
Consumer Acknowledgements机制
当consumer消费端成功消费完消息后,返回给broker确认通知,告诉broker移除队列中已经消费成功的消息,如果消费端消费失败,可以通知broker将消费失败的消息重新放回队列中,以便继续消费。1. channel.basicAck(deliveryTag, multiple);
consumer处理成功后,通知broker删除队列中的消息,如果设置multiple=true,表示支持批量确认机制以减少网络流量。
例如:有值为5,6,7,8 deliveryTag的投递
如果此时channel.basicAck(8, true);则表示前面未确认的5,6,7投递也一起确认处理完毕。
如果此时channel.basicAck(8, false);则仅表示deliveryTag=8的消息已经成功处理。
2. channel.basicNack(deliveryTag, multiple, requeue);
consumer处理失败后,例如:有值为5,6,7,8 deliveryTag的投递。
如果channel.basicNack(8, true, true);表示deliveryTag=8之前未确认的消息都处理失败且将这些消息重新放回队列中。
如果channel.basicNack(8, true, false);表示deliveryTag=8之前未确认的消息都处理失败且将这些消息直接丢弃。
如果channel.basicNack(8, false, true);表示deliveryTag=8的消息处理失败且将该消息重新放回队列。
如果channel.basicNack(8, false, false);表示deliveryTag=8的消息处理失败且将该消息直接丢弃。
3. channel.basicReject(deliveryTag, requeue);
相比channel.basicNack,除了没有multiple批量确认机制之外,其他语义完全一样。
如果channel.basicReject(8, true);表示deliveryTag=8的消息处理失败且将该消息重新放回队列。
如果channel.basicReject(8, false);表示deliveryTag=8的消息处理失败且将该消息直接丢弃。
参数字段类型:
deliveryTag:long - 消息投递的唯一标识,作用域为当前channel
multiple:boolean - 是否启用批量确认机制
requeue:boolean - 消息处理失败是重新放回队列还是直接丢弃
Publisher Acknowledgements机制
使用标准AMQP 0-9-1协议,保证消息不丢失的唯一方法是使用事务 —— 通道事务,发布消息并提交。在这种情况下,事务是非常重量级的操作,会使得broker消息吞吐量降低250倍左右。那么,为了解决使用事务确保消息不丢失所带来的性能损耗。我们参考Consumer Acknowledgements确认机制的原理引入了Publisher Confirms确认机制。消息生产者发送消息给broker,当broker收到消息,将消息持久化到磁盘并同步至所有的镜像节点之后,才会返回给客户端消息投递成功确认。从而保证消息在投递过程中不会因为网络拥塞,服务宕机,机房断电等突发情况导致消息投递失败而丢失。当由于broker内部消息处理发生异常时,将返回给客户端basic.nack通知;当消息投递成功时,broker则返回给客户端basic.ack通知。
1. 对于可路由的消息,当所有的队列接收到消息后,broker向client发送basic.ack确认通知; |
如何启用Publisher Confirms:为了启用通道Publisher Acknowledgements机制,客户端需要发送confirm.select告知broker,将channel设置为confirm mode(即:确认模式),一旦通道channel处于确认模式下,则该channel就不能进行通道事务。// 开启confirm mode模式的示例代码
public static void main(String[] args) throws IOException, InterruptedException {
// 初始化连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("test"));
connectionFactory.setPassword("1234");
connectionFactory.setVirtualHost("/test");
// 创建连接connection
Connection conn = connectionFactory.newConnection();
// 创建通道channel
Channel channel = conn.createChannel();
// 声明队列
channel.queueDeclare("ha-publisher-confirm-test", true, false, true, null);
// 设置开启confirm mode模式
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("publisher failure deliveryTag=" + deliveryTag + "| multiple=" + multiple);
}
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
// if multiple=true then 表示deliveryTag序号之前的所有消息都投递成功
// if multiple=false then 表示仅deliveryTag序号的消息投递成功
System.out.println("publisher success deliveryTag=" + deliveryTag + "| multiple=" + multiple);
}
});
// 循环发送持久化消息,消息内容为helloWorld
for (long i = 0; i < 100; ++i) {
channel.basicPublish("", QUEUE_NAME,
MessageProperties.PERSISTENT_BASIC,
"helloWorld".getBytes());
}
// 关键:阻塞等待100条消息处理完毕,如果存在nack未处理成功的消息则抛出IOException;
// 如果在non-Confirm mode of channel下调用此方法则抛出IllegalStateException异常;
channel.waitForConfirmsOrDie();
//或关键:等待所有消息的确认,如果处理成功则返回true,否则返回false
boolean flag = channel.waitForConfirms();
}
ps:完整示例代码 at here。
延迟确认Ack: 对于持久化消息,是需要等待消息成功持久化到磁盘之后,broker才会返回给客户端basic.ack通知,为了提升IO吞吐量,broker并不会实时将消息刷回到磁盘,而是先将消息存储到内存中,在一定时间间隔后(几百毫秒)或当队列空闲时,批量将消息持久化到磁盘,然后在返回给客户端basic.ack确认通知。这就意味着在恒定负载下,basic.ack的延迟可以达到几百毫秒。那么,为了提升消息系统的吞吐量,强烈建议客户端应用程序采用异步方式处理消息basic.ack确认通知。
Durable持久化机制
在rabbitmq中,节点分ram内存节点和disc磁盘节点。在磁盘节点中,消息是需要持久化到磁盘中。一个在mq集群中,至少会有一个disc磁盘节点。以防止机房断电,节点宕机等突发情况引起的消息丢失。Exchange,Queue,消息都可以指定Durable持久化,这样即使服务重启,这些都不会丢失。
总结
- 保证消息生产者发生消息给broker是可靠的;
- 保证broker存储消息是可靠的;
- 保证broker消息投递给consumer且消费成功是可靠的;
只要保证上述3个环节都是可靠的,那么整个消息处理机制就是可靠的。我们就可以把这种消息处理机制,放心的使用到订单类操作,或涉及到钱相关的服务中去。