HBASE-24428 : Update compaction priority for recently split daughter regions (#1784)
Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
parent
f520c9d009
commit
48e9835adb
|
@ -143,6 +143,12 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
||||||
public static final int DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER = 1000;
|
public static final int DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER = 1000;
|
||||||
public static final int DEFAULT_BLOCKING_STOREFILE_COUNT = 16;
|
public static final int DEFAULT_BLOCKING_STOREFILE_COUNT = 16;
|
||||||
|
|
||||||
|
// HBASE-24428 : Update compaction priority for recently split daughter regions
|
||||||
|
// so as to prioritize their compaction.
|
||||||
|
// Any compaction candidate with higher priority than compaction of newly split daugher regions
|
||||||
|
// should have priority value < (Integer.MIN_VALUE + 1000)
|
||||||
|
private static final int SPLIT_REGION_COMPACTION_PRIORITY = Integer.MIN_VALUE + 1000;
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(HStore.class);
|
private static final Logger LOG = LoggerFactory.getLogger(HStore.class);
|
||||||
|
|
||||||
protected final MemStore memstore;
|
protected final MemStore memstore;
|
||||||
|
@ -1923,7 +1929,22 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
||||||
|
|
||||||
// Set common request properties.
|
// Set common request properties.
|
||||||
// Set priority, either override value supplied by caller or from store.
|
// Set priority, either override value supplied by caller or from store.
|
||||||
request.setPriority((priority != Store.NO_PRIORITY) ? priority : getCompactPriority());
|
final int compactionPriority =
|
||||||
|
(priority != Store.NO_PRIORITY) ? priority : getCompactPriority();
|
||||||
|
request.setPriority(compactionPriority);
|
||||||
|
|
||||||
|
if (request.isAfterSplit()) {
|
||||||
|
// If the store belongs to recently splitted daughter regions, better we consider
|
||||||
|
// them with the higher priority in the compaction queue.
|
||||||
|
// Override priority if it is lower (higher int value) than
|
||||||
|
// SPLIT_REGION_COMPACTION_PRIORITY
|
||||||
|
final int splitHousekeepingPriority =
|
||||||
|
Math.min(compactionPriority, SPLIT_REGION_COMPACTION_PRIORITY);
|
||||||
|
request.setPriority(splitHousekeepingPriority);
|
||||||
|
LOG.info("Keeping/Overriding Compaction request priority to {} for CF {} since it"
|
||||||
|
+ " belongs to recently split daughter region {}", splitHousekeepingPriority,
|
||||||
|
this.getColumnFamilyName(), getRegionInfo().getRegionNameAsString());
|
||||||
|
}
|
||||||
request.setDescription(getRegionInfo().getRegionNameAsString(), getColumnFamilyName());
|
request.setDescription(getRegionInfo().getRegionNameAsString(), getColumnFamilyName());
|
||||||
request.setTracker(tracker);
|
request.setTracker(tracker);
|
||||||
}
|
}
|
||||||
|
|
|
@ -52,7 +52,7 @@ public class StoreUtils {
|
||||||
*/
|
*/
|
||||||
public static boolean hasReferences(Collection<HStoreFile> files) {
|
public static boolean hasReferences(Collection<HStoreFile> files) {
|
||||||
// TODO: make sure that we won't pass null here in the future.
|
// TODO: make sure that we won't pass null here in the future.
|
||||||
return files != null ? files.stream().anyMatch(HStoreFile::isReference) : false;
|
return files != null && files.stream().anyMatch(HStoreFile::isReference);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -43,6 +43,7 @@ public class CompactionRequestImpl implements CompactionRequest {
|
||||||
private DisplayCompactionType isMajor = DisplayCompactionType.MINOR;
|
private DisplayCompactionType isMajor = DisplayCompactionType.MINOR;
|
||||||
private int priority = NO_PRIORITY;
|
private int priority = NO_PRIORITY;
|
||||||
private Collection<HStoreFile> filesToCompact;
|
private Collection<HStoreFile> filesToCompact;
|
||||||
|
private boolean isAfterSplit = false;
|
||||||
|
|
||||||
// CompactRequest object creation time.
|
// CompactRequest object creation time.
|
||||||
private long selectionTime;
|
private long selectionTime;
|
||||||
|
@ -136,6 +137,14 @@ public class CompactionRequestImpl implements CompactionRequest {
|
||||||
return tracker;
|
return tracker;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isAfterSplit() {
|
||||||
|
return isAfterSplit;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setAfterSplit(boolean afterSplit) {
|
||||||
|
isAfterSplit = afterSplit;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int hashCode() {
|
public int hashCode() {
|
||||||
final int prime = 31;
|
final int prime = 31;
|
||||||
|
@ -149,6 +158,7 @@ public class CompactionRequestImpl implements CompactionRequest {
|
||||||
result = prime * result + ((storeName == null) ? 0 : storeName.hashCode());
|
result = prime * result + ((storeName == null) ? 0 : storeName.hashCode());
|
||||||
result = prime * result + (int) (totalSize ^ (totalSize >>> 32));
|
result = prime * result + (int) (totalSize ^ (totalSize >>> 32));
|
||||||
result = prime * result + ((tracker == null) ? 0 : tracker.hashCode());
|
result = prime * result + ((tracker == null) ? 0 : tracker.hashCode());
|
||||||
|
result = prime * result + (isAfterSplit ? 1231 : 1237);
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -200,6 +210,9 @@ public class CompactionRequestImpl implements CompactionRequest {
|
||||||
if (totalSize != other.totalSize) {
|
if (totalSize != other.totalSize) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
if (isAfterSplit != other.isAfterSplit) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
if (tracker == null) {
|
if (tracker == null) {
|
||||||
if (other.tracker != null) {
|
if (other.tracker != null) {
|
||||||
return false;
|
return false;
|
||||||
|
|
|
@ -84,6 +84,7 @@ public abstract class SortedCompactionPolicy extends CompactionPolicy {
|
||||||
|
|
||||||
CompactionRequestImpl result = createCompactionRequest(candidateSelection,
|
CompactionRequestImpl result = createCompactionRequest(candidateSelection,
|
||||||
isTryingMajor || isAfterSplit, mayUseOffPeak, mayBeStuck);
|
isTryingMajor || isAfterSplit, mayUseOffPeak, mayBeStuck);
|
||||||
|
result.setAfterSplit(isAfterSplit);
|
||||||
|
|
||||||
ArrayList<HStoreFile> filesToCompact = Lists.newArrayList(result.getFiles());
|
ArrayList<HStoreFile> filesToCompact = Lists.newArrayList(result.getFiles());
|
||||||
removeExcessFiles(filesToCompact, isUserCompaction, isTryingMajor);
|
removeExcessFiles(filesToCompact, isUserCompaction, isTryingMajor);
|
||||||
|
|
|
@ -122,6 +122,7 @@ public class StripeCompactionPolicy extends CompactionPolicy {
|
||||||
SplitStripeCompactionRequest request = new SplitStripeCompactionRequest(
|
SplitStripeCompactionRequest request = new SplitStripeCompactionRequest(
|
||||||
allFiles, OPEN_KEY, OPEN_KEY, targetKvs);
|
allFiles, OPEN_KEY, OPEN_KEY, targetKvs);
|
||||||
request.setMajorRangeFull();
|
request.setMajorRangeFull();
|
||||||
|
request.getRequest().setAfterSplit(true);
|
||||||
return request;
|
return request;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertNotEquals;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertNotSame;
|
import static org.junit.Assert.assertNotSame;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
|
@ -26,6 +27,8 @@ import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.lang.reflect.Field;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -75,6 +78,7 @@ import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
|
||||||
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
|
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
|
||||||
import org.apache.hadoop.hbase.coprocessor.MasterObserver;
|
import org.apache.hadoop.hbase.coprocessor.MasterObserver;
|
||||||
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
||||||
|
import org.apache.hadoop.hbase.io.Reference;
|
||||||
import org.apache.hadoop.hbase.master.HMaster;
|
import org.apache.hadoop.hbase.master.HMaster;
|
||||||
import org.apache.hadoop.hbase.master.LoadBalancer;
|
import org.apache.hadoop.hbase.master.LoadBalancer;
|
||||||
import org.apache.hadoop.hbase.master.MasterRpcServices;
|
import org.apache.hadoop.hbase.master.MasterRpcServices;
|
||||||
|
@ -86,6 +90,7 @@ import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
|
||||||
import org.apache.hadoop.hbase.master.assignment.RegionStates;
|
import org.apache.hadoop.hbase.master.assignment.RegionStates;
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
|
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
|
||||||
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
|
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
|
||||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
|
@ -110,6 +115,7 @@ import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
import org.junit.rules.TestName;
|
import org.junit.rules.TestName;
|
||||||
|
import org.mockito.Mockito;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -280,6 +286,79 @@ public class TestSplitTransactionOnCluster {
|
||||||
assertEquals(2, cluster.getRegions(tableName).size());
|
assertEquals(2, cluster.getRegions(tableName).size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSplitCompactWithPriority() throws Exception {
|
||||||
|
final TableName tableName = TableName.valueOf(name.getMethodName());
|
||||||
|
// Create table then get the single region for our new table.
|
||||||
|
byte[] cf = Bytes.toBytes("cf");
|
||||||
|
TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName)
|
||||||
|
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(cf)).build();
|
||||||
|
admin.createTable(htd);
|
||||||
|
|
||||||
|
assertNotEquals("Unable to retrieve regions of the table", -1,
|
||||||
|
TESTING_UTIL.waitFor(10000, () -> cluster.getRegions(tableName).size() == 1));
|
||||||
|
|
||||||
|
HRegion region = cluster.getRegions(tableName).get(0);
|
||||||
|
HStore store = region.getStore(cf);
|
||||||
|
int regionServerIndex = cluster.getServerWith(region.getRegionInfo().getRegionName());
|
||||||
|
HRegionServer regionServer = cluster.getRegionServer(regionServerIndex);
|
||||||
|
|
||||||
|
Table table = TESTING_UTIL.getConnection().getTable(tableName);
|
||||||
|
// insert data
|
||||||
|
insertData(tableName, admin, table);
|
||||||
|
insertData(tableName, admin, table, 20);
|
||||||
|
insertData(tableName, admin, table, 40);
|
||||||
|
|
||||||
|
// Compaction Request
|
||||||
|
store.triggerMajorCompaction();
|
||||||
|
Optional<CompactionContext> compactionContext = store.requestCompaction();
|
||||||
|
assertTrue(compactionContext.isPresent());
|
||||||
|
assertFalse(compactionContext.get().getRequest().isAfterSplit());
|
||||||
|
assertEquals(compactionContext.get().getRequest().getPriority(), 13);
|
||||||
|
|
||||||
|
// Split
|
||||||
|
long procId =
|
||||||
|
cluster.getMaster().splitRegion(region.getRegionInfo(), Bytes.toBytes("row4"), 0, 0);
|
||||||
|
|
||||||
|
// wait for the split to complete or get interrupted. If the split completes successfully,
|
||||||
|
// the procedure will return true; if the split fails, the procedure would throw exception.
|
||||||
|
ProcedureTestingUtility.waitProcedure(cluster.getMaster().getMasterProcedureExecutor(),
|
||||||
|
procId);
|
||||||
|
|
||||||
|
assertEquals(2, cluster.getRegions(tableName).size());
|
||||||
|
// we have 2 daughter regions
|
||||||
|
HRegion hRegion1 = cluster.getRegions(tableName).get(0);
|
||||||
|
HRegion hRegion2 = cluster.getRegions(tableName).get(1);
|
||||||
|
HStore hStore1 = hRegion1.getStore(cf);
|
||||||
|
HStore hStore2 = hRegion2.getStore(cf);
|
||||||
|
|
||||||
|
// For hStore1 && hStore2, set mock reference to one of the storeFiles
|
||||||
|
StoreFileInfo storeFileInfo1 = new ArrayList<>(hStore1.getStorefiles()).get(0).getFileInfo();
|
||||||
|
StoreFileInfo storeFileInfo2 = new ArrayList<>(hStore2.getStorefiles()).get(0).getFileInfo();
|
||||||
|
Field field = StoreFileInfo.class.getDeclaredField("reference");
|
||||||
|
field.setAccessible(true);
|
||||||
|
field.set(storeFileInfo1, Mockito.mock(Reference.class));
|
||||||
|
field.set(storeFileInfo2, Mockito.mock(Reference.class));
|
||||||
|
hStore1.triggerMajorCompaction();
|
||||||
|
hStore2.triggerMajorCompaction();
|
||||||
|
|
||||||
|
compactionContext = hStore1.requestCompaction();
|
||||||
|
assertTrue(compactionContext.isPresent());
|
||||||
|
// since we set mock reference to one of the storeFiles, we will get isAfterSplit=true &&
|
||||||
|
// highest priority for hStore1's compactionContext
|
||||||
|
assertTrue(compactionContext.get().getRequest().isAfterSplit());
|
||||||
|
assertEquals(compactionContext.get().getRequest().getPriority(), Integer.MIN_VALUE + 1000);
|
||||||
|
|
||||||
|
compactionContext =
|
||||||
|
hStore2.requestCompaction(Integer.MIN_VALUE + 10, CompactionLifeCycleTracker.DUMMY, null);
|
||||||
|
assertTrue(compactionContext.isPresent());
|
||||||
|
// compaction request contains higher priority than default priority of daughter region
|
||||||
|
// compaction (Integer.MIN_VALUE + 1000), hence we are expecting request priority to
|
||||||
|
// be accepted.
|
||||||
|
assertTrue(compactionContext.get().getRequest().isAfterSplit());
|
||||||
|
assertEquals(compactionContext.get().getRequest().getPriority(), Integer.MIN_VALUE + 10);
|
||||||
|
}
|
||||||
|
|
||||||
public static class FailingSplitMasterObserver implements MasterCoprocessor, MasterObserver {
|
public static class FailingSplitMasterObserver implements MasterCoprocessor, MasterObserver {
|
||||||
volatile CountDownLatch latch;
|
volatile CountDownLatch latch;
|
||||||
|
|
||||||
|
@ -641,18 +720,21 @@ public class TestSplitTransactionOnCluster {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void insertData(final TableName tableName, Admin admin, Table t) throws IOException,
|
private void insertData(final TableName tableName, Admin admin, Table t) throws IOException {
|
||||||
InterruptedException {
|
insertData(tableName, admin, t, 1);
|
||||||
Put p = new Put(Bytes.toBytes("row1"));
|
}
|
||||||
|
|
||||||
|
private void insertData(TableName tableName, Admin admin, Table t, int i) throws IOException {
|
||||||
|
Put p = new Put(Bytes.toBytes("row" + i));
|
||||||
p.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("q1"), Bytes.toBytes("1"));
|
p.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("q1"), Bytes.toBytes("1"));
|
||||||
t.put(p);
|
t.put(p);
|
||||||
p = new Put(Bytes.toBytes("row2"));
|
p = new Put(Bytes.toBytes("row" + (i + 1)));
|
||||||
p.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("q1"), Bytes.toBytes("2"));
|
p.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("q1"), Bytes.toBytes("2"));
|
||||||
t.put(p);
|
t.put(p);
|
||||||
p = new Put(Bytes.toBytes("row3"));
|
p = new Put(Bytes.toBytes("row" + (i + 2)));
|
||||||
p.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("q1"), Bytes.toBytes("3"));
|
p.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("q1"), Bytes.toBytes("3"));
|
||||||
t.put(p);
|
t.put(p);
|
||||||
p = new Put(Bytes.toBytes("row4"));
|
p = new Put(Bytes.toBytes("row" + (i + 3)));
|
||||||
p.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("q1"), Bytes.toBytes("4"));
|
p.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("q1"), Bytes.toBytes("4"));
|
||||||
t.put(p);
|
t.put(p);
|
||||||
admin.flush(tableName);
|
admin.flush(tableName);
|
||||||
|
|
Loading…
Reference in New Issue