好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

Nacos配置中心集群原理及源码分析

Nacos作为配置中心,必然需要保证服务节点的高可用性,那么Nacos是如何实现集群的呢?

下面这个图,表示Nacos集群的部署图。

Nacos集群工作原理

Nacos作为配置中心的集群结构中,是一种无中心化节点的设计,由于没有主从节点,也没有选举机制,所以为了能够实现热备,就需要增加虚拟IP(VIP)。

Nacos的数据存储分为两部分

Mysql数据库存储,所有Nacos节点共享同一份数据,数据的副本机制由Mysql本身的主从方案来解决,从而保证数据的可靠性。 每个节点的本地磁盘,会保存一份全量数据,具体路径:/data/program/nacos-1/data/config-data/${GROUP}.

在Nacos的设计中,Mysql是一个中心数据仓库,且认为在Mysql中的数据是绝对正确的。 除此之外,Nacos在启动时会把Mysql中的数据写一份到本地磁盘。

这么设计的好处是可以提高性能,当客户端需要请求某个配置项时,服务端会想Ian从磁盘中读取对应文件返回,而磁盘的读取效率要比数据库效率高。

当配置发生变更时:

Nacos会把变更的配置保存到数据库,然后再写入本地文件。 接着发送一个HTTP请求,给到集群中的其他节点,其他节点收到事件后,从Mysql中dump刚刚写入的数据到本地文件中。

另外,NacosServer启动后,会同步启动一个定时任务,每隔6小时,会dump一次全量数据到本地文件

配置变更同步入口

当配置发生修改、删除、新增操作时,通过发布一个 notifyConfigChange 事件。

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

@PostMapping

@Secured (action = ActionTypes.WRITE, parser = ConfigResourceParser. class )

public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response,

         @RequestParam (value = "dataId" ) String dataId, @RequestParam (value = "group" ) String group,

         @RequestParam (value = "tenant" , required = false , defaultValue = StringUtils.EMPTY) String tenant,

         @RequestParam (value = "content" ) String content, @RequestParam (value = "tag" , required = false ) String tag,

         @RequestParam (value = "appName" , required = false ) String appName,

         @RequestParam (value = "src_user" , required = false ) String srcUser,

         @RequestParam (value = "config_tags" , required = false ) String configTags,

         @RequestParam (value = "desc" , required = false ) String desc,

         @RequestParam (value = "use" , required = false ) String use,

         @RequestParam (value = "effect" , required = false ) String effect,

         @RequestParam (value = "type" , required = false ) String type,

         @RequestParam (value = "schema" , required = false ) String schema) throws NacosException {

    //省略..

     if (StringUtils.isBlank(betaIps)) {

         if (StringUtils.isBlank(tag)) {

             persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, true );

             ConfigChangePublisher

                     .notifyConfigChange( new ConfigDataChangeEvent( false , dataId, group, tenant, time.getTime()));

         } else {

             persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, true );

             ConfigChangePublisher.notifyConfigChange(

                     new ConfigDataChangeEvent( false , dataId, group, tenant, tag, time.getTime()));

         }

     } //省略

     return true ;

}

AsyncNotifyService

配置数据变更事件,专门有一个监听器AsyncNotifyService,它会处理数据变更后的同步事件。

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

@Autowired

public AsyncNotifyService(ServerMemberManager memberManager) {

     this .memberManager = memberManager;

     // Register ConfigDataChangeEvent to NotifyCenter.

     NotifyCenter.registerToPublisher(ConfigDataChangeEvent. class , NotifyCenter.ringBufferSize);

     // Register A Subscriber to subscribe ConfigDataChangeEvent.

     NotifyCenter.registerSubscriber( new Subscriber() {

         @Override

         public void onEvent(Event event) {

             // Generate ConfigDataChangeEvent concurrently

             if (event instanceof ConfigDataChangeEvent) {

                 ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;

                 long dumpTs = evt.lastModifiedTs;

                 String dataId = evt.dataId;

                 String group = evt.group;

                 String tenant = evt.tenant;

                 String tag = evt.tag;

                 Collection<Member> ipList = memberManager.allMembers(); //得到集群中的ip列表

                 // 构建NotifySingleTask,并添加到队列中。

                 Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();

                 for (Member member : ipList) { //遍历集群中的每个节点

                     queue.add( new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(),

                             evt.isBeta));

                 }

                 //异步执行任务 AsyncTask

                 ConfigExecutor.executeAsyncNotify( new AsyncTask(nacosAsyncRestTemplate, queue));

             }

         }

         @Override

         public Class<? extends Event> subscribeType() {

             return ConfigDataChangeEvent. class ;

         }

     });

}

AsyncTask

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

@Override

public void run() {

     executeAsyncInvoke();

}

private void executeAsyncInvoke() {

     while (!queue.isEmpty()) { //遍历队列中的数据,直到数据为空

         NotifySingleTask task = queue.poll(); //获取task

         String targetIp = task.getTargetIP(); //获取目标ip

        

         if (memberManager.hasMember(targetIp)) { //如果集群中的ip列表包含目标ip

             // start the health check and there are ips that are not monitored, put them directly in the notification queue, otherwise notify

             //判断目标ip的健康状态

             boolean unHealthNeedDelay = memberManager.isUnHealth(targetIp); //

             if (unHealthNeedDelay) { //如果目标服务是非健康,则继续添加到队列中,延后再执行。

                 // target ip is unhealthy, then put it in the notification list

                 ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null ,

                         task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_UNHEALTH,

                         0 , task.target);

                 // get delay time and set fail count to the task

                 asyncTaskExecute(task);

             } else {

                 //构建header

                 Header header = Header.newInstance();

                 header.addParam(NotifyService.NOTIFY_HEADER_LAST_MODIFIED, String.valueOf(task.getLastModified()));

                 header.addParam(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, InetUtils.getSelfIP());

                 if (task.isBeta) {

                     header.addParam( "isBeta" , "true" );

                 }

                 AuthHeaderUtil.addIdentityToHeader(header);

                 //通过restTemplate发起远程调用,如果调用成功,则执行AsyncNotifyCallBack的回调方法

                 restTemplate.get(task.url, header, Query.EMPTY, String. class , new AsyncNotifyCallBack(task));

             }

         }

     }

}

目标节点接收请求

数据同步的请求地址为,task.url=http://192.168.8.16:8848/nacos/v1/cs/communication/dataChange?dataId=log.yaml&group=DEFAULT_GROUP

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

@GetMapping ( "/dataChange" )

public Boolean notifyConfigInfo(HttpServletRequest request, @RequestParam ( "dataId" ) String dataId,

         @RequestParam ( "group" ) String group,

         @RequestParam (value = "tenant" , required = false , defaultValue = StringUtils.EMPTY) String tenant,

         @RequestParam (value = "tag" , required = false ) String tag) {

     dataId = dataId.trim();

     group = group.trim();

     String lastModified = request.getHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED);

     long lastModifiedTs = StringUtils.isEmpty(lastModified) ? - 1 : Long.parseLong(lastModified);

     String handleIp = request.getHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP);

     String isBetaStr = request.getHeader( "isBeta" );

     if (StringUtils.isNotBlank(isBetaStr) && trueStr.equals(isBetaStr)) {

         dumpService.dump(dataId, group, tenant, lastModifiedTs, handleIp, true );

     } else {

         //

         dumpService.dump(dataId, group, tenant, tag, lastModifiedTs, handleIp);

     }

     return true ;

}

dumpService.dump用来实现配置的更新,代码如下

当前任务会被添加到DumpTaskMgr中管理。

?

1

2

3

4

5

6

7

public void dump(String dataId, String group, String tenant, String tag, long lastModified, String handleIp,

         boolean isBeta) {

     String groupKey = GroupKey2.getKey(dataId, group, tenant);

     String taskKey = String.join( "+" , dataId, group, tenant, String.valueOf(isBeta), tag);

     dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, tag, lastModified, handleIp, isBeta));

     DUMP_LOG.info( "[dump-task] add task. groupKey={}, taskKey={}" , groupKey, taskKey);

}

TaskManager.addTask, 先调用父类去完成任务添加。

?

1

2

3

4

5

@Override

public void addTask(Object key, AbstractDelayTask newTask) {

     super .addTask(key, newTask);

     MetricsMonitor.getDumpTaskMonitor().set(tasks.size());

}

在这种场景设计中,一般都会采用生产者消费者模式来完成,因此这里不难猜测到,任务会被保存到一个队列中,然后有另外一个线程来执行。

NacosDelayTaskExecuteEngine

TaskManager的父类是NacosDelayTaskExecuteEngine,

这个类中有一个成员属性protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks;,专门来保存延期执行的任务类型AbstractDelayTask.

在这个类的构造方法中,初始化了一个延期执行的任务,其中具体的任务是ProcessRunnable.

?

1

2

3

4

5

6

7

public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {

     super (logger);

     tasks = new ConcurrentHashMap<Object, AbstractDelayTask>(initCapacity);

     processingExecutor = ExecutorFactory.newSingleScheduledExecutorService( new NameThreadFactory(name));

     processingExecutor

             .scheduleWithFixedDelay( new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);

}

ProcessRunnable

?

1

2

3

4

5

6

7

8

9

10

private class ProcessRunnable implements Runnable {

     @Override

     public void run() {

         try {

             processTasks();

         } catch (Throwable e) {

             getEngineLog().error(e.toString(), e);

         }

     }

}

processTasks

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

protected void processTasks() {

     //获取所有的任务

     Collection<Object> keys = getAllTaskKeys();

     for (Object taskKey : keys) {

         AbstractDelayTask task = removeTask(taskKey);

         if ( null == task) {

             continue ;

         }

         //获取任务处理器,这里返回的是DumpProcessor

         NacosTaskProcessor processor = getProcessor(taskKey);

         if ( null == processor) {

             getEngineLog().error( "processor not found for task, so discarded. " + task);

             continue ;

         }

         try {

             // ReAdd task if process failed

             //执行具体任务

             if (!processor.process(task)) {

                 retryFailedTask(taskKey, task);

             }

         } catch (Throwable e) {

             getEngineLog().error( "Nacos task execute error : " + e.toString(), e);

             retryFailedTask(taskKey, task);

         }

     }

}

DumpProcessor.process

读取数据库的最新数据,然后更新本地缓存和磁盘

以上就是Nacos配置中心集群原理及源码分析的详细内容,更多关于Nacos配置中心集群原理的资料请关注其它相关文章!

原文链接:https://www.cnblogs.com/mic112/p/16071963.html

查看更多关于Nacos配置中心集群原理及源码分析的详细内容...

  阅读:16次