Rabbit交换机确认模式。comfirm和事务模式测试。

This commit is contained in:
falcon 2023-03-10 15:51:15 +08:00
parent 12beee8577
commit 2a9928c209
2 changed files with 57 additions and 6 deletions

View File

@ -20,7 +20,7 @@ namespace RabbitMqTest
using var channel = GetChannel(); using var channel = GetChannel();
channel.QueueDeclare("hello",false,false,false,null); channel.QueueDeclare("hello",false,false,false,null);
var consumer = new EventingBasicConsumer(channel); var consumer = new EventingBasicConsumer(channel);
consumer.Received+=(model,args) => { consumer.Received += (model,args) => {
var msg = Encoding.UTF8.GetString(args.Body.ToArray()); var msg = Encoding.UTF8.GetString(args.Body.ToArray());
Console.WriteLine($"Received message 1 {msg}"); Console.WriteLine($"Received message 1 {msg}");
//Assert.IsTrue(msg==smsg1 || msg==smsg2); //Assert.IsTrue(msg==smsg1 || msg==smsg2);
@ -33,7 +33,7 @@ namespace RabbitMqTest
//消费者2 //消费者2
using var channel2 = GetChannel(); using var channel2 = GetChannel();
var consumer2 = new EventingBasicConsumer(channel2); var consumer2 = new EventingBasicConsumer(channel2);
consumer2.Received+=(model,args) => { consumer2.Received += (model,args) => {
var msg = Encoding.UTF8.GetString(args.Body.ToArray()); var msg = Encoding.UTF8.GetString(args.Body.ToArray());
Console.WriteLine($"Received message 2 {msg}"); Console.WriteLine($"Received message 2 {msg}");
//Assert.IsTrue(msg==smsg1||msg==smsg2); //Assert.IsTrue(msg==smsg1||msg==smsg2);
@ -71,7 +71,7 @@ namespace RabbitMqTest
Console.WriteLine($"创建的临时队列名1:{queueName1}"); Console.WriteLine($"创建的临时队列名1:{queueName1}");
channel1.QueueBind(queueName1,exchangeName,"",null); channel1.QueueBind(queueName1,exchangeName,"",null);
var consumer1 = new EventingBasicConsumer(channel1); var consumer1 = new EventingBasicConsumer(channel1);
consumer1.Received+=(m,a) => { consumer1.Received += (m,a) => {
var msgs = Encoding.UTF8.GetString(a.Body.ToArray()); var msgs = Encoding.UTF8.GetString(a.Body.ToArray());
Console.WriteLine($"Received message 1 {msgs}"); Console.WriteLine($"Received message 1 {msgs}");
Assert.AreEqual(msgs,smsg); Assert.AreEqual(msgs,smsg);
@ -87,7 +87,7 @@ namespace RabbitMqTest
Console.WriteLine($"创建的临时队列名2:{queueName2}"); Console.WriteLine($"创建的临时队列名2:{queueName2}");
channel2.QueueBind(queueName2,exchangeName,"",null); channel2.QueueBind(queueName2,exchangeName,"",null);
var consumer2 = new EventingBasicConsumer(channel2); var consumer2 = new EventingBasicConsumer(channel2);
consumer2.Received+=(m,a) => { consumer2.Received += (m,a) => {
var msgs = Encoding.UTF8.GetString(a.Body.ToArray()); var msgs = Encoding.UTF8.GetString(a.Body.ToArray());
Console.WriteLine($"Received message 2 {msgs}"); Console.WriteLine($"Received message 2 {msgs}");
Assert.AreEqual(msgs,smsg); Assert.AreEqual(msgs,smsg);
@ -114,7 +114,7 @@ namespace RabbitMqTest
//消费者1 //消费者1
using var channel1 = GetChannel(); using var channel1 = GetChannel();
var consumer1 = new EventingBasicConsumer(channel1); var consumer1 = new EventingBasicConsumer(channel1);
consumer1.Received+=(m,a) => { consumer1.Received += (m,a) => {
var msgs = Encoding.UTF8.GetString(a.Body.ToArray()); var msgs = Encoding.UTF8.GetString(a.Body.ToArray());
Console.WriteLine($"Received message 1 {msgs}"); Console.WriteLine($"Received message 1 {msgs}");
Assert.AreEqual(msgs,smsg); Assert.AreEqual(msgs,smsg);
@ -129,7 +129,7 @@ namespace RabbitMqTest
//消费者2 //消费者2
using var channel2 = GetChannel(); using var channel2 = GetChannel();
var consumer2 = new EventingBasicConsumer(channel2); var consumer2 = new EventingBasicConsumer(channel2);
consumer2.Received+=(m,a) => { consumer2.Received += (m,a) => {
var msgs = Encoding.UTF8.GetString(a.Body.ToArray()); var msgs = Encoding.UTF8.GetString(a.Body.ToArray());
Console.WriteLine($"Received message 2 {msgs}"); Console.WriteLine($"Received message 2 {msgs}");
Assert.AreEqual(msgs,smsg); Assert.AreEqual(msgs,smsg);
@ -138,5 +138,46 @@ namespace RabbitMqTest
} }
/// <summary>
/// 发送到exchange后exchange需要确认.Confirm模式
/// </summary>
[TestMethod("exchangeConfirm")]
public void ExchangeConfirm() {
var exchangeName = "tempExchange";
var channel = GetChannel();
channel.ExchangeDeclare(exchangeName,ExchangeType.Fanout,false,true,null);
channel.ConfirmSelect();
channel.BasicPublish(exchangeName,"",null,RandomMsgBytes());
if(channel.WaitForConfirms()) {
Console.WriteLine("交换机确认成功!");
} }
else {
Console.WriteLine("交换机确认失败!");
Assert.Fail();
}
}
/// <summary>
/// 发送到exchange后exchange需要确认.事务模式
/// </summary>
[TestMethod("exchangeTx")]
public void ExchangeConfirm2() {
var exchangeName = "tempExchange";
using var channel = GetChannel();
channel.ExchangeDeclare(exchangeName,ExchangeType.Fanout,false,true,null);
try {
channel.TxSelect();
channel.BasicPublish(exchangeName,"",null,RandomMsgBytes());
channel.TxCommit();
Console.WriteLine("交换机确认成功!");
}
catch(Exception ex) {
Console.WriteLine("交换机确认失败!");
Assert.Fail();
}
}
}
} }

View File

@ -34,6 +34,16 @@ namespace RabbitMqTest
return sb.ToString(); return sb.ToString();
} }
/// <summary>
/// 生成随机消息字节数组
/// </summary>
/// <param name="len"></param>
/// <returns></returns>
protected virtual Byte[] RandomMsgBytes(int len = 10) {
var str = RandomString(len);
return Encoding.UTF8.GetBytes(str);
}
/// <summary> /// <summary>
/// 创建队列 /// 创建队列
/// </summary> /// </summary>