好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

CQRS架构中同步服务的一种实现方式

CQRS架构中同步服务的一种实现方式

概述

同步服务(Synchronization Service)在CQRS架构中有着重要的作用。它通过访问事件总线来读取事件数据,并对事件进行派发。应用程序会向同步服务注册事件处理器,以便同步服务在派发事件的过程中,能够通过事件处理器对事件进行处理。在此,我将针对“查询数据库的同步”这一基本的CQRS应用场景,来给出一种最简单的同步服务实现方式。

回顾一下CQRS架构,在《 EntityFramework之领域驱动设计实践【扩展阅读】:CQRS体系结构模式 》一文中,我给出了一个简单的CQRS架构模型图,在该图的“事件总线(Event Bus)”与“查询数据库(Query Database)”之间,有一个Denormalizers/Synchronizers的组件,它负责侦听事件总线,并将事件数据同步到查询数据库中。在具体实现上,Denormalizer/Synchronizer通常会以服务(Service)的方式存在,也就是这里所说的“同步服务”。同步服务的实现是多样化的,基本上也都是按照项目和应用程序的具体情况进行设计,不过大体上离不开两种方式,即主动方式和被动方式。主动方式就是同步服务主动监视事件总线,发现有事件到达后便读取事件数据然后更新查询数据库;被动方式则是由事件发起方(通常是领域仓储,或者是基础结构设施所支持的通知服务等)负责通知同步服务,服务接到通知后,再到事件总线获取数据。其实两种方式都各有利弊,主动方式需要定期对事件总线做查询,这个“定期”的度就不太好把握,时间间隔太短会影响性能,间隔太长又会影响实时性;被动方式避免了定期查询带来的系统开销,但同时又加大了Command部分与Query部分之间的耦合,它需要依靠一些技术手段(比如WCF)来实现进程间通信,又或者,还需要利用一些位于基础结构层的系统组件(比如MSMQ Trigger)。我的一个想法是,在实际项目中根据情况对事件进行路由,分别结合两种方式实现事件派发与数据同步,当然,这只是我的一个设想,并没有真正实践过。

之前,我发布了Apworks开发框架的Alpha版本(地址: http://apworks.codeplex.com ),同时也针对这个版本发布了一个基于CQRS架构的演示案例:TinyLibrary CQRS(地址: http://tlibcqrs.codeplex.com )。在Alpha版本中,Apworks仅提供了基于内存对象的“直接事件总线(Direct Event Bus)”,它在获得来自领域仓储的事件后,会直接调用派发器实现事件派发,于是查询数据库也将被同步更新。Direct Event Bus的最大弊病就是要求Event Bus与Command部分在物理上被部署在同一台机器上(因为是直接内存对象实现的),而且其它任何外部系统都无法访问Event Bus,这在系统整合方面就造成了很大的困难。现在,Apworks已经能够支持基于MSMQ的总线机制了,无论是Command Bus还是Event Bus,都可以基于MSMQ来实现。通过使用MSMQ,基于CQRS架构的应用程序在系统整合的方案选取上获得了巨大的发挥空间,比如我们可以使用Biztalk Server的MSMQ Adapter来访问MSMQ。有关Biztalk Server与CQRS架构的整合,我会在另外的文章中讨论,这里不作太多介绍。

为了使得TinyLibrary CQRS演示案例能够支持当前版本的Apworks,并希望在演示中使用MSMQ替代原有的Direct Event Bus作为事件总线,就需要实现一个具有完整功能的同步服务。对于这个同步服务的实现,我对上述主动与被动两种方式进行了分析,最后决定还是采用主动方式(即定期查询MSMQ)。如果是采用被动方式,那么又有如下三个选项:

使用WCF,在仓储完成Event Store与Event Bus的两次提交(2PC)之后,以WCF客户端的角色,调用同步服务(同时也是WCF服务端的角色)中的方法,并在该方法中完成MSMQ的读取与数据库的同步 使用MSMQ Trigger,但这种方式需要实现并注册COM组件,实现起来不方便 通过Query端的查询请求来通知同步服务完成同步,也就是说仓储不需要对同步服务进行通知,同步服务本身也不去定期地查询MSMQ,而是在出现Query端的查询请求时,触发通知并完成同步任务

由于Command部分的仓储操作和Query部分的操作是非常频繁的,因此事实上第一个选项和第三个选项会频繁地通知同步服务,造成同步服务不断地读取MSMQ并处理事件同步任务,这又加重了同步服务的负担,降低了系统性能。而基于MSMQ Trigger的方式,实现则相对更为复杂。权衡一下,针对TinyLibrary CQRS这个演示案例,我还是打算采取主动方式实现同步服务。

TinyLibrary CQRS中基于MSMQ的同步服务的实现方式

总体上讲,TinyLibrary CQRS演示案例的同步服务的设计,主体上有以下几个方面:

结合Windows Service和控制台应用的实现方式 MSMQ的定期查询 事件数据读取

结合Windows Service和控制台应用的实现方式

在做服务程序调试的过程中,与读取日志相比,我们更希望能够看到一些实时的结果;而在生产环境中,服务通常以后台的形式运行,并会将一些结果、错误信息写到日志中。TinyLibrary CQRS同步服务结合了这两种方式,在开发的时候可以以控制台方式运行,后台则以Windows Service的形式运行。实现这样的效果其实很简单,首先创建一个控制台应用程序,然后向其中添加一个继承于System.ServiceProcess.ServiceBase的类,并在该类中重写OnStart、OnStop等方法以实现服务运行逻辑。控制台应用程序通常会有一个Main的静态函数作为其执行入口,那么我们只需要在这个Main静态函数中以new关键字创建刚刚新建的类的实例,即可启动服务。大致代码如下:

01 public   sealed   class   SynchronizationServiceProc : ServiceBase

02 {

03 #if !CONSOLE

04    static   void   Main()

05    {

06        ServiceBase.Run( new   SynchronizationServiceProc());

07    }

08 #endif

09       

10    public   void   StartProc()

11    {

12      // 处理启动逻辑

13    }

14     

15    public   void   StopProc()

16    {

17      // 处理停止逻辑

18    }

19     

20    protected   override   void   OnStart( string [] args)

21    {

22      this .StartProc();

23    }

24     

25    protected   override   void   OnStop()

26    {

27      this .StopProc();

28    }

29 }

30  

31 #if CONSOLE

32 class   Program

33 {

34    static   void   Main( string [] args)

35    {

36      using   (SynchronizationServiceProc proc =  new   SynchronizationServiceProc())

37      {

38        proc.StartProc();

39        Console.ReadLine();

40        proc.StopProc();

41      }

42    }

43 }

44 #endif

你会发现在上面的代码中有两个Main的静态函数,如果让它们同时存在的话,是无法编译通过的。因此,我在这个控制台程序的Build选项中,向Conditional compilation symbols添加了CONSOLE宏,并在上面的代码中加入了#if/#endif的宏判断以支持两种不同的编译方式。另外,如需通过installutil.exe命令行安装Windows Service的话,还需向这个控制台程序添加Installer Class。在此就不详述这个过程了。

MSMQ的定期查询

TinyLibrary CQRS的同步服务中,使用System.Timers.Timer类,实现对MSMQ的定期查询。事实上,TinyLibrary CQRS的同步服务并不是真正在Timer的Elapsed事件被触发的时候进行同步操作的。同步操作会被BackgroundWorker分派到另一个线程中执行,这个待会我会介绍。Timer的Elapsed事件只对MSMQ中是否有消息进行判断,首先,确定BackgroundWorker是空闲的,然后读取MSMQ并判断其中是否有消息,若有,则启动BackgroundWorker进行同步操作,否则直接返回。当下一次间隔时间到来时,如果BackgroundWorker正在处理上一次触发的任务,那么Elapsed处理函数会直接返回,于是就达到了既能持续监听MSMQ,又能有效地处理同步任务的目的。Timer的Elapsed代码如下:

01 private   void   timer_Elapsed( object   sender, ElapsedEventArgs e)

02 {

03      // 如果BackgroundWorker为空闲状态,则

04      // 查询MSMQ以确定是否有消息

05      if   (!worker.IsBusy)

06      {

07          int   messageCount = 0;

08          List< string > messageIds =  new   List< string >();

09          using   (MessageQueue messageQueue =

10              new   MessageQueue( this .EventMessageQueue))

11          {

12              var messages = messageQueue.GetAllMessages();

13              messageCount = messages.Length;

14              messageIds = messages.Select(p => p.Id).ToList();

15              messageQueue.Close();

16          }

17          // 如果MSMQ中有消息,则启动BackgroundWorker

18          // 并将所有消息的ID作为参数传给BackgroundWorker

19          if   (messageCount > 0)

20          {

21              worker.RunWorkerAsync(messageIds);

22          }

23      }

24 }

事件数据读取

这个功能是在一个单独的线程中完成的。Tiny Library CQRS的同步服务采用Background Worker实现这一机制。在Background Worker的DoWork事件处理函数中,首先读取由Timer传入的消息ID列表,然后使用MSMQ的PeekById方法根据ID读取消息内容,同时对读入的消息进行组织(比如判断消息的正确性、获取消息的二进制代码、将二进制代码反序列化为XML字符串、从XML字符串解析出领域事件的类型以及事件触发时间等信息)。最后,通过这些已组织好的数据信息构建出领域事件的实体,并使用消息派发器(Message Dispatcher)将事件派发出去。

在这里有两个需要认真思考的问题:

如果事件处理失败怎么办? - 所以我们用的是PeekById,而不是 ReceiveById。PeekById只会根据ID从MSMQ读取出消息,而不会将其移除;ReceiveById则会将消息移除 Peek、PeekById、Receive、ReceiveById都是阻塞式调用,如果读取消息不成功怎么办? - 有网上资料提议使用异步的方式,比如使用BeginReceive等,但这种方式在异步完成处理时仍需要另一个BeginReceive请求来完成下一个消息的读取操作,从实现上看无非就是多出了几个处理线程,并没有对系统性能带来太大好处,而且增加了实现的复杂度

Background Worker的DoWork事件处理函数大致如下:

01 private   void   worker_DoWork( object   sender, DoWorkEventArgs e)

02 {

03      BackgroundWorker localWorker = sender  as   BackgroundWorker;

04      if   (localWorker.CancellationPending)

05      {

06          e.Cancel =  true ;

07          return ;

08      }

09      List< string > allMessageIds = e.Argument  as   List< string >;

10      var messageCount = allMessageIds.Count;

11      List<DomainEventMessageContent> messageContents =

12          new   List<DomainEventMessageContent>();

13      using   (MessageQueue messageQueue =  new   MessageQueue( this .EventMessageQueue))

14      {

15          messageQueue.MessageReadPropertyFilter.SentTime =  true ;

16          for   ( int   i = 0; i < messageCount; i++)

17          {

18              Message message = messageQueue.PeekById(allMessageIds[i],

19                  this .EventMessageReceiveTimeout);

20              var messageContent =  new   DomainEventMessageContent(message);

21              messageContents.Add(messageContent);

22          }

23          messageQueue.Close();

24      }

25      var sortedMessageContents = messageContents.OrderBy(mc => mc.SentTime);

26      foreach   (var mc  in   sortedMessageContents)

27      {

28          bool   canRemove =  true ;

29          try

30          {

31              if   (!mc.IsValidMessage)

32                  throw   new   Exception( "Invalid Message Content." );

33              OnProcessing(mc);

34              Type eventType = Type.GetType(mc.Type);

35              if   (eventType !=  null )

36              {

37                  DomainEventXmlSerializer xmlSerializer =

38                      new   DomainEventXmlSerializer();

39                  var domainEvent = xmlSerializer.Deserialize(eventType, mc.Bytes);

40                  messageDispatcher.DispatchMessage(domainEvent);

41              }

42              else

43                  canRemove =  false ;

44          }

45          catch   (Exception ex)

46          {

47              OnProcessFailed(mc.MessageId, mc, ex);

48              canRemove =  false ;

49          }

50          finally

51          {

52              if   (canRemove)

53              {

54                  using   (MessageQueue messageQueue =

55                      new   MessageQueue( this .EventMessageQueue))

56                  {

57                      try

58                      {

59                          messageQueue.ReceiveById(mc.MessageId,

60                              this .EventMessageReceiveTimeout);

61                      }

62                      finally

63                      {

64                          messageQueue.Close();

65                      }

66                  }

67              }

68          }

69      }

70 }

 

从上面的代码可以看到,在处理和派发消息时,如果失败,则会引发ProcessFailed事件,同时会将canRemove设置为false,以防止未成功处理的消息从消息队列中移除,造成数据丢失。在finally代码块中,会对已成功处理的消息进行移除操作。

此外,在处理所有获得的消息之前,程序会首先根据消息的发送事件对消息进行排序。这样做的目的是确保消息是按照其发布的顺序进行处理的。比如修改客户信息的消息一定是在创建客户信息之后被处理的。貌似MSMQ并不能够100%确保其Send、Receive的操作是FIFO(First In First Out)的,好像是与队列是否为事务性队列有关系,这部分内容还值得继续研究。不管怎样,对消息排序总归还是行得通的。

运行效果 启动同步服务 向MSMQ中随意发送一条文本消息,同步服务会读取这个消息并试图处理。由于在处理时发现消息格式不正确,同步服务会显示出错误信息,并在MSQM中保留这个消息,以便在下一时间到来时试图再次处理该消息


创建一个UserAccountCreated的领域事件,以表示有一个用户账号被创建。通过发起RegisterUserAccount命令,Command Handler会向领域仓储保存新创建的UserAccount实体。领域仓储在保存实体(确切地说是实体的领域事件序列)时,同时会将领域事件发送到MSMQ事件总线。以下是发起这个RegisterUserAccount命令的测试代码:

01 [TestMethod]

02 public   void   CommandBus_HandleRegisterUserAccountCommandTest()

03 {

04      RegisterUserAccountCommand registerUserAccountCommand =

05          new   RegisterUserAccountCommand

06      {

07          UserName =  "daxnet" ,

08          Password= "password" ,

09          DisplayName= "Sunny Chen" ,

10          Email =  "daxnet@live.com" ,

11          ContactPhone =  "1234567" ,

12          ContactAddressZip= "201203" ,

13          ContactAddressCity= "Shanghai" ,

14          ContactAddressState= "Shanghai" ,

15          ContactAddressCountry= "China" ,

16          ContactAddressStreet= "Zuchongzhi Rd." ,

17      };

18  

19      using   (ICommandBus commandBus = appIniter

20          .Application

21          .ObjectContainer

22          .GetService<ICommandBus>())

23      {

24          commandBus.Publish(registerUserAccountCommand);

25          commandBus.Commit();

26      }

27      long   msgCnt = TestEnvironment.GetMessageCount();

28      int   recordCnt = TestEnvironment.GetDomainEventsTableRecordCount();

29      Assert.AreEqual(1, recordCnt);

30 }

同步服务在获得了来自Command部分的领域事件消息后,便对消息进行信息提取,然后使用事件派发器派发到相应的事件处理器(Event Handler),我们可以通过同步服务的输出结果看到消息已经被处理:


事件处理器(Event Handler)在获得了来自消息派发器(Event Dispatcher)的事件之后,直接使用SQL语句更新查询数据库。Event Handler代码如下:

01 public   class   TinyLibraryCQRSEventHandler : IEventHandler<UserAccountCreatedEvent>

02 {

03      private   string   queryDBConnectionString =  null ;

04       

05      private   string   QueryDBConnectionString

06      {

07          get

08          {

09              if   (queryDBConnectionString ==  null )

10                  queryDBConnectionString = ConfigurationManager

11                      .ConnectionStrings[ "QueryDBConnectionString" ].ConnectionString;

12              return   queryDBConnectionString;

13          }

14      }

15  

16      #region IHandler<UserAccountCreatedEvent> Members

17  

18      public   bool   Handle(UserAccountCreatedEvent message)

19      {

20          string   insertUserAccoutSql =  @"INSERT INTO [UserAccounts]

21 ([UserName], [Password], [DisplayName], [Email], [ContactPhone], [Address_Country],

22   [Address_State], [Address_Street], [Address_City], [Address_Zip])

23 VALUES

24 (@userName, @password, @displayName, @email,

25 @contactPhone, @country, @state, @street, @city, @zip)" ;

26  

27          var rowsAffected = SqlHelper.ExecuteNonQuery(QueryDBConnectionString,

28              CommandType.Text, insertUserAccoutSql,

29              new   SqlParameter( "@userName" , message.UserName),

30              new   SqlParameter( "@password" , message.Password),

31              new   SqlParameter( "@displayName" , message.DisplayName),

32              new   SqlParameter( "@email" , message.Email),

33              new   SqlParameter( "@contactPhone" , message.ContactPhone),

34              new   SqlParameter( "@country" , message.ContactAddressCountry),

35              new   SqlParameter( "@state" , message.ContactAddressState),

36              new   SqlParameter( "@street" , message.ContactAddressStreet),

37              new   SqlParameter( "@city" , message.ContactAddressCity),

38              new   SqlParameter( "@zip" , message.ContactAddressZip));

39  

40          return   rowsAffected > 0;

41      }

42  

43      #endregion

44 }

最后,检查查询数据库,我们发现UserAccounts数据表中已经产生了所需的记录:

总结

通过这篇文章的介绍,我们不仅了解了Tiny Library CQRS演示案例中同步服务的实现方式,我们还了解了CQRS架构中同步服务的主要任务和大致上的操作过程。当然,本文给出的这种实现方式也不是100%的能够确保所有的消息都能够被准确、正确地处理,或许有可能还是会造成数据丢失,但这至少是一种解决方案,而且还是具有相当的改进余地。针对这种方案,我们会有两个疑惑:1、MSMQ查询频率应该是多少?我在案例中使用的是5秒,太频繁会导致服务器严重过载,但太不频繁又会导致数据的不实时性。对于这个不实时性的处理,我提个方案,就是对领域事件的优先级进行规划,并根据优先级对领域事件进行路由,采用不同的同步服务进行处理。2、对于某些需要多个领域事件进行确认的业务逻辑,很抱歉,本文提供的演示案例暂不支持Saga,Apworks目前的版本也不支持Saga,这个问题我会在后续版本的Apworks框架中逐步解决。

部分代码示例

请 单击此处 下载与本文相关的部分代码示例。整个Tiny Library CQRS项目的最新版目前正在进行中,因此在codeplex上并无任何与此版本相关的签入代码。敬请谅解。

作者: Leo_wl

    

出处: http://www.cnblogs.com/Leo_wl/

    

本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

版权信息

查看更多关于CQRS架构中同步服务的一种实现方式的详细内容...

  阅读:57次