好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

HBase的Block Cache实现机制分析

HBase的Block Cache实现机制分析

本文结合HBase 0.94.1版本源码,对HBase的Block Cache实现机制进行分析,总结学习其Cache设计的核心思想。

1. 概述

HBase上Regionserver的内存分为两个部分,一部分作为Memstore,主要用来写;另外一部分作为BlockCache,主要用于读。

写请求会先写入Memstore,Regionserver会给每个region提供一个Memstore,当Memstore满64MB以后,会启动 flush刷新到磁盘。当Memstore的总大小超过限制时(heapsize * hbase.regionserver.global.memstore.upperLimit * 0.9),会强行启动flush进程,从最大的Memstore开始flush直到低于限制。 读请求先到Memstore中查数据,查不到就到BlockCache中查,再查不到就会到磁盘上读,并把读的结果放入BlockCache。由于BlockCache采用的是LRU策略,因此BlockCache达到上限(heapsize * hfile.block.cache.size * 0.85)后,会启动淘汰机制,淘汰掉最老的一批数据。

一个Regionserver上有一个BlockCache和N个Memstore,它们的大小之和不能大于等于heapsize * 0.8,否则HBase不能正常启动。

默认配置下,BlockCache为0.2,而Memstore为0.4。在注重读响应时间的应用场景下,可以将 BlockCache设置大些,Memstore设置小些,以加大缓存的命中率。

HBase RegionServer包含三个级别的Block优先级队列:

Single:如果一个Block第一次被访问,则放在这一优先级队列中; Multi:如果一个Block被多次访问,则从Single队列移到Multi队列中; InMemory:如果一个Block是inMemory的,则放到这个队列中。

以上将Cache分级思想的好处在于:

首先,通过inMemory类型Cache,可以有选择地将in-memory的column families放到RegionServer内存中,例如Meta元数据信息; 通过区分Single和Multi类型Cache,可以防止由于Scan操作带来的Cache频繁颠簸,将最少使用的Block加入到淘汰算法中。

默认配置下,对于整个BlockCache的内存,又按照以下百分比分配给Single、Multi、InMemory使用:0.25、0.50和0.25。

注意,其中InMemory队列用于保存HBase Meta表元数据信息,因此如果将数据量很大的用户表设置为InMemory的话,可能会导致Meta表缓存失效,进而对整个集群的性能产生影响。

2. 源码分析

下面是对HBase 0.94.1中相关源码(org.apache.hadoop.hbase.io.hfile.LruBlockCache)的分析过程。

2.1加入Block Cache

   /**   Concurrent map (the cache)   */ 
   private   final  ConcurrentHashMap<BlockCacheKey,CachedBlock>  map;

    /**  
   * Cache the block with the specified name and buffer.
   * <p>
   * It is assumed this will NEVER be called on an already cached block.  If
   * that is done, an exception will be thrown.
   *   @param   cacheKey block's cache key
   *   @param   buf block buffer
   *   @param   inMemory if block is in-memory
     */ 
   public   void  cacheBlock(BlockCacheKey cacheKey, Cacheable buf,  boolean   inMemory) {
    CachedBlock cb  =  map.get(cacheKey);
      if (cb !=  null  ) {
        throw   new  RuntimeException("Cached an already cached block" );
    }
    cb  =  new   CachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory);
      long  newSize = updateSizeMetrics(cb,  false  );
    map.put(cacheKey, cb);
    elements.incrementAndGet();
      if (newSize > acceptableSize() && ! evictionInProgress) {
      runEviction();
    }
  }

    /**  
   * Cache the block with the specified name and buffer.
   * <p>
   * It is assumed this will NEVER be called on an already cached block.  If
   * that is done, it is assumed that you are reinserting the same exact
   * block due to a race condition and will update the buffer but not modify
   * the size of the cache.
   *   @param   cacheKey block's cache key
   *   @param   buf block buffer
     */ 
   public   void   cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
    cacheBlock(cacheKey, buf,   false  );
  } 

1)  这里假设不会对同一个已经被缓存的BlockCacheKey重复放入cache操作;

2)  根据inMemory标志创建不同类别的CachedBlock对象:若inMemory为true则创建BlockPriority.MEMORY类型,否则创建BlockPriority.SINGLE;注意,这里只有这两种类型的Cache,因为BlockPriority.MULTI在Cache Block被重复访问时才进行创建,见CachedBlock的access方法代码:

   /**  
   * Block has been accessed.  Update its local access time.
     */ 
   public   void  access( long   accessTime) {
      this .accessTime =  accessTime;
      if ( this .priority ==  BlockPriority.SINGLE) {
        this .priority =  BlockPriority.MULTI;
    }
  } 

3)  将BlockCacheKey和创建的CachedBlock对象加入到全局的ConcurrentHashMap map中,同时做一些更新计数操作;

4)  最后判断如果加入后的Block Size大于设定的临界值且当前没有淘汰线程运行,则调用runEviction()方法启动LRU淘汰过程:

   /**   Eviction thread   */ 
   private   final   EvictionThread evictionThread;
  
    /**  
   * Multi-threaded call to run the eviction process.
     */ 
   private   void   runEviction() {
      if (evictionThread ==  null  ) {
      evict();
    }   else   {
      evictionThread.evict();
    }
  } 

其中,EvictionThread线程即是LRU淘汰的具体实现线程。下面将给出详细分析。

2.2淘汰Block Cache

EvictionThread线程主要用于与主线程的同步,从而完成Block Cache的LRU淘汰过程。

   /*  
   * Eviction thread.  Sits in waiting state until an eviction is triggered
   * when the cache size grows above the acceptable level.<p>
   *
   * Thread is triggered into action by {@link LruBlockCache#runEviction()}
     */ 
   private   static   class  EvictionThread  extends   HasThread {
      private  WeakReference<LruBlockCache>  cache;
      private   boolean  go =  true  ;

      public   EvictionThread(LruBlockCache cache) {
        super (Thread.currentThread().getName() + ".LruBlockCache.EvictionThread" );
      setDaemon(  true  );
        this .cache =  new  WeakReference<LruBlockCache> (cache);
    }

    @Override
      public   void   run() {
        while  ( this  .go) {
          synchronized ( this  ) {
            try   {
              this  .wait();
          }   catch  (InterruptedException e) {}
        }
        LruBlockCache cache  =  this  .cache.get();
          if (cache ==  null )  break  ;
        cache.evict();
      }
    }

      public   void   evict() {
        synchronized ( this  ) {
          this .notify();  //   FindBugs NN_NAKED_NOTIFY 
       }
    }

      void   shutdown() {
        this .go =  false  ;
      interrupt();
    }
  } 

EvictionThread线程启动后,调用wait被阻塞住,直到EvictionThread线程的evict方法被主线程调用时执行notify(见上面的代码分析过程,通过主线程的runEviction方法触发调用),开始执行LruBlockCache的evict方法进行真正的淘汰过程,代码如下:

   /**  
   * Eviction method.
     */ 
   void   evict() {

      //   Ensure only one eviction at a time 
     if (!evictionLock.tryLock())  return  ;

      try   {
      evictionInProgress  =  true  ;
        long  currentSize =  this  .size.get();
        long  bytesToFree = currentSize -  minSize();

        if   (LOG.isDebugEnabled()) {
        LOG.debug( "Block cache LRU eviction started; Attempting to free " + 
          StringUtils.byteDesc(bytesToFree)  + " of total=" + 
          StringUtils.byteDesc(currentSize));
      }

        if (bytesToFree <= 0)  return  ;

        //   Instantiate priority buckets 
      BlockBucket bucketSingle =  new   BlockBucket(bytesToFree, blockSize,
          singleSize());
      BlockBucket bucketMulti  =  new   BlockBucket(bytesToFree, blockSize,
          multiSize());
      BlockBucket bucketMemory  =  new   BlockBucket(bytesToFree, blockSize,
          memorySize());

        //   Scan entire map putting into appropriate buckets 
       for  (CachedBlock cachedBlock : map.values()) {
          switch  (cachedBlock.getPriority()) {
            case   SINGLE: {
            bucketSingle.add(cachedBlock);
              break  ;
          }
            case   MULTI: {
            bucketMulti.add(cachedBlock);
              break  ;
          }
            case   MEMORY: {
            bucketMemory.add(cachedBlock);
              break  ;
          }
        }
      }

      PriorityQueue <BlockBucket> bucketQueue =
         new  PriorityQueue<BlockBucket>(3 );

      bucketQueue.add(bucketSingle);
      bucketQueue.add(bucketMulti);
      bucketQueue.add(bucketMemory);

        int  remainingBuckets = 3 ;
        long  bytesFreed = 0 ;

      BlockBucket bucket;
        while ((bucket = bucketQueue.poll()) !=  null  ) {
          long  overflow =  bucket.overflow();
          if (overflow > 0 ) {
            long  bucketBytesToFree =  Math.min(overflow,
            (bytesToFree  - bytesFreed) /  remainingBuckets);
          bytesFreed  +=  bucket.free(bucketBytesToFree);
        }
        remainingBuckets -- ;
      }

        if   (LOG.isDebugEnabled()) {
          long  single =  bucketSingle.totalSize();
          long  multi =  bucketMulti.totalSize();
          long  memory =  bucketMemory.totalSize();
        LOG.debug( "Block cache LRU eviction completed; " +
          "freed=" + StringUtils.byteDesc(bytesFreed) + ", " +
          "total=" + StringUtils.byteDesc( this .size.get()) + ", " +
          "single=" + StringUtils.byteDesc(single) + ", " +
          "multi=" + StringUtils.byteDesc(multi) + ", " +
          "memory=" +  StringUtils.byteDesc(memory));
      }
    }   finally   {
      stats.evict();
      evictionInProgress  =  false  ;
      evictionLock.unlock();
    }
  } 

1)首先获取锁,保证同一时刻只有一个淘汰线程运行;

2)计算得到当前Block Cache总大小currentSize及需要被淘汰释放掉的大小bytesToFree,如果bytesToFree小于等于0则不进行后续操作;

3) 初始化创建三个BlockBucket队列,分别用于存放Single、Multi和InMemory类Block Cache,其中每个BlockBucket维护了一个CachedBlockQueue,按LRU淘汰算法维护该BlockBucket中的所有CachedBlock对象;

4) 遍历记录所有Block Cache的全局ConcurrentHashMap,加入到相应的BlockBucket队列中;

5) 将以上三个BlockBucket队列加入到一个优先级队列中,按照各个BlockBucket超出bucketSize的大小顺序排序(见BlockBucket的compareTo方法);

6) 遍历优先级队列,对于每个BlockBucket,通过Math.min(overflow, (bytesToFree - bytesFreed) / remainingBuckets)计算出需要释放的空间大小,这样做可以保证尽可能平均地从三个BlockBucket中释放指定的空间;具体实现过程详见BlockBucket的free方法,从其CachedBlockQueue中取出即将被淘汰掉的CachedBlock对象:

     public   long  free( long   toFree) {
      CachedBlock cb;
        long  freedBytes = 0 ;
        while  ((cb = queue.pollLast()) !=  null  ) {
        freedBytes  +=  evictBlock(cb);
          if  (freedBytes >=  toFree) {
            return   freedBytes;
        }
      }
        return   freedBytes;
    } 

7) 进一步调用了LruBlockCache的evictBlock方法,从全局ConcurrentHashMap中移除该CachedBlock对象,同时更新相关计数:

   protected   long   evictBlock(CachedBlock block) {
    map.remove(block.getCacheKey());
    updateSizeMetrics(block,   true  );
    elements.decrementAndGet();
    stats.evicted();
      return   block.heapSize();
  } 

8) 释放锁,完成善后工作。

3. 总结

以上关于Block Cache的实现机制,核心思想是将Cache分级,这样的好处是避免Cache之间相互影响,尤其是对HBase来说像Meta表这样的Cache应该保证高优先级。

 

作者: Leo_wl

    

出处: http://www.cnblogs.com/Leo_wl/

    

本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

版权信息

查看更多关于HBase的Block Cache实现机制分析的详细内容...

  阅读:54次