HBASE-27474 Evict blocks on split/merge; Avoid caching reference/hlinks if compaction is enabled (#4868)
Signed-off-by: Peter Somogyi <psomogyi@apache.org>
(cherry picked from commit 222ec684d6
)
This commit is contained in:
parent
bc58d6d861
commit
012306b62d
|
@ -2963,6 +2963,19 @@ public final class ProtobufUtil {
|
|||
}
|
||||
|
||||
public static CloseRegionRequest buildCloseRegionRequest(ServerName server, byte[] regionName,
|
||||
ServerName destinationServer, long closeProcId) {
|
||||
return ProtobufUtil.getBuilder(server, regionName, destinationServer, closeProcId).build();
|
||||
}
|
||||
|
||||
public static CloseRegionRequest buildCloseRegionRequest(ServerName server, byte[] regionName,
|
||||
ServerName destinationServer, long closeProcId, boolean evictCache) {
|
||||
CloseRegionRequest.Builder builder =
|
||||
getBuilder(server, regionName, destinationServer, closeProcId);
|
||||
builder.setEvictCache(evictCache);
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
public static CloseRegionRequest.Builder getBuilder(ServerName server, byte[] regionName,
|
||||
ServerName destinationServer, long closeProcId) {
|
||||
CloseRegionRequest.Builder builder = CloseRegionRequest.newBuilder();
|
||||
RegionSpecifier region =
|
||||
|
@ -2975,7 +2988,7 @@ public final class ProtobufUtil {
|
|||
builder.setServerStartCode(server.getStartcode());
|
||||
}
|
||||
builder.setCloseProcId(closeProcId);
|
||||
return builder.build();
|
||||
return builder;
|
||||
}
|
||||
|
||||
public static ProcedureDescription buildProcedureDescription(String signature, String instance,
|
||||
|
|
|
@ -122,6 +122,7 @@ message CloseRegionRequest {
|
|||
// the intended server for this RPC.
|
||||
optional uint64 serverStartCode = 5;
|
||||
optional int64 close_proc_id = 6 [default = -1];
|
||||
optional bool evict_cache = 7 [default = false];
|
||||
}
|
||||
|
||||
message CloseRegionResponse {
|
||||
|
|
|
@ -541,6 +541,8 @@ message RegionStateTransitionStateData {
|
|||
required RegionTransitionType type = 1;
|
||||
optional ServerName assign_candidate = 2;
|
||||
required bool force_new_plan = 3;
|
||||
optional bool is_split = 4 [default = false];
|
||||
optional bool evict_cache = 5 [default = false];
|
||||
}
|
||||
|
||||
enum RegionRemoteProcedureBaseState {
|
||||
|
@ -565,6 +567,7 @@ message OpenRegionProcedureStateData {
|
|||
|
||||
message CloseRegionProcedureStateData {
|
||||
optional ServerName assign_candidate = 1;
|
||||
optional bool evict_cache = 2 [default = false];
|
||||
}
|
||||
|
||||
enum SwitchRpcThrottleState {
|
||||
|
|
|
@ -35,7 +35,7 @@ public class HFilePreadReader extends HFileReaderImpl {
|
|||
Configuration conf) throws IOException {
|
||||
super(context, fileInfo, cacheConf, conf);
|
||||
// Prefetch file blocks upon open if requested
|
||||
if (cacheConf.shouldPrefetchOnOpen()) {
|
||||
if (cacheConf.shouldPrefetchOnOpen() && cacheIfCompactionsOff()) {
|
||||
PrefetchExecutor.request(path, new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
|
@ -97,7 +97,7 @@ public class HFilePreadReader extends HFileReaderImpl {
|
|||
if (evictOnClose) {
|
||||
int numEvicted = cache.evictBlocksByHfileName(name);
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("On close, file=" + name + " evicted=" + numEvicted + " block(s)");
|
||||
LOG.trace("On close, file= {} evicted= {} block(s)", name, numEvicted);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.io.hfile;
|
||||
|
||||
import static org.apache.hadoop.hbase.regionserver.CompactSplit.HBASE_REGION_SERVER_ENABLE_COMPACTION;
|
||||
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.BLOCK_CACHE_KEY_KEY;
|
||||
|
||||
import io.opentelemetry.api.common.Attributes;
|
||||
|
@ -40,12 +41,14 @@ import org.apache.hadoop.hbase.SizeCachedByteBufferKeyValue;
|
|||
import org.apache.hadoop.hbase.SizeCachedKeyValue;
|
||||
import org.apache.hadoop.hbase.SizeCachedNoTagsByteBufferKeyValue;
|
||||
import org.apache.hadoop.hbase.SizeCachedNoTagsKeyValue;
|
||||
import org.apache.hadoop.hbase.io.HFileLink;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
|
||||
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
|
||||
import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
|
||||
import org.apache.hadoop.hbase.nio.ByteBuff;
|
||||
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
|
||||
import org.apache.hadoop.hbase.util.ByteBufferUtils;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.IdLock;
|
||||
|
@ -1264,6 +1267,8 @@ public abstract class HFileReaderImpl implements HFile.Reader, Configurable {
|
|||
new BlockCacheKey(name, dataBlockOffset, this.isPrimaryReplicaReader(), expectedBlockType);
|
||||
Attributes attributes = Attributes.of(BLOCK_CACHE_KEY_KEY, cacheKey.toString());
|
||||
|
||||
boolean cacheable = cacheBlock && cacheIfCompactionsOff();
|
||||
|
||||
boolean useLock = false;
|
||||
IdLock.Entry lockEntry = null;
|
||||
final Span span = Span.current();
|
||||
|
@ -1305,7 +1310,7 @@ public abstract class HFileReaderImpl implements HFile.Reader, Configurable {
|
|||
return cachedBlock;
|
||||
}
|
||||
|
||||
if (!useLock && cacheBlock && cacheConf.shouldLockOnCacheMiss(expectedBlockType)) {
|
||||
if (!useLock && cacheable && cacheConf.shouldLockOnCacheMiss(expectedBlockType)) {
|
||||
// check cache again with lock
|
||||
useLock = true;
|
||||
continue;
|
||||
|
@ -1324,10 +1329,10 @@ public abstract class HFileReaderImpl implements HFile.Reader, Configurable {
|
|||
|
||||
// Don't need the unpacked block back and we're storing the block in the cache compressed
|
||||
if (cacheOnly && cacheCompressed && cacheOnRead) {
|
||||
LOG.debug("Skipping decompression of block in prefetch");
|
||||
LOG.debug("Skipping decompression of block {} in prefetch", cacheKey);
|
||||
// Cache the block if necessary
|
||||
cacheConf.getBlockCache().ifPresent(cache -> {
|
||||
if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
|
||||
if (cacheable && cacheConf.shouldCacheBlockOnRead(category)) {
|
||||
cache.cacheBlock(cacheKey, hfileBlock, cacheConf.isInMemory(), cacheOnly);
|
||||
}
|
||||
});
|
||||
|
@ -1340,7 +1345,7 @@ public abstract class HFileReaderImpl implements HFile.Reader, Configurable {
|
|||
HFileBlock unpacked = hfileBlock.unpack(hfileContext, fsBlockReader);
|
||||
// Cache the block if necessary
|
||||
cacheConf.getBlockCache().ifPresent(cache -> {
|
||||
if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
|
||||
if (cacheable && cacheConf.shouldCacheBlockOnRead(category)) {
|
||||
// Using the wait on cache during compaction and prefetching.
|
||||
cache.cacheBlock(cacheKey, cacheCompressed ? hfileBlock : unpacked,
|
||||
cacheConf.isInMemory(), cacheOnly);
|
||||
|
@ -1667,4 +1672,9 @@ public abstract class HFileReaderImpl implements HFile.Reader, Configurable {
|
|||
public void unbufferStream() {
|
||||
fsBlockReader.unbufferStream();
|
||||
}
|
||||
|
||||
protected boolean cacheIfCompactionsOff() {
|
||||
return (!StoreFileInfo.isReference(name) && !HFileLink.isHFileLink(name))
|
||||
|| !conf.getBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION, true);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -115,7 +115,7 @@ final class AssignmentManagerUtil {
|
|||
for (; i < procs.length; i++) {
|
||||
RegionStateNode regionNode = regionNodes.get(i);
|
||||
TransitRegionStateProcedure proc =
|
||||
TransitRegionStateProcedure.unassign(env, regionNode.getRegionInfo());
|
||||
TransitRegionStateProcedure.unassignSplitMerge(env, regionNode.getRegionInfo());
|
||||
if (regionNode.getProcedure() != null) {
|
||||
throw new HBaseIOException(
|
||||
"The parent region " + regionNode + " is currently in transition, give up");
|
||||
|
|
|
@ -45,14 +45,17 @@ public class CloseRegionProcedure extends RegionRemoteProcedureBase {
|
|||
// wrong(but do not make it wrong intentionally). The client can handle this error.
|
||||
private ServerName assignCandidate;
|
||||
|
||||
private boolean evictCache;
|
||||
|
||||
public CloseRegionProcedure() {
|
||||
super();
|
||||
}
|
||||
|
||||
public CloseRegionProcedure(TransitRegionStateProcedure parent, RegionInfo region,
|
||||
ServerName targetServer, ServerName assignCandidate) {
|
||||
ServerName targetServer, ServerName assignCandidate, boolean evictCache) {
|
||||
super(parent, region, targetServer);
|
||||
this.assignCandidate = assignCandidate;
|
||||
this.evictCache = evictCache;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -62,7 +65,7 @@ public class CloseRegionProcedure extends RegionRemoteProcedureBase {
|
|||
|
||||
@Override
|
||||
public RemoteOperation newRemoteOperation() {
|
||||
return new RegionCloseOperation(this, region, getProcId(), assignCandidate);
|
||||
return new RegionCloseOperation(this, region, getProcId(), assignCandidate, evictCache);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -72,6 +75,7 @@ public class CloseRegionProcedure extends RegionRemoteProcedureBase {
|
|||
if (assignCandidate != null) {
|
||||
builder.setAssignCandidate(ProtobufUtil.toServerName(assignCandidate));
|
||||
}
|
||||
builder.setEvictCache(evictCache);
|
||||
serializer.serialize(builder.build());
|
||||
}
|
||||
|
||||
|
@ -83,6 +87,7 @@ public class CloseRegionProcedure extends RegionRemoteProcedureBase {
|
|||
if (data.hasAssignCandidate()) {
|
||||
assignCandidate = ProtobufUtil.toServerName(data.getAssignCandidate());
|
||||
}
|
||||
evictCache = data.getEvictCache();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.master.assignment;
|
||||
|
||||
import static org.apache.hadoop.hbase.io.hfile.CacheConfig.DEFAULT_EVICT_ON_CLOSE;
|
||||
import static org.apache.hadoop.hbase.io.hfile.CacheConfig.EVICT_BLOCKS_ON_CLOSE_KEY;
|
||||
import static org.apache.hadoop.hbase.master.LoadBalancer.BOGUS_SERVER_NAME;
|
||||
|
||||
import edu.umd.cs.findbugs.annotations.Nullable;
|
||||
|
@ -120,6 +122,10 @@ public class TransitRegionStateProcedure
|
|||
|
||||
private RegionRemoteProcedureBase remoteProc;
|
||||
|
||||
private boolean evictCache;
|
||||
|
||||
private boolean isSplit;
|
||||
|
||||
public TransitRegionStateProcedure() {
|
||||
}
|
||||
|
||||
|
@ -155,6 +161,14 @@ public class TransitRegionStateProcedure
|
|||
if (type == TransitionType.REOPEN) {
|
||||
this.assignCandidate = getRegionStateNode(env).getRegionLocation();
|
||||
}
|
||||
evictCache =
|
||||
env.getMasterConfiguration().getBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, DEFAULT_EVICT_ON_CLOSE);
|
||||
}
|
||||
|
||||
protected TransitRegionStateProcedure(MasterProcedureEnv env, RegionInfo hri,
|
||||
ServerName assignCandidate, boolean forceNewPlan, TransitionType type, boolean isSplit) {
|
||||
this(env, hri, assignCandidate, forceNewPlan, type);
|
||||
this.isSplit = isSplit;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -264,8 +278,12 @@ public class TransitRegionStateProcedure
|
|||
if (regionNode.isInState(State.OPEN, State.CLOSING, State.MERGING, State.SPLITTING)) {
|
||||
// this is the normal case
|
||||
env.getAssignmentManager().regionClosing(regionNode);
|
||||
addChildProcedure(new CloseRegionProcedure(this, getRegion(), regionNode.getRegionLocation(),
|
||||
assignCandidate));
|
||||
CloseRegionProcedure closeProc = isSplit
|
||||
? new CloseRegionProcedure(this, getRegion(), regionNode.getRegionLocation(),
|
||||
assignCandidate, true)
|
||||
: new CloseRegionProcedure(this, getRegion(), regionNode.getRegionLocation(),
|
||||
assignCandidate, evictCache);
|
||||
addChildProcedure(closeProc);
|
||||
setNextState(RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_CLOSED);
|
||||
} else {
|
||||
forceNewPlan = true;
|
||||
|
@ -504,8 +522,9 @@ public class TransitRegionStateProcedure
|
|||
@Override
|
||||
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
||||
super.serializeStateData(serializer);
|
||||
RegionStateTransitionStateData.Builder builder = RegionStateTransitionStateData.newBuilder()
|
||||
.setType(convert(type)).setForceNewPlan(forceNewPlan);
|
||||
RegionStateTransitionStateData.Builder builder =
|
||||
RegionStateTransitionStateData.newBuilder().setType(convert(type))
|
||||
.setForceNewPlan(forceNewPlan).setEvictCache(evictCache).setIsSplit(isSplit);
|
||||
if (assignCandidate != null) {
|
||||
builder.setAssignCandidate(ProtobufUtil.toServerName(assignCandidate));
|
||||
}
|
||||
|
@ -523,6 +542,8 @@ public class TransitRegionStateProcedure
|
|||
if (data.hasAssignCandidate()) {
|
||||
assignCandidate = ProtobufUtil.toServerName(data.getAssignCandidate());
|
||||
}
|
||||
evictCache = data.getEvictCache();
|
||||
isSplit = data.getIsSplit();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -586,6 +607,12 @@ public class TransitRegionStateProcedure
|
|||
new TransitRegionStateProcedure(env, region, null, false, TransitionType.UNASSIGN));
|
||||
}
|
||||
|
||||
public static TransitRegionStateProcedure unassignSplitMerge(MasterProcedureEnv env,
|
||||
RegionInfo region) {
|
||||
return setOwner(env,
|
||||
new TransitRegionStateProcedure(env, region, null, false, TransitionType.UNASSIGN, true));
|
||||
}
|
||||
|
||||
public static TransitRegionStateProcedure reopen(MasterProcedureEnv env, RegionInfo region) {
|
||||
return setOwner(env,
|
||||
new TransitRegionStateProcedure(env, region, null, false, TransitionType.REOPEN));
|
||||
|
|
|
@ -472,11 +472,13 @@ public class RSProcedureDispatcher extends RemoteProcedureDispatcher<MasterProce
|
|||
|
||||
public static class RegionCloseOperation extends RegionOperation {
|
||||
private final ServerName destinationServer;
|
||||
private boolean evictCache;
|
||||
|
||||
public RegionCloseOperation(RemoteProcedure remoteProcedure, RegionInfo regionInfo, long procId,
|
||||
ServerName destinationServer) {
|
||||
ServerName destinationServer, boolean evictCache) {
|
||||
super(remoteProcedure, regionInfo, procId);
|
||||
this.destinationServer = destinationServer;
|
||||
this.evictCache = evictCache;
|
||||
}
|
||||
|
||||
public ServerName getDestinationServer() {
|
||||
|
@ -485,7 +487,8 @@ public class RSProcedureDispatcher extends RemoteProcedureDispatcher<MasterProce
|
|||
|
||||
public CloseRegionRequest buildCloseRegionRequest(final ServerName serverName) {
|
||||
return ProtobufUtil.buildCloseRegionRequest(serverName, regionInfo.getRegionName(),
|
||||
getDestinationServer(), procId);
|
||||
getDestinationServer(), procId, evictCache);
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -849,7 +849,7 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
|
|||
|
||||
public void setCompactionsEnabled(boolean compactionsEnabled) {
|
||||
this.compactionsEnabled = compactionsEnabled;
|
||||
this.conf.set(HBASE_REGION_SERVER_ENABLE_COMPACTION, String.valueOf(compactionsEnabled));
|
||||
this.conf.setBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION, compactionsEnabled);
|
||||
}
|
||||
|
||||
/** Returns the longCompactions thread pool executor */
|
||||
|
|
|
@ -3451,6 +3451,9 @@ public class HRegionServer extends Thread
|
|||
* <p>
|
||||
* If a close was in progress, this new request will be ignored, and an exception thrown.
|
||||
* </p>
|
||||
* <p>
|
||||
* Provides additional flag to indicate if this region blocks should be evicted from the cache.
|
||||
* </p>
|
||||
* @param encodedName Region to close
|
||||
* @param abort True if we are aborting
|
||||
* @param destination Where the Region is being moved too... maybe null if unknown.
|
||||
|
|
|
@ -3904,9 +3904,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.Blockin
|
|||
? ProtobufUtil.toServerName(request.getDestinationServer())
|
||||
: null;
|
||||
long procId = request.getCloseProcId();
|
||||
boolean evictCache = request.getEvictCache();
|
||||
if (regionServer.submitRegionProcedure(procId)) {
|
||||
regionServer.executorService.submit(
|
||||
UnassignRegionHandler.create(regionServer, encodedName, procId, false, destination));
|
||||
regionServer.getExecutorService().submit(
|
||||
UnassignRegionHandler.create(regionServer, encodedName, procId, false, destination, evictCache));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.regionserver.handler;
|
||||
|
||||
import static org.apache.hadoop.hbase.regionserver.CompactSplit.HBASE_REGION_SERVER_ENABLE_COMPACTION;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
|
@ -27,6 +29,7 @@ import org.apache.hadoop.hbase.executor.EventHandler;
|
|||
import org.apache.hadoop.hbase.executor.EventType;
|
||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices.PostOpenDeployContext;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices.RegionStateTransitionContext;
|
||||
|
@ -277,6 +280,10 @@ public class OpenRegionHandler extends EventHandler {
|
|||
/** Returns Instance of HRegion if successful open else null. */
|
||||
private HRegion openRegion() {
|
||||
HRegion region = null;
|
||||
boolean compactionEnabled =
|
||||
((HRegionServer) server).getCompactSplitThread().isCompactionsEnabled();
|
||||
this.server.getConfiguration().setBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION,
|
||||
compactionEnabled);
|
||||
try {
|
||||
// Instantiate the region. This also periodically tickles OPENING
|
||||
// state so master doesn't timeout this region in transition.
|
||||
|
|
|
@ -62,14 +62,22 @@ public class UnassignRegionHandler extends EventHandler {
|
|||
|
||||
private final RetryCounter retryCounter;
|
||||
|
||||
private boolean evictCache;
|
||||
|
||||
public UnassignRegionHandler(HRegionServer server, String encodedName, long closeProcId,
|
||||
boolean abort, @Nullable ServerName destination, EventType eventType) {
|
||||
this(server, encodedName, closeProcId, abort, destination, eventType, false);
|
||||
}
|
||||
|
||||
public UnassignRegionHandler(HRegionServer server, String encodedName, long closeProcId,
|
||||
boolean abort, @Nullable ServerName destination, EventType eventType, boolean evictCache) {
|
||||
super(server, eventType);
|
||||
this.encodedName = encodedName;
|
||||
this.closeProcId = closeProcId;
|
||||
this.abort = abort;
|
||||
this.destination = destination;
|
||||
this.retryCounter = HandlerUtil.getRetryCounter();
|
||||
this.evictCache = evictCache;
|
||||
}
|
||||
|
||||
private HRegionServer getServer() {
|
||||
|
@ -115,6 +123,12 @@ public class UnassignRegionHandler extends EventHandler {
|
|||
// abort the RS...
|
||||
region.getCoprocessorHost().preClose(abort);
|
||||
}
|
||||
// This should be true only in the case of splits/merges closing the parent regions, as
|
||||
// there's no point on keep blocks for those region files. As hbase.rs.evictblocksonclose is
|
||||
// false by default we don't bother overriding it if evictCache is false.
|
||||
if (evictCache) {
|
||||
region.getStores().forEach(s -> s.getCacheConfig().setEvictOnClose(true));
|
||||
}
|
||||
if (region.close(abort) == null) {
|
||||
// XXX: Is this still possible? The old comment says about split, but now split is done at
|
||||
// master side, so...
|
||||
|
@ -157,7 +171,7 @@ public class UnassignRegionHandler extends EventHandler {
|
|||
}
|
||||
|
||||
public static UnassignRegionHandler create(HRegionServer server, String encodedName,
|
||||
long closeProcId, boolean abort, @Nullable ServerName destination) {
|
||||
long closeProcId, boolean abort, @Nullable ServerName destination, boolean evictCache) {
|
||||
// Just try our best to determine whether it is for closing meta. It is not the end of the world
|
||||
// if we put the handler into a wrong executor.
|
||||
Region region = server.getRegion(encodedName);
|
||||
|
@ -165,6 +179,6 @@ public class UnassignRegionHandler extends EventHandler {
|
|||
? EventType.M_RS_CLOSE_META
|
||||
: EventType.M_RS_CLOSE_REGION;
|
||||
return new UnassignRegionHandler(server, encodedName, closeProcId, abort, destination,
|
||||
eventType);
|
||||
eventType, evictCache);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.io.hfile;
|
|||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId;
|
||||
import static org.apache.hadoop.hbase.io.hfile.CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY;
|
||||
import static org.apache.hadoop.hbase.regionserver.CompactSplit.HBASE_REGION_SERVER_ENABLE_COMPACTION;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.allOf;
|
||||
import static org.hamcrest.Matchers.hasItem;
|
||||
|
@ -38,6 +39,7 @@ import java.util.concurrent.ThreadLocalRandom;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.Consumer;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
@ -46,17 +48,25 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
|
|||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.MatcherPredicate;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||
import org.apache.hadoop.hbase.client.trace.StringTraceRenderer;
|
||||
import org.apache.hadoop.hbase.fs.HFileSystem;
|
||||
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||
import org.apache.hadoop.hbase.regionserver.BloomType;
|
||||
import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
|
||||
import org.apache.hadoop.hbase.regionserver.HStoreFile;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
|
||||
import org.apache.hadoop.hbase.testclassification.IOTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.trace.TraceUtil;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.junit.Before;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Rule;
|
||||
|
@ -226,6 +236,57 @@ public class TestPrefetch {
|
|||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPrefetchSkipsRefs() throws Exception {
|
||||
testPrefetchWhenRefs(true, c -> {
|
||||
boolean isCached = c != null;
|
||||
assertFalse(isCached);
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPrefetchDoesntSkipRefs() throws Exception {
|
||||
testPrefetchWhenRefs(false, c -> {
|
||||
boolean isCached = c != null;
|
||||
assertTrue(isCached);
|
||||
});
|
||||
}
|
||||
|
||||
private void testPrefetchWhenRefs(boolean compactionEnabled, Consumer<Cacheable> test)
|
||||
throws Exception {
|
||||
cacheConf = new CacheConfig(conf, blockCache);
|
||||
HFileContext context = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
|
||||
Path tableDir = new Path(TEST_UTIL.getDataTestDir(), "testPrefetchSkipRefs");
|
||||
RegionInfo region =
|
||||
RegionInfoBuilder.newBuilder(TableName.valueOf("testPrefetchSkipRefs")).build();
|
||||
Path regionDir = new Path(tableDir, region.getEncodedName());
|
||||
Pair<Path, byte[]> fileWithSplitPoint =
|
||||
writeStoreFileForSplit(new Path(regionDir, "cf"), context);
|
||||
Path storeFile = fileWithSplitPoint.getFirst();
|
||||
HRegionFileSystem regionFS =
|
||||
HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, region);
|
||||
HStoreFile file = new HStoreFile(fs, storeFile, conf, cacheConf, BloomType.NONE, true);
|
||||
Path ref = regionFS.splitStoreFile(region, "cf", file, fileWithSplitPoint.getSecond(), false,
|
||||
new ConstantSizeRegionSplitPolicy());
|
||||
conf.setBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION, compactionEnabled);
|
||||
HStoreFile refHsf = new HStoreFile(this.fs, ref, conf, cacheConf, BloomType.NONE, true);
|
||||
refHsf.initReader();
|
||||
HFile.Reader reader = refHsf.getReader().getHFileReader();
|
||||
while (!reader.prefetchComplete()) {
|
||||
// Sleep for a bit
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
long offset = 0;
|
||||
while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
|
||||
HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, null, true);
|
||||
BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset);
|
||||
if (block.getBlockType() == BlockType.DATA) {
|
||||
test.accept(blockCache.getBlock(blockCacheKey, true, false, true));
|
||||
}
|
||||
offset += block.getOnDiskSizeWithHeader();
|
||||
}
|
||||
}
|
||||
|
||||
private Path writeStoreFile(String fname) throws IOException {
|
||||
HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
|
||||
return writeStoreFile(fname, meta);
|
||||
|
@ -250,6 +311,28 @@ public class TestPrefetch {
|
|||
return sfw.getPath();
|
||||
}
|
||||
|
||||
private Pair<Path, byte[]> writeStoreFileForSplit(Path storeDir, HFileContext context)
|
||||
throws IOException {
|
||||
StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs).withOutputDir(storeDir)
|
||||
.withFileContext(context).build();
|
||||
Random rand = ThreadLocalRandom.current();
|
||||
final int rowLen = 32;
|
||||
byte[] splitPoint = null;
|
||||
for (int i = 0; i < NUM_KV; ++i) {
|
||||
byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i);
|
||||
byte[] v = RandomKeyValueUtil.randomValue(rand);
|
||||
int cfLen = rand.nextInt(k.length - rowLen + 1);
|
||||
KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen,
|
||||
k.length - rowLen - cfLen, rand.nextLong(), generateKeyType(rand), v, 0, v.length);
|
||||
sfw.append(kv);
|
||||
if (i == NUM_KV / 2) {
|
||||
splitPoint = k;
|
||||
}
|
||||
}
|
||||
sfw.close();
|
||||
return new Pair(sfw.getPath(), splitPoint);
|
||||
}
|
||||
|
||||
public static KeyValue.Type generateKeyType(Random rand) {
|
||||
if (rand.nextBoolean()) {
|
||||
// Let's make half of KVs puts.
|
||||
|
|
Loading…
Reference in New Issue