HBASE-9855 evictBlocksByHfileName improvement for bucket cache
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1538320 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b0c406da07
commit
d6bdd54ecc
|
@ -0,0 +1,173 @@
|
|||
/*
|
||||
* Copyright The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.util;
|
||||
|
||||
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.common.collect.Multiset;
|
||||
|
||||
import java.util.Comparator;
|
||||
import java.util.ConcurrentModificationException;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ConcurrentSkipListSet;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
||||
/**
|
||||
* A simple concurrent map of sets. This is similar in concept to
|
||||
* {@link Multiset}, with the following exceptions:
|
||||
* <ul>
|
||||
* <li>The set is thread-safe and concurrent: no external locking or
|
||||
* synchronization is required. This is important for the use case where
|
||||
* this class is used to index cached blocks by filename for their
|
||||
* efficient eviction from cache when the file is closed or compacted.</li>
|
||||
* <li>The expectation is that all entries may only be removed for a key
|
||||
* once no more additions of values are being made under that key.</li>
|
||||
* </ul>
|
||||
* @param <K> Key type
|
||||
* @param <V> Value type
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public class ConcurrentIndex<K, V> {
|
||||
|
||||
/** Container for the sets, indexed by key */
|
||||
private final ConcurrentMap<K, Set<V>> container;
|
||||
|
||||
/**
|
||||
* A factory that constructs new instances of the sets if no set is
|
||||
* associated with a given key.
|
||||
*/
|
||||
private final Supplier<Set<V>> valueSetFactory;
|
||||
|
||||
/**
|
||||
* Creates an instance with a specified factory object for sets to be
|
||||
* associated with a given key.
|
||||
* @param valueSetFactory The factory instance
|
||||
*/
|
||||
public ConcurrentIndex(Supplier<Set<V>> valueSetFactory) {
|
||||
this.valueSetFactory = valueSetFactory;
|
||||
this.container = new ConcurrentHashMap<K, Set<V>>();
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates an instance using the DefaultValueSetFactory for sets,
|
||||
* which in turn creates instances of {@link ConcurrentSkipListSet}
|
||||
* @param valueComparator A {@link Comparator} for value types
|
||||
*/
|
||||
public ConcurrentIndex(Comparator<V> valueComparator) {
|
||||
this(new DefaultValueSetFactory<V>(valueComparator));
|
||||
}
|
||||
|
||||
/**
|
||||
* Associate a new unique value with a specified key. Under the covers, the
|
||||
* method employs optimistic concurrency: if no set is associated with a
|
||||
* given key, we create a new set; if another thread comes in, creates,
|
||||
* and associates a set with the same key in the mean-time, we simply add
|
||||
* the value to the already created set.
|
||||
* @param key The key
|
||||
* @param value An additional unique value we want to associate with a key
|
||||
*/
|
||||
public void put(K key, V value) {
|
||||
Set<V> set = container.get(key);
|
||||
if (set != null) {
|
||||
set.add(value);
|
||||
} else {
|
||||
set = valueSetFactory.get();
|
||||
set.add(value);
|
||||
Set<V> existing = container.putIfAbsent(key, set);
|
||||
if (existing != null) {
|
||||
// If a set is already associated with a key, that means another
|
||||
// writer has already come in and created the set for the given key.
|
||||
// Pursuant to an optimistic concurrency policy, in this case we will
|
||||
// simply add the value to the existing set associated with the key.
|
||||
existing.add(value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all values associated with a specified key or null if no values are
|
||||
* associated. <b>Note:</b> if the caller wishes to add or removes values
|
||||
* to under the specified as they're iterating through the returned value,
|
||||
* they should make a defensive copy; otherwise, a
|
||||
* {@link ConcurrentModificationException} may be thrown.
|
||||
* @param key The key
|
||||
* @return All values associated with the specified key or null if no values
|
||||
* are associated with the key.
|
||||
*/
|
||||
public Set<V> values(K key) {
|
||||
return container.get(key);
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes the association between a specified key and value. If as a
|
||||
* result of removing a value a set becomes empty, we remove the given
|
||||
* set from the mapping as well.
|
||||
* @param key The specified key
|
||||
* @param value The value to disassociate with the key
|
||||
*/
|
||||
public boolean remove(K key, V value) {
|
||||
Set<V> set = container.get(key);
|
||||
boolean success = false;
|
||||
if (set != null) {
|
||||
success = set.remove(value);
|
||||
if (set.isEmpty()) {
|
||||
container.remove(key);
|
||||
}
|
||||
}
|
||||
return success;
|
||||
}
|
||||
|
||||
/**
|
||||
* Default factory class for the sets associated with given keys. Creates
|
||||
* a {@link ConcurrentSkipListSet} using the comparator passed into the
|
||||
* constructor.
|
||||
* @see ConcurrentSkipListSet
|
||||
* @see Supplier
|
||||
* @param <V> The value type. Should match value type of the
|
||||
* ConcurrentIndex instances of this object are passed to.
|
||||
*/
|
||||
private static class DefaultValueSetFactory<V> implements Supplier<Set<V>> {
|
||||
private final Comparator<V> comparator;
|
||||
|
||||
/**
|
||||
* Creates an instance that passes a specified comparator to the
|
||||
* {@link ConcurrentSkipListSet}
|
||||
* @param comparator The specified comparator
|
||||
*/
|
||||
public DefaultValueSetFactory(Comparator<V> comparator) {
|
||||
this.comparator = comparator;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new {@link ConcurrentSkipListSet} instance using the
|
||||
* comparator specified when the class instance was constructed.
|
||||
* @return The instantiated {@link ConcurrentSkipListSet} object
|
||||
*/
|
||||
@Override
|
||||
public Set<V> get() {
|
||||
return new ConcurrentSkipListSet<V>(comparator);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -97,4 +97,8 @@ public class BlockCacheKey implements HeapSize, java.io.Serializable {
|
|||
public DataBlockEncoding getDataBlockEncoding() {
|
||||
return encoding;
|
||||
}
|
||||
|
||||
public long getOffset() {
|
||||
return offset;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,9 +30,11 @@ import java.io.ObjectOutputStream;
|
|||
import java.io.Serializable;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.PriorityQueue;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
@ -58,11 +60,13 @@ import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
|
|||
import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||
import org.apache.hadoop.hbase.util.ConcurrentIndex;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.HasThread;
|
||||
import org.apache.hadoop.hbase.util.IdLock;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
|
||||
/**
|
||||
|
@ -169,7 +173,18 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
*/
|
||||
private IdLock offsetLock = new IdLock();
|
||||
|
||||
|
||||
private final ConcurrentIndex<String, BlockCacheKey> blocksByHFile =
|
||||
new ConcurrentIndex<String, BlockCacheKey>(new Comparator<BlockCacheKey>() {
|
||||
@Override
|
||||
public int compare(BlockCacheKey a, BlockCacheKey b) {
|
||||
if (a.getOffset() == b.getOffset()) {
|
||||
return 0;
|
||||
} else if (a.getOffset() < b.getOffset()) {
|
||||
return -1;
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
});
|
||||
|
||||
/** Statistics thread schedule pool (for heavy debugging, could remove) */
|
||||
private final ScheduledExecutorService scheduleThreadPool =
|
||||
|
@ -322,6 +337,7 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
} else {
|
||||
this.blockNumber.incrementAndGet();
|
||||
this.heapSize.addAndGet(cachedItem.heapSize());
|
||||
blocksByHFile.put(cacheKey.getHfileName(), cacheKey);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -392,6 +408,7 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
if (bucketEntry.equals(backingMap.remove(cacheKey))) {
|
||||
bucketAllocator.freeBlock(bucketEntry.offset());
|
||||
realCacheSize.addAndGet(-1 * bucketEntry.getLength());
|
||||
blocksByHFile.remove(cacheKey.getHfileName(), cacheKey);
|
||||
if (removedBlock == null) {
|
||||
this.blockNumber.decrementAndGet();
|
||||
}
|
||||
|
@ -914,10 +931,7 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
}
|
||||
|
||||
/**
|
||||
* Evicts all blocks for a specific HFile. This is an expensive operation
|
||||
* implemented as a linear-time search through all blocks in the cache.
|
||||
* Ideally this should be a search in a log-access-time map.
|
||||
*
|
||||
* Evicts all blocks for a specific HFile.
|
||||
* <p>
|
||||
* This is used for evict-on-close to remove all blocks of a specific HFile.
|
||||
*
|
||||
|
@ -925,13 +939,20 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
*/
|
||||
@Override
|
||||
public int evictBlocksByHfileName(String hfileName) {
|
||||
// Copy the list to avoid ConcurrentModificationException
|
||||
// as evictBlockKey removes the key from the index
|
||||
Set<BlockCacheKey> keySet = blocksByHFile.values(hfileName);
|
||||
if (keySet == null) {
|
||||
return 0;
|
||||
}
|
||||
int numEvicted = 0;
|
||||
for (BlockCacheKey key : this.backingMap.keySet()) {
|
||||
if (key.getHfileName().equals(hfileName)) {
|
||||
if (evictBlock(key))
|
||||
List<BlockCacheKey> keysForHFile = ImmutableList.copyOf(keySet);
|
||||
for (BlockCacheKey key : keysForHFile) {
|
||||
if (evictBlock(key)) {
|
||||
++numEvicted;
|
||||
}
|
||||
}
|
||||
|
||||
return numEvicted;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue