2023-03-06 11:24:45 +08:00
|
|
|
|
using Microsoft.VisualStudio.TestTools.UnitTesting;
|
|
|
|
|
using RabbitMQ.Client;
|
|
|
|
|
using RabbitMQ.Client.Events;
|
|
|
|
|
using System.Text;
|
|
|
|
|
|
|
|
|
|
namespace RabbitMqTest
|
|
|
|
|
{
|
|
|
|
|
[TestClass]
|
|
|
|
|
public class OriginalTest:RabbitmqBase
|
|
|
|
|
{
|
|
|
|
|
/// <summary>
|
|
|
|
|
/// 轮训消息,一个生产者,多个消费者轮训接收。
|
|
|
|
|
/// </summary>
|
|
|
|
|
[TestMethod("轮训消息")]
|
|
|
|
|
public void BaseTest() {
|
|
|
|
|
var smsg1 = base.RandomString();
|
|
|
|
|
var smsg2 = base.RandomString();
|
|
|
|
|
var smsg3 = base.RandomString();
|
|
|
|
|
//消费者1
|
|
|
|
|
using var channel = GetChannel();
|
|
|
|
|
channel.QueueDeclare("hello",false,false,false,null);
|
|
|
|
|
var consumer = new EventingBasicConsumer(channel);
|
2023-03-10 15:51:15 +08:00
|
|
|
|
consumer.Received += (model,args) => {
|
2023-03-06 11:24:45 +08:00
|
|
|
|
var msg = Encoding.UTF8.GetString(args.Body.ToArray());
|
|
|
|
|
Console.WriteLine($"Received message 1 {msg}");
|
|
|
|
|
//Assert.IsTrue(msg==smsg1 || msg==smsg2);
|
|
|
|
|
|
|
|
|
|
//手动回复Ack,如果非自动回复
|
|
|
|
|
channel.BasicAck(args.DeliveryTag,false);
|
|
|
|
|
};
|
|
|
|
|
channel.BasicConsume("hello",false,consumer);
|
|
|
|
|
|
|
|
|
|
//消费者2
|
|
|
|
|
using var channel2 = GetChannel();
|
|
|
|
|
var consumer2 = new EventingBasicConsumer(channel2);
|
2023-03-10 15:51:15 +08:00
|
|
|
|
consumer2.Received += (model,args) => {
|
2023-03-06 11:24:45 +08:00
|
|
|
|
var msg = Encoding.UTF8.GetString(args.Body.ToArray());
|
|
|
|
|
Console.WriteLine($"Received message 2 {msg}");
|
|
|
|
|
//Assert.IsTrue(msg==smsg1||msg==smsg2);
|
|
|
|
|
|
|
|
|
|
//手动回复Ack,如果非自动回复
|
|
|
|
|
channel.BasicAck(args.DeliveryTag,false);
|
|
|
|
|
};
|
|
|
|
|
channel.BasicConsume("hello",false,consumer2);
|
|
|
|
|
|
|
|
|
|
//生产者
|
|
|
|
|
using var channel1 = GetChannel();
|
|
|
|
|
channel1.QueueDeclare("hello",false,false,false,null);
|
|
|
|
|
channel.BasicPublish("","hello",null,Encoding.UTF8.GetBytes(smsg1));
|
|
|
|
|
Console.WriteLine($"Send message 1 {smsg1}");
|
|
|
|
|
channel.BasicPublish("","hello",null,Encoding.UTF8.GetBytes(smsg2));
|
|
|
|
|
Console.WriteLine($"Send message 2 {smsg2}");
|
|
|
|
|
channel.BasicPublish("","hello",null,Encoding.UTF8.GetBytes(smsg3));
|
|
|
|
|
Console.WriteLine($"Send message 3 {smsg3}");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
/// 订阅发布模式。一个发布者多个订阅。消费者可以同时收到消息
|
|
|
|
|
/// </summary>
|
|
|
|
|
[TestMethod("订阅发布模式")]
|
|
|
|
|
public void BaseTest2() {
|
|
|
|
|
var exchangeName = RandomString();
|
|
|
|
|
var smsg = RandomString();
|
|
|
|
|
|
|
|
|
|
//消费者1
|
|
|
|
|
using var channel1 = GetChannel();
|
|
|
|
|
//创建交换机
|
|
|
|
|
channel1.ExchangeDeclare(exchangeName,ExchangeType.Fanout);
|
|
|
|
|
//创建队列
|
|
|
|
|
var queueName1 = channel1.QueueDeclare().QueueName;
|
|
|
|
|
Console.WriteLine($"创建的临时队列名1:{queueName1}");
|
|
|
|
|
channel1.QueueBind(queueName1,exchangeName,"",null);
|
|
|
|
|
var consumer1 = new EventingBasicConsumer(channel1);
|
2023-03-10 15:51:15 +08:00
|
|
|
|
consumer1.Received += (m,a) => {
|
2023-03-06 11:24:45 +08:00
|
|
|
|
var msgs = Encoding.UTF8.GetString(a.Body.ToArray());
|
|
|
|
|
Console.WriteLine($"Received message 1 {msgs}");
|
|
|
|
|
Assert.AreEqual(msgs,smsg);
|
|
|
|
|
};
|
|
|
|
|
channel1.BasicConsume(queueName1,true,consumer1);
|
|
|
|
|
|
|
|
|
|
//消费者2
|
|
|
|
|
using var channel2 = GetChannel();
|
|
|
|
|
//创建交换机
|
|
|
|
|
channel2.ExchangeDeclare(exchangeName,ExchangeType.Fanout);
|
|
|
|
|
//创建队列
|
|
|
|
|
var queueName2 = channel2.QueueDeclare().QueueName;
|
|
|
|
|
Console.WriteLine($"创建的临时队列名2:{queueName2}");
|
|
|
|
|
channel2.QueueBind(queueName2,exchangeName,"",null);
|
|
|
|
|
var consumer2 = new EventingBasicConsumer(channel2);
|
2023-03-10 15:51:15 +08:00
|
|
|
|
consumer2.Received += (m,a) => {
|
2023-03-06 11:24:45 +08:00
|
|
|
|
var msgs = Encoding.UTF8.GetString(a.Body.ToArray());
|
|
|
|
|
Console.WriteLine($"Received message 2 {msgs}");
|
|
|
|
|
Assert.AreEqual(msgs,smsg);
|
|
|
|
|
};
|
|
|
|
|
channel2.BasicConsume(queueName2,true,consumer2);
|
|
|
|
|
|
|
|
|
|
//生产者
|
|
|
|
|
using var channel3 = GetChannel();
|
|
|
|
|
channel3.ExchangeDeclare(exchangeName,ExchangeType.Fanout);
|
|
|
|
|
channel3.BasicPublish(exchangeName,"",null,Encoding.UTF8.GetBytes(smsg));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
/// 测试生产者先发送消息,消费者后连接接收消息
|
|
|
|
|
/// </summary>
|
|
|
|
|
[TestMethod("先生产者后消费者")]
|
|
|
|
|
public void BaseTest3() {
|
|
|
|
|
var smsg = RandomString();
|
|
|
|
|
//以下交换机和队列及其绑定配置需要提前在服务器上配置,否则取消相关注释自行创建
|
|
|
|
|
var exchangeName = "logs";
|
|
|
|
|
var queueName1 = "queueu1";
|
|
|
|
|
var queueName2 = "queueu2";
|
|
|
|
|
|
|
|
|
|
//消费者1
|
|
|
|
|
using var channel1 = GetChannel();
|
|
|
|
|
var consumer1 = new EventingBasicConsumer(channel1);
|
2023-03-10 15:51:15 +08:00
|
|
|
|
consumer1.Received += (m,a) => {
|
2023-03-06 11:24:45 +08:00
|
|
|
|
var msgs = Encoding.UTF8.GetString(a.Body.ToArray());
|
|
|
|
|
Console.WriteLine($"Received message 1 {msgs}");
|
|
|
|
|
Assert.AreEqual(msgs,smsg);
|
|
|
|
|
};
|
|
|
|
|
channel1.BasicConsume(queueName1,true,consumer1);
|
|
|
|
|
|
|
|
|
|
//生产者
|
|
|
|
|
var produce = GetChannel();
|
|
|
|
|
produce.BasicPublish(exchangeName,"",null,Encoding.UTF8.GetBytes(smsg));
|
|
|
|
|
Console.WriteLine($"Send message {smsg}");
|
|
|
|
|
|
|
|
|
|
//消费者2
|
|
|
|
|
using var channel2 = GetChannel();
|
|
|
|
|
var consumer2 = new EventingBasicConsumer(channel2);
|
2023-03-10 15:51:15 +08:00
|
|
|
|
consumer2.Received += (m,a) => {
|
2023-03-06 11:24:45 +08:00
|
|
|
|
var msgs = Encoding.UTF8.GetString(a.Body.ToArray());
|
|
|
|
|
Console.WriteLine($"Received message 2 {msgs}");
|
|
|
|
|
Assert.AreEqual(msgs,smsg);
|
|
|
|
|
};
|
|
|
|
|
channel2.BasicConsume(queueName2,true,consumer2);
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
2023-03-10 15:51:15 +08:00
|
|
|
|
/// <summary>
|
|
|
|
|
/// 发送到exchange后,exchange需要确认.Confirm模式
|
|
|
|
|
/// </summary>
|
|
|
|
|
[TestMethod("exchangeConfirm")]
|
|
|
|
|
public void ExchangeConfirm() {
|
|
|
|
|
var exchangeName = "tempExchange";
|
|
|
|
|
|
|
|
|
|
var channel = GetChannel();
|
|
|
|
|
channel.ExchangeDeclare(exchangeName,ExchangeType.Fanout,false,true,null);
|
|
|
|
|
channel.ConfirmSelect();
|
|
|
|
|
channel.BasicPublish(exchangeName,"",null,RandomMsgBytes());
|
|
|
|
|
if(channel.WaitForConfirms()) {
|
|
|
|
|
Console.WriteLine("交换机确认成功!");
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
Console.WriteLine("交换机确认失败!");
|
|
|
|
|
Assert.Fail();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
/// 发送到exchange后,exchange需要确认.事务模式
|
|
|
|
|
/// </summary>
|
|
|
|
|
[TestMethod("exchangeTx")]
|
|
|
|
|
public void ExchangeConfirm2() {
|
|
|
|
|
var exchangeName = "tempExchange";
|
|
|
|
|
|
|
|
|
|
using var channel = GetChannel();
|
|
|
|
|
channel.ExchangeDeclare(exchangeName,ExchangeType.Fanout,false,true,null);
|
|
|
|
|
try {
|
|
|
|
|
channel.TxSelect();
|
|
|
|
|
channel.BasicPublish(exchangeName,"",null,RandomMsgBytes());
|
|
|
|
|
channel.TxCommit();
|
|
|
|
|
Console.WriteLine("交换机确认成功!");
|
|
|
|
|
}
|
|
|
|
|
catch(Exception ex) {
|
|
|
|
|
Console.WriteLine("交换机确认失败!");
|
|
|
|
|
Assert.Fail();
|
|
|
|
|
}
|
|
|
|
|
}
|
2023-03-06 11:24:45 +08:00
|
|
|
|
}
|
2023-03-10 15:51:15 +08:00
|
|
|
|
|
2023-03-06 11:24:45 +08:00
|
|
|
|
}
|