using Microsoft.VisualStudio.TestTools.UnitTesting; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Text; namespace RabbitMqTest { [TestClass] public class OriginalTest:RabbitmqBase { /// /// 轮训消息,一个生产者,多个消费者轮训接收。 /// [TestMethod("轮训消息")] public void BaseTest() { var smsg1 = base.RandomString(); var smsg2 = base.RandomString(); var smsg3 = base.RandomString(); //消费者1 using var channel = GetChannel(); channel.QueueDeclare("hello",false,false,false,null); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model,args) => { var msg = Encoding.UTF8.GetString(args.Body.ToArray()); Console.WriteLine($"Received message 1 {msg}"); //Assert.IsTrue(msg==smsg1 || msg==smsg2); //手动回复Ack,如果非自动回复 channel.BasicAck(args.DeliveryTag,false); }; channel.BasicConsume("hello",false,consumer); //消费者2 using var channel2 = GetChannel(); var consumer2 = new EventingBasicConsumer(channel2); consumer2.Received += (model,args) => { var msg = Encoding.UTF8.GetString(args.Body.ToArray()); Console.WriteLine($"Received message 2 {msg}"); //Assert.IsTrue(msg==smsg1||msg==smsg2); //手动回复Ack,如果非自动回复 channel.BasicAck(args.DeliveryTag,false); }; channel.BasicConsume("hello",false,consumer2); //生产者 using var channel1 = GetChannel(); channel1.QueueDeclare("hello",false,false,false,null); channel.BasicPublish("","hello",null,Encoding.UTF8.GetBytes(smsg1)); Console.WriteLine($"Send message 1 {smsg1}"); channel.BasicPublish("","hello",null,Encoding.UTF8.GetBytes(smsg2)); Console.WriteLine($"Send message 2 {smsg2}"); channel.BasicPublish("","hello",null,Encoding.UTF8.GetBytes(smsg3)); Console.WriteLine($"Send message 3 {smsg3}"); } /// /// 订阅发布模式。一个发布者多个订阅。消费者可以同时收到消息 /// [TestMethod("订阅发布模式")] public void BaseTest2() { var exchangeName = RandomString(); var smsg = RandomString(); //消费者1 using var channel1 = GetChannel(); //创建交换机 channel1.ExchangeDeclare(exchangeName,ExchangeType.Fanout); //创建队列 var queueName1 = channel1.QueueDeclare().QueueName; Console.WriteLine($"创建的临时队列名1:{queueName1}"); channel1.QueueBind(queueName1,exchangeName,"",null); var consumer1 = new EventingBasicConsumer(channel1); consumer1.Received += (m,a) => { var msgs = Encoding.UTF8.GetString(a.Body.ToArray()); Console.WriteLine($"Received message 1 {msgs}"); Assert.AreEqual(msgs,smsg); }; channel1.BasicConsume(queueName1,true,consumer1); //消费者2 using var channel2 = GetChannel(); //创建交换机 channel2.ExchangeDeclare(exchangeName,ExchangeType.Fanout); //创建队列 var queueName2 = channel2.QueueDeclare().QueueName; Console.WriteLine($"创建的临时队列名2:{queueName2}"); channel2.QueueBind(queueName2,exchangeName,"",null); var consumer2 = new EventingBasicConsumer(channel2); consumer2.Received += (m,a) => { var msgs = Encoding.UTF8.GetString(a.Body.ToArray()); Console.WriteLine($"Received message 2 {msgs}"); Assert.AreEqual(msgs,smsg); }; channel2.BasicConsume(queueName2,true,consumer2); //生产者 using var channel3 = GetChannel(); channel3.ExchangeDeclare(exchangeName,ExchangeType.Fanout); channel3.BasicPublish(exchangeName,"",null,Encoding.UTF8.GetBytes(smsg)); } /// /// 测试生产者先发送消息,消费者后连接接收消息 /// [TestMethod("先生产者后消费者")] public void BaseTest3() { var smsg = RandomString(); //以下交换机和队列及其绑定配置需要提前在服务器上配置,否则取消相关注释自行创建 var exchangeName = "logs"; var queueName1 = "queueu1"; var queueName2 = "queueu2"; //消费者1 using var channel1 = GetChannel(); var consumer1 = new EventingBasicConsumer(channel1); consumer1.Received += (m,a) => { var msgs = Encoding.UTF8.GetString(a.Body.ToArray()); Console.WriteLine($"Received message 1 {msgs}"); Assert.AreEqual(msgs,smsg); }; channel1.BasicConsume(queueName1,true,consumer1); //生产者 var produce = GetChannel(); produce.BasicPublish(exchangeName,"",null,Encoding.UTF8.GetBytes(smsg)); Console.WriteLine($"Send message {smsg}"); //消费者2 using var channel2 = GetChannel(); var consumer2 = new EventingBasicConsumer(channel2); consumer2.Received += (m,a) => { var msgs = Encoding.UTF8.GetString(a.Body.ToArray()); Console.WriteLine($"Received message 2 {msgs}"); Assert.AreEqual(msgs,smsg); }; channel2.BasicConsume(queueName2,true,consumer2); } /// /// 发送到exchange后,exchange需要确认.Confirm模式 /// [TestMethod("exchangeConfirm")] public void ExchangeConfirm() { var exchangeName = "tempExchange"; var channel = GetChannel(); channel.ExchangeDeclare(exchangeName,ExchangeType.Fanout,false,true,null); channel.ConfirmSelect(); channel.BasicPublish(exchangeName,"",null,RandomMsgBytes()); if(channel.WaitForConfirms()) { Console.WriteLine("交换机确认成功!"); } else { Console.WriteLine("交换机确认失败!"); Assert.Fail(); } } /// /// 发送到exchange后,exchange需要确认.事务模式 /// [TestMethod("exchangeTx")] public void ExchangeConfirm2() { var exchangeName = "tempExchange"; using var channel = GetChannel(); channel.ExchangeDeclare(exchangeName,ExchangeType.Fanout,false,true,null); try { channel.TxSelect(); channel.BasicPublish(exchangeName,"",null,RandomMsgBytes()); channel.TxCommit(); Console.WriteLine("交换机确认成功!"); } catch(Exception ex) { Console.WriteLine("交换机确认失败!"); Assert.Fail(); } } } }