HBASE-23932 Minor improvements to Region Normalizer (#1258)

- consolidate checks made by master on behalf of balancer and
   normalizer: deciding if the master is in a healthy state for
   running any actions at all (skipRegionManagementAction). Normalizer
   now does as balancer did previously.
 - both balancer and normalizer make one final check on above
   conditions between calculating an action plan and executing the
   plan. should make the process more responsive to shutdown
   requests.
 - change normalizer to only consider acting on a region when it is in
   the OPEN state. previously we would normalizer attempt to merge a
   region that was already in a MERGING_NEW,MERGING,MERGED state.
 - fix some typos in variable names.

Signed-off-by: Josh Elser <elserj@apache.org>
Signed-off-by: binlijin <binlijin@gmail.com>
This commit is contained in:
Nick Dimiduk 2020-03-10 10:08:37 -07:00 committed by Nick Dimiduk
parent 2655f9647e
commit 678b142da2
4 changed files with 221 additions and 132 deletions

View File

@ -80,6 +80,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotDisabledException;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.UnknownRegionException;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.MasterSwitchType;
import org.apache.hadoop.hbase.client.RegionInfo;
@ -219,6 +220,7 @@ import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
@ -374,7 +376,7 @@ public class HMaster extends HRegionServer implements MasterServices {
volatile boolean serviceStarted = false;
// Maximum time we should run balancer for
private final int maxBlancingTime;
private final int maxBalancingTime;
// Maximum percent of regions in transition when balancing
private final double maxRitPercent;
@ -540,7 +542,7 @@ public class HMaster extends HRegionServer implements MasterServices {
// preload table descriptor at startup
this.preLoadTableDescriptors = conf.getBoolean("hbase.master.preload.tabledescriptors", true);
this.maxBlancingTime = getMaxBalancingTime();
this.maxBalancingTime = getMaxBalancingTime();
this.maxRitPercent = conf.getDouble(HConstants.HBASE_MASTER_BALANCER_MAX_RIT_PERCENT,
HConstants.DEFAULT_HBASE_MASTER_BALANCER_MAX_RIT_PERCENT);
@ -1693,21 +1695,36 @@ public class HMaster extends HRegionServer implements MasterServices {
return balance(false);
}
public boolean balance(boolean force) throws IOException {
// if master not initialized, don't run balancer.
/**
* Checks master state before initiating action over region topology.
* @param action the name of the action under consideration, for logging.
* @return {@code true} when the caller should exit early, {@code false} otherwise.
*/
private boolean skipRegionManagementAction(final String action) {
if (!isInitialized()) {
LOG.debug("Master has not been initialized, don't run balancer.");
LOG.debug("Master has not been initialized, don't run {}.", action);
return true;
}
if (this.getServerManager().isClusterShutdown()) {
LOG.info("Cluster is shutting down, don't run {}.", action);
return true;
}
if (isInMaintenanceMode()) {
LOG.info("Master is in maintenance mode, don't run {}.", action);
return true;
}
return false;
}
public boolean balance(boolean force) throws IOException {
if (loadBalancerTracker == null || !loadBalancerTracker.isBalancerOn()) {
return false;
}
if (isInMaintenanceMode()) {
LOG.info("Master is in maintenanceMode mode, don't run balancer.");
if (skipRegionManagementAction("balancer")) {
return false;
}
synchronized (this.balancer) {
// If balance not true, don't run balancer.
if (!this.loadBalancerTracker.isBalancerOn()) return false;
// Only allow one balance run at at time.
if (this.assignmentManager.hasRegionsInTransition()) {
List<RegionStateNode> regionsInTransition = assignmentManager.getRegionsInTransition();
@ -1765,6 +1782,11 @@ public class HMaster extends HRegionServer implements MasterServices {
}
}
if (skipRegionManagementAction("balancer")) {
// make one last check that the cluster isn't shutting down before proceeding.
return false;
}
List<RegionPlan> sucRPs = executeRegionPlansWithThrottling(plans);
if (this.cpHost != null) {
@ -1785,10 +1807,10 @@ public class HMaster extends HRegionServer implements MasterServices {
List<RegionPlan> sucRPs = new ArrayList<>();
int maxRegionsInTransition = getMaxRegionsInTransition();
long balanceStartTime = System.currentTimeMillis();
long cutoffTime = balanceStartTime + this.maxBlancingTime;
long cutoffTime = balanceStartTime + this.maxBalancingTime;
int rpCount = 0; // number of RegionPlans balanced so far
if (plans != null && !plans.isEmpty()) {
int balanceInterval = this.maxBlancingTime / plans.size();
int balanceInterval = this.maxBalancingTime / plans.size();
LOG.info("Balancer plans size is " + plans.size() + ", the balance interval is "
+ balanceInterval + " ms, and the max number regions in transition is "
+ maxRegionsInTransition);
@ -1806,18 +1828,18 @@ public class HMaster extends HRegionServer implements MasterServices {
//rpCount records balance plans processed, does not care if a plan succeeds
rpCount++;
if (this.maxBlancingTime > 0) {
if (this.maxBalancingTime > 0) {
balanceThrottling(balanceStartTime + rpCount * balanceInterval, maxRegionsInTransition,
cutoffTime);
}
// if performing next balance exceeds cutoff time, exit the loop
if (this.maxBlancingTime > 0 && rpCount < plans.size()
if (this.maxBalancingTime > 0 && rpCount < plans.size()
&& System.currentTimeMillis() > cutoffTime) {
// TODO: After balance, there should not be a cutoff time (keeping it as
// a security net for now)
LOG.debug("No more balancing till next balance run; maxBalanceTime="
+ this.maxBlancingTime);
+ this.maxBalancingTime);
break;
}
}
@ -1839,36 +1861,26 @@ public class HMaster extends HRegionServer implements MasterServices {
* is globally disabled)
*/
public boolean normalizeRegions() throws IOException {
if (!isInitialized()) {
LOG.debug("Master has not been initialized, don't run region normalizer.");
return false;
}
if (this.getServerManager().isClusterShutdown()) {
LOG.info("Cluster is shutting down, don't run region normalizer.");
return false;
}
if (isInMaintenanceMode()) {
LOG.info("Master is in maintenance mode, don't run region normalizer.");
return false;
}
if (!this.regionNormalizerTracker.isNormalizerOn()) {
if (regionNormalizerTracker == null || !regionNormalizerTracker.isNormalizerOn()) {
LOG.debug("Region normalization is disabled, don't run region normalizer.");
return false;
}
if (skipRegionManagementAction("region normalizer")) {
return false;
}
if (assignmentManager.hasRegionsInTransition()) {
return false;
}
synchronized (this.normalizer) {
// Don't run the normalizer concurrently
List<TableName> allEnabledTables = new ArrayList<>(
this.tableStateManager.getTablesInStates(TableState.State.ENABLED));
Collections.shuffle(allEnabledTables);
for (TableName table : allEnabledTables) {
if (isInMaintenanceMode()) {
LOG.debug("Master is in maintenance mode, stop running region normalizer.");
return false;
}
TableDescriptor tblDesc = getTableDescriptors().get(table);
if (table.isSystemTable() || (tblDesc != null &&
!tblDesc.isNormalizationEnabled())) {
@ -1876,10 +1888,20 @@ public class HMaster extends HRegionServer implements MasterServices {
+ " table or doesn't have auto normalization turned on", table);
continue;
}
List<NormalizationPlan> plans = this.normalizer.computePlanForTable(table);
if (plans != null) {
// make one last check that the cluster isn't shutting down before proceeding.
if (skipRegionManagementAction("region normalizer")) {
return false;
}
final List<NormalizationPlan> plans = this.normalizer.computePlanForTable(table);
if (CollectionUtils.isEmpty(plans)) {
return true;
}
try (final Admin admin = clusterConnection.getAdmin()) {
for (NormalizationPlan plan : plans) {
plan.execute(clusterConnection.getAdmin());
plan.execute(admin);
if (plan.getType() == PlanType.SPLIT) {
splitPlanCount++;
} else if (plan.getType() == PlanType.MERGE) {

View File

@ -1,4 +1,4 @@
/**
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.master.normalizer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import org.apache.hadoop.hbase.RegionMetrics;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Size;
@ -30,12 +31,12 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.master.MasterRpcServices;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.master.assignment.RegionStates;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
@InterfaceAudience.Private
@ -108,12 +109,12 @@ public abstract class AbstractRegionNormalizer implements RegionNormalizer {
*/
protected double getAverageRegionSize(List<RegionInfo> tableRegions) {
long totalSizeMb = 0;
int acutalRegionCnt = 0;
int actualRegionCnt = 0;
for (RegionInfo hri : tableRegions) {
long regionSize = getRegionSize(hri);
// don't consider regions that are in bytes for averaging the size.
if (regionSize > 0) {
acutalRegionCnt++;
actualRegionCnt++;
totalSizeMb += regionSize;
}
}
@ -140,7 +141,7 @@ public abstract class AbstractRegionNormalizer implements RegionNormalizer {
} else if (targetRegionCount > 0) {
avgRegionSize = totalSizeMb / (double) targetRegionCount;
} else {
avgRegionSize = acutalRegionCnt == 0 ? 0 : totalSizeMb / (double) acutalRegionCnt;
avgRegionSize = actualRegionCnt == 0 ? 0 : totalSizeMb / (double) actualRegionCnt;
}
LOG.debug("Table {}, total aggregated regions size: {} and average region size {}", table,
@ -148,6 +149,13 @@ public abstract class AbstractRegionNormalizer implements RegionNormalizer {
return avgRegionSize;
}
/**
* Determine if a region in {@link RegionState} should be considered for a merge operation.
*/
private static boolean skipForMerge(final RegionState state) {
return state == null || !Objects.equals(state.getState(), RegionState.State.OPEN);
}
/**
* Computes the merge plans that should be executed for this table to converge average region
* towards target average or target region count
@ -155,33 +163,45 @@ public abstract class AbstractRegionNormalizer implements RegionNormalizer {
* @return list of merge normalization plans
*/
protected List<NormalizationPlan> getMergeNormalizationPlan(TableName table) {
List<NormalizationPlan> plans = new ArrayList<>();
List<RegionInfo> tableRegions =
masterServices.getAssignmentManager().getRegionStates().getRegionsOfTable(table);
double avgRegionSize = getAverageRegionSize(tableRegions);
LOG.debug("Table {}, average region size: {}.\n Computing normalization plan for table: {}, "
final RegionStates regionStates = masterServices.getAssignmentManager().getRegionStates();
final List<RegionInfo> tableRegions = regionStates.getRegionsOfTable(table);
final double avgRegionSize = getAverageRegionSize(tableRegions);
LOG.debug("Table {}, average region size: {}. Computing normalization plan for table: {}, "
+ "number of regions: {}",
table, avgRegionSize, table, tableRegions.size());
int candidateIdx = 0;
while (candidateIdx < tableRegions.size() - 1) {
RegionInfo hri = tableRegions.get(candidateIdx);
long regionSize = getRegionSize(hri);
RegionInfo hri2 = tableRegions.get(candidateIdx + 1);
long regionSize2 = getRegionSize(hri2);
final List<NormalizationPlan> plans = new ArrayList<>();
for (int candidateIdx = 0; candidateIdx < tableRegions.size() - 1; candidateIdx++) {
final RegionInfo hri = tableRegions.get(candidateIdx);
final RegionInfo hri2 = tableRegions.get(candidateIdx + 1);
if (skipForMerge(regionStates.getRegionState(hri))) {
continue;
}
if (skipForMerge(regionStates.getRegionState(hri2))) {
continue;
}
final long regionSize = getRegionSize(hri);
final long regionSize2 = getRegionSize(hri2);
if (regionSize >= 0 && regionSize2 >= 0 && regionSize + regionSize2 < avgRegionSize) {
// atleast one of the two regions should be older than MIN_REGION_DURATION days
// at least one of the two regions should be older than MIN_REGION_DURATION days
plans.add(new MergeNormalizationPlan(hri, hri2));
candidateIdx++;
} else {
LOG.debug("Skipping region {} of table {} with size {}", hri.getRegionNameAsString(), table,
regionSize);
}
candidateIdx++;
}
return plans;
}
/**
* Determine if a region in {@link RegionState} should be considered for a split operation.
*/
private static boolean skipForSplit(final RegionState state) {
return state == null || !Objects.equals(state.getState(), RegionState.State.OPEN);
}
/**
* Computes the split plans that should be executed for this table to converge average region size
* towards target average or target region count
@ -189,24 +209,24 @@ public abstract class AbstractRegionNormalizer implements RegionNormalizer {
* @return list of split normalization plans
*/
protected List<NormalizationPlan> getSplitNormalizationPlan(TableName table) {
List<NormalizationPlan> plans = new ArrayList<>();
List<RegionInfo> tableRegions =
masterServices.getAssignmentManager().getRegionStates().getRegionsOfTable(table);
double avgRegionSize = getAverageRegionSize(tableRegions);
final RegionStates regionStates = masterServices.getAssignmentManager().getRegionStates();
final List<RegionInfo> tableRegions = regionStates.getRegionsOfTable(table);
final double avgRegionSize = getAverageRegionSize(tableRegions);
LOG.debug("Table {}, average region size: {}", table, avgRegionSize);
int candidateIdx = 0;
while (candidateIdx < tableRegions.size()) {
RegionInfo hri = tableRegions.get(candidateIdx);
final List<NormalizationPlan> plans = new ArrayList<>();
for (final RegionInfo hri : tableRegions) {
if (skipForSplit(regionStates.getRegionState(hri))) {
continue;
}
long regionSize = getRegionSize(hri);
// if the region is > 2 times larger than average, we split it, split
// is more high priority normalization action than merge.
if (regionSize > 2 * avgRegionSize) {
LOG.info("Table {}, large region {} has size {}, more than twice avg size, splitting",
table, hri.getRegionNameAsString(), regionSize);
LOG.info("Table {}, large region {} has size {}, more than twice avg size {}, splitting",
table, hri.getRegionNameAsString(), regionSize, avgRegionSize);
plans.add(new SplitNormalizationPlan(hri, null));
}
candidateIdx++;
}
return plans;
}

View File

@ -1,4 +1,4 @@
/**
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@ -21,10 +21,9 @@ package org.apache.hadoop.hbase.master.normalizer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.when;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashMap;
@ -41,19 +40,17 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.master.MasterRpcServices;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
@Category({ MasterTests.class, SmallTests.class })
@ -64,16 +61,7 @@ public class TestMergeNormalizer {
private static final Logger LOG = LoggerFactory.getLogger(MergeNormalizer.class);
private static RegionNormalizer normalizer;
// mocks
private static MasterServices masterServices;
private static MasterRpcServices masterRpcServices;
@BeforeClass
public static void beforeAllTests() throws Exception {
normalizer = new MergeNormalizer();
}
private RegionNormalizer normalizer;
@Test
public void testNoNormalizationForMetaTable() throws HBaseIOException {
@ -144,25 +132,25 @@ public class TestMergeNormalizer {
Map<byte[], Integer> regionSizes = new HashMap<>();
Timestamp currentTime = new Timestamp(System.currentTimeMillis());
Timestamp threedaysBefore = new Timestamp(currentTime.getTime() - TimeUnit.DAYS.toMillis(3));
Timestamp threeDaysBefore = new Timestamp(currentTime.getTime() - TimeUnit.DAYS.toMillis(3));
RegionInfo hri1 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("aaa"))
.setEndKey(Bytes.toBytes("bbb")).setRegionId(threedaysBefore.getTime()).build();
.setEndKey(Bytes.toBytes("bbb")).setRegionId(threeDaysBefore.getTime()).build();
hris.add(hri1);
regionSizes.put(hri1.getRegionName(), 15);
RegionInfo hri2 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("bbb"))
.setEndKey(Bytes.toBytes("ccc")).setRegionId(threedaysBefore.getTime()).build();
.setEndKey(Bytes.toBytes("ccc")).setRegionId(threeDaysBefore.getTime()).build();
hris.add(hri2);
regionSizes.put(hri2.getRegionName(), 5);
RegionInfo hri3 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("ccc"))
.setEndKey(Bytes.toBytes("ddd")).setRegionId(threedaysBefore.getTime()).build();
.setEndKey(Bytes.toBytes("ddd")).setRegionId(threeDaysBefore.getTime()).build();
hris.add(hri3);
regionSizes.put(hri3.getRegionName(), 5);
RegionInfo hri4 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("ddd"))
.setEndKey(Bytes.toBytes("eee")).setRegionId(threedaysBefore.getTime()).build();
.setEndKey(Bytes.toBytes("eee")).setRegionId(threeDaysBefore.getTime()).build();
hris.add(hri4);
regionSizes.put(hri4.getRegionName(), 15);
@ -172,7 +160,7 @@ public class TestMergeNormalizer {
regionSizes.put(hri5.getRegionName(), 16);
RegionInfo hri6 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("fff"))
.setEndKey(Bytes.toBytes("ggg")).setRegionId(threedaysBefore.getTime()).build();
.setEndKey(Bytes.toBytes("ggg")).setRegionId(threeDaysBefore.getTime()).build();
hris.add(hri6);
regionSizes.put(hri6.getRegionName(), 0);
@ -234,16 +222,19 @@ public class TestMergeNormalizer {
@SuppressWarnings("MockitoCast")
protected void setupMocksForNormalizer(Map<byte[], Integer> regionSizes,
List<RegionInfo> RegionInfo) {
masterServices = Mockito.mock(MasterServices.class, RETURNS_DEEP_STUBS);
masterRpcServices = Mockito.mock(MasterRpcServices.class, RETURNS_DEEP_STUBS);
List<RegionInfo> RegionInfo) {
MasterServices masterServices = Mockito.mock(MasterServices.class, RETURNS_DEEP_STUBS);
MasterRpcServices masterRpcServices = Mockito.mock(MasterRpcServices.class, RETURNS_DEEP_STUBS);
// for simplicity all regions are assumed to be on one server; doesn't matter to us
ServerName sn = ServerName.valueOf("localhost", 0, 1L);
when(masterServices.getAssignmentManager().getRegionStates().getRegionsOfTable(any()))
.thenReturn(RegionInfo);
when(masterServices.getAssignmentManager().getRegionStates().getRegionServerOfRegion(any()))
.thenReturn(sn);
when(masterServices.getAssignmentManager().getRegionStates()
.getRegionsOfTable(any())).thenReturn(RegionInfo);
when(masterServices.getAssignmentManager().getRegionStates()
.getRegionServerOfRegion(any())).thenReturn(sn);
when(masterServices.getAssignmentManager().getRegionStates()
.getRegionState(any(RegionInfo.class))).thenReturn(
RegionState.createForTesting(null, RegionState.State.OPEN));
for (Map.Entry<byte[], Integer> region : regionSizes.entrySet()) {
RegionMetrics regionLoad = Mockito.mock(RegionMetrics.class);
@ -264,6 +255,7 @@ public class TestMergeNormalizer {
LOG.debug("error setting isSplitOrMergeEnabled switch", se);
}
normalizer = new SimpleRegionNormalizer();
normalizer.setMasterServices(masterServices);
normalizer.setMasterRpcServices(masterRpcServices);
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -17,15 +17,17 @@
*/
package org.apache.hadoop.hbase.master.normalizer;
import static java.lang.String.format;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hbase.HBaseClassTestRule;
@ -38,11 +40,10 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.master.MasterRpcServices;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
@ -51,9 +52,7 @@ import org.junit.rules.TestName;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse;
/**
@ -68,20 +67,12 @@ public class TestSimpleRegionNormalizer {
private static final Logger LOG = LoggerFactory.getLogger(TestSimpleRegionNormalizer.class);
private static RegionNormalizer normalizer;
// mocks
private static MasterServices masterServices;
private static MasterRpcServices masterRpcServices;
private RegionNormalizer normalizer;
private MasterServices masterServices;
@Rule
public TestName name = new TestName();
@BeforeClass
public static void beforeAllTests() throws Exception {
normalizer = new SimpleRegionNormalizer();
}
@Test
public void testPlanComparator() {
Comparator<NormalizationPlan> comparator = new SimpleRegionNormalizer.PlanComparator();
@ -90,10 +81,10 @@ public class TestSimpleRegionNormalizer {
NormalizationPlan mergePlan1 = new MergeNormalizationPlan(null, null);
NormalizationPlan mergePlan2 = new MergeNormalizationPlan(null, null);
assertTrue(comparator.compare(splitPlan1, splitPlan2) == 0);
assertTrue(comparator.compare(splitPlan2, splitPlan1) == 0);
assertTrue(comparator.compare(mergePlan1, mergePlan2) == 0);
assertTrue(comparator.compare(mergePlan2, mergePlan1) == 0);
assertEquals(0, comparator.compare(splitPlan1, splitPlan2));
assertEquals(0, comparator.compare(splitPlan2, splitPlan1));
assertEquals(0, comparator.compare(mergePlan1, mergePlan2));
assertEquals(0, comparator.compare(mergePlan2, mergePlan1));
assertTrue(comparator.compare(splitPlan1, mergePlan1) < 0);
assertTrue(comparator.compare(mergePlan1, splitPlan1) > 0);
}
@ -106,7 +97,7 @@ public class TestSimpleRegionNormalizer {
setupMocksForNormalizer(regionSizes, RegionInfo);
List<NormalizationPlan> plans = normalizer.computePlanForTable(testTable);
assertTrue(plans == null);
assertNull(plans);
}
@Test
@ -130,7 +121,7 @@ public class TestSimpleRegionNormalizer {
setupMocksForNormalizer(regionSizes, RegionInfo);
List<NormalizationPlan> plans = normalizer.computePlanForTable(tableName);
assertTrue(plans == null);
assertNull(plans);
}
@Test
@ -168,7 +159,66 @@ public class TestSimpleRegionNormalizer {
setupMocksForNormalizer(regionSizes, RegionInfo);
List<NormalizationPlan> plans = normalizer.computePlanForTable(tableName);
assertTrue(plans == null);
assertNull(plans);
}
private void noNormalizationOnTransitioningRegions(final RegionState.State state)
throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
final List<RegionInfo> regionInfos = new LinkedList<>();
final Map<byte[], Integer> regionSizes = new HashMap<>();
final RegionInfo ri1 = RegionInfoBuilder.newBuilder(tableName)
.setStartKey(Bytes.toBytes("aaa"))
.setEndKey(Bytes.toBytes("bbb"))
.build();
regionInfos.add(ri1);
regionSizes.put(ri1.getRegionName(), 10);
final RegionInfo ri2 = RegionInfoBuilder.newBuilder(tableName)
.setStartKey(Bytes.toBytes("bbb"))
.setEndKey(Bytes.toBytes("ccc"))
.build();
regionInfos.add(ri2);
regionSizes.put(ri2.getRegionName(), 1);
setupMocksForNormalizer(regionSizes, regionInfos);
when(masterServices.getAssignmentManager().getRegionStates()
.getRegionState(any(RegionInfo.class))).thenReturn(
RegionState.createForTesting(null, state));
assertNull(
format("Unexpected plans for RegionState %s", state),
normalizer.computePlanForTable(tableName));
}
@Test
public void testNoNormalizationOnMergingNewRegions() throws Exception {
noNormalizationOnTransitioningRegions(RegionState.State.MERGING_NEW);
}
@Test
public void testNoNormalizationOnMergingRegions() throws Exception {
noNormalizationOnTransitioningRegions(RegionState.State.MERGING);
}
@Test
public void testNoNormalizationOnMergedRegions() throws Exception {
noNormalizationOnTransitioningRegions(RegionState.State.MERGED);
}
@Test
public void testNoNormalizationOnSplittingNewRegions() throws Exception {
noNormalizationOnTransitioningRegions(RegionState.State.SPLITTING_NEW);
}
@Test
public void testNoNormalizationOnSplittingRegions() throws Exception {
noNormalizationOnTransitioningRegions(RegionState.State.SPLITTING);
}
@Test
public void testNoNormalizationOnSplitRegions() throws Exception {
noNormalizationOnTransitioningRegions(RegionState.State.SPLIT);
}
@Test
@ -323,7 +373,7 @@ public class TestSimpleRegionNormalizer {
setupMocksForNormalizer(regionSizes, RegionInfo);
List<NormalizationPlan> plans = normalizer.computePlanForTable(tableName);
assertTrue(plans == null);
assertNull(plans);
}
@Test
@ -410,7 +460,7 @@ public class TestSimpleRegionNormalizer {
when(masterServices.getTableDescriptors().get(any()).getNormalizerTargetRegionSize())
.thenReturn(20L);
List<NormalizationPlan> plans = normalizer.computePlanForTable(tableName);
Assert.assertEquals(4, plans.size());
assertEquals(4, plans.size());
for (NormalizationPlan plan : plans) {
assertTrue(plan instanceof SplitNormalizationPlan);
@ -420,7 +470,7 @@ public class TestSimpleRegionNormalizer {
when(masterServices.getTableDescriptors().get(any()).getNormalizerTargetRegionSize())
.thenReturn(200L);
plans = normalizer.computePlanForTable(tableName);
Assert.assertEquals(2, plans.size());
assertEquals(2, plans.size());
NormalizationPlan plan = plans.get(0);
assertTrue(plan instanceof MergeNormalizationPlan);
assertEquals(hri1, ((MergeNormalizationPlan) plan).getFirstRegion());
@ -459,7 +509,7 @@ public class TestSimpleRegionNormalizer {
when(masterServices.getTableDescriptors().get(any()).getNormalizerTargetRegionCount())
.thenReturn(8);
List<NormalizationPlan> plans = normalizer.computePlanForTable(tableName);
Assert.assertEquals(2, plans.size());
assertEquals(2, plans.size());
for (NormalizationPlan plan : plans) {
assertTrue(plan instanceof SplitNormalizationPlan);
@ -469,7 +519,7 @@ public class TestSimpleRegionNormalizer {
when(masterServices.getTableDescriptors().get(any()).getNormalizerTargetRegionCount())
.thenReturn(3);
plans = normalizer.computePlanForTable(tableName);
Assert.assertEquals(1, plans.size());
assertEquals(1, plans.size());
NormalizationPlan plan = plans.get(0);
assertTrue(plan instanceof MergeNormalizationPlan);
assertEquals(hri1, ((MergeNormalizationPlan) plan).getFirstRegion());
@ -477,17 +527,21 @@ public class TestSimpleRegionNormalizer {
}
@SuppressWarnings("MockitoCast")
protected void setupMocksForNormalizer(Map<byte[], Integer> regionSizes,
List<RegionInfo> RegionInfo) {
private void setupMocksForNormalizer(Map<byte[], Integer> regionSizes,
List<RegionInfo> RegionInfo) {
masterServices = Mockito.mock(MasterServices.class, RETURNS_DEEP_STUBS);
masterRpcServices = Mockito.mock(MasterRpcServices.class, RETURNS_DEEP_STUBS);
final MasterRpcServices masterRpcServices =
Mockito.mock(MasterRpcServices.class, RETURNS_DEEP_STUBS);
// for simplicity all regions are assumed to be on one server; doesn't matter to us
ServerName sn = ServerName.valueOf("localhost", 0, 1L);
when(masterServices.getAssignmentManager().getRegionStates().
getRegionsOfTable(any())).thenReturn(RegionInfo);
when(masterServices.getAssignmentManager().getRegionStates().
getRegionServerOfRegion(any())).thenReturn(sn);
when(masterServices.getAssignmentManager().getRegionStates()
.getRegionsOfTable(any())).thenReturn(RegionInfo);
when(masterServices.getAssignmentManager().getRegionStates()
.getRegionServerOfRegion(any())).thenReturn(sn);
when(masterServices.getAssignmentManager().getRegionStates()
.getRegionState(any(RegionInfo.class))).thenReturn(
RegionState.createForTesting(null, RegionState.State.OPEN));
for (Map.Entry<byte[], Integer> region : regionSizes.entrySet()) {
RegionMetrics regionLoad = Mockito.mock(RegionMetrics.class);
@ -498,8 +552,8 @@ public class TestSimpleRegionNormalizer {
// this is possibly broken with jdk9, unclear if false positive or not
// suppress it for now, fix it when we get to running tests on 9
// see: http://errorprone.info/bugpattern/MockitoCast
when((Object) masterServices.getServerManager().getLoad(sn).
getRegionMetrics().get(region.getKey())).thenReturn(regionLoad);
when((Object) masterServices.getServerManager().getLoad(sn)
.getRegionMetrics().get(region.getKey())).thenReturn(regionLoad);
}
try {
when(masterRpcServices.isSplitOrMergeEnabled(any(),
@ -509,6 +563,7 @@ public class TestSimpleRegionNormalizer {
LOG.debug("error setting isSplitOrMergeEnabled switch", se);
}
normalizer = new SimpleRegionNormalizer();
normalizer.setMasterServices(masterServices);
normalizer.setMasterRpcServices(masterRpcServices);
}