好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

TimeCacheMap

TimeCacheMap

Storm中使用一种叫做TimeCacheMap的数据结构,用于在内存中保存近期活跃的对象,它的实现非常地高效,而且可以自动删除过期不再活跃的对象。

TimeCacheMap使用多个桶buckets来缩小锁的粒度,以此换取高并发读写性能。下面我们来看看 TimeCacheMap 内部是如何实现的。

1.  实现原理

桶链表:链表中每个元素是一个HashMap,用于保存key,value格式的数据。

     private  LinkedList<HashMap<K, V>> _buckets;

锁对象:用于对TimeCacheMap进行get/put等操作时上锁保证原子性。

     private   final  Object _lock =  new  Object();

后台清理线程:负责超时后清理数据。

     private  Thread _cleaner;

超时回调接口:用于超时后进行函数回调,做一些其他处理。

     public   static   interface  ExpiredCallback<K, V>  {
          public   void   expire(K key, V val);
    }
      private  ExpiredCallback _callback;

有了以上数据结构,下面来看看构造函数的具体实现:

1、 首先,初始化指定个数的bucket,以链式链表形式存储,每个bucket中放入空的HashMap;

2、 然后,设置清理线程,处理流程为:

  a)   休眠expirationMillis / (numBuckets-1)毫秒时间(即:expirationSecs / (numBuckets-1)秒);

  b)   对_lock对象上锁,然后从buckets链表中移除最后一个元素;

  c)   向buckets链表头部新加入一个空的HashMap桶,解除_lock对象锁;

  d)   如果设置了callback函数,则进行回调。

     public  TimeCacheMap( int  expirationSecs,  int  numBuckets, ExpiredCallback<K, V>  callback) {
          if (numBuckets<2 ) {
              throw   new  IllegalArgumentException("numBuckets must be >= 2" );
        }
        _buckets  =  new  LinkedList<HashMap<K, V>> ();
          for ( int  i=0; i<numBuckets; i++ ) {
            _buckets.add(  new  HashMap<K, V> ());
        }


        _callback  =  callback;
          final   long  expirationMillis = expirationSecs * 1000L ;
          final   long  sleepTime = expirationMillis / (numBuckets-1 );
        _cleaner  =  new  Thread( new   Runnable() {
              public   void   run() {
                  try   {
                      while ( true  ) {
                        Map <K, V> dead =  null  ;
                        Time.sleep(sleepTime);
                          synchronized  (_lock) {
                            dead  =  _buckets.removeLast();
                            _buckets.addFirst(  new  HashMap<K, V> ());
                        }
                          if (_callback!= null  ) {
                              for (Entry<K, V>  entry: dead.entrySet()) {
                                _callback.expire(entry.getKey(), entry.getValue());
                            }
                        }
                    }
                }   catch   (InterruptedException ex) {

                }
            }
        });
        _cleaner.setDaemon(  true  );
        _cleaner.start();
    } 

构造函数需要传递三个参数:expirationSecs:超时的时间,单位为秒;numBuckets:桶的个数;callback:超时回调函数。

为了方便使用,还提供了以下三种形式的构造函数,使用时可以根据需要选择:

     //  this default ensures things expire at most 50% past the expiration time 
     private   static   final   int  DEFAULT_NUM_BUCKETS = 3 ;
      public  TimeCacheMap( int  expirationSecs, ExpiredCallback<K, V>  callback) {
          this  (expirationSecs, DEFAULT_NUM_BUCKETS, callback);
    }
      public  TimeCacheMap( int  expirationSecs,  int   numBuckets) {
          this (expirationSecs, numBuckets,  null  );
    }
      public  TimeCacheMap( int   expirationSecs) {
          this  (expirationSecs, DEFAULT_NUM_BUCKETS);
    } 

2.  性能分析

get操作:遍历各个bucket,如果存在指定的key则返回,时间复杂度为O(numBuckets)

     public   V get(K key) {
          synchronized  (_lock) {
              for (HashMap<K, V>  bucket: _buckets) {
                  if  (bucket.containsKey(key)) {
                      return   bucket.get(key);
                }
            }
              return   null  ;
        }
    } 

put操作:将key,value放到_buckets的第一个桶中,然后遍历其他numBuckets-1个桶,从HashMap中移除其中键为key的记录,时间复杂度为O(numBuckets)

     public   void   put(K key, V value) {
          synchronized  (_lock) {
            Iterator <HashMap<K, V>> it =  _buckets.iterator();
            HashMap <K, V> bucket =  it.next();
            bucket.put(key, value);
              while  (it.hasNext()) {
                bucket  =  it.next();
                bucket.remove(key);
            }
        }
    } 

remove操作:遍历各个bucket,如果存在以key为键的记录,直接删除,时间复杂度为O(numBuckets)

     public   Object remove(K key) {
          synchronized  (_lock) {
              for (HashMap<K, V>  bucket: _buckets) {
                  if  (bucket.containsKey(key)) {
                      return   bucket.remove(key);
                }
            }
              return   null  ;
        }
    } 

containsKey操作:遍历各个bucket,如果存在指定的key则返回true,否则返回false,时间复杂度为O(numBuckets)

     public   boolean   containsKey(K key) {
          synchronized  (_lock) {
              for (HashMap<K, V>  bucket: _buckets) {
                  if  (bucket.containsKey(key)) {
                      return   true  ;
                }
            }
              return   false  ;
        }
    } 

size操作:遍历各个bucket,累加各个bucket的HashMap的大小,时间复杂度为O (numBuckets)

     public   int   size() {
          synchronized  (_lock) {
              int  size = 0 ;
              for (HashMap<K, V>  bucket: _buckets) {
                size += bucket.size();
            }
              return   size;
        }
    } 

3.  超时时间

经过上面对put操作和_cleaner线程的分析,我们已经知道:

  a) put操作将数据放到_buckets的第一个桶中,然后遍历其他numBuckets-1个桶,从HashMap中移除其中键为key的记录;

  b) _cleaner线程每隔expirationSecs / (numBuckets-1)秒会把_buckets中最后一个桶中的数据从TimeCacheMap中移除掉。

因此,假设_cleaner线程刚刚清理数据,put函数调用发生将key放入桶中,那么一条数据的超时时间为:

expirationSecs / (numBuckets-1) * numBuckets = expirationSecs * (1 + 1 / (numBuckets-1))

然而,假设put函数调用刚刚执行结束,_cleaner线程就开始清理数据,那么一条数据的超时时间为:

expirationSecs / (numBuckets-1) * numBuckets - expirationSecs / (numBuckets-1) = expirationSecs

4.  总结

1、 TimeCacheMap的高效之处在于锁的粒度小,O(1)时间内完成锁操作,因此,大部分时间内都可以进行get和put操作。

2、 get,put,remove,containsKey和size操作都可以在O(numBuckets)时间内完成,其中numBuckets是桶的个数,默认为3。

3、 未更新数据的超时时间在expirationSecs和expirationSecs * (1 + 1 / (numBuckets-1))之间。

 

当前标签: Storm

 

Storm常见模式——TimeCacheMap   大圆那些事 2012-06-26 12:32 阅读:112 评论:0   

 

Storm常见模式——BasicBolt   大圆那些事 2012-06-19 19:56 阅读:41 评论:0   

 

Storm常见模式——批处理   大圆那些事 2012-06-19 18:30 阅读:769 评论:0   

 

Storm常见模式——求TOP N   大圆那些事 2012-06-16 15:08 阅读:185 评论:0   

 

Storm常见模式——流聚合   大圆那些事 2012-06-04 19:26 阅读:844 评论:0   

作者: Leo_wl

    

出处: http://www.cnblogs.com/Leo_wl/

    

本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

版权信息

查看更多关于TimeCacheMap的详细内容...

  阅读:40次