前言
如果采用标准的AMQP协议,则唯一能够保证消息不会丢失的方式是利用其事务机制——令channel处于transactional模式、向其publish消息、执行commit动作。在这种方式下,事务机制会带来大量的多余开销,并会导致吞吐量下降250% 。为了补救事务带来的问题,引入了Publisher Confirm机制,详情参考之前的一篇博文《rabbitmq 消息可靠投递及消费机制》。
使用事务
AMQP协议中事务操作相关方法:channel.txSelect(); // 开启通道事务模式,与channel.confirmSelect()模式互斥
channel.txCommit(); // 提交事务
channel.txRollback(); // 回滚事务
ps: 对于transactional模式和Publisher Confirm模式,一个通道channel同时只能开启其中的一种模式,不能同时开启的。在通过txSelect开启事务之后,我们便可以发布消息给broker代理服务器,如果txCommit提交成功则消息一定到达broker了,如果在txCommit执行之前broker异常崩溃或者由于其他原因抛出异常,这个时候我们便可以捕获异常通过txRollback回滚事务。
示例代码:// The publisher would use something like:
int MSG_COUNT = 10000;
channel.txSelect();
// 单条消息提交事务
for (int i = 0; i < MSG_COUNT; ++i) {
try {
channel.basicPublish(exchange, routingKey,
MessageProperties.PERSISTENT_BASIC,
"nop".getBytes());
channel.txCommit();
} catch (Exception e) {
channel.txRollback();
}
}
// 多条消息批量提交事务,如果失败则该批次所有消息都提交失败
for (int i = 1; i <= MSG_COUNT; ++i) {
try {
channel.basicPublish(exchange, routingKey,
MessageProperties.PERSISTENT_BASIC,
"nop".getBytes());
if((i % 10) == 0) {
channel.txCommit(); // 每10条消息提交一次
}
} catch (Exception e) {
channel.txRollback();
}
}
完整示例 at here。
流程说明:1. client 发送Tx.Select向broker声明开启channel事务模式
2. broker 返回Tx.Select-Ok表明channel事务模式开启成功
------至此client和broker完成握手约定启用channel事务模式-------
3. client Basic.Publish 发送消息给broker
4. ........
5. client 发送Tx.Commit给broker通知其提交数据,然后client等待响应结果
6. broker接收并处理完消息数据之后,返回Tx.Commit-Ok给客户端,然后client继续执行
7. 如果broker接收消息时,内部出现异常则客户端会抛出异常,这时client执行回滚操作,那么这批消息则不会被broker接收并处理。
特别说明:AMQP协议中的事务仅仅是指生产者发送消息给broker这一系列流程处理的事务机制,并不包含消费端的处理流程。
事务机制 VS Publisher Confirm
据官方统计,在事务模式下,发送10000条消息要耗时4分钟,可见性能很低,所以出现了事务机制的替代方案Publisher Confirm模式,是AMQP协议的一种扩展机制。Publisher Confirm机制更轻量级,且支持异步确认。事务确实能够解决producer与broker之间消息确认的问题,只有消息成功被broker接收,事务提交才能成功,否则我们便可以在捕获异常进行事务回滚操作同时进行消息重发,但是使用事务机制的话会降低RabbitMQ的性能,那么有没有更好的方法既能保障producer知道消息已经正确送到,又能基本上不带来性能上的损失呢?从AMQP协议的层面看是没有更好的方法,但是RabbitMQ提供了一个更好的方案,即将channel信道设置成confirm模式 ——Publisher Confirm机制是AMQP协议事务的一种替代方案。