HBASE-27313: Persist list of Hfiles names for which prefetch is done (#4771)
Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
This commit is contained in:
parent
e298782ae7
commit
a1fc483ba8
|
@ -0,0 +1,31 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
syntax = "proto2";
|
||||||
|
|
||||||
|
package hbase.pb;
|
||||||
|
|
||||||
|
option java_package = "org.apache.hadoop.hbase.shaded.protobuf.generated";
|
||||||
|
option java_outer_classname = "PersistentPrefetchProtos";
|
||||||
|
option java_generic_services = true;
|
||||||
|
option java_generate_equals_and_hash = true;
|
||||||
|
option optimize_for = SPEED;
|
||||||
|
|
||||||
|
|
||||||
|
message PrefetchedHfileName {
|
||||||
|
map<string, bool> prefetched_files = 1;
|
||||||
|
}
|
|
@ -29,7 +29,7 @@ public class BlockCacheKey implements HeapSize, java.io.Serializable {
|
||||||
private static final long serialVersionUID = -5199992013113130534L;
|
private static final long serialVersionUID = -5199992013113130534L;
|
||||||
private final String hfileName;
|
private final String hfileName;
|
||||||
private final long offset;
|
private final long offset;
|
||||||
private final BlockType blockType;
|
private BlockType blockType;
|
||||||
private final boolean isPrimaryReplicaBlock;
|
private final boolean isPrimaryReplicaBlock;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -98,4 +98,8 @@ public class BlockCacheKey implements HeapSize, java.io.Serializable {
|
||||||
public BlockType getBlockType() {
|
public BlockType getBlockType() {
|
||||||
return blockType;
|
return blockType;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setBlockType(BlockType blockType) {
|
||||||
|
this.blockType = blockType;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -93,6 +93,8 @@ public class CacheConfig {
|
||||||
public static final String DROP_BEHIND_CACHE_COMPACTION_KEY =
|
public static final String DROP_BEHIND_CACHE_COMPACTION_KEY =
|
||||||
"hbase.hfile.drop.behind.compaction";
|
"hbase.hfile.drop.behind.compaction";
|
||||||
|
|
||||||
|
public static final String PREFETCH_PERSISTENCE_PATH_KEY = "hbase.prefetch.file-list.path";
|
||||||
|
|
||||||
// Defaults
|
// Defaults
|
||||||
public static final boolean DEFAULT_CACHE_DATA_ON_READ = true;
|
public static final boolean DEFAULT_CACHE_DATA_ON_READ = true;
|
||||||
public static final boolean DEFAULT_CACHE_DATA_ON_WRITE = false;
|
public static final boolean DEFAULT_CACHE_DATA_ON_WRITE = false;
|
||||||
|
|
|
@ -17,6 +17,11 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.io.hfile;
|
package org.apache.hadoop.hbase.io.hfile;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileInputStream;
|
||||||
|
import java.io.FileOutputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentSkipListMap;
|
import java.util.concurrent.ConcurrentSkipListMap;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
@ -37,6 +42,8 @@ import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.PersistentPrefetchProtos;
|
||||||
|
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public final class PrefetchExecutor {
|
public final class PrefetchExecutor {
|
||||||
|
|
||||||
|
@ -44,12 +51,16 @@ public final class PrefetchExecutor {
|
||||||
|
|
||||||
/** Futures for tracking block prefetch activity */
|
/** Futures for tracking block prefetch activity */
|
||||||
private static final Map<Path, Future<?>> prefetchFutures = new ConcurrentSkipListMap<>();
|
private static final Map<Path, Future<?>> prefetchFutures = new ConcurrentSkipListMap<>();
|
||||||
|
/** Set of files for which prefetch is completed */
|
||||||
|
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "MS_SHOULD_BE_FINAL")
|
||||||
|
private static HashMap<String, Boolean> prefetchCompleted = new HashMap<>();
|
||||||
/** Executor pool shared among all HFiles for block prefetch */
|
/** Executor pool shared among all HFiles for block prefetch */
|
||||||
private static final ScheduledExecutorService prefetchExecutorPool;
|
private static final ScheduledExecutorService prefetchExecutorPool;
|
||||||
/** Delay before beginning prefetch */
|
/** Delay before beginning prefetch */
|
||||||
private static final int prefetchDelayMillis;
|
private static final int prefetchDelayMillis;
|
||||||
/** Variation in prefetch delay times, to mitigate stampedes */
|
/** Variation in prefetch delay times, to mitigate stampedes */
|
||||||
private static final float prefetchDelayVariation;
|
private static final float prefetchDelayVariation;
|
||||||
|
static String prefetchedFileListPath;
|
||||||
static {
|
static {
|
||||||
// Consider doing this on demand with a configuration passed in rather
|
// Consider doing this on demand with a configuration passed in rather
|
||||||
// than in a static initializer.
|
// than in a static initializer.
|
||||||
|
@ -79,6 +90,13 @@ public final class PrefetchExecutor {
|
||||||
+ HConstants.HREGION_COMPACTIONDIR_NAME.replace(".", "\\.") + Path.SEPARATOR_CHAR + ")");
|
+ HConstants.HREGION_COMPACTIONDIR_NAME.replace(".", "\\.") + Path.SEPARATOR_CHAR + ")");
|
||||||
|
|
||||||
public static void request(Path path, Runnable runnable) {
|
public static void request(Path path, Runnable runnable) {
|
||||||
|
if (prefetchCompleted != null) {
|
||||||
|
if (isFilePrefetched(path.getName())) {
|
||||||
|
LOG.info(
|
||||||
|
"File has already been prefetched before the restart, so skipping prefetch : " + path);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
if (!prefetchPathExclude.matcher(path.toString()).find()) {
|
if (!prefetchPathExclude.matcher(path.toString()).find()) {
|
||||||
long delay;
|
long delay;
|
||||||
if (prefetchDelayMillis > 0) {
|
if (prefetchDelayMillis > 0) {
|
||||||
|
@ -104,6 +122,7 @@ public final class PrefetchExecutor {
|
||||||
|
|
||||||
public static void complete(Path path) {
|
public static void complete(Path path) {
|
||||||
prefetchFutures.remove(path);
|
prefetchFutures.remove(path);
|
||||||
|
prefetchCompleted.put(path.getName(), true);
|
||||||
LOG.debug("Prefetch completed for {}", path);
|
LOG.debug("Prefetch completed for {}", path);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -115,6 +134,7 @@ public final class PrefetchExecutor {
|
||||||
prefetchFutures.remove(path);
|
prefetchFutures.remove(path);
|
||||||
LOG.debug("Prefetch cancelled for {}", path);
|
LOG.debug("Prefetch cancelled for {}", path);
|
||||||
}
|
}
|
||||||
|
prefetchCompleted.remove(path.getName());
|
||||||
}
|
}
|
||||||
|
|
||||||
public static boolean isCompleted(Path path) {
|
public static boolean isCompleted(Path path) {
|
||||||
|
@ -125,6 +145,68 @@ public final class PrefetchExecutor {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static void persistToFile(String path) throws IOException {
|
||||||
|
prefetchedFileListPath = path;
|
||||||
|
if (prefetchedFileListPath == null) {
|
||||||
|
LOG.info("Exception while persisting prefetch!");
|
||||||
|
throw new IOException("Error persisting prefetched HFiles set!");
|
||||||
|
}
|
||||||
|
if (!prefetchCompleted.isEmpty()) {
|
||||||
|
try (FileOutputStream fos = new FileOutputStream(prefetchedFileListPath, true)) {
|
||||||
|
PrefetchProtoUtils.toPB(prefetchCompleted).writeDelimitedTo(fos);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void retrieveFromFile(String path) throws IOException {
|
||||||
|
prefetchedFileListPath = path;
|
||||||
|
File prefetchPersistenceFile = new File(prefetchedFileListPath);
|
||||||
|
if (!prefetchPersistenceFile.exists()) {
|
||||||
|
LOG.warn("Prefetch persistence file does not exist!");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
LOG.info("Retrieving from prefetch persistence file " + path);
|
||||||
|
assert (prefetchedFileListPath != null);
|
||||||
|
try (FileInputStream fis = deleteFileOnClose(prefetchPersistenceFile)) {
|
||||||
|
PersistentPrefetchProtos.PrefetchedHfileName proto =
|
||||||
|
PersistentPrefetchProtos.PrefetchedHfileName.parseDelimitedFrom(fis);
|
||||||
|
Map<String, Boolean> protoPrefetchedFilesMap = proto.getPrefetchedFilesMap();
|
||||||
|
prefetchCompleted.putAll(protoPrefetchedFilesMap);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static FileInputStream deleteFileOnClose(final File file) throws IOException {
|
||||||
|
return new FileInputStream(file) {
|
||||||
|
private File myFile;
|
||||||
|
|
||||||
|
private FileInputStream init(File file) {
|
||||||
|
myFile = file;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
if (myFile == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
super.close();
|
||||||
|
if (!myFile.delete()) {
|
||||||
|
throw new IOException("Failed deleting persistence file " + myFile.getAbsolutePath());
|
||||||
|
}
|
||||||
|
myFile = null;
|
||||||
|
}
|
||||||
|
}.init(file);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void removePrefetchedFileWhileEvict(String hfileName) {
|
||||||
|
prefetchCompleted.remove(hfileName);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static boolean isFilePrefetched(String hfileName) {
|
||||||
|
return prefetchCompleted.containsKey(hfileName);
|
||||||
|
}
|
||||||
|
|
||||||
private PrefetchExecutor() {
|
private PrefetchExecutor() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,33 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.io.hfile;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.PersistentPrefetchProtos;
|
||||||
|
|
||||||
|
final class PrefetchProtoUtils {
|
||||||
|
private PrefetchProtoUtils() {
|
||||||
|
}
|
||||||
|
|
||||||
|
static PersistentPrefetchProtos.PrefetchedHfileName
|
||||||
|
toPB(Map<String, Boolean> prefetchedHfileNames) {
|
||||||
|
return PersistentPrefetchProtos.PrefetchedHfileName.newBuilder()
|
||||||
|
.putAllPrefetchedFiles(prefetchedHfileNames).build();
|
||||||
|
}
|
||||||
|
}
|
|
@ -17,6 +17,8 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.io.hfile.bucket;
|
package org.apache.hadoop.hbase.io.hfile.bucket;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hbase.io.hfile.CacheConfig.PREFETCH_PERSISTENCE_PATH_KEY;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileInputStream;
|
import java.io.FileInputStream;
|
||||||
import java.io.FileOutputStream;
|
import java.io.FileOutputStream;
|
||||||
|
@ -65,6 +67,7 @@ import org.apache.hadoop.hbase.io.hfile.Cacheable;
|
||||||
import org.apache.hadoop.hbase.io.hfile.CachedBlock;
|
import org.apache.hadoop.hbase.io.hfile.CachedBlock;
|
||||||
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
|
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
|
||||||
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
||||||
|
import org.apache.hadoop.hbase.io.hfile.PrefetchExecutor;
|
||||||
import org.apache.hadoop.hbase.nio.ByteBuff;
|
import org.apache.hadoop.hbase.nio.ByteBuff;
|
||||||
import org.apache.hadoop.hbase.nio.RefCnt;
|
import org.apache.hadoop.hbase.nio.RefCnt;
|
||||||
import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
|
import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
|
||||||
|
@ -235,6 +238,8 @@ public class BucketCache implements BlockCache, HeapSize {
|
||||||
/** In-memory bucket size */
|
/** In-memory bucket size */
|
||||||
private float memoryFactor;
|
private float memoryFactor;
|
||||||
|
|
||||||
|
private String prefetchedFileListPath;
|
||||||
|
|
||||||
private static final String FILE_VERIFY_ALGORITHM =
|
private static final String FILE_VERIFY_ALGORITHM =
|
||||||
"hbase.bucketcache.persistent.file.integrity.check.algorithm";
|
"hbase.bucketcache.persistent.file.integrity.check.algorithm";
|
||||||
private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5";
|
private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5";
|
||||||
|
@ -273,6 +278,7 @@ public class BucketCache implements BlockCache, HeapSize {
|
||||||
this.singleFactor = conf.getFloat(SINGLE_FACTOR_CONFIG_NAME, DEFAULT_SINGLE_FACTOR);
|
this.singleFactor = conf.getFloat(SINGLE_FACTOR_CONFIG_NAME, DEFAULT_SINGLE_FACTOR);
|
||||||
this.multiFactor = conf.getFloat(MULTI_FACTOR_CONFIG_NAME, DEFAULT_MULTI_FACTOR);
|
this.multiFactor = conf.getFloat(MULTI_FACTOR_CONFIG_NAME, DEFAULT_MULTI_FACTOR);
|
||||||
this.memoryFactor = conf.getFloat(MEMORY_FACTOR_CONFIG_NAME, DEFAULT_MEMORY_FACTOR);
|
this.memoryFactor = conf.getFloat(MEMORY_FACTOR_CONFIG_NAME, DEFAULT_MEMORY_FACTOR);
|
||||||
|
this.prefetchedFileListPath = conf.get(PREFETCH_PERSISTENCE_PATH_KEY);
|
||||||
|
|
||||||
sanityCheckConfigs();
|
sanityCheckConfigs();
|
||||||
|
|
||||||
|
@ -452,6 +458,9 @@ public class BucketCache implements BlockCache, HeapSize {
|
||||||
if (!cacheEnabled) {
|
if (!cacheEnabled) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
if (cacheKey.getBlockType() == null && cachedItem.getBlockType() != null) {
|
||||||
|
cacheKey.setBlockType(cachedItem.getBlockType());
|
||||||
|
}
|
||||||
LOG.trace("Caching key={}, item={}", cacheKey, cachedItem);
|
LOG.trace("Caching key={}, item={}", cacheKey, cachedItem);
|
||||||
// Stuff the entry into the RAM cache so it can get drained to the persistent store
|
// Stuff the entry into the RAM cache so it can get drained to the persistent store
|
||||||
RAMQueueEntry re =
|
RAMQueueEntry re =
|
||||||
|
@ -1187,6 +1196,9 @@ public class BucketCache implements BlockCache, HeapSize {
|
||||||
fos.write(ProtobufMagic.PB_MAGIC);
|
fos.write(ProtobufMagic.PB_MAGIC);
|
||||||
BucketProtoUtils.toPB(this).writeDelimitedTo(fos);
|
BucketProtoUtils.toPB(this).writeDelimitedTo(fos);
|
||||||
}
|
}
|
||||||
|
if (prefetchedFileListPath != null) {
|
||||||
|
PrefetchExecutor.persistToFile(prefetchedFileListPath);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1198,6 +1210,9 @@ public class BucketCache implements BlockCache, HeapSize {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
assert !cacheEnabled;
|
assert !cacheEnabled;
|
||||||
|
if (prefetchedFileListPath != null) {
|
||||||
|
PrefetchExecutor.retrieveFromFile(prefetchedFileListPath);
|
||||||
|
}
|
||||||
|
|
||||||
try (FileInputStream in = deleteFileOnClose(persistenceFile)) {
|
try (FileInputStream in = deleteFileOnClose(persistenceFile)) {
|
||||||
int pblen = ProtobufMagic.lengthOfPBMagic();
|
int pblen = ProtobufMagic.lengthOfPBMagic();
|
||||||
|
@ -1402,6 +1417,7 @@ public class BucketCache implements BlockCache, HeapSize {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public int evictBlocksByHfileName(String hfileName) {
|
public int evictBlocksByHfileName(String hfileName) {
|
||||||
|
PrefetchExecutor.removePrefetchedFileWhileEvict(hfileName);
|
||||||
Set<BlockCacheKey> keySet = blocksByHFile.subSet(new BlockCacheKey(hfileName, Long.MIN_VALUE),
|
Set<BlockCacheKey> keySet = blocksByHFile.subSet(new BlockCacheKey(hfileName, Long.MIN_VALUE),
|
||||||
true, new BlockCacheKey(hfileName, Long.MAX_VALUE), true);
|
true, new BlockCacheKey(hfileName, Long.MAX_VALUE), true);
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,145 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.io.hfile;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||||
|
import org.apache.hadoop.hbase.StartMiniClusterOption;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||||
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.IOTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@Category({ IOTests.class, LargeTests.class })
|
||||||
|
public class TestPrefetchRSClose {
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestPrefetchRSClose.class);
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(TestPrefetchRSClose.class);
|
||||||
|
|
||||||
|
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||||
|
|
||||||
|
private Configuration conf;
|
||||||
|
Path testDir;
|
||||||
|
MiniZooKeeperCluster zkCluster;
|
||||||
|
MiniHBaseCluster cluster;
|
||||||
|
StartMiniClusterOption option = StartMiniClusterOption.builder().numRegionServers(2).build();
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() throws Exception {
|
||||||
|
conf = TEST_UTIL.getConfiguration();
|
||||||
|
testDir = TEST_UTIL.getDataTestDir();
|
||||||
|
TEST_UTIL.getTestFileSystem().mkdirs(testDir);
|
||||||
|
|
||||||
|
conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true);
|
||||||
|
conf.set(BUCKET_CACHE_IOENGINE_KEY, "file:" + testDir + "/bucket.cache");
|
||||||
|
conf.setInt("hbase.bucketcache.size", 400);
|
||||||
|
conf.set("hbase.bucketcache.persistent.path", testDir + "/bucket.persistence");
|
||||||
|
conf.set(CacheConfig.PREFETCH_PERSISTENCE_PATH_KEY, testDir + "/prefetch.persistence");
|
||||||
|
zkCluster = TEST_UTIL.startMiniZKCluster();
|
||||||
|
cluster = TEST_UTIL.startMiniHBaseCluster(option);
|
||||||
|
assertEquals(2, cluster.getRegionServerThreads().size());
|
||||||
|
cluster.setConf(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRegionClosePrefetchPersistence() throws Exception {
|
||||||
|
// Write to table and flush
|
||||||
|
TableName tableName = TableName.valueOf("table1");
|
||||||
|
byte[] row0 = Bytes.toBytes("row1");
|
||||||
|
byte[] row1 = Bytes.toBytes("row2");
|
||||||
|
byte[] family = Bytes.toBytes("family");
|
||||||
|
byte[] qf1 = Bytes.toBytes("qf1");
|
||||||
|
byte[] qf2 = Bytes.toBytes("qf2");
|
||||||
|
byte[] value1 = Bytes.toBytes("value1");
|
||||||
|
byte[] value2 = Bytes.toBytes("value2");
|
||||||
|
|
||||||
|
TableDescriptor td = TableDescriptorBuilder.newBuilder(tableName)
|
||||||
|
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).build();
|
||||||
|
Table table = TEST_UTIL.createTable(td, null);
|
||||||
|
try {
|
||||||
|
// put data
|
||||||
|
Put put0 = new Put(row0);
|
||||||
|
put0.addColumn(family, qf1, 1, value1);
|
||||||
|
table.put(put0);
|
||||||
|
Put put1 = new Put(row1);
|
||||||
|
put1.addColumn(family, qf2, 1, value2);
|
||||||
|
table.put(put1);
|
||||||
|
TEST_UTIL.flush(tableName);
|
||||||
|
} finally {
|
||||||
|
Thread.sleep(1000);
|
||||||
|
}
|
||||||
|
// Stop the RS
|
||||||
|
cluster.stopRegionServer(0);
|
||||||
|
LOG.info("Stopped Region Server 0.");
|
||||||
|
Thread.sleep(1000);
|
||||||
|
assertTrue(new File(testDir + "/bucket.persistence").exists());
|
||||||
|
assertTrue(new File(testDir + "/prefetch.persistence").exists());
|
||||||
|
|
||||||
|
// Start the RS and validate
|
||||||
|
cluster.startRegionServer();
|
||||||
|
Thread.sleep(1000);
|
||||||
|
assertFalse(new File(testDir + "/prefetch.persistence").exists());
|
||||||
|
assertFalse(new File(testDir + "/bucket.persistence").exists());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPrefetchPersistenceNegative() throws Exception {
|
||||||
|
cluster.stopRegionServer(0);
|
||||||
|
LOG.info("Stopped Region Server 0.");
|
||||||
|
Thread.sleep(1000);
|
||||||
|
assertFalse(new File(testDir + "/prefetch.persistence").exists());
|
||||||
|
assertTrue(new File(testDir + "/bucket.persistence").exists());
|
||||||
|
cluster.startRegionServer();
|
||||||
|
Thread.sleep(1000);
|
||||||
|
assertFalse(new File(testDir + "/prefetch.persistence").exists());
|
||||||
|
assertFalse(new File(testDir + "/bucket.persistence").exists());
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
TEST_UTIL.shutdownMiniCluster();
|
||||||
|
if (zkCluster != null) {
|
||||||
|
zkCluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,213 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.io.hfile.bucket;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertNotEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
import org.apache.hadoop.hbase.fs.HFileSystem;
|
||||||
|
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
|
||||||
|
import org.apache.hadoop.hbase.io.hfile.BlockType;
|
||||||
|
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||||
|
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||||
|
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
|
||||||
|
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
||||||
|
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
|
||||||
|
import org.apache.hadoop.hbase.io.hfile.PrefetchExecutor;
|
||||||
|
import org.apache.hadoop.hbase.io.hfile.RandomKeyValueUtil;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.IOTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.junit.rules.TestName;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@RunWith(Parameterized.class)
|
||||||
|
@Category({ IOTests.class, LargeTests.class })
|
||||||
|
public class TestPrefetchPersistence {
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestPrefetchPersistence.class);
|
||||||
|
|
||||||
|
public TestName name = new TestName();
|
||||||
|
|
||||||
|
@Parameterized.Parameters(name = "{index}: blockSize={0}, bucketSizes={1}")
|
||||||
|
@SuppressWarnings("checkstyle:Indentation")
|
||||||
|
public static Iterable<Object[]> data() {
|
||||||
|
return Arrays.asList(new Object[][] { { 16 * 1024,
|
||||||
|
new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024,
|
||||||
|
28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024,
|
||||||
|
128 * 1024 + 1024 } } });
|
||||||
|
}
|
||||||
|
|
||||||
|
@Parameterized.Parameter(0)
|
||||||
|
public int constructedBlockSize;
|
||||||
|
|
||||||
|
@Parameterized.Parameter(1)
|
||||||
|
public int[] constructedBlockSizes;
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(TestPrefetchPersistence.class);
|
||||||
|
|
||||||
|
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||||
|
|
||||||
|
private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2;
|
||||||
|
private static final int DATA_BLOCK_SIZE = 2048;
|
||||||
|
private static final int NUM_KV = 1000;
|
||||||
|
|
||||||
|
private Configuration conf;
|
||||||
|
private CacheConfig cacheConf;
|
||||||
|
private FileSystem fs;
|
||||||
|
String prefetchPersistencePath;
|
||||||
|
Path testDir;
|
||||||
|
|
||||||
|
BucketCache bucketCache;
|
||||||
|
|
||||||
|
final long capacitySize = 32 * 1024 * 1024;
|
||||||
|
final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS;
|
||||||
|
final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() throws IOException {
|
||||||
|
conf = TEST_UTIL.getConfiguration();
|
||||||
|
conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true);
|
||||||
|
testDir = TEST_UTIL.getDataTestDir();
|
||||||
|
TEST_UTIL.getTestFileSystem().mkdirs(testDir);
|
||||||
|
prefetchPersistencePath = testDir + "/prefetch.persistence";
|
||||||
|
conf.set(CacheConfig.PREFETCH_PERSISTENCE_PATH_KEY, prefetchPersistencePath);
|
||||||
|
fs = HFileSystem.get(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPrefetchPersistence() throws Exception {
|
||||||
|
|
||||||
|
bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
|
||||||
|
constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
|
||||||
|
testDir + "/bucket.persistence", 60 * 1000, conf);
|
||||||
|
bucketCache.wait_when_cache = true;
|
||||||
|
cacheConf = new CacheConfig(conf, bucketCache);
|
||||||
|
|
||||||
|
long usedSize = bucketCache.getAllocator().getUsedSize();
|
||||||
|
assertEquals(0, usedSize);
|
||||||
|
assertTrue(new File(testDir + "/bucket.cache").exists());
|
||||||
|
// Load Cache
|
||||||
|
Path storeFile = writeStoreFile("TestPrefetch0");
|
||||||
|
Path storeFile2 = writeStoreFile("TestPrefetch1");
|
||||||
|
readStoreFile(storeFile, 0);
|
||||||
|
readStoreFile(storeFile2, 0);
|
||||||
|
usedSize = bucketCache.getAllocator().getUsedSize();
|
||||||
|
assertNotEquals(0, usedSize);
|
||||||
|
|
||||||
|
bucketCache.shutdown();
|
||||||
|
assertTrue(new File(testDir + "/bucket.persistence").exists());
|
||||||
|
assertTrue(new File(testDir + "/prefetch.persistence").exists());
|
||||||
|
bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
|
||||||
|
constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
|
||||||
|
testDir + "/bucket.persistence", 60 * 1000, conf);
|
||||||
|
bucketCache.wait_when_cache = true;
|
||||||
|
assertFalse(new File(testDir + "/bucket.persistence").exists());
|
||||||
|
assertFalse(new File(testDir + "/prefetch.persistence").exists());
|
||||||
|
assertTrue(usedSize != 0);
|
||||||
|
readStoreFile(storeFile, 0);
|
||||||
|
readStoreFile(storeFile2, 0);
|
||||||
|
// Test Close Store File
|
||||||
|
closeStoreFile(storeFile2);
|
||||||
|
TEST_UTIL.cleanupTestDir();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void closeStoreFile(Path path) throws Exception {
|
||||||
|
HFile.Reader reader = HFile.createReader(fs, path, cacheConf, true, conf);
|
||||||
|
assertTrue(PrefetchExecutor.isFilePrefetched(path.getName()));
|
||||||
|
reader.close(true);
|
||||||
|
assertFalse(PrefetchExecutor.isFilePrefetched(path.getName()));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void readStoreFile(Path storeFilePath, long offset) throws Exception {
|
||||||
|
// Open the file
|
||||||
|
HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf);
|
||||||
|
|
||||||
|
while (!reader.prefetchComplete()) {
|
||||||
|
// Sleep for a bit
|
||||||
|
Thread.sleep(1000);
|
||||||
|
}
|
||||||
|
HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, null);
|
||||||
|
BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset);
|
||||||
|
BucketEntry be = bucketCache.backingMap.get(blockCacheKey);
|
||||||
|
boolean isCached = bucketCache.getBlock(blockCacheKey, true, false, true) != null;
|
||||||
|
|
||||||
|
if (
|
||||||
|
block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX
|
||||||
|
|| block.getBlockType() == BlockType.INTERMEDIATE_INDEX
|
||||||
|
) {
|
||||||
|
assertTrue(isCached);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public Path writeStoreFile(String fname) throws IOException {
|
||||||
|
Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname);
|
||||||
|
HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
|
||||||
|
StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs)
|
||||||
|
.withOutputDir(storeFileParentDir).withFileContext(meta).build();
|
||||||
|
Random rand = ThreadLocalRandom.current();
|
||||||
|
final int rowLen = 32;
|
||||||
|
for (int i = 0; i < NUM_KV; ++i) {
|
||||||
|
byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i);
|
||||||
|
byte[] v = RandomKeyValueUtil.randomValue(rand);
|
||||||
|
int cfLen = rand.nextInt(k.length - rowLen + 1);
|
||||||
|
KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen,
|
||||||
|
k.length - rowLen - cfLen, rand.nextLong(), generateKeyType(rand), v, 0, v.length);
|
||||||
|
sfw.append(kv);
|
||||||
|
}
|
||||||
|
|
||||||
|
sfw.close();
|
||||||
|
return sfw.getPath();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static KeyValue.Type generateKeyType(Random rand) {
|
||||||
|
if (rand.nextBoolean()) {
|
||||||
|
// Let's make half of KVs puts.
|
||||||
|
return KeyValue.Type.Put;
|
||||||
|
} else {
|
||||||
|
KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)];
|
||||||
|
if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) {
|
||||||
|
throw new RuntimeException("Generated an invalid key type: " + keyType + ". "
|
||||||
|
+ "Probably the layout of KeyValue.Type has changed.");
|
||||||
|
}
|
||||||
|
return keyType;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue