近几日学习了一下rabbitmq消息中间件,由于很久很久之前已经学过,而且本身入门难度也不高,所以学习起来较为简单,现将所有内容及与spring、springboot整合方法记录:
首先消息中间件定义:
1.什么是MQ
消息队列(Message Queue,简称MQ),从字面意思上看,本质是个队列,FIFO先入先出,只不过队列中存放的内容是message而已。
其主要用途:不同进程Process/线程Thread之间通信。
为什么会产生消息队列?有几个原因:
不同进程(process)之间传递消息时,两个进程之间耦合程度过高,改动一个进程,引发必须修改另一个进程,为了隔离这两个进程,在两进程间抽离出一层(一个模块),所有两进程之间传递的消息,都必须通过消息队列来传递,单独修改某一个进程,不会影响另一个;
不同进程(process)之间传递消息时,为了实现标准化,将消息的格式规范化了,并且,某一个进程接受的消息太多,一下子无法处理完,并且也有先后顺序,必须对收到的消息进行排队,因此诞生了事实上的消息队列;
我从别人博客中看到了几篇文章,写的非常棒 现在将其记录
https://mp.weixin.qq.com/s?__biz=MzAxOTc0NzExNg==&mid=2665513507&idx=1&sn=d6db79c1ae03ba9260fb0fb77727bb54&chksm=80d67a60b7a1f376e7ad1e2c3276e8b565f045b1c7e21ef90926f69d99f969557737eb5d8128&mpshare=1&scene=1&srcid=1019awkBx8kaLyFohcuW4Ee7
这个是通过故事的方式写出消息中间件到底是什么
https://mp.weixin.qq.com/s?__biz=MjM5ODYxMDA5OQ==&mid=2651960012&idx=1&sn=c6af5c79ecead98daa4d742e5ad20ce5&chksm=bd2d07108a5a8e0624ae6ad95001c4efe09d7ba695f2ddb672064805d771f3f84bee8123b8a6&mpshare=1&scene=1&srcid=04054h4e90lz5Qc2YKnLNuvY
这个是消息中间件的使用场景
https://github.com/jasonGeng88/blog/blob/master/201705/MQ.md
这个故事是消息中间件在实际开发中的应用场合
还有一个协议需要注意:
AMQP
还有rabbitmq的五种队列
现在我将其解释一下:
第一种.直接给 就是最简单的一对一(通过队列发)
第二种:可以看成是第一种的扩展,就是一对多,大家共享同一个消息
第三种:其实就是将消息发送给交换机,然后交换机在把消息发送给绑定在此交换机的队列上
第四种 路由模式,你可以看成是根据具体关键字来发 或者说是服务每个队列承担着一个或多个服务,消息会根据服务来发
第五种 topic模式,跟上面的差不多 不过他是相当于多匹配,相当于正则表达式
第六种 好像是远程调用
然后是和安装和使用,安装的话我直接用的docker
其余安装手段的话你必须得先安装elrang语言,因为rabbitmq就是用这个语言编写的
接下来是各种模式的使用
稍后我会传到github去 其实难点不多 主要是几个方法的使用及各种信道绑定啊之类的 大致流程是
获取连接------创建通道-------创建通道----发布信息
获取连接------创建通道------根据知道的通道创建消费者------获取到达消息
获取连接------创建通道------创建交换机----------发布信息
获取连接------创建通道------申明队列-------绑定交换机------定义消费者(根据队列)------获取到达消息
其余几种方法无非就是多加了点关键字 没有什么差别
重点一:与spring整合
我见到的基本有两种方式
第一种就是写配置bean类
@Configuration
public class HelloWorldConfiguration {
protected final String helloWorldQueueName = "hello.world.queue";
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("192.168.13.132");
connectionFactory.setUsername("wujifu");
connectionFactory.setPassword("123456");
connectionFactory.setVirtualHost("/vhose_1");
return connectionFactory;
}
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
//The routing key is set to the name of the queue by the broker for the default exchange.
template.setRoutingKey(this.helloWorldQueueName);
//Where we will synchronously receive messages from
template.setDefaultReceiveQueue(this.helloWorldQueueName);
return template;
}
@Bean
// Every queue is bound to the default direct exchange
public Queue helloWorldQueue() {
return new Queue(this.helloWorldQueueName);
}
/*
@Bean
public Binding binding() {
return declare(new Binding(helloWorldQueue(), defaultDirectExchange()));
}*/
/*
@Bean
public TopicExchange helloExchange() {
return declare(new TopicExchange("hello.world.exchange"));
}*/
/*
public Queue declareUniqueQueue(String namePrefix) {
Queue queue = new Queue(namePrefix + "-" + UUID.randomUUID());
rabbitAdminTemplate().declareQueue(queue);
return queue;
}
// if the default exchange isn't configured to your liking....
@Bean Binding declareP2PBinding(Queue queue, DirectExchange exchange) {
return declare(new Binding(queue, exchange, queue.getName()));
}
@Bean Binding declarePubSubBinding(String queuePrefix, FanoutExchange exchange) {
return declare(new Binding(declareUniqueQueue(queuePrefix), exchange));
}
@Bean Binding declarePubSubBinding(UniqueQueue uniqueQueue, TopicExchange exchange) {
return declare(new Binding(uniqueQueue, exchange));
}
@Bean Binding declarePubSubBinding(String queuePrefix, TopicExchange exchange, String routingKey) {
return declare(new Binding(declareUniqueQueue(queuePrefix), exchange, routingKey));
}*/
}
知识兔还有一种是配置文件的方法
"connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="localhost" />
<!-- username,访问RabbitMQ服务器的账户,默认是guest -->
<property name="username" value="${rmq.manager.user}" />
<!-- username,访问RabbitMQ服务器的密码,默认是guest -->
<property name="password" value="${rmq.manager.password}" />
<!-- host,RabbitMQ服务器地址,默认值"localhost" -->
<property name="host" value="${rmq.ip}" />
<!-- port,RabbitMQ服务端口,默认值为5672 -->
<property name="port" value="${rmq.port}" />
<!-- channel-cache-size,channel的缓存数量,默认值为25 -->
<property name="channel-cache-size" value="50" />
<!-- cache-mode,缓存连接模式,默认值为CHANNEL(单个connection连接,连接之后关闭,自动销毁) -->
<property name="cache-mode" value="CHANNEL" />
</bean>
<!--或者这样配置,connection-factory元素实际就是注册一个org.springframework.amqp.rabbit.connection.CachingConnectionFactory实例
<rabbit:connection-factory id="connectionFactory" host="${rmq.ip}" port="${rmq.port}"
username="${rmq.manager.user}" password="${rmq.manager.password}" />-->
<rabbit:admin connection-factory="connectionFactory"/>
<!--定义消息队列,durable:是否持久化,如果想在RabbitMQ退出或崩溃的时候,不会失去所有的queue和消息,需要同时标志队列(queue)和交换机(exchange)是持久化的,即rabbit:queue标签和rabbit:direct-exchange中的durable=true,而消息(message)默认是持久化的可以看类org.springframework.amqp.core.MessageProperties中的属性public static final MessageDeliveryMode DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;exclusive: 仅创建者可以使用的私有队列,断开后自动删除;auto_delete: 当所有消费客户端连接断开后,是否自动删除队列 -->
<rabbit:queue name="spittle.alert.queue.1" id="queue_1" durable="true" auto-delete="false" exclusive="false" />
<rabbit:queue name="spittle.alert.queue.2" id="queue_2" durable="true" auto-delete="false" exclusive="false" />
<rabbit:queue name="spittle.alert.queue.3" id="queue_3" durable="true" auto-delete="false" exclusive="false" />
<!--绑定队列,rabbitmq的exchangeType常用的三种模式:direct,fanout,topic三种,我们用direct模式,即rabbit:direct-exchange标签,Direct交换器很简单,如果是Direct类型,就会将消息中的RoutingKey与该Exchange关联的所有Binding中的BindingKey进行比较,如果相等,则发送到该Binding对应的Queue中。有一个需要注意的地方:如果找不到指定的exchange,就会报错。但routing key找不到的话,不会报错,这条消息会直接丢失,所以此处要小心,auto-delete:自动删除,如果为Yes,则该交换机所有队列queue删除后,自动删除交换机,默认为false -->
<rabbit:direct-exchange id="spittle.fanout" name="spittle.fanout" durable="true" auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="spittle.alert.queue.1" key="{alert.queue.1}"></rabbit:binding>
<rabbit:binding queue="spittle.alert.queue.2" key="{alert.queue.2}"></rabbit:binding>
<rabbit:binding queue="spittle.alert.queue.3" key="{alert.queue.3}"></rabbit:binding>
</rabbit:bindings>
</rabbit:fanout-exchange>
<!-- 生产者部分 -->
<!-- 发送消息的producer类,也就是生产者 -->
<bean id="msgProducer" class="com.asdf.sdf.ClassA">
<!-- value中的值就是producer中的的routingKey,也就是队列名称,它与上面的rabbit:bindings标签中的key必须相同 -->
<property name="queueName" value="{alert.queue.1}"/>
</bean>
<!-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列,由于fastjson的速度快于jackson,这里替换为fastjson的一个实现 -->
<bean id="jsonMessageConverter" class="com.jy.utils.FastJsonMessageConverter"></bean>
<!-- 或者配置jackson -->
<!--
<bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />
-->
<rabbit:template exchange="test-exchange" id="rabbitTemplate" connection-factory="connectionFactory" message-converter="jsonMessageConverter" />
<!-- 消费者部分 -->
<!-- 自定义接口类 -->
<bean id="testHandler" class="com.rabbit.TestHandler"></bean>
<!-- 用于消息的监听的代理类MessageListenerAdapter -->
<bean id="testQueueListenerAdapter" class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter" >
<!-- 类名 -->
<constructor-arg ref="testHandler" />
<!-- 方法名 -->
<property name="defaultListenerMethod" value="handlerTest"></property>
<property name="messageConverter" ref="jsonMessageConverter"></property>
</bean>
<!-- 配置监听acknowledeg="manual"设置手动应答,它能够保证即使在一个worker处理消息的时候用CTRL+C来杀掉这个worker,或者一个consumer挂了(channel关闭了、connection关闭了或者TCP连接断了),也不会丢失消息。因为RabbitMQ知道没发送ack确认消息导致这个消息没有被完全处理,将会对这条消息做re-queue处理。如果此时有另一个consumer连接,消息会被重新发送至另一个consumer会一直重发,直到消息处理成功,监听容器acknowledge="auto" concurrency="30"设置发送次数,最多发送30次 -->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" concurrency="20">
<rabbit:listener queues="spittle.alert.queue.1" ref="testQueueListenerAdapter" />
<rabbit:listener queues="spittle.alert.queue.2" ref="testQueueListenerAdapter" />
<rabbit:listener queues="spittle.alert.queue.2" ref="testQueueListenerAdapter" />
</rabbit:listener-container>
知识兔其余几种exchange
fanOut:
<!-- Fanout 扇出,顾名思义,就是像风扇吹面粉一样,吹得到处都是。如果使用fanout类型的exchange,那么routing key就不重要了。因为凡是绑定到这个exchange的queue,都会受到消息。 -->
<rabbit:fanout-exchange name="delayed_message_exchange" durable="true" auto-delete="false" id="delayed_message_exchange">
<rabbit:bindings>
<rabbit:binding queue="test_delay_queue"/>
</rabbit:bindings>
</rabbit:fanout-exchange>
知识兔还有相关的消费者和提供者
@Resource
private RabbitTemplate rabbitTemplate;
private String queueName;
public void sendMessage(CommonMessage msg){
try {
logger.error("发送信息开始");
System.out.println(rabbitTemplate.getConnectionFactory().getHost());
//发送信息 queueName交换机,就是上面的routingKey msg.getSource() 为 test_key
rabbitTemplate.convertAndSend(queueName,msg.getSource(), msg);
//如果是普通字符串消息需要先序列化,再发送消息
//rabbitTemplate.convertAndSend(queueName,msg.getSource(), SerializationUtils.serialize(msg));
logger.error("发送信息结束");
} catch (Exception e) {
e.printStackTrace();
}
}
public void setQueueName(String queueName) {
this.queueName = queueName;
}
知识兔public class TestHandler {
@Override
public void handlerTest(CommonMessage commonMessage) {
System.out.println("DetailQueueConsumer: " + new String(message.getBody()));
}
}
知识兔现在说说我个人的理解
其实思路与其他几个工具或者说方法 的用法都是一样的 在配置文件(配置类中配置好类工厂 模板 然后再函数中直接拿bean或者模板直接用就好了)
接下来是与springboot的整合
与springboot的整合我用到了父子模块的用法 现在记录一下,不然会忘!就是创建一个空的springboot项目,然后右击添加新项目 新项目的存储位置一定要处于父项目的目录下
然后具体整合也和spring差不多 甚至可以说是比spring还要简单一点
1.在配置文件中写上属性
2.写一个配置类
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author : JCccc
* @CreateTime : 2019/9/3
* @Description :
**/
@Configuration
public class DirectRabbitConfig {
//队列 起名:TestDirectQueue
@Bean
public Queue TestDirectQueue() {
return new Queue("TestDirectQueue",true);
}
//Direct交换机 起名:TestDirectExchange
@Bean
DirectExchange TestDirectExchange() {
return new DirectExchange("TestDirectExchange");
}
//绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
@Bean
Binding bindingDirect() {
return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
}
}
知识兔简单接口进行消息的推送
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/**
* @Author : JCccc
* @CreateTime : 2019/9/3
* @Description :
**/
@RestController
public class SendMessageController {
@Autowired
RabbitTemplate rabbitTemplate; //使用RabbitTemplate,这提供了接收/发送等等方法
@GetMapping("/sendDirectMessage")
public String sendDirectMessage() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "test message, hello!";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String,Object> map=new HashMap<>();
map.put("messageId",messageId);
map.put("messageData",messageData);
map.put("createTime",createTime);
//将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange
rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map);
return "ok";
}
}
知识兔然后是消费者方的写法 也是一样的 此处必须注意两边都同时得有配置文件 因为比较分成了两个模块了
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author : JCccc
* @CreateTime : 2019/9/3
* @Description :
**/
@Configuration
public class DirectRabbitConfig {
//队列 起名:TestDirectQueue
@Bean
public Queue TestDirectQueue() {
return new Queue("TestDirectQueue",true);
}
//Direct交换机 起名:TestDirectExchange
@Bean
DirectExchange TestDirectExchange() {
return new DirectExchange("TestDirectExchange");
}
//绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
@Bean
Binding bindingDirect() {
return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
}
}
知识兔然后创建消费类
@Component
@RabbitListener(queues = "TestDirectQueue")//监听的队列名称 TestDirectQueue
public class DirectReceiver {
@RabbitHandler
public void process(Map testMessage) {
System.out.println("DirectReceiver消费者收到消息 : " + testMessage.toString());
}
}
知识兔这里面消费者是用、
- 使用 @RabbitListener 注解标记方法,当监听到队列 debug 中有消息时则会进行接收并处理
- 通过 @RabbitListener 的 bindings 属性声明 Binding(若 RabbitMQ 中不存在该绑定所需要的 Queue、Exchange、RouteKey 则自动创建,若存在则抛出异常)声明绑定队列
-
@EnableRabbit和@Configuration一起使用,可以加在类或者方法上,这个注解开启了容器对注册的bean的@RabbitListener检查。
-
@RabbitListener 和 @RabbitHandler结合使用,不同类型的消息使用不同的方法来处理。
其余模式的使用也差不多 就是稍微多点配置
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author : JCccc
* @CreateTime : 2019/9/3
* @Description :
**/
@Configuration
public class TopicRabbitConfig {
//绑定键
public final static String man = "topic.man";
public final static String woman = "topic.woman";
@Bean
public Queue firstQueue() {
return new Queue(TopicRabbitConfig.man);
}
@Bean
public Queue secondQueue() {
return new Queue(TopicRabbitConfig.woman);
}
@Bean
TopicExchange exchange() {
return new TopicExchange("topicExchange");
}
//将firstQueue和topicExchange绑定,而且绑定的键值为topic.man
//这样只要是消息携带的路由键是topic.man,才会分发到该队列
@Bean
Binding bindingExchangeMessage() {
return BindingBuilder.bind(firstQueue()).to(exchange()).with(man);
}
//将secondQueue和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.#
// 这样只要是消息携带的路由键是以topic.开头,都会分发到该队列
@Bean
Binding bindingExchangeMessage2() {
return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
}
}
知识兔workqueue方法就是多创建几个队列 多绑定几次就行了
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author : JCccc
* @CreateTime : 2019/9/3
* @Description :
**/
@Configuration
public class FanoutRabbitConfig {
/**
* 创建三个队列 :fanout.A fanout.B fanout.C
* 将三个队列都绑定在交换机 fanoutExchange 上
* 因为是扇型交换机, 路由键无需配置,配置也不起作用
*/
@Bean
public Queue queueA() {
return new Queue("fanout.A");
}
@Bean
public Queue queueB() {
return new Queue("fanout.B");
}
@Bean
public Queue queueC() {
return new Queue("fanout.C");
}
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
@Bean
Binding bindingExchangeA() {
return BindingBuilder.bind(queueA()).to(fanoutExchange());
}
@Bean
Binding bindingExchangeB() {
return BindingBuilder.bind(queueB()).to(fanoutExchange());
}
@Bean
Binding bindingExchangeC() {
return BindingBuilder.bind(queueC()).to(fanoutExchange());
}
}
知识兔到此 具体的入门级用法都已经讲完了 具体的模型我会传到github上,接下来还有一些重点关于消息确认的知识(可以用来确保消息不丢失)
一:确认种类
RabbitMQ的消息确认有两种。
一种是消息发送确认。这种是用来确认生产者将消息发送给交换器,交换器传递给队列的过程中,消息是否成功投递。发送确认分为两步,一是确认是否到达交换器,二是确认是否到达队列。
第二种是消费接收确认。这种是确认消费者是否成功消费了队列中的消息。
二:消息发送确认
(1)ConfirmCallback
通过实现ConfirmCallBack接口,消息发送到交换器Exchange后触发回调。
使用该功能需要开启确认,spring-boot中配置如下:
spring.rabbitmq.publisher-confirms = true
(2)ReturnCallback
通过实现ReturnCallback接口,如果消息从交换器发送到对应队列失败时触发(比如根据发送消息时指定的routingKey找不到队列时会触发)
使用该功能需要开启确认,spring-boot中配置如下:
spring.rabbitmq.publisher-returns = true
根据前面的知识(深入了解RabbitMQ工作原理及简单使用、Rabbit的几种工作模式介绍与实践)我们知道,如果要保证消息的可靠性,需要对消息进行持久化处理,然而消息持久化除了需要代码的设置之外,还有一个重要步骤是至关重要的,那就是保证你的消息顺利进入Broker(代理服务器),如图所示:
正常情况下,如果消息经过交换器进入队列就可以完成消息的持久化,但如果消息在没有到达broker之前出现意外,那就造成消息丢失,有没有办法可以解决这个问题?
RabbitMQ有两种方式来解决这个问题:
- 通过AMQP提供的事务机制实现;
- 使用发送者确认模式实现;
一、事务使用
事务的实现主要是对信道(Channel)的设置,主要的方法有三个:
-
channel.txSelect()声明启动事务模式;
-
channel.txComment()提交事务;
-
channel.txRollback()回滚事务;
从上面的可以看出事务都是以tx开头的,tx应该是transaction extend(事务扩展模块)的缩写,如果有准确的解释欢迎在博客下留言。
我们来看具体的代码实现:
// 创建连接
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(config.UserName);
factory.setPassword(config.Password);
factory.setVirtualHost(config.VHost);
factory.setHost(config.Host);
factory.setPort(config.Port);
Connection conn = factory.newConnection();
// 创建信道
Channel channel = conn.createChannel();
// 声明队列
channel.queueDeclare(_queueName, true, false, false, null);
String message = String.format("时间 => %s", new Date().getTime());
try {
channel.txSelect(); // 声明事务
// 发送消息
channel.basicPublish("", _queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
channel.txCommit(); // 提交事务
} catch (Exception e) {
channel.txRollback();
} finally {
channel.close();
conn.close();
}
知识兔反正就是提交失败就回滚 但是效率很低
从上面可以看出,非事务模式的性能是事务模式的性能高149倍,我的电脑测试是这样的结果,不同的电脑配置略有差异,但结论是一样的,事务模式的性能要差很多,那有没有既能保证消息的可靠性又能兼顾性能的解决方案呢?那就是接下来要讲的Confirm发送方确认模式。
二、Confirm发送方确认模式
Confirm发送方确认模式使用和事务类似,也是通过设置Channel进行发送方确认的。
Confirm的三种实现方式:
方式一:channel.waitForConfirms()普通发送方确认模式;
方式二:channel.waitForConfirmsOrDie()批量确认模式;
方式三:channel.addConfirmListener()异步监听发送方确认模式;
方式一:普通Confirm模式
// 创建连接
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(config.UserName);
factory.setPassword(config.Password);
factory.setVirtualHost(config.VHost);
factory.setHost(config.Host);
factory.setPort(config.Port);
Connection conn = factory.newConnection();
// 创建信道
Channel channel = conn.createChannel();
// 声明队列
channel.queueDeclare(config.QueueName, false, false, false, null);
// 开启发送方确认模式
channel.confirmSelect();
String message = String.format("时间 => %s", new Date().getTime());
channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8"));
if (channel.waitForConfirms()) {
System.out.println("消息发送成功" );
}
知识兔看代码可以知道,我们只需要在推送消息之前,channel.confirmSelect()声明开启发送方确认模式,再使用channel.waitForConfirms()等待消息被服务器确认即可。
方式二:批量Confirm模式
// 创建连接
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(config.UserName);
factory.setPassword(config.Password);
factory.setVirtualHost(config.VHost);
factory.setHost(config.Host);
factory.setPort(config.Port);
Connection conn = factory.newConnection();
// 创建信道
Channel channel = conn.createChannel();
// 声明队列
channel.queueDeclare(config.QueueName, false, false, false, null);
// 开启发送方确认模式
channel.confirmSelect();
for (int i = 0; i < 10; i++) {
String message = String.format("时间 => %s", new Date().getTime());
channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8"));
}
channel.waitForConfirmsOrDie(); //直到所有信息都发布,只要有一个未确认就会IOException
System.out.println("全部执行完成");
知识兔以上代码可以看出来channel.waitForConfirmsOrDie(),使用同步方式等所有的消息发送之后才会执行后面代码,只要有一个消息未被确认就会抛出IOException异常。
方式三:异步Confirm模式
// 创建连接
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(config.UserName);
factory.setPassword(config.Password);
factory.setVirtualHost(config.VHost);
factory.setHost(config.Host);
factory.setPort(config.Port);
Connection conn = factory.newConnection();
// 创建信道
Channel channel = conn.createChannel();
// 声明队列
channel.queueDeclare(config.QueueName, false, false, false, null);
// 开启发送方确认模式
channel.confirmSelect();
for (int i = 0; i < 10; i++) {
String message = String.format("时间 => %s", new Date().getTime());
channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8"));
}
//异步监听确认和未确认的消息
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("未确认消息,标识:" + deliveryTag);
}
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println(String.format("已确认消息,标识:%d,多个消息:%b", deliveryTag, multiple));
}
});
知识兔异步模式的优点,就是执行效率高,不需要等待消息执行完,只需要监听消息即可,以上异步返回的信息如下:
可以看出,代码是异步执行的,消息确认有可能是批量确认的,是否批量确认在于返回的multiple的参数,此参数为bool值,如果true表示批量执行了deliveryTag这个值以前的所有消息,如果为false的话表示单条确认。