好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

Service Bus Brokered Messaging(Queues和Topic/Subscr

Service Bus Brokered Messaging(Queues和Topic/Subscription)

Windows Azure: 在Service Bus Brokered Messaging服务中使用Session

Service Bus Brokered Messaging(Queues和Topic/Subscription)中提供了对Session的支持。当我们需要接收方对消息集合进行处理时,或者是按照特定的顺序,或者是一个大消息体分拆成多个BrokeredMessage,或者是存在多个接收方的情况下,需要一系列消息完全由同一接收方实例处理,在这些情况下,Session机制为我们提供了很大的便利。这篇文章将以Queue为例子,介绍一下Session的使用以及如何使用Queue与Session实现Request/Response模式。。

如何使用Session

需要使用Session,首先在创建Queue的时候需要将QueueDescription的RequiresSession设置为true,然后在发送消息时将同一集合内的消息的SessionId设置成统一值。接收方不再通过QueueClient接收消息,而是通过QueueClient的AcceptMessageSession方法接受一次会话(MessageSession),再使用MessageSession接收这次会话包含的所有消息。

需要注意的是,如果Queue的RequiresSession设置为true,则接收方必须通过会话接收消息,否则会抛出InvalidOperationException(It is not possible for an entity that requires sessions to create a non-sessionful message receiver)。并且如果使用会话接收了一条消息,则这个会话内所有消息都由当前接收方接收,其他接收方无法接收这部分消息。所以谁先使用会话接收了第一条消息,则会话内所有消息就属于谁。

发送消息

创建队列

  1               string  queueName =  "  MyQueue  "  ;
   2              NamespaceManager namespaceClient =  NamespaceManager.Create();
   3               if   (namespaceClient.QueueExists(queueName))
   4               {
   5                   namespaceClient.DeleteQueue(queueName);
   6               }
   7              QueueDescription queueDescription =  new   QueueDescription(queueName)
   8               {
   9                  RequiresSession =  true 
 10               };
  11              namespaceClient.CreateQueue(queueDescription);        

创建客户端并发送消息

  1              MessagingFactory factory =  MessagingFactory.Create();
   2              QueueClient queueClient =  factory.CreateQueueClient(queueName);
   3  
  4               string  sessionId =  Guid.NewGuid().ToString();
   5              CreateAndSendOrderMessage(Guid.NewGuid().ToString(),  "  Beijing  "  , queueClient, sessionId);
   6              CreateAndSendOrderMessage(Guid.NewGuid().ToString(),  "  Dalian  "  , queueClient, sessionId);
   7              CreateAndSendOrderMessage(Guid.NewGuid().ToString(),  "  Guangzhou  "  , queueClient, sessionId);
   8  
  9               var  message =  new   BrokeredMessage();
  10              message.Properties.Add( "  OrderId  "  , Guid.NewGuid().ToString());
  11              message.Properties.Add( "  OrderRegion  " ,  "  Beijing  "  );
  12              Console.WriteLine( "  Sending message of Order Id:{0}, Session Id:{1}.  " , message.Properties[ "  OrderId  "  ], message.SessionId);
  13  
 14               Console.WriteLine();
  15              Console.WriteLine( "  Press [Enter] to delete queue and exit.  "  );
  16               Console.ReadLine();
  17               namespaceClient.DeleteQueue(queueName);
  18               factory.Close();
  19              queueClient.Close();

 1           private   static   void  CreateAndSendOrderMessage( string  orderId,  string  orderRegion, QueueClient sender,  string   sessionId)
  2           {
  3               var  message =  new   BrokeredMessage();
  4              message.SessionId =  sessionId;
  5              message.Properties.Add( "  OrderId  "  , orderId);
  6              message.Properties.Add( "  OrderRegion  "  , orderRegion);
  7              Console.WriteLine( "  Sending message of Order Id:{0}, Session Id:{1}.  "  , orderId, sessionId);
  8               sender.Send(message);
  9          }

使用Session接收消息

  1               string  queueName =  "  MyQueue  "  ;
   2              MessagingFactory factory =  MessagingFactory.Create();
   3              QueueClient queueClient =  factory.CreateQueueClient(queueName, ReceiveMode.PeekLock);
   4              Console.WriteLine( "  Reading messages from queue...  "  );
   5  
  6               try 
  7               {
   8                  MessageSession session = queueClient.AcceptMessageSession(TimeSpan.FromSeconds( 1  ));
   9                  BrokeredMessage message =  null  ;
  10                   int  i =  0  ;
  11                   while  ((message = session.Receive(TimeSpan.FromSeconds( 1 ))) !=  null  )
  12                   {
  13                      Console.WriteLine( "  Receiving message of Order Id:{0}, Session Id:{1}.  " , message.Properties[ "  OrderId  "  ], message.SessionId);
  14                       message.Complete();
  15                      i++ ;
  16                   }
  17               }
  18               catch 
 19               { 
  20                  
 21              }

使用Session实现Request/Response模式

我们知道,Queue的消息传送完全是异步通信,发送方只负责发送消息,接收方只负责接收,当发送方发出消息以后,接收方不需要给发送方响应,同时也不可能提供响应。而面向服务的设计中,大部分需求都是调用服务后返回调用方需要的资源,即Request/Response通信模式。那么,如果需要用Service Bus Queue来实现这种Request/Response模式,该如何做呢?接下来,我们通过一个例子来看看实现过程。

在这个例子中,包含两个应用程序,SalesApplication与ProductCatalogApplication。其中SalesApplication将产品ID发送给ProductCatalogApplication,ProductCatalogApplication根据产品ID返回产品的详细信息给SalesApplication。整个实现的难点在于对于SalesApplication,如何将响应的产品信息与请求的信息一一对应,即收到的产品信息如何唯一匹配发送的请求(也许先后请求了一样的产品ID,唯一匹配则需要知道返回的响应到底对应于哪一次发送的包含产品ID的请求),这里通过SessionID实现唯一匹配,架构如下图所示,包含两个队列,一个用于传输请求消息(RequiresSession为false),一个用于传输响应消息(RequiresSession为true)。

sales application创建了一个用于请求的包含产品ID的消息,并将ReplyToSessionId设置成唯一的值(例如Guid); 用于请求的消息发送至请求队列中; sales application调用AcceptMessageSession等待响应消息会话,接受响应会话成功以后则可以通过会话接收消息; 服务端(Product catalog application)接收请求消息; 服务端根据产品ID获取产品信息,并创建响应消息,响应消息的SessionId设置为请求消息的ReplyToSessionId; 服务端发送响应消息; sales application接收响应消息,从消息内容中获取产品详细信息,并根据消息的SessionId匹配请求,从而对消息进行处理。

SalesApplication代码:

  1               string  requestQueueName =  "  ProductRequestQueue  "  ;
   2               string  responseQueueName =  "  ProductResponseQueue  "  ;
   3              
  4  
  5              MessagingFactory factory =  MessagingFactory.Create();
   6              QueueClient requestQueueClient =  factory.CreateQueueClient(requestQueueName);
   7              QueueClient responseQueueClient =  factory.CreateQueueClient(responseQueueName);
   8  
  9               string  sessionId =  Guid.NewGuid().ToString();
  10  
 11              BrokeredMessage requestMessage =  new   BrokeredMessage();
  12              requestMessage.Properties[ "  ProductId  " ] =  Guid.NewGuid().ToString();
  13              requestMessage.ReplyToSessionId =  sessionId;
  14               requestQueueClient.Send(requestMessage);
  15              Console.WriteLine( "  Request Message sent. Session Id:{0}  "  , requestMessage.ReplyToSessionId);
  16               requestMessage.Dispose();
  17  
 18               try 
 19               {
  20                  MessageSession session =  responseQueueClient.AcceptMessageSession(sessionId);
  21                  BrokeredMessage responseMessage =  session.Receive();
  22                   if  (responseMessage !=  null  )
  23                   {
  24                       object  productInfo = responseMessage.GetBody< object > ();
  25                      Console.WriteLine( "  Response Message received. Session Id:{0}  "  , responseMessage.SessionId);
  26                   }
  27               }
  28               catch 
 29               { 
  30                  
 31               }
  32  
 33               Console.WriteLine();
  34              Console.WriteLine( "  Press [Enter] to exit.  "  );
  35               Console.ReadLine();
  36              
 37               factory.Close();
  38               requestQueueClient.Close();
  39              responseQueueClient.Close();

Product Catalog Application代码:

  1               string  requestQueueName =  "  ProductRequestQueue  "  ;
   2               string  responseQueueName =  "  ProductResponseQueue  "  ;
   3              NamespaceManager namespaceClient =  NamespaceManager.Create();
   4               if   (namespaceClient.QueueExists(requestQueueName))
   5               {
   6                   namespaceClient.DeleteQueue(requestQueueName);
   7               }
   8               if   (namespaceClient.QueueExists(responseQueueName))
   9               {
  10                   namespaceClient.DeleteQueue(responseQueueName);
  11               }
  12              QueueDescription requestQueueDescription =  new   QueueDescription(requestQueueName)
  13               {
  14                  RequiresSession =  false 
 15               };
  16               namespaceClient.CreateQueue(requestQueueDescription);
  17              QueueDescription responseQueueDescription =  new   QueueDescription(responseQueueName)
  18               {
  19                  RequiresSession =  true 
 20               };
  21               namespaceClient.CreateQueue(responseQueueDescription);
  22  
 23  
 24              MessagingFactory factory =  MessagingFactory.Create();
  25              QueueClient requestQueueClient =  factory.CreateQueueClient(requestQueueName);
  26              QueueClient responseQueueClient =  factory.CreateQueueClient(responseQueueName);
  27  
 28               while  ( true  )
  29               {
  30                  BrokeredMessage requestMessage =  null  ;
  31                   while  ((requestMessage = requestQueueClient.Receive()) !=  null  )
  32                   {
  33                      Console.WriteLine( "  Request Message received. Session Id:{0}  "  , requestMessage.ReplyToSessionId);
  34                       string  sessionId =  requestMessage.ReplyToSessionId;
  35                       object  productInfo = GetProductInfo(requestMessage.Properties[ "  ProductId  "  ].ToString());
  36                      BrokeredMessage responseMessage =  new   BrokeredMessage(productInfo);
  37                      responseMessage.SessionId =  sessionId;
  38                       responseQueueClient.Send(responseMessage);
  39                      Console.WriteLine( "  Response Message sent. Session Id:{0}  "  , responseMessage.SessionId);
  40  
 41                       requestMessage.Complete();
  42                       requestMessage.Dispose();
  43                       responseMessage.Dispose();
  44                   }
  45  
 46                   //  Thread.Sleep(5 * 1000); 
 47              }

首先运行 Product Catalog Application,因为需要先创建Queue,然后运行SalesApplication,测试结果如图所示:

请求端:

响应端:

点击  这里  下载源码。

 

 

分类:  Windows Azure

作者: Leo_wl

    

出处: http://www.cnblogs.com/Leo_wl/

    

本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

版权信息

查看更多关于Service Bus Brokered Messaging(Queues和Topic/Subscr的详细内容...

  阅读:39次