好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

C#从远程消息队列服务器获取消息的方法

C#从远程消息队列服务器获取消息的方法

以下是C#从远程消息队列服务器获取消息的完整实现方案,支持MSMQ、RabbitMQ等主流协议:

一、MSMQ远程队列消息接收【RemoteMsmqReceiver.cs】

using System;

using System.Messaging;

public class MsmqRemoteReceiver

{

    // 远程队列路径格式:FormatName:DIRECT=TCP:IP\private$\队列名

    private const string QUEUE_PATH = "FormatName:DIRECT=TCP:192.168.1.100\\private$\\order_queue";

    public static void ReceiveMessages()

    {

        try

        {

            using (MessageQueue queue = new MessageQueue(QUEUE_PATH))

            {

                queue.Formatter = new XmlMessageFormatter(new[] { typeof(string) });

                

                // 异步接收模式

                queue.ReceiveCompleted += (sender, e) => 

                {

                    var msg = e.Message;

                    Console.WriteLine($"收到消息:{msg.Body}");

                    queue.BeginReceive(); // 继续监听下一条

                };

                queue.BeginReceive();

                Console.WriteLine("监听中,按任意键退出...");

                Console.ReadKey();

            }

        }

        catch (MessageQueueException ex)

        {

            Console.WriteLine($"MSMQ错误:{ex.Message}");

        }

    }

}

关键点:需确保远程服务器启用MSMQ服务并开放1801端口,消息格式需与发送端一致。

二、RabbitMQ远程消息消费【RabbitMqConsumer.cs】

 RabbitMQ.Client;

using RabbitMQ.Client.Events;

using System.Text;

public class RabbitMqRemoteConsumer

{

    private const string HOST = "10.0.0.5";

    private const string QUEUE = "task_queue";

    public static void StartConsuming()

    {

        var factory = new ConnectionFactory() { 

            HostName = HOST,

            UserName = "admin", // 远程服务器认证

            Password = "Pass123"

        };

        using (var connection = factory.CreateConnection())

        using (var channel = connection.CreateModel())

        {

            // 声明持久化队列

            channel.QueueDeclare(QUEUE, durable: true, exclusive: false);

            

            var consumer = new EventingBasicConsumer(channel);

            consumer.Received += (model, ea) => 

            {

                var body = ea.Body.ToArray();

                var message = Encoding.UTF8.GetString(body);

                Console.WriteLine($"处理消息:{message}");

                channel.BasicAck(ea.DeliveryTag, false); // 手动确认

            };

            channel.BasicConsume(QUEUE, autoAck: false, consumer);

            Console.WriteLine("等待远程消息...");

            Console.ReadLine();

        }

    }

}

采用事件驱动模式实现高吞吐量消费,需引用RabbitMQ.Client NuGet包。

三、ActiveMQ远程连接示例【ActiveMqReceiver.cs】

 Apache.NMS;

using Apache.NMS.ActiveMQ;

public class ActiveMqRemoteListener

{

    public static void Listen(string brokerUri = "tcp://remote-server:61616")

    {

        IConnectionFactory factory = new ConnectionFactory(brokerUri);

        using (IConnection connection = factory.CreateConnection())

        {

            connection.Start();

            using (ISession session = connection.CreateSession())

            {

                IDestination dest = session.GetQueue("SAMPLE.QUEUE");

                using (IMessageConsumer consumer = session.CreateConsumer(dest))

                {

                    IMessage msg = consumer.Receive();

                    if (msg is ITextMessage textMsg)

                    {

                        Console.WriteLine($"收到:{textMsg.Text}");

                    }

                }

            }

        }

    }

}

需引用Apache.NMS和Apache.NMS.ActiveMQ程序集,支持故障转移协议。

关键注意事项

?1、认证配置?

RabbitMQ需设置用户名/密码或TLS证书7

MSMQ域环境需配置AD权限4

?2、错误处理?

网络中断时实现重连机制(如RabbitMQ的AutomaticRecoveryEnabled)

消息幂等性处理避免重复消费10

?3、性能优化?

RabbitMQ建议设置prefetchCount限制未确认消息数

MSMQ事务性操作影响吞吐量,非必要场景禁用

以上方案可根据实际消息中间件类型选择实现,生产环境建议增加日志记录和监控。


查看更多关于C#从远程消息队列服务器获取消息的方法的详细内容...

  阅读:17次