在的最后,编写了一个C#驱动RabbitMQ的简单栗子,了解了C#驱动RabbitMQ的基本用法。本章介绍RabbitMQ的四种Exchange及各种Exchange的使用场景。
1 direct类型
1 direct路由规则
上一篇最后一个栗子使用的Exchange就是direct类型的,direct类型的exchange路由规则很简单:
exchange在和queue进行binding时会设置routingkey(为了避免和下边的routingKey混淆,很多时候把这里的routingKey叫做BindingKey)
channel.QueueBind(queue:"Q1", exchange:"myexchange", routingKey:"orange");
将消息发送到Broker时会设置对应的routingkey:
channel.BasicPublish(exchange: "myexchange",routingKey: "orange", basicProperties: null, body: body);
只有RoutingKey和BindingKey完全相同时,exchange才会把消息路由到绑定的queue中去。
2 代码示例
我们知道了direact类型的交换机只有routingKey和bindingKey相同的时候才会进行消息路由,根据这一特点我们可以通过routingKey将消息路由到不同的queue中。如在进行日志处理时,需求是所有的日志都保存到文本文件,出现错误日志时则还需要短信通知以便及时处理。我们可以创建两个队列:只接收错误日志的log_error队列和接收所有日志信息的log_all队列。消费者C1处理log_error队列中消息,将这些消息通过短信通知管理员,消费者C2处理log_all队列的信息,将这些信息记录到文本文件。
生产者用于发送日志消息,代码如下:
static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "127.0.0.1", UserName = "wyy",//用户名 Password = "123321"//密码 }; //创建连接connection using (var connection = factory.CreateConnection()) { //创建通道channel using (var channel = connection.CreateModel()) { //声明交换机exchang channel.ExchangeDeclare(exchange: "myexchange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null); //声明两个队列,log_all保存所有日志,log_error保存error类型日志 channel.QueueDeclare(queue: "log_all", durable: true, exclusive: false, autoDelete: false, arguments: null); channel.QueueDeclare(queue: "log_error", durable: true, exclusive: false, autoDelete: false, arguments: null); //绑定所有日志类型到log_all队列 string[] logtypes = new string[] { "debug", "info", "warn", "error" }; foreach (string item in logtypes) { channel.QueueBind(queue: "log_all", exchange: "myexchange", routingKey: item); } //绑定错误日志到log_all队列 channel.QueueBind(queue: "log_error", exchange: "myexchange", routingKey: "error"); //准备100条测试日志信息 ListmsgList = new List (); for (int i = 1; i < 100; i++) { if (i%4==0) { msgList.Add(new LogMsg() { LogType = "info", Msg = Encoding.UTF8.GetBytes($"info第{i}条信息") }); } if (i % 4 == 1) { msgList.Add(new LogMsg() { LogType = "debug", Msg = Encoding.UTF8.GetBytes($"debug第{i}条信息") }); } if (i % 4 == 2) { msgList.Add(new LogMsg() { LogType = "warn", Msg = Encoding.UTF8.GetBytes($"warn第{i}条信息") }); } if (i % 4 == 3) { msgList.Add(new LogMsg() { LogType = "error", Msg = Encoding.UTF8.GetBytes($"error第{i}条信息") }); } } Console.WriteLine("生产者发送100条日志信息"); //发送日志信息 foreach (var item in msgList) { channel.BasicPublish(exchange: "myexchange", routingKey: item.LogType, basicProperties: null, body: item.Msg); } } } Console.ReadKey(); } }
消费者C1用于处理log_error队列中的消息,错误消息进行短信通知,代码如下:
static void Main(string[] args) { var factory = new ConnectionFactory() { //rabbitmq-server所在设备ip,这里就是本机 HostName = "127.0.0.1", UserName = "wyy",//用户名 Password = "123321"//密码 }; //创建连接connection using (var connection = factory.CreateConnection()) { //创建通道channel using (var channel = connection.CreateModel()) { //声明交换机exchang channel.ExchangeDeclare(exchange: "myexchange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null); //声明队列queue channel.QueueDeclare(queue: "log_all", durable: true, exclusive: false, autoDelete: false, arguments: null); //绑定 channel.QueueBind(queue: "log_error", exchange: "myexchange", routingKey: "error"); //定义消费者 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); //只是为了演示,并没有存入文本文件 Console.WriteLine($"接收成功!【{message}】,发送短信通知"); }; Console.WriteLine("消费者C1【接收错误日志,发送短信通知】准备就绪...."); //处理消息 channel.BasicConsume(queue: "log_error", autoAck: true, consumer: consumer); Console.ReadLine(); } } }
消费者C2用于处理log_all队列中的消息,所有消息记录到文本文件中,代码如下:
static void Main(string[] args) { var factory = new ConnectionFactory() { //rabbitmq-server所在设备ip,这里就是本机 HostName = "127.0.0.1", UserName = "wyy",//用户名 Password = "123321"//密码 }; //创建连接connection using (var connection = factory.CreateConnection()) { //创建通道channel using (var channel = connection.CreateModel()) { //声明交换机exchang channel.ExchangeDeclare(exchange: "myexchange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null); //声明队列queue channel.QueueDeclare(queue: "log_all", durable: true, exclusive: false, autoDelete: false, arguments: null); //绑定 string[] logtypes = new string[] { "debug", "info", "warn", "error" }; foreach (string item in logtypes) { channel.QueueBind(queue: "log_all", exchange: "myexchange", routingKey: item); } //定义消费者 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); //只是为了演示,并没有存入文本文件 Console.WriteLine($"接收成功!【{message}】,存入文本文件"); }; Console.WriteLine("消费者C2【接收所有日志信息,存入文本文件】准备就绪...."); //处理消息 channel.BasicConsume(queue: "log_all", autoAck: true, consumer: consumer); Console.ReadLine(); } } }
运行这三个项目,执行结果如下:
2 fanout类型
1 fanout路由规则
fanout类型的exchange路由规则是最简单的,交换机会把消息广播到与该Exchange绑定的所有queue中,即所有和该exchange绑定的队列都会收到消息。fanout类型exchange和队列绑定时不需要指定routingKey,即使指定了routingKey也会被忽略掉。路由结构如下:
fanout类型交换机主要用于发布/订阅的一些场景,如用户注册了我们的网站后,我们通过短信和邮件两种方式通知用户
2 代码示例
这里通过代码简单演示将消息同时使用短信和邮件两种方式通知用户的流程。首先声明一个fanout类型的exchange,然后声明两个队列 SMSqueue和EMAILqueue,这两个队列都和这个exchange绑定。消费者1处理EMAILqueue的消息,通过邮件方式发送通知;消费者2处理SMSqueue的消息通过短信方式发送通知。
生产者发送信息,代码如下:
static void Main(string[] args) { var factory = new ConnectionFactory() { //rabbitmq-server所在设备ip,这里就是本机 HostName = "127.0.0.1", UserName = "wyy",//用户名 Password = "123321"//密码 }; //第一步:创建连接connection using (var connection = factory.CreateConnection()) { //创建通道channel using (var channel = connection.CreateModel()) { //声明交换机exchang channel.ExchangeDeclare(exchange: "myfanoutexchange", type: ExchangeType.Fanout, durable: true, autoDelete: false, arguments: null); //声明SMSqueue队列,用于短信通知 channel.QueueDeclare(queue: "SMSqueue", durable: true, exclusive: false, autoDelete: false, arguments: null); //声明队列,Email队列,用于邮件通知 channel.QueueDeclare(queue: "EMAILqueue", durable: true, exclusive: false, autoDelete: false, arguments: null); //绑定exchange和queue channel.QueueBind(queue: "SMSqueue", exchange: "myfanoutexchange", routingKey: string.Empty,arguments:null); channel.QueueBind(queue: "EMAILqueue", exchange: "myfanoutexchange", routingKey: string.Empty, arguments: null); Console.WriteLine("生产者准备就绪...."); string message = ""; //第六步:发送消息 //在控制台输入消息,按enter键发送消息 while (!message.Equals("quit", StringComparison.CurrentCultureIgnoreCase)) { message = Console.ReadLine(); var body = Encoding.UTF8.GetBytes(message); //基本发布 channel.BasicPublish(exchange: "myfanoutexchange", routingKey: string.Empty, basicProperties: null, body: body); Console.WriteLine($"消息【{message}】已发送到队列"); } } } Console.ReadKey(); }
消费者1将EMAILqueue的消息通过邮件方式发送通知,代码如下:
static void Main(string[] args) { var factory = new ConnectionFactory() { //rabbitmq-server所在设备ip,这里就是本机 HostName = "127.0.0.1", UserName = "wyy",//用户名 Password = "123321"//密码 }; //创建连接connection using (var connection = factory.CreateConnection()) { //创建通道channel using (var channel = connection.CreateModel()) { //声明交换机exchang channel.ExchangeDeclare(exchange: "myfanoutexchange", type: ExchangeType.Fanout, durable: true, autoDelete: false, arguments: null); //声明队列queue channel.QueueDeclare(queue: "EMAILqueue", durable: true, exclusive: false, autoDelete: false, arguments: null); //绑定exchange和queue channel.QueueBind(queue: "EMAILqueue", exchange: "myfanoutexchange", routingKey: string.Empty, arguments: null); //定义消费者 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); //只是为了演示,并没有存入文本文件 Console.WriteLine($"接收成功!【{message}】,邮件通知"); }; Console.WriteLine("邮件通知服务准备就绪..."); //处理消息 channel.BasicConsume(queue: "EMAILqueue", autoAck: true, consumer: consumer); Console.ReadLine(); } } }
消费者2将SMSqueue的消息通过短信方式发送通知,代码如下:
static void Main(string[] args) { var factory = new ConnectionFactory() { //rabbitmq-server所在设备ip,这里就是本机 HostName = "127.0.0.1", UserName = "wyy",//用户名 Password = "123321"//密码 }; //创建连接connection using (var connection = factory.CreateConnection()) { //创建通道channel using (var channel = connection.CreateModel()) { //声明交换机exchang channel.ExchangeDeclare(exchange: "myfanoutexchange", type: ExchangeType.Fanout, durable: true, autoDelete: false, arguments: null); //声明队列queue channel.QueueDeclare(queue: "SMSqueue", durable: true, exclusive: false, autoDelete: false, arguments: null); //绑定exchange和queue channel.QueueBind(queue: "SMSqueue", exchange: "myfanoutexchange", routingKey: string.Empty,arguments:null); //定义消费者 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); //只是为了演示,并没有存入文本文件 Console.WriteLine($"接收成功!【{message}】,短信通知"); }; Console.WriteLine("短信通知服务准备就绪..."); //处理消息 channel.BasicConsume(queue: "myfanoutqueue", autoAck: true, consumer: consumer); Console.ReadLine(); } } }
启动这三个应用程序,执行结果如下:
3 topic类型
1 topic路由规则
topic类型exchange的路由规则也是基于routingKey和bindingKey的,其路由过程和direct类型基本一致,两者的区别在于direct类型的exchange要求routingKey和bindingKey必须相同才进行将消息路由到绑定的queue中,而topic类型的bindingKey是一个匹配规则,只要routingKey符合bindingKey的规则就可以将消息路由到绑定的queue中去,结构如下图所示。注意routingKey和bindingKey的结构都是一系列由点号连接单词的字符串,例如【aaa.bbb.ccc】。
bindingKey的两个特殊符号:*表示一个单词,#表示0或多个单词(注意是单词,而不是字符)。如下图,usa.news和usa.weather都和usa.#匹配,而usa.news和europe.news都和#.news匹配。
2 代码实现
这里使用代码实现上图中的例子,为了方便我们只定义两个队列:接收美国相关信息的usaQueue(bindingKey是usa.#)和接收新闻消息的newsQueue(bindingKey是#.news)。然后定义两个消费者,消费者1处理useaQueue的消息,消费者2处理newsQueue的消息。
生产者代码:
static void Main(string[] args) { var factory = new ConnectionFactory() { //rabbitmq-server所在设备ip,这里就是本机 HostName = "127.0.0.1", UserName = "wyy",//用户名 Password = "123321"//密码 }; //创建连接connection using (var connection = factory.CreateConnection()) { //创建通道channel using (var channel = connection.CreateModel()) { //声明交换机exchang channel.ExchangeDeclare(exchange: "mytopicExchange", type: ExchangeType.Topic, durable: true, autoDelete: false, arguments: null); //声明队列usaQueue channel.QueueDeclare(queue: "usaQueue", durable: true, exclusive: false, autoDelete: false, arguments: null); //声明队列newsQueue channel.QueueDeclare(queue: "newsQueue", durable: true, exclusive: false, autoDelete: false, arguments: null); Console.WriteLine("生产者准备就绪...."); //绑定usaQueue队列到交互机,routingKey为usa.# channel.QueueBind(queue: "usaQueue", exchange: "mytopicExchange", routingKey: "usa.#", arguments: null); //绑定newsQueue队列到交互机,routingKey为#.news channel.QueueBind(queue: "newsQueue", exchange: "mytopicExchange", routingKey: "#.news", arguments: null); ////--------------------开始发送消息 //1.发送美国新闻消息 string message1 = "美国新闻消息:内容balabala"; var body1 = Encoding.UTF8.GetBytes(message1); channel.BasicPublish(exchange: "mytopicExchange", routingKey: "usa.news", basicProperties: null, body: body1); Console.WriteLine($"消息【{message1}】已发送到队列"); //2.发送美国天气消息 string message2 = "美国天气消息:内容balabala"; var body2 = Encoding.UTF8.GetBytes(message2); channel.BasicPublish(exchange: "mytopicExchange", routingKey: "usa.weather", basicProperties: null, body: body2); Console.WriteLine($"消息【{message2}】已发送到队列"); //3.发送欧洲新闻消息 string message3 = "欧洲新闻消息:内容balabala"; var body3 = Encoding.UTF8.GetBytes(message3); channel.BasicPublish(exchange: "mytopicExchange", routingKey: "europe.news", basicProperties: null, body: body3); Console.WriteLine($"消息【{message3}】已发送到队列"); //4.发送欧洲天气消息 string message4 = "欧洲天气消息:内容balabala"; var body4 = Encoding.UTF8.GetBytes(message4); //基本发布 channel.BasicPublish(exchange: "mytopicExchange", routingKey: "europe.weather", basicProperties: null, body: body4); Console.WriteLine($"消息【{message4}】已发送到队列"); } } Console.ReadKey(); }
消费者1代码,只处理usaQueue中的消息:
static void Main(string[] args) { var factory = new ConnectionFactory() { //rabbitmq-server所在设备ip,这里就是本机 HostName = "127.0.0.1", UserName = "wyy",//用户名 Password = "123321"//密码 }; //创建连接connection using (var connection = factory.CreateConnection()) { //创建通道channel using (var channel = connection.CreateModel()) { //声明交换机exchang channel.ExchangeDeclare(exchange: "mytopicExchange", type: ExchangeType.Topic, durable: true, autoDelete: false, arguments: null); //声明队列queue channel.QueueDeclare(queue: "usaQueue", durable: true, exclusive: false, autoDelete: false, arguments: null); Console.WriteLine("usaQueue消费者准备就绪...."); //绑定usaQueue队列到交互机 channel.QueueBind(queue: "usaQueue", exchange: "mytopicExchange", routingKey: "usa.#", arguments: null); //定义消费者 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine($"接收成功!【{message}】"); }; //处理消息 channel.BasicConsume(queue: "usaQueue", autoAck: true, consumer: consumer); Console.ReadLine(); } } }
消费者2代码,只处理newsQueue中的消息:
static void Main(string[] args) { var factory = new ConnectionFactory() { //rabbitmq-server所在设备ip,这里就是本机 HostName = "127.0.0.1", UserName = "wyy",//用户名 Password = "123321"//密码 }; //创建连接connection using (var connection = factory.CreateConnection()) { //创建通道channel using (var channel = connection.CreateModel()) { //声明交换机exchang channel.ExchangeDeclare(exchange: "mytopicExchange", type: ExchangeType.Topic, durable: true, autoDelete: false, arguments: null); //声明队列queue channel.QueueDeclare(queue: "newsQueue", durable: true, exclusive: false, autoDelete: false, arguments: null); Console.WriteLine("newsQueue消费者准备就绪...."); //绑定usaQueue队列到交互机 channel.QueueBind(queue: "newsQueue", exchange: "mytopicExchange", routingKey: "#.news", arguments: null); //定义消费者 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine($"接收成功!【{message}】"); }; //处理消息 channel.BasicConsume(queue: "newsQueue", autoAck: true, consumer: consumer); Console.ReadLine(); } } }
生成者发送的四条消息中,消息1的routingKey为usa.news,同时符合usaQueue的bindingKey(usa.#)和newsQueue的bindingKey(#.news),所以消息1同时路由到两个队列中;消息2的routingKey为usa.weather只符合usa.#,发送到usaQueue;消息的rouKey为europe.news,只符合#.news,发送到newsQueue中;消息4的routingKey为europe.weahter,和两个队列的bindingKey都不符合,所以被丢弃。执行这三个Console应用程序,结果如下:
一点补充:topic类型交换机十分灵活,可以轻松实现direct和fanout类型交换机的功能。如果绑定队列时所有的bindingKey都是#,则交换机和fanout类型交换机表现一致;如果所有的bindingKey都不包含*和#,则交换机和direct类型交换机表现一致。
4 header类型
1 header路由规则
header类型路由规则和上边的几种exchange都不一样,header类型exchange不是通过routingKey进行路由的,而是通过Headers。exchange在和queue进行binding时可以设置arguments:
channel.QueueBind(queue: "Allqueue", exchange: "myheaderExchange", routingKey: string.Empty, arguments: new Dictionary{ { "x-match","all"}, { "user","jack"}, { "pass","123"} });
将消息发送到exchange时可以设置消息的Header:
var props1 = channel.CreateBasicProperties(); props1.Headers = new Dictionary() { { "user","jack"}, { "pass","123"} }; var body1 = Encoding.UTF8.GetBytes(msg1); //发送消息 channel.BasicPublish(exchange: "myheaderExchange", routingKey: string.Empty, basicProperties: props1, body: body1);
user和pass是普通的键值对,我们也可以设置其他的键值对。x-match是一个特殊的属性,当x-match为all时,aguments和basicProrperties.Headers的所有键值对都相等时才会路由到queue(AND关系);当x-match为any时,aguments和basicProrperties.Headers的键值对只要有一个相同就可以路由到queue(OR关系)。
2.代码示例
看一个简单的栗子,创建两个队列Allqueue和Anyqueue,其中Allqueue和exchange绑定时的x-match为all,Anyqueue和exchange绑定时的x-match为any;然后发送两条消息,发送第一条消息时basicProperties.Headers中的user和pass都和绑定队列时的agruments的user和pass相等,发送第二条消息是两者的pass不相等,代码如下:
static void Main(string[] args) { var factory = new ConnectionFactory() { //rabbitmq-server所在设备ip,这里就是本机 HostName = "127.0.0.1", UserName = "wyy",//用户名 Password = "123321"//密码 }; //创建连接connection using (var connection = factory.CreateConnection()) { //创建通道channel using (var channel = connection.CreateModel()) { //声明交换机exchang channel.ExchangeDeclare(exchange: "myheaderExchange", type: ExchangeType.Headers, durable: true, autoDelete: false, arguments: null); //声明Allqueue队列 channel.QueueDeclare(queue: "Allqueue", durable: true, exclusive: false, autoDelete: false, arguments: null); //声明Anyqueue队列 channel.QueueDeclare(queue: "Anyqueue", durable: true, exclusive: false, autoDelete: false, arguments: null); Console.WriteLine("生产者准备就绪...."); //发送消息消息1,user和pass都相同 //绑定exchange和Allqueue channel.QueueBind(queue: "Allqueue", exchange: "myheaderExchange", routingKey: string.Empty, arguments: new Dictionary{ { "x-match","all"}, { "user","jack"}, { "pass","123"}}); string msg1 = "user和pass都相同时发送的消息"; var props1 = channel.CreateBasicProperties(); props1.Headers = new Dictionary () { { "user","jack"}, { "pass","123"} }; var body1 = Encoding.UTF8.GetBytes(msg1); //基本发布 channel.BasicPublish(exchange: "myheaderExchange", routingKey: string.Empty, basicProperties: props1, body: body1); Console.WriteLine($"消息【{msg1}】已发送到队列"); //发送消息消息2,user和pass不完全相同 //绑定exchange和Anyqueue channel.QueueBind(queue: "Anyqueue", exchange: "myheaderExchange", routingKey: string.Empty, arguments: new Dictionary { { "x-match","any"}, { "user","jack"}, { "pass","123"},}); string msg2 = "user和pass不完全相同时发送的消息"; var props2 = channel.CreateBasicProperties(); props2.Headers = new Dictionary () { { "user","jack"}, { "pass","456"}//这里的pass和BindQueue方法的中argumens中的pass不相同 }; var body2 = Encoding.UTF8.GetBytes(msg2); //基本发布 channel.BasicPublish(exchange: "myheaderExchange", routingKey: string.Empty, basicProperties: props2, body: body2); Console.WriteLine($"消息【{msg2}】已发送到队列"); } } Console.ReadKey(); } }
执行程序,打开WebUI管理界面,结果如下,我们看到只有user和pass都相等时消息才会路由到Allqueue;user和pass只要有一个相等就会路由到Anyqueue
5 小结
RabbitMQ的交换机(exchange)的作用是路由消息,我们可以根据应用场景的不同选择合适的交换机。如果需要精准路由到队列,或者对消息进行单一维度分类(只对日志的严重程度这一维度进行分类)可以使用direct类型交换机;如果需要广播消息,可以使用fanout类型的交换机;如果对消息进行多维度分类(如例子中按照地区和消息内容类型两个维度进行分类)使用topic类型的交换机;如果消息归类的逻辑包含了较多的AND/OR逻辑判断可以使用header类型交换机(开发中很少用到Header类型,官网上关于Header类型的介绍也不多)。
【参考文章】
1. https://www.cnblogs.com/zhangweizhong/p/5713874.html
2.https://blog.csdn.net/ctwy291314/article/details/83147194