Falcon.SugarApi/RabbitMqTest/OriginalTest.cs

184 lines
7.3 KiB
C#
Raw Normal View History

2023-03-06 11:24:45 +08:00
using Microsoft.VisualStudio.TestTools.UnitTesting;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
namespace RabbitMqTest
{
[TestClass]
public class OriginalTest:RabbitmqBase
{
/// <summary>
/// 轮训消息,一个生产者,多个消费者轮训接收。
/// </summary>
[TestMethod("轮训消息")]
public void BaseTest() {
var smsg1 = base.RandomString();
var smsg2 = base.RandomString();
var smsg3 = base.RandomString();
//消费者1
using var channel = GetChannel();
channel.QueueDeclare("hello",false,false,false,null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model,args) => {
2023-03-06 11:24:45 +08:00
var msg = Encoding.UTF8.GetString(args.Body.ToArray());
Console.WriteLine($"Received message 1 {msg}");
//Assert.IsTrue(msg==smsg1 || msg==smsg2);
//手动回复Ack如果非自动回复
channel.BasicAck(args.DeliveryTag,false);
};
channel.BasicConsume("hello",false,consumer);
//消费者2
using var channel2 = GetChannel();
var consumer2 = new EventingBasicConsumer(channel2);
consumer2.Received += (model,args) => {
2023-03-06 11:24:45 +08:00
var msg = Encoding.UTF8.GetString(args.Body.ToArray());
Console.WriteLine($"Received message 2 {msg}");
//Assert.IsTrue(msg==smsg1||msg==smsg2);
//手动回复Ack如果非自动回复
channel.BasicAck(args.DeliveryTag,false);
};
channel.BasicConsume("hello",false,consumer2);
//生产者
using var channel1 = GetChannel();
channel1.QueueDeclare("hello",false,false,false,null);
channel.BasicPublish("","hello",null,Encoding.UTF8.GetBytes(smsg1));
Console.WriteLine($"Send message 1 {smsg1}");
channel.BasicPublish("","hello",null,Encoding.UTF8.GetBytes(smsg2));
Console.WriteLine($"Send message 2 {smsg2}");
channel.BasicPublish("","hello",null,Encoding.UTF8.GetBytes(smsg3));
Console.WriteLine($"Send message 3 {smsg3}");
}
/// <summary>
/// 订阅发布模式。一个发布者多个订阅。消费者可以同时收到消息
/// </summary>
[TestMethod("订阅发布模式")]
public void BaseTest2() {
var exchangeName = RandomString();
var smsg = RandomString();
//消费者1
using var channel1 = GetChannel();
//创建交换机
channel1.ExchangeDeclare(exchangeName,ExchangeType.Fanout);
//创建队列
var queueName1 = channel1.QueueDeclare().QueueName;
Console.WriteLine($"创建的临时队列名1:{queueName1}");
channel1.QueueBind(queueName1,exchangeName,"",null);
var consumer1 = new EventingBasicConsumer(channel1);
consumer1.Received += (m,a) => {
2023-03-06 11:24:45 +08:00
var msgs = Encoding.UTF8.GetString(a.Body.ToArray());
Console.WriteLine($"Received message 1 {msgs}");
Assert.AreEqual(msgs,smsg);
};
channel1.BasicConsume(queueName1,true,consumer1);
//消费者2
using var channel2 = GetChannel();
//创建交换机
channel2.ExchangeDeclare(exchangeName,ExchangeType.Fanout);
//创建队列
var queueName2 = channel2.QueueDeclare().QueueName;
Console.WriteLine($"创建的临时队列名2:{queueName2}");
channel2.QueueBind(queueName2,exchangeName,"",null);
var consumer2 = new EventingBasicConsumer(channel2);
consumer2.Received += (m,a) => {
2023-03-06 11:24:45 +08:00
var msgs = Encoding.UTF8.GetString(a.Body.ToArray());
Console.WriteLine($"Received message 2 {msgs}");
Assert.AreEqual(msgs,smsg);
};
channel2.BasicConsume(queueName2,true,consumer2);
//生产者
using var channel3 = GetChannel();
channel3.ExchangeDeclare(exchangeName,ExchangeType.Fanout);
channel3.BasicPublish(exchangeName,"",null,Encoding.UTF8.GetBytes(smsg));
}
/// <summary>
/// 测试生产者先发送消息,消费者后连接接收消息
/// </summary>
[TestMethod("先生产者后消费者")]
public void BaseTest3() {
var smsg = RandomString();
//以下交换机和队列及其绑定配置需要提前在服务器上配置,否则取消相关注释自行创建
var exchangeName = "logs";
var queueName1 = "queueu1";
var queueName2 = "queueu2";
//消费者1
using var channel1 = GetChannel();
var consumer1 = new EventingBasicConsumer(channel1);
consumer1.Received += (m,a) => {
2023-03-06 11:24:45 +08:00
var msgs = Encoding.UTF8.GetString(a.Body.ToArray());
Console.WriteLine($"Received message 1 {msgs}");
Assert.AreEqual(msgs,smsg);
};
channel1.BasicConsume(queueName1,true,consumer1);
//生产者
var produce = GetChannel();
produce.BasicPublish(exchangeName,"",null,Encoding.UTF8.GetBytes(smsg));
Console.WriteLine($"Send message {smsg}");
//消费者2
using var channel2 = GetChannel();
var consumer2 = new EventingBasicConsumer(channel2);
consumer2.Received += (m,a) => {
2023-03-06 11:24:45 +08:00
var msgs = Encoding.UTF8.GetString(a.Body.ToArray());
Console.WriteLine($"Received message 2 {msgs}");
Assert.AreEqual(msgs,smsg);
};
channel2.BasicConsume(queueName2,true,consumer2);
}
/// <summary>
/// 发送到exchange后exchange需要确认.Confirm模式
/// </summary>
[TestMethod("exchangeConfirm")]
public void ExchangeConfirm() {
var exchangeName = "tempExchange";
var channel = GetChannel();
channel.ExchangeDeclare(exchangeName,ExchangeType.Fanout,false,true,null);
channel.ConfirmSelect();
channel.BasicPublish(exchangeName,"",null,RandomMsgBytes());
if(channel.WaitForConfirms()) {
Console.WriteLine("交换机确认成功!");
}
else {
Console.WriteLine("交换机确认失败!");
Assert.Fail();
}
}
/// <summary>
/// 发送到exchange后exchange需要确认.事务模式
/// </summary>
[TestMethod("exchangeTx")]
public void ExchangeConfirm2() {
var exchangeName = "tempExchange";
using var channel = GetChannel();
channel.ExchangeDeclare(exchangeName,ExchangeType.Fanout,false,true,null);
try {
channel.TxSelect();
channel.BasicPublish(exchangeName,"",null,RandomMsgBytes());
channel.TxCommit();
Console.WriteLine("交换机确认成功!");
}
catch(Exception ex) {
Console.WriteLine("交换机确认失败!");
Assert.Fail();
}
}
2023-03-06 11:24:45 +08:00
}
2023-03-06 11:24:45 +08:00
}