HBASE-15065 SimpleRegionNormalizer should return multiple normalization plans in one run
This commit is contained in:
parent
164aeb5399
commit
6e781a1f34
|
@ -1341,12 +1341,16 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
NormalizationPlan plan = this.normalizer.computePlanForTable(table, types);
|
List<NormalizationPlan> plans = this.normalizer.computePlanForTable(table, types);
|
||||||
plan.execute(clusterConnection.getAdmin());
|
if (plans != null) {
|
||||||
if (plan.getType() == PlanType.SPLIT) {
|
for (NormalizationPlan plan : plans) {
|
||||||
splitPlanCount++;
|
plan.execute(clusterConnection.getAdmin());
|
||||||
} else if (plan.getType() == PlanType.MERGE) {
|
if (plan.getType() == PlanType.SPLIT) {
|
||||||
mergePlanCount++;
|
splitPlanCount++;
|
||||||
|
} else if (plan.getType() == PlanType.MERGE) {
|
||||||
|
mergePlanCount++;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -51,9 +51,9 @@ public interface RegionNormalizer {
|
||||||
* Computes next optimal normalization plan.
|
* Computes next optimal normalization plan.
|
||||||
* @param table table to normalize
|
* @param table table to normalize
|
||||||
* @param types desired types of NormalizationPlan
|
* @param types desired types of NormalizationPlan
|
||||||
* @return Next (perhaps most urgent) normalization action to perform
|
* @return normalization actions to perform. Null if no action to take
|
||||||
*/
|
*/
|
||||||
NormalizationPlan computePlanForTable(TableName table, List<PlanType> types)
|
List<NormalizationPlan> computePlanForTable(TableName table, List<PlanType> types)
|
||||||
throws HBaseIOException;
|
throws HBaseIOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -38,7 +38,7 @@ public class RegionNormalizerChore extends ScheduledChore {
|
||||||
|
|
||||||
public RegionNormalizerChore(HMaster master) {
|
public RegionNormalizerChore(HMaster master) {
|
||||||
super(master.getServerName() + "-RegionNormalizerChore", master,
|
super(master.getServerName() + "-RegionNormalizerChore", master,
|
||||||
master.getConfiguration().getInt("hbase.normalizer.period", 1800000));
|
master.getConfiguration().getInt("hbase.normalizer.period", 300000));
|
||||||
this.master = master;
|
this.master = master;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -72,22 +72,6 @@ public class SimpleRegionNormalizer implements RegionNormalizer {
|
||||||
this.masterServices = masterServices;
|
this.masterServices = masterServices;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
* This comparator compares the region size.
|
|
||||||
* The second element in the triple is region size while the 3rd element
|
|
||||||
* is the index of the region in the underlying List
|
|
||||||
*/
|
|
||||||
private Comparator<Triple<HRegionInfo, Long, Integer>> regionSizeComparator =
|
|
||||||
new Comparator<Triple<HRegionInfo, Long, Integer>>() {
|
|
||||||
@Override
|
|
||||||
public int compare(Triple<HRegionInfo, Long, Integer> pair,
|
|
||||||
Triple<HRegionInfo, Long, Integer> pair2) {
|
|
||||||
long sz = pair.getSecond();
|
|
||||||
long sz2 = pair2.getSecond();
|
|
||||||
return (sz < sz2) ? -1 : ((sz == sz2) ? 0 : 1);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void planSkipped(HRegionInfo hri, PlanType type) {
|
public void planSkipped(HRegionInfo hri, PlanType type) {
|
||||||
skippedCount[type.ordinal()]++;
|
skippedCount[type.ordinal()]++;
|
||||||
|
@ -98,6 +82,17 @@ public class SimpleRegionNormalizer implements RegionNormalizer {
|
||||||
return skippedCount[type.ordinal()];
|
return skippedCount[type.ordinal()];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Comparator that gives higher priority to region Split plan
|
||||||
|
private Comparator<NormalizationPlan> planComparator =
|
||||||
|
new Comparator<NormalizationPlan>() {
|
||||||
|
@Override
|
||||||
|
public int compare(NormalizationPlan plan, NormalizationPlan plan2) {
|
||||||
|
if (plan instanceof SplitNormalizationPlan) return -1;
|
||||||
|
if (plan2 instanceof SplitNormalizationPlan) return 1;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Computes next most "urgent" normalization action on the table.
|
* Computes next most "urgent" normalization action on the table.
|
||||||
* Action may be either a split, or a merge, or no action.
|
* Action may be either a split, or a merge, or no action.
|
||||||
|
@ -107,13 +102,14 @@ public class SimpleRegionNormalizer implements RegionNormalizer {
|
||||||
* @return normalization plan to execute
|
* @return normalization plan to execute
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public NormalizationPlan computePlanForTable(TableName table, List<PlanType> types)
|
public List<NormalizationPlan> computePlanForTable(TableName table, List<PlanType> types)
|
||||||
throws HBaseIOException {
|
throws HBaseIOException {
|
||||||
if (table == null || table.isSystemTable()) {
|
if (table == null || table.isSystemTable()) {
|
||||||
LOG.debug("Normalization of system table " + table + " isn't allowed");
|
LOG.debug("Normalization of system table " + table + " isn't allowed");
|
||||||
return EmptyNormalizationPlan.getInstance();
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
List<NormalizationPlan> plans = new ArrayList<NormalizationPlan>();
|
||||||
List<HRegionInfo> tableRegions = masterServices.getAssignmentManager().getRegionStates().
|
List<HRegionInfo> tableRegions = masterServices.getAssignmentManager().getRegionStates().
|
||||||
getRegionsOfTable(table);
|
getRegionsOfTable(table);
|
||||||
|
|
||||||
|
@ -122,7 +118,7 @@ public class SimpleRegionNormalizer implements RegionNormalizer {
|
||||||
int nrRegions = tableRegions == null ? 0 : tableRegions.size();
|
int nrRegions = tableRegions == null ? 0 : tableRegions.size();
|
||||||
LOG.debug("Table " + table + " has " + nrRegions + " regions, required min number"
|
LOG.debug("Table " + table + " has " + nrRegions + " regions, required min number"
|
||||||
+ " of regions for normalizer to run is " + MIN_REGION_COUNT + ", not running normalizer");
|
+ " of regions for normalizer to run is " + MIN_REGION_COUNT + ", not running normalizer");
|
||||||
return EmptyNormalizationPlan.getInstance();
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.debug("Computing normalization plan for table: " + table +
|
LOG.debug("Computing normalization plan for table: " + table +
|
||||||
|
@ -130,56 +126,49 @@ public class SimpleRegionNormalizer implements RegionNormalizer {
|
||||||
|
|
||||||
long totalSizeMb = 0;
|
long totalSizeMb = 0;
|
||||||
|
|
||||||
ArrayList<Triple<HRegionInfo, Long, Integer>> regionsWithSize =
|
|
||||||
new ArrayList<Triple<HRegionInfo, Long, Integer>>(tableRegions.size());
|
|
||||||
for (int i = 0; i < tableRegions.size(); i++) {
|
for (int i = 0; i < tableRegions.size(); i++) {
|
||||||
HRegionInfo hri = tableRegions.get(i);
|
HRegionInfo hri = tableRegions.get(i);
|
||||||
long regionSize = getRegionSize(hri);
|
long regionSize = getRegionSize(hri);
|
||||||
regionsWithSize.add(new Triple<HRegionInfo, Long, Integer>(hri, regionSize, i));
|
|
||||||
totalSizeMb += regionSize;
|
totalSizeMb += regionSize;
|
||||||
}
|
}
|
||||||
Collections.sort(regionsWithSize, regionSizeComparator);
|
|
||||||
|
|
||||||
Triple<HRegionInfo, Long, Integer> largestRegion = regionsWithSize.get(tableRegions.size()-1);
|
|
||||||
|
|
||||||
double avgRegionSize = totalSizeMb / (double) tableRegions.size();
|
double avgRegionSize = totalSizeMb / (double) tableRegions.size();
|
||||||
|
|
||||||
LOG.debug("Table " + table + ", total aggregated regions size: " + totalSizeMb);
|
LOG.debug("Table " + table + ", total aggregated regions size: " + totalSizeMb);
|
||||||
LOG.debug("Table " + table + ", average region size: " + avgRegionSize);
|
LOG.debug("Table " + table + ", average region size: " + avgRegionSize);
|
||||||
|
|
||||||
// now; if the largest region is >2 times large than average, we split it, split
|
|
||||||
// is more high priority normalization action than merge.
|
|
||||||
if (types.contains(PlanType.SPLIT) && largestRegion.getSecond() > 2 * avgRegionSize) {
|
|
||||||
LOG.debug("Table " + table + ", largest region "
|
|
||||||
+ largestRegion.getFirst().getRegionNameAsString() + " has size "
|
|
||||||
+ largestRegion.getSecond() + ", more than 2 times than avg size, splitting");
|
|
||||||
return new SplitNormalizationPlan(largestRegion.getFirst(), null);
|
|
||||||
}
|
|
||||||
int candidateIdx = 0;
|
int candidateIdx = 0;
|
||||||
// look for two successive entries whose indices are adjacent
|
while (candidateIdx < tableRegions.size()) {
|
||||||
while (candidateIdx < tableRegions.size()-1) {
|
HRegionInfo hri = tableRegions.get(candidateIdx);
|
||||||
if (Math.abs(regionsWithSize.get(candidateIdx).getThird() -
|
long regionSize = getRegionSize(hri);
|
||||||
regionsWithSize.get(candidateIdx + 1).getThird()) == 1) {
|
// if the region is > 2 times larger than average, we split it, split
|
||||||
break;
|
// is more high priority normalization action than merge.
|
||||||
|
if (types.contains(PlanType.SPLIT) && regionSize > 2 * avgRegionSize) {
|
||||||
|
LOG.debug("Table " + table + ", large region " + hri.getRegionNameAsString() + " has size "
|
||||||
|
+ regionSize + ", more than twice avg size, splitting");
|
||||||
|
plans.add(new SplitNormalizationPlan(hri, null));
|
||||||
|
} else {
|
||||||
|
if (candidateIdx == tableRegions.size()-1) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
HRegionInfo hri2 = tableRegions.get(candidateIdx+1);
|
||||||
|
long regionSize2 = getRegionSize(hri2);
|
||||||
|
if (types.contains(PlanType.MERGE) && regionSize + regionSize2 < avgRegionSize) {
|
||||||
|
LOG.debug("Table " + table + ", small region size: " + regionSize
|
||||||
|
+ " plus its neighbor size: " + regionSize2
|
||||||
|
+ ", less than the avg size " + avgRegionSize + ", merging them");
|
||||||
|
plans.add(new MergeNormalizationPlan(hri, hri2));
|
||||||
|
candidateIdx++;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
candidateIdx++;
|
candidateIdx++;
|
||||||
}
|
}
|
||||||
if (candidateIdx == tableRegions.size()-1) {
|
if (plans.isEmpty()) {
|
||||||
LOG.debug("No neighboring regions found for table: " + table);
|
LOG.debug("No normalization needed, regions look good for table: " + table);
|
||||||
return EmptyNormalizationPlan.getInstance();
|
return null;
|
||||||
}
|
}
|
||||||
Triple<HRegionInfo, Long, Integer> candidateRegion = regionsWithSize.get(candidateIdx);
|
Collections.sort(plans, planComparator);
|
||||||
Triple<HRegionInfo, Long, Integer> candidateRegion2 = regionsWithSize.get(candidateIdx+1);
|
return plans;
|
||||||
if (types.contains(PlanType.MERGE) &&
|
|
||||||
candidateRegion.getSecond() + candidateRegion2.getSecond() < avgRegionSize) {
|
|
||||||
LOG.debug("Table " + table + ", smallest region size: " + candidateRegion.getSecond()
|
|
||||||
+ " and its smallest neighbor size: " + candidateRegion2.getSecond()
|
|
||||||
+ ", less than the avg size, merging them");
|
|
||||||
return new MergeNormalizationPlan(candidateRegion.getFirst(),
|
|
||||||
candidateRegion2.getFirst());
|
|
||||||
}
|
|
||||||
LOG.debug("No normalization needed, regions look good for table: " + table);
|
|
||||||
return EmptyNormalizationPlan.getInstance();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private long getRegionSize(HRegionInfo hri) {
|
private long getRegionSize(HRegionInfo hri) {
|
||||||
|
|
|
@ -83,8 +83,8 @@ public class TestSimpleRegionNormalizer {
|
||||||
Map<byte[], Integer> regionSizes = new HashMap<>();
|
Map<byte[], Integer> regionSizes = new HashMap<>();
|
||||||
|
|
||||||
setupMocksForNormalizer(regionSizes, hris);
|
setupMocksForNormalizer(regionSizes, hris);
|
||||||
NormalizationPlan plan = normalizer.computePlanForTable(testTable, bothTypes);
|
List<NormalizationPlan> plans = normalizer.computePlanForTable(testTable, bothTypes);
|
||||||
assertTrue(plan instanceof EmptyNormalizationPlan);
|
assertTrue(plans == null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -102,8 +102,8 @@ public class TestSimpleRegionNormalizer {
|
||||||
regionSizes.put(hri2.getRegionName(), 15);
|
regionSizes.put(hri2.getRegionName(), 15);
|
||||||
|
|
||||||
setupMocksForNormalizer(regionSizes, hris);
|
setupMocksForNormalizer(regionSizes, hris);
|
||||||
NormalizationPlan plan = normalizer.computePlanForTable(testTable, bothTypes);
|
List<NormalizationPlan> plans = normalizer.computePlanForTable(testTable, bothTypes);
|
||||||
assertTrue((plan instanceof EmptyNormalizationPlan));
|
assertTrue(plans == null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -129,8 +129,8 @@ public class TestSimpleRegionNormalizer {
|
||||||
regionSizes.put(hri4.getRegionName(), 10);
|
regionSizes.put(hri4.getRegionName(), 10);
|
||||||
|
|
||||||
setupMocksForNormalizer(regionSizes, hris);
|
setupMocksForNormalizer(regionSizes, hris);
|
||||||
NormalizationPlan plan = normalizer.computePlanForTable(testTable, bothTypes);
|
List<NormalizationPlan> plans = normalizer.computePlanForTable(testTable, bothTypes);
|
||||||
assertTrue(plan instanceof EmptyNormalizationPlan);
|
assertTrue(plans == null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -165,15 +165,16 @@ public class TestSimpleRegionNormalizer {
|
||||||
regionSizes.put(hri5.getRegionName(), 16);
|
regionSizes.put(hri5.getRegionName(), 16);
|
||||||
|
|
||||||
setupMocksForNormalizer(regionSizes, hris);
|
setupMocksForNormalizer(regionSizes, hris);
|
||||||
NormalizationPlan plan = normalizer.computePlanForTable(testTable,
|
List<NormalizationPlan> plans = normalizer.computePlanForTable(testTable,
|
||||||
mergeDesired ? bothTypes : splitType);
|
mergeDesired ? bothTypes : splitType);
|
||||||
|
|
||||||
if (mergeDesired) {
|
if (mergeDesired) {
|
||||||
|
NormalizationPlan plan = plans.get(0);
|
||||||
assertTrue(plan instanceof MergeNormalizationPlan);
|
assertTrue(plan instanceof MergeNormalizationPlan);
|
||||||
assertEquals(hri2, ((MergeNormalizationPlan) plan).getFirstRegion());
|
assertEquals(hri2, ((MergeNormalizationPlan) plan).getFirstRegion());
|
||||||
assertEquals(hri3, ((MergeNormalizationPlan) plan).getSecondRegion());
|
assertEquals(hri3, ((MergeNormalizationPlan) plan).getSecondRegion());
|
||||||
} else {
|
} else {
|
||||||
assertTrue(plan instanceof EmptyNormalizationPlan);
|
assertTrue(plans == null);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -209,7 +210,8 @@ public class TestSimpleRegionNormalizer {
|
||||||
regionSizes.put(hri6.getRegionName(), 2700);
|
regionSizes.put(hri6.getRegionName(), 2700);
|
||||||
|
|
||||||
setupMocksForNormalizer(regionSizes, hris);
|
setupMocksForNormalizer(regionSizes, hris);
|
||||||
NormalizationPlan plan = normalizer.computePlanForTable(testTable, bothTypes);
|
List<NormalizationPlan> plans = normalizer.computePlanForTable(testTable, bothTypes);
|
||||||
|
NormalizationPlan plan = plans.get(0);
|
||||||
|
|
||||||
assertTrue(plan instanceof MergeNormalizationPlan);
|
assertTrue(plan instanceof MergeNormalizationPlan);
|
||||||
assertEquals(hri5, ((MergeNormalizationPlan) plan).getFirstRegion());
|
assertEquals(hri5, ((MergeNormalizationPlan) plan).getFirstRegion());
|
||||||
|
@ -243,9 +245,9 @@ public class TestSimpleRegionNormalizer {
|
||||||
regionSizes.put(hri5.getRegionName(), 5);
|
regionSizes.put(hri5.getRegionName(), 5);
|
||||||
|
|
||||||
setupMocksForNormalizer(regionSizes, hris);
|
setupMocksForNormalizer(regionSizes, hris);
|
||||||
NormalizationPlan plan = normalizer.computePlanForTable(testTable, bothTypes);
|
List<NormalizationPlan> plans = normalizer.computePlanForTable(testTable, bothTypes);
|
||||||
|
|
||||||
assertTrue(plan instanceof EmptyNormalizationPlan);
|
assertTrue(plans == null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -271,7 +273,8 @@ public class TestSimpleRegionNormalizer {
|
||||||
regionSizes.put(hri4.getRegionName(), 30);
|
regionSizes.put(hri4.getRegionName(), 30);
|
||||||
|
|
||||||
setupMocksForNormalizer(regionSizes, hris);
|
setupMocksForNormalizer(regionSizes, hris);
|
||||||
NormalizationPlan plan = normalizer.computePlanForTable(testTable, bothTypes);
|
List<NormalizationPlan> plans = normalizer.computePlanForTable(testTable, bothTypes);
|
||||||
|
NormalizationPlan plan = plans.get(0);
|
||||||
|
|
||||||
assertTrue(plan instanceof SplitNormalizationPlan);
|
assertTrue(plan instanceof SplitNormalizationPlan);
|
||||||
assertEquals(hri4, ((SplitNormalizationPlan) plan).getRegionInfo());
|
assertEquals(hri4, ((SplitNormalizationPlan) plan).getRegionInfo());
|
||||||
|
|
|
@ -155,11 +155,19 @@ public class TestSimpleRegionNormalizerOnCluster {
|
||||||
} while (skippedSplitcnt == 0L);
|
} while (skippedSplitcnt == 0L);
|
||||||
assert(skippedSplitcnt > 0);
|
assert(skippedSplitcnt > 0);
|
||||||
} else {
|
} else {
|
||||||
while (MetaTableAccessor.getRegionCount(TEST_UTIL.getConnection(), TABLENAME) < 6) {
|
while (true) {
|
||||||
LOG.info("Waiting for normalization split to complete");
|
List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(TABLENAME);
|
||||||
Thread.sleep(100);
|
int cnt = 0;
|
||||||
|
for (HRegion region : regions) {
|
||||||
|
String regionName = region.getRegionInfo().getRegionNameAsString();
|
||||||
|
if (regionName.startsWith("testRegionNormalizationSplitOnCluster,zzzzz")) {
|
||||||
|
cnt++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (cnt >= 2) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
assertEquals(6, MetaTableAccessor.getRegionCount(TEST_UTIL.getConnection(), TABLENAME));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
admin.disableTable(TABLENAME);
|
admin.disableTable(TABLENAME);
|
||||||
|
|
Loading…
Reference in New Issue