1、RabbitMQ简解
RabbitMQ是消息代理:它接受并转发消息。
您可以将其视为邮局:将要发布的邮件放在邮箱中时,可以确保Mailperson先生或女士最终将邮件传递给收件人。以此类推,RabbitMQ是一个邮箱,一个邮局和一个邮递员。
RabbitMQ与邮局之间的主要区别在于,它不处理纸张,而是接收,存储和转发数据消息的二进制斑点。
2、RabbitMQ使用
生产仅意味着发送。发送消息的程序是生产者.
队列是RabbitMQ内部的邮箱的名称。尽管消息流经RabbitMQ和您的应用程序,但它们只能存储在队列中。甲队列仅由主机的存储器&磁盘限制约束,它本质上是一个大的消息缓冲器。许多生产者可以发送进入一个队列的消息,许多消费者可以尝试从一个队列接收数据。这就是我们表示队列的方式.
消费与接收具有相似的含义。一个消费者是一个程序,主要是等待接收信息.
生产者,消费者和经纪人不必位于同一主机上。实际上,在大多数应用程序中却没有。一个应用程序既可以是生产者,也可以是消费者。
消息从生产到 消费的流程
RabbitMQ中Exchange 的类型
类型有4种,direct,fanout,topic,headers。
类型的使用:Exchange与队列进行绑定后,消息根据exchang的类型,按照不同的绑定规则分发消息到消息队列中,可以是一个消息被分发给多个消息队列,也可以是一个消息分发到一个消息队列。
RoutingKey
是exchange与消息队列绑定中的一个标识。有些路由类型会按照标识对应消息队列,有些路由类型忽略routingkey。
1、Exchange类型direct
这个是根据交换器名称与routingkey来找队列的。
消息从client发出,传送给交换器ChangeA,RoutingKey为routingkey,那么不管你发送给Queue1,还是Queue2一个消息都会保存在Queue1,Queue2,Queue3,三个队列中。这就是交换器的direct类型的路由规则。只要找到路由器与routingkey绑定的队列,那么他有多少队列,他就分发给多少队列。
2、Exchange类型fanout
这个类型忽略Routingkey,他为广播模式。
消息从客户端发出,只要queue与exchange有绑定,那么他不管你的Routingkey是什么他都会将消息分发给所有与该exchang绑定的队列中。
3、Exchange类型topic
他是根据RoutingKey的设置,来做匹配的,其中这里还有两个通配符为:
*,代表任意的一个词。例如topic.zlh.*,他能够匹配到,topic.zlh.one ,topic.zlh.two ,topic.zlh.abc, ....
#,代表任意多个词。例如topic.#,他能够匹配到,topic.zlh.one ,topic.zlh.two ,topic.zlh.abc, ....
4、消息队列的消费与消息确认Ack
如果一个消息队列中有大量消息等待操作时,我们可以用多个客户端来处理消息,这里的分发机制是采用负载均衡算法中的轮询。第一个消息给A,下一个消息给B,下下一个消息给A,下下下一个消息给B......以此类推。
为啦保证消息的安全性,保证此消息被正确处理后才能在服务端的消息队列中删除。那么rabbitmq提供啦ack应答机制,来实现这一功能。
ack应答有两种方式:1、自动应答,2、手动应答。
//定义这个队列的消费者
QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
//false为手动应答,true为自动应答
channel.BasicConsume(queueName, false, consumer);
知识兔3、RabbitMQ服务安装
服务安装软件路径:https://www.rabbitmq.com/install-windows.html#chocolatey
下载下图exe文件并安装,注意安装前要安装RabbitMQ依赖64位Erlang,https://www.erlang.org/downloads
(http://erlang.org/download/otp_win64_22.1.exe)
Erlang安装程序必须使用管理帐户运行, 否则RabbitMQ安装程序期望的注册表项将不存在
安装好RabbitMQ的服务,会自动启动。如下图
可以从开始菜单停止/重新安装/启动RabbitMQ服务。
4、RabbitMQ程序实现
依赖安装RabbitMQ
我们生成两个项目,一个用于发布者,一个用于消费者:
发布者客户端代码:
1 using RabbitMQ.Client;
2 using System; 5 using System.Text; 7
8 namespace RabbitMQ.Send
9 {
10 class Program
11 {
12 /// <summary>
13 /// 连接配置
14 /// </summary>
15 private static readonly ConnectionFactory connectingFactory = new ConnectionFactory()
16 {
17 HostName = "localhost",
18 UserName = "guest",
19 Password = "guest",
20 Port = 5672,
21 Uri = new Uri("amqp://guest:guest@localhost:5672/")
22 };
23 /// <summary>
24 /// 路由名称
25 /// </summary>
26 const string ExchangeName = "ExchangeName";
27 /// <summary>
28 /// 队列名称
29 /// </summary>
30 const string QueueName = "QueueName";
31
32
33 static void Main(string[] args)
34 {
35 Console.InputEncoding = Encoding.Unicode;
36 Console.OutputEncoding = Encoding.Unicode;
37 DirectExchangeSendMsg();
38
39 Console.WriteLine("按任意键退出程序!");
40 Console.ReadLine();
41 }
42
43 /// <summary>
44 /// 单点精确路由模式
45 /// </summary>
46 private static void DirectExchangeSendMsg()
47 {
48 using (var conn = connectingFactory.CreateConnection())
49 {
50 using (IModel channel = conn.CreateModel())
51 {
52 channel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);
53 channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
54 channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName);
55
56 var props = channel.CreateBasicProperties();
57 props.Persistent = true;
58 Console.WriteLine("请输入需要发送的消息:");
59 string vadata = Console.ReadLine();
60 while (vadata != "exit")
61 {
62 var msgBody = Encoding.UTF8.GetBytes(vadata);
63 channel.BasicPublish(exchange: ExchangeName, routingKey: QueueName, basicProperties: props, body: msgBody);
64 Console.WriteLine(string.Format("发送时间:{0},发送完毕,输入exit退出消息发送", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")));
65 vadata = Console.ReadLine();
66 }
67 }
68 }
69 }
70 }
71 }
知识兔消费者客户端代码:
1 using RabbitMQ.Client;
2 using System; 5 using System.Text; 7
8 namespace RabbitMQ.Receive
9 {
10 class Program
11 {
12 private static readonly ConnectionFactory rabbitFactory = new ConnectionFactory()
13 {
14 HostName = "localhost",
15 UserName = "guest",
16 Password = "guest",
17 Port = 5672
18 };
19 /// <summary>
20 /// 路由名称
21 /// </summary>
22 const string ExchangeName = "ExchangeName";
23 /// <summary>
24 /// 对列名称
25 /// </summary>
26 const string QueueName = "QueueName";
27
28 static void Main(string[] args)
29 {
30 Console.InputEncoding = Encoding.Unicode;
31 Console.OutputEncoding = Encoding.Unicode;
32 DirectAcceptExchange();
33
34 Console.WriteLine("输入任意值退出程序!");
35 Console.ReadLine();
36 }
37
38 private static void DirectAcceptExchange()
39 {
40 using (IConnection connection = rabbitFactory.CreateConnection())
41 {
42 using (IModel channel = connection.CreateModel())
43 {
44 channel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);
45 //声明队列
46 channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
47 channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName);
48
49 while (true)
50 {
51 BasicGetResult msgResponse = channel.BasicGet(QueueName, autoAck: false);
52 if (msgResponse != null)
53 {
54 var msgBody = Encoding.UTF8.GetString(msgResponse.Body);
55 Console.WriteLine(string.Format("接收时间:{0},消息内容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody));
56 }
57 }
58 }
59 }
60 }
61 }
62 }
知识兔5、运行效果
优秀博文: