HBASE-24428 : Update compaction priority for recently split daughter regions (#1784)
Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
parent
9f12ef0772
commit
1e386e3d83
|
@ -131,6 +131,12 @@ public class HStore implements Store {
|
|||
|
||||
private static final Log LOG = LogFactory.getLog(HStore.class);
|
||||
|
||||
// HBASE-24428 : Update compaction priority for recently split daughter regions
|
||||
// so as to prioritize their compaction.
|
||||
// Any compaction candidate with higher priority than compaction of newly split daugher regions
|
||||
// should have priority value < (Integer.MIN_VALUE + 1000)
|
||||
public static final int SPLIT_REGION_COMPACTION_PRIORITY = Integer.MIN_VALUE + 1000;
|
||||
|
||||
protected final MemStore memstore;
|
||||
// This stores directory in the filesystem.
|
||||
private final HRegion region;
|
||||
|
@ -1813,7 +1819,23 @@ public class HStore implements Store {
|
|||
|
||||
// Set common request properties.
|
||||
// Set priority, either override value supplied by caller or from store.
|
||||
request.setPriority((priority != Store.NO_PRIORITY) ? priority : getCompactPriority());
|
||||
final int compactionPriority =
|
||||
(priority != Store.NO_PRIORITY) ? priority : getCompactPriority();
|
||||
request.setPriority(compactionPriority);
|
||||
|
||||
if (request.isAfterSplit()) {
|
||||
// If the store belongs to recently splitted daughter regions, better we consider
|
||||
// them with the higher priority in the compaction queue.
|
||||
// Override priority if it is lower (higher int value) than
|
||||
// SPLIT_REGION_COMPACTION_PRIORITY
|
||||
final int splitHousekeepingPriority =
|
||||
Math.min(compactionPriority, SPLIT_REGION_COMPACTION_PRIORITY);
|
||||
request.setPriority(splitHousekeepingPriority);
|
||||
LOG.info("Keeping/Overriding Compaction request priority to " + splitHousekeepingPriority
|
||||
+ " for CF " + this.getColumnFamilyName() + " since it"
|
||||
+ " belongs to recently split daughter region " + getRegionInfo()
|
||||
.getRegionNameAsString());
|
||||
}
|
||||
request.setDescription(getRegionInfo().getRegionNameAsString(), getColumnFamilyName());
|
||||
}
|
||||
} finally {
|
||||
|
|
|
@ -48,6 +48,7 @@ public class CompactionRequest implements Comparable<CompactionRequest> {
|
|||
private DisplayCompactionType isMajor = DisplayCompactionType.MINOR;
|
||||
private int priority = Store.NO_PRIORITY;
|
||||
private Collection<StoreFile> filesToCompact;
|
||||
private boolean isAfterSplit = false;
|
||||
|
||||
// CompactRequest object creation time.
|
||||
private long selectionTime;
|
||||
|
@ -139,6 +140,14 @@ public class CompactionRequest implements Comparable<CompactionRequest> {
|
|||
return this.hashCode() - request.hashCode();
|
||||
}
|
||||
|
||||
public boolean isAfterSplit() {
|
||||
return isAfterSplit;
|
||||
}
|
||||
|
||||
public void setAfterSplit(boolean afterSplit) {
|
||||
isAfterSplit = afterSplit;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
final int prime = 31;
|
||||
|
@ -152,6 +161,7 @@ public class CompactionRequest implements Comparable<CompactionRequest> {
|
|||
result = prime * result + ((storeName == null) ? 0 : storeName.hashCode());
|
||||
result = prime * result + ((timeInNanos == null) ? 0 : timeInNanos.hashCode());
|
||||
result = prime * result + (int) (totalSize ^ (totalSize >>> 32));
|
||||
result = prime * result + (isAfterSplit ? 1231 : 1237);
|
||||
return result;
|
||||
}
|
||||
|
||||
|
@ -210,6 +220,9 @@ public class CompactionRequest implements Comparable<CompactionRequest> {
|
|||
if (totalSize != other.totalSize) {
|
||||
return false;
|
||||
}
|
||||
if (isAfterSplit != other.isAfterSplit) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
|
@ -86,6 +86,7 @@ public abstract class SortedCompactionPolicy extends CompactionPolicy {
|
|||
|
||||
CompactionRequest result = createCompactionRequest(candidateSelection,
|
||||
isTryingMajor || isAfterSplit, mayUseOffPeak, mayBeStuck);
|
||||
result.setAfterSplit(isAfterSplit);
|
||||
|
||||
ArrayList<StoreFile> filesToCompact = Lists.newArrayList(result.getFiles());
|
||||
removeExcessFiles(filesToCompact, isUserCompaction, isTryingMajor);
|
||||
|
|
|
@ -123,6 +123,7 @@ public class StripeCompactionPolicy extends CompactionPolicy {
|
|||
SplitStripeCompactionRequest request = new SplitStripeCompactionRequest(
|
||||
allFiles, OPEN_KEY, OPEN_KEY, targetKvs);
|
||||
request.setMajorRangeFull();
|
||||
request.getRequest().setAfterSplit(true);
|
||||
return request;
|
||||
}
|
||||
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver;
|
|||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNotSame;
|
||||
import static org.junit.Assert.assertNull;
|
||||
|
@ -27,6 +28,8 @@ import static org.junit.Assert.assertTrue;
|
|||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Field;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -83,6 +86,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
|||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||
import org.apache.hadoop.hbase.executor.EventType;
|
||||
import org.apache.hadoop.hbase.master.AssignmentManager;
|
||||
import org.apache.hadoop.hbase.io.Reference;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.master.RegionState;
|
||||
import org.apache.hadoop.hbase.master.RegionState.State;
|
||||
|
@ -113,6 +117,7 @@ import org.junit.Before;
|
|||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import com.google.protobuf.ServiceException;
|
||||
|
||||
|
@ -269,6 +274,77 @@ public class TestSplitTransactionOnCluster {
|
|||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testSplitCompactWithPriority() throws Exception {
|
||||
final TableName tableName = TableName.valueOf("testSplitCompactWithPriority");
|
||||
// Create table then get the single region for our new table.
|
||||
byte[] cf = Bytes.toBytes("cf");
|
||||
HTable hTable = createTableAndWait(tableName, cf);
|
||||
|
||||
assertNotEquals("Unable to retrieve regions of the table", -1,
|
||||
TESTING_UTIL.waitFor(10000, new Waiter.Predicate<Exception>() {
|
||||
@Override
|
||||
public boolean evaluate() throws Exception {
|
||||
return cluster.getRegions(tableName).size() == 1;
|
||||
}
|
||||
}));
|
||||
|
||||
HRegion region = cluster.getRegions(tableName).get(0);
|
||||
Store store = region.getStore(cf);
|
||||
int regionServerIndex = cluster.getServerWith(region.getRegionInfo().getRegionName());
|
||||
HRegionServer regionServer = cluster.getRegionServer(regionServerIndex);
|
||||
|
||||
Table table = TESTING_UTIL.getConnection().getTable(tableName);
|
||||
// insert data
|
||||
insertData(tableName, admin, table);
|
||||
insertData(tableName, admin, table, 20);
|
||||
insertData(tableName, admin, table, 40);
|
||||
|
||||
// Compaction Request
|
||||
store.triggerMajorCompaction();
|
||||
CompactionContext compactionContext = store.requestCompaction();
|
||||
assertNotNull(compactionContext);
|
||||
assertFalse(compactionContext.getRequest().isAfterSplit());
|
||||
assertEquals(compactionContext.getRequest().getPriority(), 7);
|
||||
|
||||
// Split
|
||||
this.admin.split(region.getRegionInfo().getRegionName(), Bytes.toBytes("row4"));
|
||||
|
||||
Thread.sleep(5000);
|
||||
assertEquals(2, cluster.getRegions(tableName).size());
|
||||
// we have 2 daughter regions
|
||||
HRegion hRegion1 = cluster.getRegions(tableName).get(0);
|
||||
HRegion hRegion2 = cluster.getRegions(tableName).get(1);
|
||||
Store store1 = hRegion1.getStore(cf);
|
||||
Store store2 = hRegion2.getStore(cf);
|
||||
|
||||
// For hStore1 && hStore2, set mock reference to one of the storeFiles
|
||||
StoreFileInfo storeFileInfo1 = new ArrayList<>(store1.getStorefiles()).get(0).getFileInfo();
|
||||
StoreFileInfo storeFileInfo2 = new ArrayList<>(store2.getStorefiles()).get(0).getFileInfo();
|
||||
Field field = StoreFileInfo.class.getDeclaredField("reference");
|
||||
field.setAccessible(true);
|
||||
field.set(storeFileInfo1, Mockito.mock(Reference.class));
|
||||
field.set(storeFileInfo2, Mockito.mock(Reference.class));
|
||||
store1.triggerMajorCompaction();
|
||||
store2.triggerMajorCompaction();
|
||||
|
||||
compactionContext = store1.requestCompaction();
|
||||
assertNotNull(compactionContext);
|
||||
// since we set mock reference to one of the storeFiles, we will get isAfterSplit=true &&
|
||||
// highest priority for hStore1's compactionContext
|
||||
assertTrue(compactionContext.getRequest().isAfterSplit());
|
||||
assertEquals(compactionContext.getRequest().getPriority(), Integer.MIN_VALUE + 1000);
|
||||
|
||||
compactionContext =
|
||||
store2.requestCompaction(Integer.MIN_VALUE + 10, null, null);
|
||||
assertNotNull(compactionContext);
|
||||
// compaction request contains higher priority than default priority of daughter region
|
||||
// compaction (Integer.MIN_VALUE + 1000), hence we are expecting request priority to
|
||||
// be accepted.
|
||||
assertTrue(compactionContext.getRequest().isAfterSplit());
|
||||
assertEquals(compactionContext.getRequest().getPriority(), Integer.MIN_VALUE + 10);
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testRITStateForRollback() throws Exception {
|
||||
final TableName tableName =
|
||||
|
@ -1057,19 +1133,23 @@ public class TestSplitTransactionOnCluster {
|
|||
}
|
||||
}
|
||||
|
||||
private void insertData(final TableName tableName, HBaseAdmin admin, Table t) throws IOException,
|
||||
InterruptedException {
|
||||
Put p = new Put(Bytes.toBytes("row1"));
|
||||
p.add(Bytes.toBytes("cf"), Bytes.toBytes("q1"), Bytes.toBytes("1"));
|
||||
private void insertData(final TableName tableName, HBaseAdmin admin, Table t)
|
||||
throws IOException {
|
||||
insertData(tableName, admin, t, 1);
|
||||
}
|
||||
|
||||
private void insertData(TableName tableName, Admin admin, Table t, int i) throws IOException {
|
||||
Put p = new Put(Bytes.toBytes("row" + i));
|
||||
p.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("q1"), Bytes.toBytes("1"));
|
||||
t.put(p);
|
||||
p = new Put(Bytes.toBytes("row2"));
|
||||
p.add(Bytes.toBytes("cf"), Bytes.toBytes("q1"), Bytes.toBytes("2"));
|
||||
p = new Put(Bytes.toBytes("row" + (i + 1)));
|
||||
p.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("q1"), Bytes.toBytes("2"));
|
||||
t.put(p);
|
||||
p = new Put(Bytes.toBytes("row3"));
|
||||
p.add(Bytes.toBytes("cf"), Bytes.toBytes("q1"), Bytes.toBytes("3"));
|
||||
p = new Put(Bytes.toBytes("row" + (i + 2)));
|
||||
p.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("q1"), Bytes.toBytes("3"));
|
||||
t.put(p);
|
||||
p = new Put(Bytes.toBytes("row4"));
|
||||
p.add(Bytes.toBytes("cf"), Bytes.toBytes("q1"), Bytes.toBytes("4"));
|
||||
p = new Put(Bytes.toBytes("row" + (i + 3)));
|
||||
p.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("q1"), Bytes.toBytes("4"));
|
||||
t.put(p);
|
||||
admin.flush(tableName);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue