HBASE-20894 Use proto for BucketCache persistence
This commit is contained in:
parent
9b06361a5a
commit
4bcaf495c2
|
@ -0,0 +1,79 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
syntax = "proto2";
|
||||
|
||||
package hbase.pb;
|
||||
|
||||
option java_package = "org.apache.hadoop.hbase.shaded.protobuf.generated";
|
||||
option java_outer_classname = "BucketCacheProtos";
|
||||
option java_generic_services = true;
|
||||
option java_generate_equals_and_hash = true;
|
||||
option optimize_for = SPEED;
|
||||
|
||||
message BucketCacheEntry {
|
||||
required int64 cache_capacity = 1;
|
||||
required string io_class = 2;
|
||||
required string map_class = 3;
|
||||
map<int32, string> deserializers = 4;
|
||||
required BackingMap backing_map = 5;
|
||||
}
|
||||
|
||||
message BackingMap {
|
||||
repeated BackingMapEntry entry = 1;
|
||||
}
|
||||
|
||||
message BackingMapEntry {
|
||||
required BlockCacheKey key = 1;
|
||||
required BucketEntry value = 2;
|
||||
}
|
||||
|
||||
message BlockCacheKey {
|
||||
required string hfilename = 1;
|
||||
required int64 offset = 2;
|
||||
required BlockType block_type = 3;
|
||||
required bool primary_replica_block = 4;
|
||||
}
|
||||
|
||||
enum BlockType {
|
||||
data = 0;
|
||||
encoded_data = 1;
|
||||
leaf_index = 2;
|
||||
bloom_chunk = 3;
|
||||
meta = 4;
|
||||
intermediate_index = 5;
|
||||
root_index = 6;
|
||||
file_info = 7;
|
||||
general_bloom_meta = 8;
|
||||
delete_family_bloom_meta = 9;
|
||||
trailer = 10;
|
||||
index_v1 = 11;
|
||||
}
|
||||
|
||||
message BucketEntry {
|
||||
required int64 offset = 1;
|
||||
required int32 length = 2;
|
||||
required int64 access_counter = 3;
|
||||
required int32 deserialiser_index = 4;
|
||||
required BlockPriority priority = 5;
|
||||
}
|
||||
|
||||
enum BlockPriority {
|
||||
single = 0;
|
||||
multi = 1;
|
||||
memory = 2;
|
||||
}
|
|
@ -25,8 +25,11 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* This class is used to manage the identifiers for
|
||||
* {@link CacheableDeserializer}
|
||||
* This class is used to manage the identifiers for {@link CacheableDeserializer}.
|
||||
* All deserializers are registered with this Manager via the
|
||||
* {@link #registerDeserializer(CacheableDeserializer)}}. On registration, we return an
|
||||
* int *identifier* for this deserializer. The int identifier is passed to
|
||||
* {@link #getDeserializer(int)}} to obtain the registered deserializer instance.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class CacheableDeserializerIdManager {
|
||||
|
@ -34,10 +37,11 @@ public class CacheableDeserializerIdManager {
|
|||
private static final AtomicInteger identifier = new AtomicInteger(0);
|
||||
|
||||
/**
|
||||
* Register the given cacheable deserializer and generate an unique identifier
|
||||
* id for it
|
||||
* @param cd
|
||||
* Register the given {@link Cacheable} -- usually an hfileblock instance, these implement
|
||||
* the Cacheable Interface -- deserializer and generate an unique identifier id for it and return
|
||||
* this as our result.
|
||||
* @return the identifier of given cacheable deserializer
|
||||
* @see #getDeserializer(int)
|
||||
*/
|
||||
public static int registerDeserializer(CacheableDeserializer<Cacheable> cd) {
|
||||
int idx = identifier.incrementAndGet();
|
||||
|
@ -48,11 +52,25 @@ public class CacheableDeserializerIdManager {
|
|||
}
|
||||
|
||||
/**
|
||||
* Get the cacheable deserializer as the given identifier Id
|
||||
* @param id
|
||||
* @return CacheableDeserializer
|
||||
* Get the cacheable deserializer registered at the given identifier Id.
|
||||
* @see #registerDeserializer(CacheableDeserializer)
|
||||
*/
|
||||
public static CacheableDeserializer<Cacheable> getDeserializer(int id) {
|
||||
return registeredDeserializers.get(id);
|
||||
}
|
||||
|
||||
/**
|
||||
* Snapshot a map of the current identifiers to class names for reconstruction on reading out
|
||||
* of a file.
|
||||
*/
|
||||
public static Map<Integer,String> save() {
|
||||
Map<Integer, String> snapshot = new HashMap<>();
|
||||
synchronized (registeredDeserializers) {
|
||||
for (Map.Entry<Integer, CacheableDeserializer<Cacheable>> entry :
|
||||
registeredDeserializers.entrySet()) {
|
||||
snapshot.put(entry.getKey(), entry.getValue().getClass().getName());
|
||||
}
|
||||
}
|
||||
return snapshot;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -251,10 +251,14 @@ public class HFileBlock implements Cacheable {
|
|||
* + Metadata! + <= See note on BLOCK_METADATA_SPACE above.
|
||||
* ++++++++++++++
|
||||
* </code>
|
||||
* @see #serialize(ByteBuffer)
|
||||
* @see #serialize(ByteBuffer, boolean)
|
||||
*/
|
||||
static final CacheableDeserializer<Cacheable> BLOCK_DESERIALIZER =
|
||||
new CacheableDeserializer<Cacheable>() {
|
||||
public static final CacheableDeserializer<Cacheable> BLOCK_DESERIALIZER = new BlockDeserializer();
|
||||
|
||||
public static final class BlockDeserializer implements CacheableDeserializer<Cacheable> {
|
||||
private BlockDeserializer() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public HFileBlock deserialize(ByteBuff buf, boolean reuse, MemoryType memType)
|
||||
throws IOException {
|
||||
|
@ -291,7 +295,7 @@ public class HFileBlock implements Cacheable {
|
|||
// Used only in tests
|
||||
return deserialize(b, false, MemoryType.EXCLUSIVE);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static final int DESERIALIZER_IDENTIFIER;
|
||||
static {
|
||||
|
|
|
@ -25,8 +25,6 @@ import java.io.FileInputStream;
|
|||
import java.io.FileNotFoundException;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.ObjectInputStream;
|
||||
import java.io.ObjectOutputStream;
|
||||
import java.io.Serializable;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
|
@ -68,6 +66,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
|
|||
import org.apache.hadoop.hbase.io.hfile.CachedBlock;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
|
||||
import org.apache.hadoop.hbase.nio.ByteBuff;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.HasThread;
|
||||
import org.apache.hadoop.hbase.util.IdReadWriteLock;
|
||||
|
@ -81,6 +80,7 @@ import org.slf4j.LoggerFactory;
|
|||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.BucketCacheProtos;
|
||||
|
||||
/**
|
||||
* BucketCache uses {@link BucketAllocator} to allocate/free blocks, and uses
|
||||
|
@ -164,8 +164,6 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
private volatile boolean freeInProgress = false;
|
||||
private final Lock freeSpaceLock = new ReentrantLock();
|
||||
|
||||
private UniqueIndexMap<Integer> deserialiserMap = new UniqueIndexMap<>();
|
||||
|
||||
private final LongAdder realCacheSize = new LongAdder();
|
||||
private final LongAdder heapSize = new LongAdder();
|
||||
/** Current number of cached elements */
|
||||
|
@ -299,10 +297,7 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
try {
|
||||
retrieveFromFile(bucketSizes);
|
||||
} catch (IOException ioex) {
|
||||
LOG.error("Can't restore from file because of", ioex);
|
||||
} catch (ClassNotFoundException cnfe) {
|
||||
LOG.error("Can't restore from file in rebuild because can't deserialise",cnfe);
|
||||
throw new RuntimeException(cnfe);
|
||||
LOG.error("Can't restore from file[" + persistencePath + "] because of ", ioex);
|
||||
}
|
||||
}
|
||||
final String threadName = Thread.currentThread().getName();
|
||||
|
@ -511,7 +506,7 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
LOG.trace("Read offset=" + bucketEntry.offset() + ", len=" + len);
|
||||
}
|
||||
Cacheable cachedBlock = ioEngine.read(bucketEntry.offset(), len,
|
||||
bucketEntry.deserializerReference(this.deserialiserMap));
|
||||
bucketEntry.deserializerReference());
|
||||
long timeTaken = System.nanoTime() - start;
|
||||
if (updateCacheMetrics) {
|
||||
cacheStats.hit(caching, key.isPrimary(), key.getBlockType());
|
||||
|
@ -988,7 +983,7 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
continue;
|
||||
}
|
||||
BucketEntry bucketEntry =
|
||||
re.writeToCache(ioEngine, bucketAllocator, deserialiserMap, realCacheSize);
|
||||
re.writeToCache(ioEngine, bucketAllocator, realCacheSize);
|
||||
// Successfully added. Up index and add bucketEntry. Clear io exceptions.
|
||||
bucketEntries[index] = bucketEntry;
|
||||
if (ioErrorStartTime > 0) {
|
||||
|
@ -1083,75 +1078,98 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
return receptacle;
|
||||
}
|
||||
|
||||
/**
|
||||
* @see #retrieveFromFile(int[])
|
||||
*/
|
||||
private void persistToFile() throws IOException {
|
||||
assert !cacheEnabled;
|
||||
FileOutputStream fos = null;
|
||||
ObjectOutputStream oos = null;
|
||||
try {
|
||||
if (!ioEngine.isPersistent()) {
|
||||
throw new IOException("Attempt to persist non-persistent cache mappings!");
|
||||
}
|
||||
fos = new FileOutputStream(persistencePath, false);
|
||||
oos = new ObjectOutputStream(fos);
|
||||
oos.writeLong(cacheCapacity);
|
||||
oos.writeUTF(ioEngine.getClass().getName());
|
||||
oos.writeUTF(backingMap.getClass().getName());
|
||||
oos.writeObject(deserialiserMap);
|
||||
oos.writeObject(backingMap);
|
||||
} finally {
|
||||
if (oos != null) oos.close();
|
||||
if (fos != null) fos.close();
|
||||
if (!ioEngine.isPersistent()) {
|
||||
throw new IOException("Attempt to persist non-persistent cache mappings!");
|
||||
}
|
||||
try (FileOutputStream fos = new FileOutputStream(persistencePath, false)) {
|
||||
fos.write(ProtobufMagic.PB_MAGIC);
|
||||
BucketProtoUtils.toPB(this).writeDelimitedTo(fos);
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void retrieveFromFile(int[] bucketSizes) throws IOException, BucketAllocatorException,
|
||||
ClassNotFoundException {
|
||||
/**
|
||||
* @see #persistToFile()
|
||||
*/
|
||||
private void retrieveFromFile(int[] bucketSizes) throws IOException {
|
||||
File persistenceFile = new File(persistencePath);
|
||||
if (!persistenceFile.exists()) {
|
||||
return;
|
||||
}
|
||||
assert !cacheEnabled;
|
||||
FileInputStream fis = null;
|
||||
ObjectInputStream ois = null;
|
||||
try {
|
||||
if (!ioEngine.isPersistent())
|
||||
throw new IOException(
|
||||
"Attempt to restore non-persistent cache mappings!");
|
||||
fis = new FileInputStream(persistencePath);
|
||||
ois = new ObjectInputStream(fis);
|
||||
long capacitySize = ois.readLong();
|
||||
if (capacitySize != cacheCapacity)
|
||||
throw new IOException("Mismatched cache capacity:"
|
||||
+ StringUtils.byteDesc(capacitySize) + ", expected: "
|
||||
+ StringUtils.byteDesc(cacheCapacity));
|
||||
String ioclass = ois.readUTF();
|
||||
String mapclass = ois.readUTF();
|
||||
if (!ioEngine.getClass().getName().equals(ioclass))
|
||||
throw new IOException("Class name for IO engine mismatch: " + ioclass
|
||||
+ ", expected:" + ioEngine.getClass().getName());
|
||||
if (!backingMap.getClass().getName().equals(mapclass))
|
||||
throw new IOException("Class name for cache map mismatch: " + mapclass
|
||||
+ ", expected:" + backingMap.getClass().getName());
|
||||
UniqueIndexMap<Integer> deserMap = (UniqueIndexMap<Integer>) ois
|
||||
.readObject();
|
||||
ConcurrentHashMap<BlockCacheKey, BucketEntry> backingMapFromFile =
|
||||
(ConcurrentHashMap<BlockCacheKey, BucketEntry>) ois.readObject();
|
||||
BucketAllocator allocator = new BucketAllocator(cacheCapacity, bucketSizes,
|
||||
backingMapFromFile, realCacheSize);
|
||||
bucketAllocator = allocator;
|
||||
deserialiserMap = deserMap;
|
||||
backingMap = backingMapFromFile;
|
||||
} finally {
|
||||
if (ois != null) ois.close();
|
||||
if (fis != null) fis.close();
|
||||
if (!persistenceFile.delete()) {
|
||||
throw new IOException("Failed deleting persistence file "
|
||||
+ persistenceFile.getAbsolutePath());
|
||||
|
||||
try (FileInputStream in = deleteFileOnClose(persistenceFile)) {
|
||||
int pblen = ProtobufMagic.lengthOfPBMagic();
|
||||
byte[] pbuf = new byte[pblen];
|
||||
int read = in.read(pbuf);
|
||||
if (read != pblen) {
|
||||
throw new IOException("Incorrect number of bytes read while checking for protobuf magic "
|
||||
+ "number. Requested=" + pblen + ", Received= " + read + ", File=" + persistencePath);
|
||||
}
|
||||
if (! ProtobufMagic.isPBMagicPrefix(pbuf)) {
|
||||
// In 3.0 we have enough flexibility to dump the old cache data.
|
||||
// TODO: In 2.x line, this might need to be filled in to support reading the old format
|
||||
throw new IOException("Persistence file does not start with protobuf magic number. " +
|
||||
persistencePath);
|
||||
}
|
||||
parsePB(BucketCacheProtos.BucketCacheEntry.parseDelimitedFrom(in));
|
||||
bucketAllocator = new BucketAllocator(cacheCapacity, bucketSizes, backingMap, realCacheSize);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an input stream that deletes the file after reading it. Use in try-with-resources to
|
||||
* avoid this pattern where an exception thrown from a finally block may mask earlier exceptions:
|
||||
* <pre>
|
||||
* File f = ...
|
||||
* try (FileInputStream fis = new FileInputStream(f)) {
|
||||
* // use the input stream
|
||||
* } finally {
|
||||
* if (!f.delete()) throw new IOException("failed to delete");
|
||||
* }
|
||||
* </pre>
|
||||
* @param file the file to read and delete
|
||||
* @return a FileInputStream for the given file
|
||||
* @throws IOException if there is a problem creating the stream
|
||||
*/
|
||||
private FileInputStream deleteFileOnClose(final File file) throws IOException {
|
||||
return new FileInputStream(file) {
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
super.close();
|
||||
if (!file.delete()) {
|
||||
throw new IOException("Failed deleting persistence file " + file.getAbsolutePath());
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private void verifyCapacityAndClasses(long capacitySize, String ioclass, String mapclass)
|
||||
throws IOException {
|
||||
if (capacitySize != cacheCapacity) {
|
||||
throw new IOException("Mismatched cache capacity:"
|
||||
+ StringUtils.byteDesc(capacitySize) + ", expected: "
|
||||
+ StringUtils.byteDesc(cacheCapacity));
|
||||
}
|
||||
if (!ioEngine.getClass().getName().equals(ioclass)) {
|
||||
throw new IOException("Class name for IO engine mismatch: " + ioclass
|
||||
+ ", expected:" + ioEngine.getClass().getName());
|
||||
}
|
||||
if (!backingMap.getClass().getName().equals(mapclass)) {
|
||||
throw new IOException("Class name for cache map mismatch: " + mapclass
|
||||
+ ", expected:" + backingMap.getClass().getName());
|
||||
}
|
||||
}
|
||||
|
||||
private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws IOException {
|
||||
verifyCapacityAndClasses(proto.getCacheCapacity(), proto.getIoClass(), proto.getMapClass());
|
||||
backingMap = BucketProtoUtils.fromPB(proto.getDeserializersMap(), proto.getBackingMap());
|
||||
}
|
||||
|
||||
/**
|
||||
* Check whether we tolerate IO error this time. If the duration of IOEngine
|
||||
* throwing errors exceeds ioErrorsDurationTimeTolerated, we will disable the
|
||||
|
@ -1287,18 +1305,19 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
private static final long serialVersionUID = -6741504807982257534L;
|
||||
|
||||
// access counter comparator, descending order
|
||||
static final Comparator<BucketEntry> COMPARATOR = new Comparator<BucketCache.BucketEntry>() {
|
||||
|
||||
@Override
|
||||
public int compare(BucketEntry o1, BucketEntry o2) {
|
||||
return Long.compare(o2.accessCounter, o1.accessCounter);
|
||||
}
|
||||
};
|
||||
static final Comparator<BucketEntry> COMPARATOR = Comparator
|
||||
.comparingLong(BucketEntry::getAccessCounter).reversed();
|
||||
|
||||
private int offsetBase;
|
||||
private int length;
|
||||
private byte offset1;
|
||||
|
||||
/**
|
||||
* The index of the deserializer that can deserialize this BucketEntry content.
|
||||
* See {@link CacheableDeserializerIdManager} for hosting of index to serializers.
|
||||
*/
|
||||
byte deserialiserIndex;
|
||||
|
||||
private volatile long accessCounter;
|
||||
private BlockPriority priority;
|
||||
|
||||
|
@ -1335,17 +1354,16 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
return length;
|
||||
}
|
||||
|
||||
protected CacheableDeserializer<Cacheable> deserializerReference(
|
||||
UniqueIndexMap<Integer> deserialiserMap) {
|
||||
return CacheableDeserializerIdManager.getDeserializer(deserialiserMap
|
||||
.unmap(deserialiserIndex));
|
||||
protected CacheableDeserializer<Cacheable> deserializerReference() {
|
||||
return CacheableDeserializerIdManager.getDeserializer(deserialiserIndex);
|
||||
}
|
||||
|
||||
protected void setDeserialiserReference(
|
||||
CacheableDeserializer<Cacheable> deserializer,
|
||||
UniqueIndexMap<Integer> deserialiserMap) {
|
||||
this.deserialiserIndex = ((byte) deserialiserMap.map(deserializer
|
||||
.getDeserialiserIdentifier()));
|
||||
protected void setDeserialiserReference(CacheableDeserializer<Cacheable> deserializer) {
|
||||
this.deserialiserIndex = (byte) deserializer.getDeserialiserIdentifier();
|
||||
}
|
||||
|
||||
public long getAccessCounter() {
|
||||
return accessCounter;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1504,7 +1522,6 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
|
||||
public BucketEntry writeToCache(final IOEngine ioEngine,
|
||||
final BucketAllocator bucketAllocator,
|
||||
final UniqueIndexMap<Integer> deserialiserMap,
|
||||
final LongAdder realCacheSize) throws CacheFullException, IOException,
|
||||
BucketAllocatorException {
|
||||
int len = data.getSerializedLength();
|
||||
|
@ -1516,7 +1533,7 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
? new UnsafeSharedMemoryBucketEntry(offset, len, accessCounter, inMemory)
|
||||
: new SharedMemoryBucketEntry(offset, len, accessCounter, inMemory)
|
||||
: new BucketEntry(offset, len, accessCounter, inMemory);
|
||||
bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap);
|
||||
bucketEntry.setDeserialiserReference(data.getDeserializer());
|
||||
try {
|
||||
if (data instanceof HFileBlock) {
|
||||
// If an instance of HFileBlock, save on some allocations.
|
||||
|
|
|
@ -0,0 +1,191 @@
|
|||
/*
|
||||
* Copyright The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.io.hfile.bucket;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
|
||||
import org.apache.hadoop.hbase.io.hfile.BlockPriority;
|
||||
import org.apache.hadoop.hbase.io.hfile.BlockType;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.BucketCacheProtos;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
final class BucketProtoUtils {
|
||||
private BucketProtoUtils() {
|
||||
|
||||
}
|
||||
|
||||
static BucketCacheProtos.BucketCacheEntry toPB(BucketCache cache) {
|
||||
return BucketCacheProtos.BucketCacheEntry.newBuilder()
|
||||
.setCacheCapacity(cache.getMaxSize())
|
||||
.setIoClass(cache.ioEngine.getClass().getName())
|
||||
.setMapClass(cache.backingMap.getClass().getName())
|
||||
.putAllDeserializers(CacheableDeserializerIdManager.save())
|
||||
.setBackingMap(BucketProtoUtils.toPB(cache.backingMap))
|
||||
.build();
|
||||
}
|
||||
|
||||
private static BucketCacheProtos.BackingMap toPB(
|
||||
Map<BlockCacheKey, BucketCache.BucketEntry> backingMap) {
|
||||
BucketCacheProtos.BackingMap.Builder builder = BucketCacheProtos.BackingMap.newBuilder();
|
||||
for (Map.Entry<BlockCacheKey, BucketCache.BucketEntry> entry : backingMap.entrySet()) {
|
||||
builder.addEntry(BucketCacheProtos.BackingMapEntry.newBuilder()
|
||||
.setKey(toPB(entry.getKey()))
|
||||
.setValue(toPB(entry.getValue()))
|
||||
.build());
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
private static BucketCacheProtos.BlockCacheKey toPB(BlockCacheKey key) {
|
||||
return BucketCacheProtos.BlockCacheKey.newBuilder()
|
||||
.setHfilename(key.getHfileName())
|
||||
.setOffset(key.getOffset())
|
||||
.setPrimaryReplicaBlock(key.isPrimary())
|
||||
.setBlockType(toPB(key.getBlockType()))
|
||||
.build();
|
||||
}
|
||||
|
||||
private static BucketCacheProtos.BlockType toPB(BlockType blockType) {
|
||||
switch(blockType) {
|
||||
case DATA:
|
||||
return BucketCacheProtos.BlockType.data;
|
||||
case META:
|
||||
return BucketCacheProtos.BlockType.meta;
|
||||
case TRAILER:
|
||||
return BucketCacheProtos.BlockType.trailer;
|
||||
case INDEX_V1:
|
||||
return BucketCacheProtos.BlockType.index_v1;
|
||||
case FILE_INFO:
|
||||
return BucketCacheProtos.BlockType.file_info;
|
||||
case LEAF_INDEX:
|
||||
return BucketCacheProtos.BlockType.leaf_index;
|
||||
case ROOT_INDEX:
|
||||
return BucketCacheProtos.BlockType.root_index;
|
||||
case BLOOM_CHUNK:
|
||||
return BucketCacheProtos.BlockType.bloom_chunk;
|
||||
case ENCODED_DATA:
|
||||
return BucketCacheProtos.BlockType.encoded_data;
|
||||
case GENERAL_BLOOM_META:
|
||||
return BucketCacheProtos.BlockType.general_bloom_meta;
|
||||
case INTERMEDIATE_INDEX:
|
||||
return BucketCacheProtos.BlockType.intermediate_index;
|
||||
case DELETE_FAMILY_BLOOM_META:
|
||||
return BucketCacheProtos.BlockType.delete_family_bloom_meta;
|
||||
default:
|
||||
throw new Error("Unrecognized BlockType.");
|
||||
}
|
||||
}
|
||||
|
||||
private static BucketCacheProtos.BucketEntry toPB(BucketCache.BucketEntry entry) {
|
||||
return BucketCacheProtos.BucketEntry.newBuilder()
|
||||
.setOffset(entry.offset())
|
||||
.setLength(entry.getLength())
|
||||
.setDeserialiserIndex(entry.deserialiserIndex)
|
||||
.setAccessCounter(entry.getAccessCounter())
|
||||
.setPriority(toPB(entry.getPriority()))
|
||||
.build();
|
||||
}
|
||||
|
||||
private static BucketCacheProtos.BlockPriority toPB(BlockPriority p) {
|
||||
switch (p) {
|
||||
case MULTI:
|
||||
return BucketCacheProtos.BlockPriority.multi;
|
||||
case MEMORY:
|
||||
return BucketCacheProtos.BlockPriority.memory;
|
||||
case SINGLE:
|
||||
return BucketCacheProtos.BlockPriority.single;
|
||||
default:
|
||||
throw new Error("Unrecognized BlockPriority.");
|
||||
}
|
||||
}
|
||||
|
||||
static ConcurrentHashMap<BlockCacheKey, BucketCache.BucketEntry> fromPB(
|
||||
Map<Integer, String> deserializers, BucketCacheProtos.BackingMap backingMap)
|
||||
throws IOException {
|
||||
ConcurrentHashMap<BlockCacheKey, BucketCache.BucketEntry> result = new ConcurrentHashMap<>();
|
||||
for (BucketCacheProtos.BackingMapEntry entry : backingMap.getEntryList()) {
|
||||
BucketCacheProtos.BlockCacheKey protoKey = entry.getKey();
|
||||
BlockCacheKey key = new BlockCacheKey(protoKey.getHfilename(), protoKey.getOffset(),
|
||||
protoKey.getPrimaryReplicaBlock(), fromPb(protoKey.getBlockType()));
|
||||
BucketCacheProtos.BucketEntry protoValue = entry.getValue();
|
||||
BucketCache.BucketEntry value = new BucketCache.BucketEntry(
|
||||
protoValue.getOffset(),
|
||||
protoValue.getLength(),
|
||||
protoValue.getAccessCounter(),
|
||||
protoValue.getPriority() == BucketCacheProtos.BlockPriority.memory);
|
||||
// This is the deserializer that we stored
|
||||
int oldIndex = protoValue.getDeserialiserIndex();
|
||||
String deserializerClass = deserializers.get(oldIndex);
|
||||
if (deserializerClass == null) {
|
||||
throw new IOException("Found deserializer index without matching entry.");
|
||||
}
|
||||
// Convert it to the identifier for the deserializer that we have in this runtime
|
||||
if (deserializerClass.equals(HFileBlock.BlockDeserializer.class.getName())) {
|
||||
int actualIndex = HFileBlock.BLOCK_DESERIALIZER.getDeserialiserIdentifier();
|
||||
value.deserialiserIndex = (byte) actualIndex;
|
||||
} else {
|
||||
// We could make this more plugable, but right now HFileBlock is the only implementation
|
||||
// of Cacheable outside of tests, so this might not ever matter.
|
||||
throw new IOException("Unknown deserializer class found: " + deserializerClass);
|
||||
}
|
||||
result.put(key, value);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private static BlockType fromPb(BucketCacheProtos.BlockType blockType) {
|
||||
switch (blockType) {
|
||||
case data:
|
||||
return BlockType.DATA;
|
||||
case meta:
|
||||
return BlockType.META;
|
||||
case trailer:
|
||||
return BlockType.TRAILER;
|
||||
case index_v1:
|
||||
return BlockType.INDEX_V1;
|
||||
case file_info:
|
||||
return BlockType.FILE_INFO;
|
||||
case leaf_index:
|
||||
return BlockType.LEAF_INDEX;
|
||||
case root_index:
|
||||
return BlockType.ROOT_INDEX;
|
||||
case bloom_chunk:
|
||||
return BlockType.BLOOM_CHUNK;
|
||||
case encoded_data:
|
||||
return BlockType.ENCODED_DATA;
|
||||
case general_bloom_meta:
|
||||
return BlockType.GENERAL_BLOOM_META;
|
||||
case intermediate_index:
|
||||
return BlockType.INTERMEDIATE_INDEX;
|
||||
case delete_family_bloom_meta:
|
||||
return BlockType.DELETE_FAMILY_BLOOM_META;
|
||||
default:
|
||||
throw new Error("Unrecognized BlockType.");
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,56 +0,0 @@
|
|||
/**
|
||||
* Copyright The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.io.hfile.bucket;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Map from type T to int and vice-versa. Used for reducing bit field item
|
||||
* counts.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public final class UniqueIndexMap<T> implements Serializable {
|
||||
private static final long serialVersionUID = -1145635738654002342L;
|
||||
|
||||
ConcurrentHashMap<T, Integer> mForwardMap = new ConcurrentHashMap<>();
|
||||
ConcurrentHashMap<Integer, T> mReverseMap = new ConcurrentHashMap<>();
|
||||
AtomicInteger mIndex = new AtomicInteger(0);
|
||||
|
||||
// Map a length to an index. If we can't, allocate a new mapping. We might
|
||||
// race here and get two entries with the same deserialiser. This is fine.
|
||||
int map(T parameter) {
|
||||
Integer ret = mForwardMap.get(parameter);
|
||||
if (ret != null) return ret.intValue();
|
||||
int nexti = mIndex.incrementAndGet();
|
||||
assert (nexti < Short.MAX_VALUE);
|
||||
mForwardMap.put(parameter, nexti);
|
||||
mReverseMap.put(nexti, parameter);
|
||||
return nexti;
|
||||
}
|
||||
|
||||
T unmap(int leni) {
|
||||
Integer len = Integer.valueOf(leni);
|
||||
assert mReverseMap.containsKey(len);
|
||||
return mReverseMap.get(len);
|
||||
}
|
||||
}
|
|
@ -287,6 +287,7 @@ public class CacheTestUtils {
|
|||
return deserializerIdentifier;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Cacheable deserialize(ByteBuff b, boolean reuse, MemoryType memType)
|
||||
throws IOException {
|
||||
|
|
|
@ -19,9 +19,11 @@ package org.apache.hadoop.hbase.io.hfile.bucket;
|
|||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
|
@ -246,11 +248,13 @@ public class TestBucketCache {
|
|||
Path testDir = TEST_UTIL.getDataTestDir();
|
||||
TEST_UTIL.getTestFileSystem().mkdirs(testDir);
|
||||
|
||||
BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
|
||||
constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir
|
||||
+ "/bucket.persistence");
|
||||
String ioEngineName = "file:" + testDir + "/bucket.cache";
|
||||
String persistencePath = testDir + "/bucket.persistence";
|
||||
|
||||
BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
|
||||
constructedBlockSizes, writeThreads, writerQLen, persistencePath);
|
||||
long usedSize = bucketCache.getAllocator().getUsedSize();
|
||||
assertTrue(usedSize == 0);
|
||||
assertEquals(0, usedSize);
|
||||
|
||||
HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
|
||||
// Add blocks
|
||||
|
@ -261,24 +265,26 @@ public class TestBucketCache {
|
|||
cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
|
||||
}
|
||||
usedSize = bucketCache.getAllocator().getUsedSize();
|
||||
assertTrue(usedSize != 0);
|
||||
assertNotEquals(0, usedSize);
|
||||
// persist cache to file
|
||||
bucketCache.shutdown();
|
||||
assertTrue(new File(persistencePath).exists());
|
||||
|
||||
// restore cache from file
|
||||
bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
|
||||
constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir
|
||||
+ "/bucket.persistence");
|
||||
bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
|
||||
constructedBlockSizes, writeThreads, writerQLen, persistencePath);
|
||||
assertFalse(new File(persistencePath).exists());
|
||||
assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
|
||||
// persist cache to file
|
||||
bucketCache.shutdown();
|
||||
assertTrue(new File(persistencePath).exists());
|
||||
|
||||
// reconfig buckets sizes, the biggest bucket is small than constructedBlockSize (8k or 16k)
|
||||
// so it can't restore cache from file
|
||||
int[] smallBucketSizes = new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024 };
|
||||
bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
|
||||
constructedBlockSize, smallBucketSizes, writeThreads,
|
||||
writerQLen, testDir + "/bucket.persistence");
|
||||
bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
|
||||
smallBucketSizes, writeThreads, writerQLen, persistencePath);
|
||||
assertFalse(new File(persistencePath).exists());
|
||||
assertEquals(0, bucketCache.getAllocator().getUsedSize());
|
||||
assertEquals(0, bucketCache.backingMap.size());
|
||||
|
||||
|
|
|
@ -27,7 +27,6 @@ import java.io.IOException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.atomic.LongAdder;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
|
||||
|
@ -143,8 +142,7 @@ public class TestBucketWriterThread {
|
|||
RAMQueueEntry rqe = q.remove();
|
||||
RAMQueueEntry spiedRqe = Mockito.spy(rqe);
|
||||
Mockito.doThrow(new IOException("Mocked!")).when(spiedRqe).
|
||||
writeToCache((IOEngine)Mockito.any(), (BucketAllocator)Mockito.any(),
|
||||
(UniqueIndexMap<Integer>)Mockito.any(), (LongAdder) Mockito.any());
|
||||
writeToCache(Mockito.any(), Mockito.any(), Mockito.any());
|
||||
this.q.add(spiedRqe);
|
||||
doDrainOfOneEntry(bc, wt, q);
|
||||
// Cache disabled when ioes w/o ever healing.
|
||||
|
@ -166,8 +164,7 @@ public class TestBucketWriterThread {
|
|||
BucketEntry mockedBucketEntry = Mockito.mock(BucketEntry.class);
|
||||
Mockito.doThrow(cfe).
|
||||
doReturn(mockedBucketEntry).
|
||||
when(spiedRqe).writeToCache((IOEngine)Mockito.any(), (BucketAllocator)Mockito.any(),
|
||||
(UniqueIndexMap<Integer>)Mockito.any(), (LongAdder) Mockito.any());
|
||||
when(spiedRqe).writeToCache(Mockito.any(), Mockito.any(), Mockito.any());
|
||||
this.q.add(spiedRqe);
|
||||
doDrainOfOneEntry(bc, wt, q);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue