TimeCacheMap
Storm中使用一种叫做TimeCacheMap的数据结构,用于在内存中保存近期活跃的对象,它的实现非常地高效,而且可以自动删除过期不再活跃的对象。
TimeCacheMap使用多个桶buckets来缩小锁的粒度,以此换取高并发读写性能。下面我们来看看 TimeCacheMap 内部是如何实现的。
1. 实现原理
桶链表:链表中每个元素是一个HashMap,用于保存key,value格式的数据。
private LinkedList<HashMap<K, V>> _buckets;
锁对象:用于对TimeCacheMap进行get/put等操作时上锁保证原子性。
private final Object _lock = new Object();
后台清理线程:负责超时后清理数据。
private Thread _cleaner;
超时回调接口:用于超时后进行函数回调,做一些其他处理。
public static interface ExpiredCallback<K, V> { public void expire(K key, V val); } private ExpiredCallback _callback;
有了以上数据结构,下面来看看构造函数的具体实现:
1、 首先,初始化指定个数的bucket,以链式链表形式存储,每个bucket中放入空的HashMap;
2、 然后,设置清理线程,处理流程为:
a) 休眠expirationMillis / (numBuckets-1)毫秒时间(即:expirationSecs / (numBuckets-1)秒);
b) 对_lock对象上锁,然后从buckets链表中移除最后一个元素;
c) 向buckets链表头部新加入一个空的HashMap桶,解除_lock对象锁;
d) 如果设置了callback函数,则进行回调。
public TimeCacheMap( int expirationSecs, int numBuckets, ExpiredCallback<K, V> callback) { if (numBuckets<2 ) { throw new IllegalArgumentException("numBuckets must be >= 2" ); } _buckets = new LinkedList<HashMap<K, V>> (); for ( int i=0; i<numBuckets; i++ ) { _buckets.add( new HashMap<K, V> ()); } _callback = callback; final long expirationMillis = expirationSecs * 1000L ; final long sleepTime = expirationMillis / (numBuckets-1 ); _cleaner = new Thread( new Runnable() { public void run() { try { while ( true ) { Map <K, V> dead = null ; Time.sleep(sleepTime); synchronized (_lock) { dead = _buckets.removeLast(); _buckets.addFirst( new HashMap<K, V> ()); } if (_callback!= null ) { for (Entry<K, V> entry: dead.entrySet()) { _callback.expire(entry.getKey(), entry.getValue()); } } } } catch (InterruptedException ex) { } } }); _cleaner.setDaemon( true ); _cleaner.start(); }
构造函数需要传递三个参数:expirationSecs:超时的时间,单位为秒;numBuckets:桶的个数;callback:超时回调函数。
为了方便使用,还提供了以下三种形式的构造函数,使用时可以根据需要选择:
// this default ensures things expire at most 50% past the expiration time private static final int DEFAULT_NUM_BUCKETS = 3 ; public TimeCacheMap( int expirationSecs, ExpiredCallback<K, V> callback) { this (expirationSecs, DEFAULT_NUM_BUCKETS, callback); } public TimeCacheMap( int expirationSecs, int numBuckets) { this (expirationSecs, numBuckets, null ); } public TimeCacheMap( int expirationSecs) { this (expirationSecs, DEFAULT_NUM_BUCKETS); }
2. 性能分析
get操作:遍历各个bucket,如果存在指定的key则返回,时间复杂度为O(numBuckets)
public V get(K key) { synchronized (_lock) { for (HashMap<K, V> bucket: _buckets) { if (bucket.containsKey(key)) { return bucket.get(key); } } return null ; } }
put操作:将key,value放到_buckets的第一个桶中,然后遍历其他numBuckets-1个桶,从HashMap中移除其中键为key的记录,时间复杂度为O(numBuckets)
public void put(K key, V value) { synchronized (_lock) { Iterator <HashMap<K, V>> it = _buckets.iterator(); HashMap <K, V> bucket = it.next(); bucket.put(key, value); while (it.hasNext()) { bucket = it.next(); bucket.remove(key); } } }
remove操作:遍历各个bucket,如果存在以key为键的记录,直接删除,时间复杂度为O(numBuckets)
public Object remove(K key) { synchronized (_lock) { for (HashMap<K, V> bucket: _buckets) { if (bucket.containsKey(key)) { return bucket.remove(key); } } return null ; } }
containsKey操作:遍历各个bucket,如果存在指定的key则返回true,否则返回false,时间复杂度为O(numBuckets)
public boolean containsKey(K key) { synchronized (_lock) { for (HashMap<K, V> bucket: _buckets) { if (bucket.containsKey(key)) { return true ; } } return false ; } }
size操作:遍历各个bucket,累加各个bucket的HashMap的大小,时间复杂度为O (numBuckets)
public int size() { synchronized (_lock) { int size = 0 ; for (HashMap<K, V> bucket: _buckets) { size += bucket.size(); } return size; } }
3. 超时时间
经过上面对put操作和_cleaner线程的分析,我们已经知道:
a) put操作将数据放到_buckets的第一个桶中,然后遍历其他numBuckets-1个桶,从HashMap中移除其中键为key的记录;
b) _cleaner线程每隔expirationSecs / (numBuckets-1)秒会把_buckets中最后一个桶中的数据从TimeCacheMap中移除掉。
因此,假设_cleaner线程刚刚清理数据,put函数调用发生将key放入桶中,那么一条数据的超时时间为:
expirationSecs / (numBuckets-1) * numBuckets = expirationSecs * (1 + 1 / (numBuckets-1))
然而,假设put函数调用刚刚执行结束,_cleaner线程就开始清理数据,那么一条数据的超时时间为:
expirationSecs / (numBuckets-1) * numBuckets - expirationSecs / (numBuckets-1) = expirationSecs
4. 总结
1、 TimeCacheMap的高效之处在于锁的粒度小,O(1)时间内完成锁操作,因此,大部分时间内都可以进行get和put操作。
2、 get,put,remove,containsKey和size操作都可以在O(numBuckets)时间内完成,其中numBuckets是桶的个数,默认为3。
3、 未更新数据的超时时间在expirationSecs和expirationSecs * (1 + 1 / (numBuckets-1))之间。
当前标签: Storm
Storm常见模式——TimeCacheMap 大圆那些事 2012-06-26 12:32 阅读:112 评论:0
Storm常见模式——BasicBolt 大圆那些事 2012-06-19 19:56 阅读:41 评论:0
Storm常见模式——批处理 大圆那些事 2012-06-19 18:30 阅读:769 评论:0
Storm常见模式——求TOP N 大圆那些事 2012-06-16 15:08 阅读:185 评论:0
Storm常见模式——流聚合 大圆那些事 2012-06-04 19:26 阅读:844 评论:0
作者: Leo_wl
出处: http://www.cnblogs.com/Leo_wl/
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
版权信息