HBASE-20894 Use proto for BucketCache persistence

This commit is contained in:
Mike Drob 2018-07-16 12:21:33 -05:00 committed by huzheng
parent bd8260c07e
commit 92fc520cb0
9 changed files with 426 additions and 169 deletions

View File

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
syntax = "proto2";
package hbase.pb;
option java_package = "org.apache.hadoop.hbase.shaded.protobuf.generated";
option java_outer_classname = "BucketCacheProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;
message BucketCacheEntry {
required int64 cache_capacity = 1;
required string io_class = 2;
required string map_class = 3;
map<int32, string> deserializers = 4;
required BackingMap backing_map = 5;
}
message BackingMap {
repeated BackingMapEntry entry = 1;
}
message BackingMapEntry {
required BlockCacheKey key = 1;
required BucketEntry value = 2;
}
message BlockCacheKey {
required string hfilename = 1;
required int64 offset = 2;
required BlockType block_type = 3;
required bool primary_replica_block = 4;
}
enum BlockType {
data = 0;
encoded_data = 1;
leaf_index = 2;
bloom_chunk = 3;
meta = 4;
intermediate_index = 5;
root_index = 6;
file_info = 7;
general_bloom_meta = 8;
delete_family_bloom_meta = 9;
trailer = 10;
index_v1 = 11;
}
message BucketEntry {
required int64 offset = 1;
required int32 length = 2;
required int64 access_counter = 3;
required int32 deserialiser_index = 4;
required BlockPriority priority = 5;
}
enum BlockPriority {
single = 0;
multi = 1;
memory = 2;
}

View File

@ -25,8 +25,11 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.yetus.audience.InterfaceAudience;
/**
* This class is used to manage the identifiers for
* {@link CacheableDeserializer}
* This class is used to manage the identifiers for {@link CacheableDeserializer}.
* All deserializers are registered with this Manager via the
* {@link #registerDeserializer(CacheableDeserializer)}}. On registration, we return an
* int *identifier* for this deserializer. The int identifier is passed to
* {@link #getDeserializer(int)}} to obtain the registered deserializer instance.
*/
@InterfaceAudience.Private
public class CacheableDeserializerIdManager {
@ -34,10 +37,11 @@ public class CacheableDeserializerIdManager {
private static final AtomicInteger identifier = new AtomicInteger(0);
/**
* Register the given cacheable deserializer and generate an unique identifier
* id for it
* @param cd
* Register the given {@link Cacheable} -- usually an hfileblock instance, these implement
* the Cacheable Interface -- deserializer and generate an unique identifier id for it and return
* this as our result.
* @return the identifier of given cacheable deserializer
* @see #getDeserializer(int)
*/
public static int registerDeserializer(CacheableDeserializer<Cacheable> cd) {
int idx = identifier.incrementAndGet();
@ -48,11 +52,25 @@ public class CacheableDeserializerIdManager {
}
/**
* Get the cacheable deserializer as the given identifier Id
* @param id
* @return CacheableDeserializer
* Get the cacheable deserializer registered at the given identifier Id.
* @see #registerDeserializer(CacheableDeserializer)
*/
public static CacheableDeserializer<Cacheable> getDeserializer(int id) {
return registeredDeserializers.get(id);
}
/**
* Snapshot a map of the current identifiers to class names for reconstruction on reading out
* of a file.
*/
public static Map<Integer,String> save() {
Map<Integer, String> snapshot = new HashMap<>();
synchronized (registeredDeserializers) {
for (Map.Entry<Integer, CacheableDeserializer<Cacheable>> entry :
registeredDeserializers.entrySet()) {
snapshot.put(entry.getKey(), entry.getValue().getClass().getName());
}
}
return snapshot;
}
}

View File

@ -254,10 +254,14 @@ public class HFileBlock implements Cacheable {
* + Metadata! + <= See note on BLOCK_METADATA_SPACE above.
* ++++++++++++++
* </code>
* @see #serialize(ByteBuffer)
* @see #serialize(ByteBuffer, boolean)
*/
static final CacheableDeserializer<Cacheable> BLOCK_DESERIALIZER =
new CacheableDeserializer<Cacheable>() {
public static final CacheableDeserializer<Cacheable> BLOCK_DESERIALIZER = new BlockDeserializer();
public static final class BlockDeserializer implements CacheableDeserializer<Cacheable> {
private BlockDeserializer() {
}
@Override
public HFileBlock deserialize(ByteBuff buf, boolean reuse, MemoryType memType)
throws IOException {
@ -295,7 +299,7 @@ public class HFileBlock implements Cacheable {
// Used only in tests
return deserialize(b, false, MemoryType.EXCLUSIVE);
}
};
}
private static final int DESERIALIZER_IDENTIFIER;
static {

View File

@ -25,8 +25,6 @@ import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@ -70,6 +68,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
import org.apache.hadoop.hbase.io.hfile.CachedBlock;
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.HasThread;
import org.apache.hadoop.hbase.util.IdReadWriteLock;
@ -83,6 +82,7 @@ import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hbase.shaded.protobuf.generated.BucketCacheProtos;
/**
* BucketCache uses {@link BucketAllocator} to allocate/free blocks, and uses
@ -165,8 +165,6 @@ public class BucketCache implements BlockCache, HeapSize {
private volatile boolean freeInProgress = false;
private transient final Lock freeSpaceLock = new ReentrantLock();
private UniqueIndexMap<Integer> deserialiserMap = new UniqueIndexMap<>();
private final LongAdder realCacheSize = new LongAdder();
private final LongAdder heapSize = new LongAdder();
/** Current number of cached elements */
@ -301,10 +299,7 @@ public class BucketCache implements BlockCache, HeapSize {
try {
retrieveFromFile(bucketSizes);
} catch (IOException ioex) {
LOG.error("Can't restore from file because of", ioex);
} catch (ClassNotFoundException cnfe) {
LOG.error("Can't restore from file in rebuild because can't deserialise",cnfe);
throw new RuntimeException(cnfe);
LOG.error("Can't restore from file[" + persistencePath + "] because of ", ioex);
}
}
final String threadName = Thread.currentThread().getName();
@ -521,7 +516,7 @@ public class BucketCache implements BlockCache, HeapSize {
LOG.trace("Read offset=" + bucketEntry.offset() + ", len=" + len);
}
Cacheable cachedBlock = ioEngine.read(bucketEntry.offset(), len,
bucketEntry.deserializerReference(this.deserialiserMap));
bucketEntry.deserializerReference());
long timeTaken = System.nanoTime() - start;
if (updateCacheMetrics) {
cacheStats.hit(caching, key.isPrimary(), key.getBlockType());
@ -997,8 +992,7 @@ public class BucketCache implements BlockCache, HeapSize {
index++;
continue;
}
BucketEntry bucketEntry =
re.writeToCache(ioEngine, bucketAllocator, deserialiserMap, realCacheSize);
BucketEntry bucketEntry = re.writeToCache(ioEngine, bucketAllocator, realCacheSize);
// Successfully added. Up index and add bucketEntry. Clear io exceptions.
bucketEntries[index] = bucketEntry;
if (ioErrorStartTime > 0) {
@ -1102,75 +1096,98 @@ public class BucketCache implements BlockCache, HeapSize {
return receptacle;
}
/**
* @see #retrieveFromFile(int[])
*/
private void persistToFile() throws IOException {
assert !cacheEnabled;
FileOutputStream fos = null;
ObjectOutputStream oos = null;
try {
if (!ioEngine.isPersistent()) {
throw new IOException("Attempt to persist non-persistent cache mappings!");
}
fos = new FileOutputStream(persistencePath, false);
oos = new ObjectOutputStream(fos);
oos.writeLong(cacheCapacity);
oos.writeUTF(ioEngine.getClass().getName());
oos.writeUTF(backingMap.getClass().getName());
oos.writeObject(deserialiserMap);
oos.writeObject(backingMap);
} finally {
if (oos != null) oos.close();
if (fos != null) fos.close();
if (!ioEngine.isPersistent()) {
throw new IOException("Attempt to persist non-persistent cache mappings!");
}
try (FileOutputStream fos = new FileOutputStream(persistencePath, false)) {
fos.write(ProtobufMagic.PB_MAGIC);
BucketProtoUtils.toPB(this).writeDelimitedTo(fos);
}
}
@SuppressWarnings("unchecked")
private void retrieveFromFile(int[] bucketSizes) throws IOException, BucketAllocatorException,
ClassNotFoundException {
/**
* @see #persistToFile()
*/
private void retrieveFromFile(int[] bucketSizes) throws IOException {
File persistenceFile = new File(persistencePath);
if (!persistenceFile.exists()) {
return;
}
assert !cacheEnabled;
FileInputStream fis = null;
ObjectInputStream ois = null;
try {
if (!ioEngine.isPersistent())
throw new IOException(
"Attempt to restore non-persistent cache mappings!");
fis = new FileInputStream(persistencePath);
ois = new ObjectInputStream(fis);
long capacitySize = ois.readLong();
if (capacitySize != cacheCapacity)
throw new IOException("Mismatched cache capacity:"
+ StringUtils.byteDesc(capacitySize) + ", expected: "
+ StringUtils.byteDesc(cacheCapacity));
String ioclass = ois.readUTF();
String mapclass = ois.readUTF();
if (!ioEngine.getClass().getName().equals(ioclass))
throw new IOException("Class name for IO engine mismatch: " + ioclass
+ ", expected:" + ioEngine.getClass().getName());
if (!backingMap.getClass().getName().equals(mapclass))
throw new IOException("Class name for cache map mismatch: " + mapclass
+ ", expected:" + backingMap.getClass().getName());
UniqueIndexMap<Integer> deserMap = (UniqueIndexMap<Integer>) ois
.readObject();
ConcurrentHashMap<BlockCacheKey, BucketEntry> backingMapFromFile =
(ConcurrentHashMap<BlockCacheKey, BucketEntry>) ois.readObject();
BucketAllocator allocator = new BucketAllocator(cacheCapacity, bucketSizes,
backingMapFromFile, realCacheSize);
bucketAllocator = allocator;
deserialiserMap = deserMap;
backingMap = backingMapFromFile;
} finally {
if (ois != null) ois.close();
if (fis != null) fis.close();
if (!persistenceFile.delete()) {
throw new IOException("Failed deleting persistence file "
+ persistenceFile.getAbsolutePath());
try (FileInputStream in = deleteFileOnClose(persistenceFile)) {
int pblen = ProtobufMagic.lengthOfPBMagic();
byte[] pbuf = new byte[pblen];
int read = in.read(pbuf);
if (read != pblen) {
throw new IOException("Incorrect number of bytes read while checking for protobuf magic "
+ "number. Requested=" + pblen + ", Received= " + read + ", File=" + persistencePath);
}
if (! ProtobufMagic.isPBMagicPrefix(pbuf)) {
// In 3.0 we have enough flexibility to dump the old cache data.
// TODO: In 2.x line, this might need to be filled in to support reading the old format
throw new IOException("Persistence file does not start with protobuf magic number. " +
persistencePath);
}
parsePB(BucketCacheProtos.BucketCacheEntry.parseDelimitedFrom(in));
bucketAllocator = new BucketAllocator(cacheCapacity, bucketSizes, backingMap, realCacheSize);
}
}
/**
* Create an input stream that deletes the file after reading it. Use in try-with-resources to
* avoid this pattern where an exception thrown from a finally block may mask earlier exceptions:
* <pre>
* File f = ...
* try (FileInputStream fis = new FileInputStream(f)) {
* // use the input stream
* } finally {
* if (!f.delete()) throw new IOException("failed to delete");
* }
* </pre>
* @param file the file to read and delete
* @return a FileInputStream for the given file
* @throws IOException if there is a problem creating the stream
*/
private FileInputStream deleteFileOnClose(final File file) throws IOException {
return new FileInputStream(file) {
@Override
public void close() throws IOException {
super.close();
if (!file.delete()) {
throw new IOException("Failed deleting persistence file " + file.getAbsolutePath());
}
}
};
}
private void verifyCapacityAndClasses(long capacitySize, String ioclass, String mapclass)
throws IOException {
if (capacitySize != cacheCapacity) {
throw new IOException("Mismatched cache capacity:"
+ StringUtils.byteDesc(capacitySize) + ", expected: "
+ StringUtils.byteDesc(cacheCapacity));
}
if (!ioEngine.getClass().getName().equals(ioclass)) {
throw new IOException("Class name for IO engine mismatch: " + ioclass
+ ", expected:" + ioEngine.getClass().getName());
}
if (!backingMap.getClass().getName().equals(mapclass)) {
throw new IOException("Class name for cache map mismatch: " + mapclass
+ ", expected:" + backingMap.getClass().getName());
}
}
private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws IOException {
verifyCapacityAndClasses(proto.getCacheCapacity(), proto.getIoClass(), proto.getMapClass());
backingMap = BucketProtoUtils.fromPB(proto.getDeserializersMap(), proto.getBackingMap());
}
/**
* Check whether we tolerate IO error this time. If the duration of IOEngine
* throwing errors exceeds ioErrorsDurationTimeTolerated, we will disable the
@ -1306,18 +1323,19 @@ public class BucketCache implements BlockCache, HeapSize {
private static final long serialVersionUID = -6741504807982257534L;
// access counter comparator, descending order
static final Comparator<BucketEntry> COMPARATOR = new Comparator<BucketCache.BucketEntry>() {
@Override
public int compare(BucketEntry o1, BucketEntry o2) {
return Long.compare(o2.accessCounter, o1.accessCounter);
}
};
static final Comparator<BucketEntry> COMPARATOR = Comparator
.comparingLong(BucketEntry::getAccessCounter).reversed();
private int offsetBase;
private int length;
private byte offset1;
/**
* The index of the deserializer that can deserialize this BucketEntry content.
* See {@link CacheableDeserializerIdManager} for hosting of index to serializers.
*/
byte deserialiserIndex;
private volatile long accessCounter;
private BlockPriority priority;
@ -1354,17 +1372,16 @@ public class BucketCache implements BlockCache, HeapSize {
return length;
}
protected CacheableDeserializer<Cacheable> deserializerReference(
UniqueIndexMap<Integer> deserialiserMap) {
return CacheableDeserializerIdManager.getDeserializer(deserialiserMap
.unmap(deserialiserIndex));
protected CacheableDeserializer<Cacheable> deserializerReference() {
return CacheableDeserializerIdManager.getDeserializer(deserialiserIndex);
}
protected void setDeserialiserReference(
CacheableDeserializer<Cacheable> deserializer,
UniqueIndexMap<Integer> deserialiserMap) {
this.deserialiserIndex = ((byte) deserialiserMap.map(deserializer
.getDeserialiserIdentifier()));
protected void setDeserialiserReference(CacheableDeserializer<Cacheable> deserializer) {
this.deserialiserIndex = (byte) deserializer.getDeserialiserIdentifier();
}
public long getAccessCounter() {
return accessCounter;
}
/**
@ -1534,7 +1551,7 @@ public class BucketCache implements BlockCache, HeapSize {
}
public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator bucketAllocator,
final UniqueIndexMap<Integer> deserialiserMap, final LongAdder realCacheSize)
final LongAdder realCacheSize)
throws IOException {
int len = data.getSerializedLength();
// This cacheable thing can't be serialized
@ -1546,7 +1563,7 @@ public class BucketCache implements BlockCache, HeapSize {
BucketEntry bucketEntry;
try {
bucketEntry = getBucketEntry(ioEngine, offset, len);
bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap);
bucketEntry.setDeserialiserReference(data.getDeserializer());
if (data instanceof HFileBlock) {
// If an instance of HFileBlock, save on some allocations.
HFileBlock block = (HFileBlock) data;

View File

@ -0,0 +1,191 @@
/*
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile.bucket;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
import org.apache.hadoop.hbase.io.hfile.BlockPriority;
import org.apache.hadoop.hbase.io.hfile.BlockType;
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.protobuf.generated.BucketCacheProtos;
@InterfaceAudience.Private
final class BucketProtoUtils {
private BucketProtoUtils() {
}
static BucketCacheProtos.BucketCacheEntry toPB(BucketCache cache) {
return BucketCacheProtos.BucketCacheEntry.newBuilder()
.setCacheCapacity(cache.getMaxSize())
.setIoClass(cache.ioEngine.getClass().getName())
.setMapClass(cache.backingMap.getClass().getName())
.putAllDeserializers(CacheableDeserializerIdManager.save())
.setBackingMap(BucketProtoUtils.toPB(cache.backingMap))
.build();
}
private static BucketCacheProtos.BackingMap toPB(
Map<BlockCacheKey, BucketCache.BucketEntry> backingMap) {
BucketCacheProtos.BackingMap.Builder builder = BucketCacheProtos.BackingMap.newBuilder();
for (Map.Entry<BlockCacheKey, BucketCache.BucketEntry> entry : backingMap.entrySet()) {
builder.addEntry(BucketCacheProtos.BackingMapEntry.newBuilder()
.setKey(toPB(entry.getKey()))
.setValue(toPB(entry.getValue()))
.build());
}
return builder.build();
}
private static BucketCacheProtos.BlockCacheKey toPB(BlockCacheKey key) {
return BucketCacheProtos.BlockCacheKey.newBuilder()
.setHfilename(key.getHfileName())
.setOffset(key.getOffset())
.setPrimaryReplicaBlock(key.isPrimary())
.setBlockType(toPB(key.getBlockType()))
.build();
}
private static BucketCacheProtos.BlockType toPB(BlockType blockType) {
switch(blockType) {
case DATA:
return BucketCacheProtos.BlockType.data;
case META:
return BucketCacheProtos.BlockType.meta;
case TRAILER:
return BucketCacheProtos.BlockType.trailer;
case INDEX_V1:
return BucketCacheProtos.BlockType.index_v1;
case FILE_INFO:
return BucketCacheProtos.BlockType.file_info;
case LEAF_INDEX:
return BucketCacheProtos.BlockType.leaf_index;
case ROOT_INDEX:
return BucketCacheProtos.BlockType.root_index;
case BLOOM_CHUNK:
return BucketCacheProtos.BlockType.bloom_chunk;
case ENCODED_DATA:
return BucketCacheProtos.BlockType.encoded_data;
case GENERAL_BLOOM_META:
return BucketCacheProtos.BlockType.general_bloom_meta;
case INTERMEDIATE_INDEX:
return BucketCacheProtos.BlockType.intermediate_index;
case DELETE_FAMILY_BLOOM_META:
return BucketCacheProtos.BlockType.delete_family_bloom_meta;
default:
throw new Error("Unrecognized BlockType.");
}
}
private static BucketCacheProtos.BucketEntry toPB(BucketCache.BucketEntry entry) {
return BucketCacheProtos.BucketEntry.newBuilder()
.setOffset(entry.offset())
.setLength(entry.getLength())
.setDeserialiserIndex(entry.deserialiserIndex)
.setAccessCounter(entry.getAccessCounter())
.setPriority(toPB(entry.getPriority()))
.build();
}
private static BucketCacheProtos.BlockPriority toPB(BlockPriority p) {
switch (p) {
case MULTI:
return BucketCacheProtos.BlockPriority.multi;
case MEMORY:
return BucketCacheProtos.BlockPriority.memory;
case SINGLE:
return BucketCacheProtos.BlockPriority.single;
default:
throw new Error("Unrecognized BlockPriority.");
}
}
static ConcurrentHashMap<BlockCacheKey, BucketCache.BucketEntry> fromPB(
Map<Integer, String> deserializers, BucketCacheProtos.BackingMap backingMap)
throws IOException {
ConcurrentHashMap<BlockCacheKey, BucketCache.BucketEntry> result = new ConcurrentHashMap<>();
for (BucketCacheProtos.BackingMapEntry entry : backingMap.getEntryList()) {
BucketCacheProtos.BlockCacheKey protoKey = entry.getKey();
BlockCacheKey key = new BlockCacheKey(protoKey.getHfilename(), protoKey.getOffset(),
protoKey.getPrimaryReplicaBlock(), fromPb(protoKey.getBlockType()));
BucketCacheProtos.BucketEntry protoValue = entry.getValue();
BucketCache.BucketEntry value = new BucketCache.BucketEntry(
protoValue.getOffset(),
protoValue.getLength(),
protoValue.getAccessCounter(),
protoValue.getPriority() == BucketCacheProtos.BlockPriority.memory);
// This is the deserializer that we stored
int oldIndex = protoValue.getDeserialiserIndex();
String deserializerClass = deserializers.get(oldIndex);
if (deserializerClass == null) {
throw new IOException("Found deserializer index without matching entry.");
}
// Convert it to the identifier for the deserializer that we have in this runtime
if (deserializerClass.equals(HFileBlock.BlockDeserializer.class.getName())) {
int actualIndex = HFileBlock.BLOCK_DESERIALIZER.getDeserialiserIdentifier();
value.deserialiserIndex = (byte) actualIndex;
} else {
// We could make this more plugable, but right now HFileBlock is the only implementation
// of Cacheable outside of tests, so this might not ever matter.
throw new IOException("Unknown deserializer class found: " + deserializerClass);
}
result.put(key, value);
}
return result;
}
private static BlockType fromPb(BucketCacheProtos.BlockType blockType) {
switch (blockType) {
case data:
return BlockType.DATA;
case meta:
return BlockType.META;
case trailer:
return BlockType.TRAILER;
case index_v1:
return BlockType.INDEX_V1;
case file_info:
return BlockType.FILE_INFO;
case leaf_index:
return BlockType.LEAF_INDEX;
case root_index:
return BlockType.ROOT_INDEX;
case bloom_chunk:
return BlockType.BLOOM_CHUNK;
case encoded_data:
return BlockType.ENCODED_DATA;
case general_bloom_meta:
return BlockType.GENERAL_BLOOM_META;
case intermediate_index:
return BlockType.INTERMEDIATE_INDEX;
case delete_family_bloom_meta:
return BlockType.DELETE_FAMILY_BLOOM_META;
default:
throw new Error("Unrecognized BlockType.");
}
}
}

View File

@ -1,56 +0,0 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.io.hfile.bucket;
import java.io.Serializable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Map from type T to int and vice-versa. Used for reducing bit field item
* counts.
*/
@InterfaceAudience.Private
public final class UniqueIndexMap<T> implements Serializable {
private static final long serialVersionUID = -1145635738654002342L;
ConcurrentHashMap<T, Integer> mForwardMap = new ConcurrentHashMap<>();
ConcurrentHashMap<Integer, T> mReverseMap = new ConcurrentHashMap<>();
AtomicInteger mIndex = new AtomicInteger(0);
// Map a length to an index. If we can't, allocate a new mapping. We might
// race here and get two entries with the same deserialiser. This is fine.
int map(T parameter) {
Integer ret = mForwardMap.get(parameter);
if (ret != null) return ret.intValue();
int nexti = mIndex.incrementAndGet();
assert (nexti < Short.MAX_VALUE);
mForwardMap.put(parameter, nexti);
mReverseMap.put(nexti, parameter);
return nexti;
}
T unmap(int leni) {
Integer len = Integer.valueOf(leni);
assert mReverseMap.containsKey(len);
return mReverseMap.get(len);
}
}

View File

@ -288,6 +288,7 @@ public class CacheTestUtils {
return deserializerIdentifier;
}
@Override
public Cacheable deserialize(ByteBuff b, boolean reuse, MemoryType memType)
throws IOException {

View File

@ -19,10 +19,12 @@ package org.apache.hadoop.hbase.io.hfile.bucket;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@ -250,11 +252,13 @@ public class TestBucketCache {
Path testDir = TEST_UTIL.getDataTestDir();
TEST_UTIL.getTestFileSystem().mkdirs(testDir);
BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir
+ "/bucket.persistence");
String ioEngineName = "file:" + testDir + "/bucket.cache";
String persistencePath = testDir + "/bucket.persistence";
BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, persistencePath);
long usedSize = bucketCache.getAllocator().getUsedSize();
assertTrue(usedSize == 0);
assertEquals(0, usedSize);
HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
// Add blocks
@ -265,24 +269,26 @@ public class TestBucketCache {
cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
}
usedSize = bucketCache.getAllocator().getUsedSize();
assertTrue(usedSize != 0);
assertNotEquals(0, usedSize);
// persist cache to file
bucketCache.shutdown();
assertTrue(new File(persistencePath).exists());
// restore cache from file
bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir
+ "/bucket.persistence");
bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, persistencePath);
assertFalse(new File(persistencePath).exists());
assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
// persist cache to file
bucketCache.shutdown();
assertTrue(new File(persistencePath).exists());
// reconfig buckets sizes, the biggest bucket is small than constructedBlockSize (8k or 16k)
// so it can't restore cache from file
int[] smallBucketSizes = new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024 };
bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
constructedBlockSize, smallBucketSizes, writeThreads,
writerQLen, testDir + "/bucket.persistence");
bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
smallBucketSizes, writeThreads, writerQLen, persistencePath);
assertFalse(new File(persistencePath).exists());
assertEquals(0, bucketCache.getAllocator().getUsedSize());
assertEquals(0, bucketCache.backingMap.size());
@ -540,7 +546,7 @@ public class TestBucketCache {
Assert.assertEquals(0, allocator.getUsedSize());
try {
re.writeToCache(ioEngine, allocator, new UniqueIndexMap<>(), null);
re.writeToCache(ioEngine, allocator, null);
Assert.fail();
} catch (Exception e) {
}

View File

@ -27,7 +27,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.LongAdder;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
@ -143,8 +142,7 @@ public class TestBucketWriterThread {
RAMQueueEntry rqe = q.remove();
RAMQueueEntry spiedRqe = Mockito.spy(rqe);
Mockito.doThrow(new IOException("Mocked!")).when(spiedRqe).
writeToCache((IOEngine)Mockito.any(), (BucketAllocator)Mockito.any(),
(UniqueIndexMap<Integer>)Mockito.any(), (LongAdder) Mockito.any());
writeToCache(Mockito.any(), Mockito.any(), Mockito.any());
this.q.add(spiedRqe);
doDrainOfOneEntry(bc, wt, q);
// Cache disabled when ioes w/o ever healing.
@ -166,8 +164,7 @@ public class TestBucketWriterThread {
BucketEntry mockedBucketEntry = Mockito.mock(BucketEntry.class);
Mockito.doThrow(cfe).
doReturn(mockedBucketEntry).
when(spiedRqe).writeToCache((IOEngine)Mockito.any(), (BucketAllocator)Mockito.any(),
(UniqueIndexMap<Integer>)Mockito.any(), (LongAdder) Mockito.any());
when(spiedRqe).writeToCache(Mockito.any(), Mockito.any(), Mockito.any());
this.q.add(spiedRqe);
doDrainOfOneEntry(bc, wt, q);
}