184 lines
7.3 KiB
C#
184 lines
7.3 KiB
C#
using Microsoft.VisualStudio.TestTools.UnitTesting;
|
||
using RabbitMQ.Client;
|
||
using RabbitMQ.Client.Events;
|
||
using System.Text;
|
||
|
||
namespace RabbitMqTest
|
||
{
|
||
[TestClass]
|
||
public class OriginalTest:RabbitmqBase
|
||
{
|
||
/// <summary>
|
||
/// 轮训消息,一个生产者,多个消费者轮训接收。
|
||
/// </summary>
|
||
[TestMethod("轮训消息")]
|
||
public void BaseTest() {
|
||
var smsg1 = base.RandomString();
|
||
var smsg2 = base.RandomString();
|
||
var smsg3 = base.RandomString();
|
||
//消费者1
|
||
using var channel = GetChannel();
|
||
channel.QueueDeclare("hello",false,false,false,null);
|
||
var consumer = new EventingBasicConsumer(channel);
|
||
consumer.Received += (model,args) => {
|
||
var msg = Encoding.UTF8.GetString(args.Body.ToArray());
|
||
Console.WriteLine($"Received message 1 {msg}");
|
||
//Assert.IsTrue(msg==smsg1 || msg==smsg2);
|
||
|
||
//手动回复Ack,如果非自动回复
|
||
channel.BasicAck(args.DeliveryTag,false);
|
||
};
|
||
channel.BasicConsume("hello",false,consumer);
|
||
|
||
//消费者2
|
||
using var channel2 = GetChannel();
|
||
var consumer2 = new EventingBasicConsumer(channel2);
|
||
consumer2.Received += (model,args) => {
|
||
var msg = Encoding.UTF8.GetString(args.Body.ToArray());
|
||
Console.WriteLine($"Received message 2 {msg}");
|
||
//Assert.IsTrue(msg==smsg1||msg==smsg2);
|
||
|
||
//手动回复Ack,如果非自动回复
|
||
channel.BasicAck(args.DeliveryTag,false);
|
||
};
|
||
channel.BasicConsume("hello",false,consumer2);
|
||
|
||
//生产者
|
||
using var channel1 = GetChannel();
|
||
channel1.QueueDeclare("hello",false,false,false,null);
|
||
channel.BasicPublish("","hello",null,Encoding.UTF8.GetBytes(smsg1));
|
||
Console.WriteLine($"Send message 1 {smsg1}");
|
||
channel.BasicPublish("","hello",null,Encoding.UTF8.GetBytes(smsg2));
|
||
Console.WriteLine($"Send message 2 {smsg2}");
|
||
channel.BasicPublish("","hello",null,Encoding.UTF8.GetBytes(smsg3));
|
||
Console.WriteLine($"Send message 3 {smsg3}");
|
||
}
|
||
|
||
/// <summary>
|
||
/// 订阅发布模式。一个发布者多个订阅。消费者可以同时收到消息
|
||
/// </summary>
|
||
[TestMethod("订阅发布模式")]
|
||
public void BaseTest2() {
|
||
var exchangeName = RandomString();
|
||
var smsg = RandomString();
|
||
|
||
//消费者1
|
||
using var channel1 = GetChannel();
|
||
//创建交换机
|
||
channel1.ExchangeDeclare(exchangeName,ExchangeType.Fanout);
|
||
//创建队列
|
||
var queueName1 = channel1.QueueDeclare().QueueName;
|
||
Console.WriteLine($"创建的临时队列名1:{queueName1}");
|
||
channel1.QueueBind(queueName1,exchangeName,"",null);
|
||
var consumer1 = new EventingBasicConsumer(channel1);
|
||
consumer1.Received += (m,a) => {
|
||
var msgs = Encoding.UTF8.GetString(a.Body.ToArray());
|
||
Console.WriteLine($"Received message 1 {msgs}");
|
||
Assert.AreEqual(msgs,smsg);
|
||
};
|
||
channel1.BasicConsume(queueName1,true,consumer1);
|
||
|
||
//消费者2
|
||
using var channel2 = GetChannel();
|
||
//创建交换机
|
||
channel2.ExchangeDeclare(exchangeName,ExchangeType.Fanout);
|
||
//创建队列
|
||
var queueName2 = channel2.QueueDeclare().QueueName;
|
||
Console.WriteLine($"创建的临时队列名2:{queueName2}");
|
||
channel2.QueueBind(queueName2,exchangeName,"",null);
|
||
var consumer2 = new EventingBasicConsumer(channel2);
|
||
consumer2.Received += (m,a) => {
|
||
var msgs = Encoding.UTF8.GetString(a.Body.ToArray());
|
||
Console.WriteLine($"Received message 2 {msgs}");
|
||
Assert.AreEqual(msgs,smsg);
|
||
};
|
||
channel2.BasicConsume(queueName2,true,consumer2);
|
||
|
||
//生产者
|
||
using var channel3 = GetChannel();
|
||
channel3.ExchangeDeclare(exchangeName,ExchangeType.Fanout);
|
||
channel3.BasicPublish(exchangeName,"",null,Encoding.UTF8.GetBytes(smsg));
|
||
}
|
||
|
||
/// <summary>
|
||
/// 测试生产者先发送消息,消费者后连接接收消息
|
||
/// </summary>
|
||
[TestMethod("先生产者后消费者")]
|
||
public void BaseTest3() {
|
||
var smsg = RandomString();
|
||
//以下交换机和队列及其绑定配置需要提前在服务器上配置,否则取消相关注释自行创建
|
||
var exchangeName = "logs";
|
||
var queueName1 = "queueu1";
|
||
var queueName2 = "queueu2";
|
||
|
||
//消费者1
|
||
using var channel1 = GetChannel();
|
||
var consumer1 = new EventingBasicConsumer(channel1);
|
||
consumer1.Received += (m,a) => {
|
||
var msgs = Encoding.UTF8.GetString(a.Body.ToArray());
|
||
Console.WriteLine($"Received message 1 {msgs}");
|
||
Assert.AreEqual(msgs,smsg);
|
||
};
|
||
channel1.BasicConsume(queueName1,true,consumer1);
|
||
|
||
//生产者
|
||
var produce = GetChannel();
|
||
produce.BasicPublish(exchangeName,"",null,Encoding.UTF8.GetBytes(smsg));
|
||
Console.WriteLine($"Send message {smsg}");
|
||
|
||
//消费者2
|
||
using var channel2 = GetChannel();
|
||
var consumer2 = new EventingBasicConsumer(channel2);
|
||
consumer2.Received += (m,a) => {
|
||
var msgs = Encoding.UTF8.GetString(a.Body.ToArray());
|
||
Console.WriteLine($"Received message 2 {msgs}");
|
||
Assert.AreEqual(msgs,smsg);
|
||
};
|
||
channel2.BasicConsume(queueName2,true,consumer2);
|
||
|
||
}
|
||
|
||
/// <summary>
|
||
/// 发送到exchange后,exchange需要确认.Confirm模式
|
||
/// </summary>
|
||
[TestMethod("exchangeConfirm")]
|
||
public void ExchangeConfirm() {
|
||
var exchangeName = "tempExchange";
|
||
|
||
var channel = GetChannel();
|
||
channel.ExchangeDeclare(exchangeName,ExchangeType.Fanout,false,true,null);
|
||
channel.ConfirmSelect();
|
||
channel.BasicPublish(exchangeName,"",null,RandomMsgBytes());
|
||
if(channel.WaitForConfirms()) {
|
||
Console.WriteLine("交换机确认成功!");
|
||
}
|
||
else {
|
||
Console.WriteLine("交换机确认失败!");
|
||
Assert.Fail();
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// 发送到exchange后,exchange需要确认.事务模式
|
||
/// </summary>
|
||
[TestMethod("exchangeTx")]
|
||
public void ExchangeConfirm2() {
|
||
var exchangeName = "tempExchange";
|
||
|
||
using var channel = GetChannel();
|
||
channel.ExchangeDeclare(exchangeName,ExchangeType.Fanout,false,true,null);
|
||
try {
|
||
channel.TxSelect();
|
||
channel.BasicPublish(exchangeName,"",null,RandomMsgBytes());
|
||
channel.TxCommit();
|
||
Console.WriteLine("交换机确认成功!");
|
||
}
|
||
catch(Exception ex) {
|
||
Console.WriteLine("交换机确认失败!");
|
||
Assert.Fail();
|
||
}
|
||
}
|
||
}
|
||
|
||
}
|