C#从远程消息队列服务器获取消息的方法
以下是C#从远程消息队列服务器获取消息的完整实现方案,支持MSMQ、RabbitMQ等主流协议:
一、MSMQ远程队列消息接收【RemoteMsmqReceiver.cs】
using System;
using System.Messaging;
public class MsmqRemoteReceiver
{
// 远程队列路径格式:FormatName:DIRECT=TCP:IP\private$\队列名
private const string QUEUE_PATH = "FormatName:DIRECT=TCP:192.168.1.100\\private$\\order_queue";
public static void ReceiveMessages()
{
try
{
using (MessageQueue queue = new MessageQueue(QUEUE_PATH))
{
queue.Formatter = new XmlMessageFormatter(new[] { typeof(string) });
// 异步接收模式
queue.ReceiveCompleted += (sender, e) =>
{
var msg = e.Message;
Console.WriteLine($"收到消息:{msg.Body}");
queue.BeginReceive(); // 继续监听下一条
};
queue.BeginReceive();
Console.WriteLine("监听中,按任意键退出...");
Console.ReadKey();
}
}
catch (MessageQueueException ex)
{
Console.WriteLine($"MSMQ错误:{ex.Message}");
}
}
}
关键点:需确保远程服务器启用MSMQ服务并开放1801端口,消息格式需与发送端一致。
二、RabbitMQ远程消息消费【RabbitMqConsumer.cs】
RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
public class RabbitMqRemoteConsumer
{
private const string HOST = "10.0.0.5";
private const string QUEUE = "task_queue";
public static void StartConsuming()
{
var factory = new ConnectionFactory() {
HostName = HOST,
UserName = "admin", // 远程服务器认证
Password = "Pass123"
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
// 声明持久化队列
channel.QueueDeclare(QUEUE, durable: true, exclusive: false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($"处理消息:{message}");
channel.BasicAck(ea.DeliveryTag, false); // 手动确认
};
channel.BasicConsume(QUEUE, autoAck: false, consumer);
Console.WriteLine("等待远程消息...");
Console.ReadLine();
}
}
}
采用事件驱动模式实现高吞吐量消费,需引用RabbitMQ.Client NuGet包。
三、ActiveMQ远程连接示例【ActiveMqReceiver.cs】
Apache.NMS;
using Apache.NMS.ActiveMQ;
public class ActiveMqRemoteListener
{
public static void Listen(string brokerUri = "tcp://remote-server:61616")
{
IConnectionFactory factory = new ConnectionFactory(brokerUri);
using (IConnection connection = factory.CreateConnection())
{
connection.Start();
using (ISession session = connection.CreateSession())
{
IDestination dest = session.GetQueue("SAMPLE.QUEUE");
using (IMessageConsumer consumer = session.CreateConsumer(dest))
{
IMessage msg = consumer.Receive();
if (msg is ITextMessage textMsg)
{
Console.WriteLine($"收到:{textMsg.Text}");
}
}
}
}
}
}
需引用Apache.NMS和Apache.NMS.ActiveMQ程序集,支持故障转移协议。
关键注意事项
?1、认证配置?
RabbitMQ需设置用户名/密码或TLS证书7
MSMQ域环境需配置AD权限4
?2、错误处理?
网络中断时实现重连机制(如RabbitMQ的AutomaticRecoveryEnabled)
消息幂等性处理避免重复消费10
?3、性能优化?
RabbitMQ建议设置prefetchCount限制未确认消息数
MSMQ事务性操作影响吞吐量,非必要场景禁用
以上方案可根据实际消息中间件类型选择实现,生产环境建议增加日志记录和监控。
查看更多关于C#从远程消息队列服务器获取消息的方法的详细内容...