Falcon.SugarApi/RabbitMqTest/OriginalTest.cs

184 lines
7.3 KiB
C#
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using Microsoft.VisualStudio.TestTools.UnitTesting;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
namespace RabbitMqTest
{
[TestClass]
public class OriginalTest:RabbitmqBase
{
/// <summary>
/// 轮训消息,一个生产者,多个消费者轮训接收。
/// </summary>
[TestMethod("轮训消息")]
public void BaseTest() {
var smsg1 = base.RandomString();
var smsg2 = base.RandomString();
var smsg3 = base.RandomString();
//消费者1
using var channel = GetChannel();
channel.QueueDeclare("hello",false,false,false,null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model,args) => {
var msg = Encoding.UTF8.GetString(args.Body.ToArray());
Console.WriteLine($"Received message 1 {msg}");
//Assert.IsTrue(msg==smsg1 || msg==smsg2);
//手动回复Ack如果非自动回复
channel.BasicAck(args.DeliveryTag,false);
};
channel.BasicConsume("hello",false,consumer);
//消费者2
using var channel2 = GetChannel();
var consumer2 = new EventingBasicConsumer(channel2);
consumer2.Received += (model,args) => {
var msg = Encoding.UTF8.GetString(args.Body.ToArray());
Console.WriteLine($"Received message 2 {msg}");
//Assert.IsTrue(msg==smsg1||msg==smsg2);
//手动回复Ack如果非自动回复
channel.BasicAck(args.DeliveryTag,false);
};
channel.BasicConsume("hello",false,consumer2);
//生产者
using var channel1 = GetChannel();
channel1.QueueDeclare("hello",false,false,false,null);
channel.BasicPublish("","hello",null,Encoding.UTF8.GetBytes(smsg1));
Console.WriteLine($"Send message 1 {smsg1}");
channel.BasicPublish("","hello",null,Encoding.UTF8.GetBytes(smsg2));
Console.WriteLine($"Send message 2 {smsg2}");
channel.BasicPublish("","hello",null,Encoding.UTF8.GetBytes(smsg3));
Console.WriteLine($"Send message 3 {smsg3}");
}
/// <summary>
/// 订阅发布模式。一个发布者多个订阅。消费者可以同时收到消息
/// </summary>
[TestMethod("订阅发布模式")]
public void BaseTest2() {
var exchangeName = RandomString();
var smsg = RandomString();
//消费者1
using var channel1 = GetChannel();
//创建交换机
channel1.ExchangeDeclare(exchangeName,ExchangeType.Fanout);
//创建队列
var queueName1 = channel1.QueueDeclare().QueueName;
Console.WriteLine($"创建的临时队列名1:{queueName1}");
channel1.QueueBind(queueName1,exchangeName,"",null);
var consumer1 = new EventingBasicConsumer(channel1);
consumer1.Received += (m,a) => {
var msgs = Encoding.UTF8.GetString(a.Body.ToArray());
Console.WriteLine($"Received message 1 {msgs}");
Assert.AreEqual(msgs,smsg);
};
channel1.BasicConsume(queueName1,true,consumer1);
//消费者2
using var channel2 = GetChannel();
//创建交换机
channel2.ExchangeDeclare(exchangeName,ExchangeType.Fanout);
//创建队列
var queueName2 = channel2.QueueDeclare().QueueName;
Console.WriteLine($"创建的临时队列名2:{queueName2}");
channel2.QueueBind(queueName2,exchangeName,"",null);
var consumer2 = new EventingBasicConsumer(channel2);
consumer2.Received += (m,a) => {
var msgs = Encoding.UTF8.GetString(a.Body.ToArray());
Console.WriteLine($"Received message 2 {msgs}");
Assert.AreEqual(msgs,smsg);
};
channel2.BasicConsume(queueName2,true,consumer2);
//生产者
using var channel3 = GetChannel();
channel3.ExchangeDeclare(exchangeName,ExchangeType.Fanout);
channel3.BasicPublish(exchangeName,"",null,Encoding.UTF8.GetBytes(smsg));
}
/// <summary>
/// 测试生产者先发送消息,消费者后连接接收消息
/// </summary>
[TestMethod("先生产者后消费者")]
public void BaseTest3() {
var smsg = RandomString();
//以下交换机和队列及其绑定配置需要提前在服务器上配置,否则取消相关注释自行创建
var exchangeName = "logs";
var queueName1 = "queueu1";
var queueName2 = "queueu2";
//消费者1
using var channel1 = GetChannel();
var consumer1 = new EventingBasicConsumer(channel1);
consumer1.Received += (m,a) => {
var msgs = Encoding.UTF8.GetString(a.Body.ToArray());
Console.WriteLine($"Received message 1 {msgs}");
Assert.AreEqual(msgs,smsg);
};
channel1.BasicConsume(queueName1,true,consumer1);
//生产者
var produce = GetChannel();
produce.BasicPublish(exchangeName,"",null,Encoding.UTF8.GetBytes(smsg));
Console.WriteLine($"Send message {smsg}");
//消费者2
using var channel2 = GetChannel();
var consumer2 = new EventingBasicConsumer(channel2);
consumer2.Received += (m,a) => {
var msgs = Encoding.UTF8.GetString(a.Body.ToArray());
Console.WriteLine($"Received message 2 {msgs}");
Assert.AreEqual(msgs,smsg);
};
channel2.BasicConsume(queueName2,true,consumer2);
}
/// <summary>
/// 发送到exchange后exchange需要确认.Confirm模式
/// </summary>
[TestMethod("exchangeConfirm")]
public void ExchangeConfirm() {
var exchangeName = "tempExchange";
var channel = GetChannel();
channel.ExchangeDeclare(exchangeName,ExchangeType.Fanout,false,true,null);
channel.ConfirmSelect();
channel.BasicPublish(exchangeName,"",null,RandomMsgBytes());
if(channel.WaitForConfirms()) {
Console.WriteLine("交换机确认成功!");
}
else {
Console.WriteLine("交换机确认失败!");
Assert.Fail();
}
}
/// <summary>
/// 发送到exchange后exchange需要确认.事务模式
/// </summary>
[TestMethod("exchangeTx")]
public void ExchangeConfirm2() {
var exchangeName = "tempExchange";
using var channel = GetChannel();
channel.ExchangeDeclare(exchangeName,ExchangeType.Fanout,false,true,null);
try {
channel.TxSelect();
channel.BasicPublish(exchangeName,"",null,RandomMsgBytes());
channel.TxCommit();
Console.WriteLine("交换机确认成功!");
}
catch(Exception ex) {
Console.WriteLine("交换机确认失败!");
Assert.Fail();
}
}
}
}