rabbitmq 常用监听器listener

前言

本篇介绍rabbitmq client api中常用监听器listener的使用及作用。

ShutdownListener

当连接connection被关闭时,这个连接connection上打开的所有通道channel都会一并被关闭,这个时候就会触发注册在这个connection和所有channel上的ShutdownListener监听器,如果只是某个channel被关闭,则仅仅会触发该channel上的ShutdownListener监听器。ShutdownSignalException异常包含了关闭的异常信息,通过调用cause.getReason()获取异常出现的原因等信息。

作用对象:Connection,Channel

使用示例:

// connection 添加监听器
connection.addShutdownListener(new ShutdownListener() {
@Override
public void shutdownCompleted(ShutdownSignalException cause) {
......
}
});

// channel 添加监听器
channel.addShutdownListener(new ShutdownListener() {
@Override
public void shutdownCompleted(ShutdownSignalException cause) {
......
}
});

// 移除监听器
connection.removeShutdownListener(listener);
channel.removeShutdownListener(listener);

BlockedListener

当broker检测到内存或磁盘等资源不足时,会发送connection.blocked信号给支持这一特性(即connection.blocked=true)的所有连接,阻塞这些连接,阻止其继续向broker发送消息。在connection 解除阻塞之前,新产生的connection.blocked信号,不会在发送给已经处于阻塞状态的连接。只有当所有的资源不足警告都已消除时,broker会发送connection.unblocked信号给所有处于阻塞状态的connection,通知其解除阻塞,恢复正常使用状态,详情参考《rabbitmq 流量控制》。

作用对象:Connection

使用示例:

// 添加监听器
connection.addBlockedListener(new BlockedListener() {
@Override
public void handleUnblocked() throws IOException {
......
}

@Override
public void handleBlocked(String reason) throws IOException {
......
}
});

// 移除监听器
connection.removeBlockedListener(listener);

ConfirmListener

当通道channel开启confirm mode模式后,publisher客户端投递的消息成功被broker处理时,会触发channel的ConfirmListener监听器中的handleAck方法,如果broker处理失败则会触发channel的ConfirmListener监听器中的handleNack方法。

作用对象:Channel

使用示例:

// 设置开启confirm mode模式
channel.confirmSelect();
// 给通道添加监听器
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("publisher failure deliveryTag=" + deliveryTag + "| multiple=" + multiple);
}
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
// if multiple=true then 表示deliveryTag序号之前的所有消息都投递成功
// if multiple=false then 表示仅deliveryTag序号的消息投递成功
System.out.println("publisher success deliveryTag=" + deliveryTag + "| multiple=" + multiple);
}
});

// 移除监听器
channel.removeConfirmListener(listener);

ReturnListener

用于处理不可路由的消息,当mandatory强制投递的消息不可路由时,broker会触发客户端通道channel的ReturnListener监听器,如果通道没有添加任何ReturnListener监听器,消息会默认丢失,如果消息没有设置为mandatory强制投递,那么是不会触发通道channel的ReturnListener监听器的,详情参考《rabbitmq 消息可靠投递及消费机制》。

作用对象:Channel

使用示例:

// 添加监听器
channel.addReturnListener(new ReturnListener() {
public void handleReturn(int replyCode,
String replyText,
String exchange,
String routingKey,
BasicProperties properties,
byte[] body)
throws IOException {
...
}
});

// 移除监听器
channel.removeReturnListener(listener);

参考链接

  1. https://www.rabbitmq.com/api-guide.html
  2. https://www.rabbitmq.com/connection-blocked.html