HBASE-25592 Improve normalizer code in line with HBASE-23932 (#2972)
Signed-off-by: Viraj Jasani <vjasani@apache.org>
This commit is contained in:
parent
9e9301a242
commit
d37f734299
|
@ -135,6 +135,7 @@ import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
|||
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
|
||||
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan;
|
||||
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
|
||||
import org.apache.hadoop.hbase.namespace.NamespaceAuditor;
|
||||
import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
|
||||
import org.apache.hadoop.hbase.procedure.flush.MasterFlushTableProcedureManager;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
|
||||
|
@ -327,7 +328,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
|
|||
new ProcedureEvent("server crash processing");
|
||||
|
||||
// Maximum time we should run balancer for
|
||||
private final int maxBlancingTime;
|
||||
private final int maxBalancingTime;
|
||||
// Maximum percent of regions in transition when balancing
|
||||
private final double maxRitPercent;
|
||||
|
||||
|
@ -493,7 +494,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
|
|||
// preload table descriptor at startup
|
||||
this.preLoadTableDescriptors = conf.getBoolean("hbase.master.preload.tabledescriptors", true);
|
||||
|
||||
this.maxBlancingTime = getMaxBalancingTime();
|
||||
this.maxBalancingTime = getMaxBalancingTime();
|
||||
this.maxRitPercent = conf.getDouble(HConstants.HBASE_MASTER_BALANCER_MAX_RIT_PERCENT,
|
||||
HConstants.DEFAULT_HBASE_MASTER_BALANCER_MAX_RIT_PERCENT);
|
||||
|
||||
|
@ -1552,13 +1553,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
|
|||
|
||||
public boolean balance(boolean force) throws IOException {
|
||||
// if master not initialized, don't run balancer.
|
||||
if (!isInitialized()) {
|
||||
LOG.debug("Master has not been initialized, don't run balancer.");
|
||||
return false;
|
||||
}
|
||||
|
||||
if (isInMaintenanceMode()) {
|
||||
LOG.info("Master is in maintenanceMode mode, don't run balancer.");
|
||||
if (skipRegionManagementAction("balancer")) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -1610,10 +1605,10 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
|
|||
}
|
||||
|
||||
long balanceStartTime = System.currentTimeMillis();
|
||||
long cutoffTime = balanceStartTime + this.maxBlancingTime;
|
||||
long cutoffTime = balanceStartTime + this.maxBalancingTime;
|
||||
int rpCount = 0; // number of RegionPlans balanced so far
|
||||
if (plans != null && !plans.isEmpty()) {
|
||||
int balanceInterval = this.maxBlancingTime / plans.size();
|
||||
int balanceInterval = this.maxBalancingTime / plans.size();
|
||||
LOG.info("Balancer plans size is " + plans.size() + ", the balance interval is "
|
||||
+ balanceInterval + " ms, and the max number regions in transition is "
|
||||
+ maxRegionsInTransition);
|
||||
|
@ -1632,7 +1627,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
|
|||
// TODO: After balance, there should not be a cutoff time (keeping it as a security net
|
||||
// for now)
|
||||
LOG.debug("No more balancing till next balance run; maxBalanceTime="
|
||||
+ this.maxBlancingTime);
|
||||
+ this.maxBalancingTime);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -1652,6 +1647,23 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
|
|||
return true;
|
||||
}
|
||||
|
||||
private boolean skipRegionManagementAction(String action) throws IOException {
|
||||
if (!isInitialized()) {
|
||||
LOG.debug("Master has not been initialized, don't run " + action);
|
||||
return true;
|
||||
}
|
||||
|
||||
if (this.getServerManager().isClusterShutdown()) {
|
||||
LOG.info("CLuster is shutting down, don't run " + action);
|
||||
}
|
||||
|
||||
if (isInMaintenanceMode()) {
|
||||
LOG.info("Master is in maintenanceMode mode, don't run " + action);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Perform normalization of cluster (invoked by {@link RegionNormalizerChore}).
|
||||
*
|
||||
|
@ -1662,50 +1674,55 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
|
|||
* @throws CoordinatedStateException
|
||||
*/
|
||||
public boolean normalizeRegions() throws IOException, CoordinatedStateException {
|
||||
if (!isInitialized()) {
|
||||
LOG.debug("Master has not been initialized, don't run region normalizer.");
|
||||
if (skipRegionManagementAction("normalizer")) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (isInMaintenanceMode()) {
|
||||
LOG.info("Master is in maintenance mode, don't run region normalizer.");
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!this.regionNormalizerTracker.isNormalizerOn()) {
|
||||
if (isNormalizerOn()) {
|
||||
LOG.debug("Region normalization is disabled, don't run region normalizer.");
|
||||
return false;
|
||||
}
|
||||
|
||||
synchronized (this.normalizer) {
|
||||
// Don't run the normalizer concurrently
|
||||
List<TableName> allEnabledTables = new ArrayList<>(
|
||||
final List<TableName> allEnabledTables = new ArrayList<>(
|
||||
this.assignmentManager.getTableStateManager().getTablesInStates(
|
||||
TableState.State.ENABLED));
|
||||
|
||||
Collections.shuffle(allEnabledTables);
|
||||
|
||||
for (TableName table : allEnabledTables) {
|
||||
if (isInMaintenanceMode()) {
|
||||
LOG.debug("Master is in maintenance mode, stop running region normalizer.");
|
||||
final NamespaceAuditor namespaceQuotaManager = quotaManager.getNamespaceQuotaManager();
|
||||
if(namespaceQuotaManager == null) {
|
||||
LOG.debug("Skipping normalizing since namespace quota is null");
|
||||
return false;
|
||||
}
|
||||
|
||||
if (quotaManager.getNamespaceQuotaManager() != null &&
|
||||
quotaManager.getNamespaceQuotaManager().getState(table.getNamespaceAsString()) != null){
|
||||
if (namespaceQuotaManager.getState(table.getNamespaceAsString()) != null) {
|
||||
LOG.debug("Skipping normalizing " + table + " since its namespace has quota");
|
||||
continue;
|
||||
}
|
||||
if (table.isSystemTable() || (getTableDescriptors().get(table) != null &&
|
||||
!getTableDescriptors().get(table).isNormalizationEnabled())) {
|
||||
LOG.debug("Skipping normalization for table: " + table + ", as it's either system"
|
||||
+ " table or doesn't have auto normalization turned on");
|
||||
if (table.isSystemTable()) {
|
||||
continue;
|
||||
}
|
||||
List<NormalizationPlan> plans = this.normalizer.computePlanForTable(table);
|
||||
if (plans != null) {
|
||||
|
||||
HTableDescriptor tableDescriptor = getTableDescriptors().get(table);
|
||||
if (tableDescriptor != null && !tableDescriptor.isNormalizationEnabled()) {
|
||||
LOG.debug("Skipping normalization for table: " + table
|
||||
+ ", as it doesn't have auto normalization turned on");
|
||||
continue;
|
||||
}
|
||||
// make one last check that the cluster isn't shutting down before proceeding.
|
||||
if (skipRegionManagementAction("region normalizer")) {
|
||||
return false;
|
||||
}
|
||||
|
||||
List<NormalizationPlan> plans = this.normalizer.computePlansForTable(table);
|
||||
if (plans == null || plans.isEmpty()) {
|
||||
return true;
|
||||
}
|
||||
try (Admin admin = clusterConnection.getAdmin()) {
|
||||
for (NormalizationPlan plan : plans) {
|
||||
plan.execute(clusterConnection.getAdmin());
|
||||
plan.execute(admin);
|
||||
if (plan.getType() == PlanType.SPLIT) {
|
||||
splitPlanCount++;
|
||||
} else if (plan.getType() == PlanType.MERGE) {
|
||||
|
|
|
@ -40,14 +40,14 @@ import org.apache.hadoop.hbase.master.MasterServices;
|
|||
public interface RegionNormalizer {
|
||||
/**
|
||||
* Set the master service. Must be called before first call to
|
||||
* {@link #computePlanForTable(TableName)}.
|
||||
* {@link #computePlansForTable(TableName)}.
|
||||
* @param masterServices master services to use
|
||||
*/
|
||||
void setMasterServices(MasterServices masterServices);
|
||||
|
||||
/**
|
||||
* Set the master RPC service. Must be called before first call to
|
||||
* {@link #computePlanForTable(TableName)}.
|
||||
* {@link #computePlansForTable(TableName)}.
|
||||
* @param masterRpcServices master RPC services to use
|
||||
*/
|
||||
void setMasterRpcServices(MasterRpcServices masterRpcServices);
|
||||
|
@ -57,6 +57,6 @@ public interface RegionNormalizer {
|
|||
* @param table table to normalize
|
||||
* @return normalization actions to perform. Null if no action to take
|
||||
*/
|
||||
List<NormalizationPlan> computePlanForTable(TableName table)
|
||||
List<NormalizationPlan> computePlansForTable(TableName table)
|
||||
throws HBaseIOException;
|
||||
}
|
||||
|
|
|
@ -103,24 +103,15 @@ public class SimpleRegionNormalizer implements RegionNormalizer {
|
|||
* @return normalization plan to execute
|
||||
*/
|
||||
@Override
|
||||
public List<NormalizationPlan> computePlanForTable(TableName table) throws HBaseIOException {
|
||||
public List<NormalizationPlan> computePlansForTable(TableName table) throws HBaseIOException {
|
||||
if (table == null || table.isSystemTable()) {
|
||||
LOG.debug("Normalization of system table " + table + " isn't allowed");
|
||||
return null;
|
||||
}
|
||||
boolean splitEnabled = true, mergeEnabled = true;
|
||||
try {
|
||||
splitEnabled = masterRpcServices.isSplitOrMergeEnabled(null,
|
||||
RequestConverter.buildIsSplitOrMergeEnabledRequest(MasterSwitchType.SPLIT)).getEnabled();
|
||||
} catch (ServiceException se) {
|
||||
LOG.debug("Unable to determine whether split is enabled", se);
|
||||
}
|
||||
try {
|
||||
mergeEnabled = masterRpcServices.isSplitOrMergeEnabled(null,
|
||||
RequestConverter.buildIsSplitOrMergeEnabledRequest(MasterSwitchType.MERGE)).getEnabled();
|
||||
} catch (ServiceException se) {
|
||||
LOG.debug("Unable to determine whether merge is enabled", se);
|
||||
}
|
||||
splitEnabled = isSplitEnabled();
|
||||
mergeEnabled = isMergeEnabled();
|
||||
|
||||
if (!splitEnabled && !mergeEnabled) {
|
||||
LOG.debug("Both split and merge are disabled for table: " + table);
|
||||
return null;
|
||||
|
@ -142,13 +133,13 @@ public class SimpleRegionNormalizer implements RegionNormalizer {
|
|||
", number of regions: " + tableRegions.size());
|
||||
|
||||
long totalSizeMb = 0;
|
||||
int acutalRegionCnt = 0;
|
||||
int actualRegionCnt = 0;
|
||||
|
||||
for (int i = 0; i < tableRegions.size(); i++) {
|
||||
HRegionInfo hri = tableRegions.get(i);
|
||||
long regionSize = getRegionSize(hri);
|
||||
if (regionSize > 0) {
|
||||
acutalRegionCnt++;
|
||||
actualRegionCnt++;
|
||||
totalSizeMb += regionSize;
|
||||
}
|
||||
}
|
||||
|
@ -176,16 +167,15 @@ public class SimpleRegionNormalizer implements RegionNormalizer {
|
|||
} else if (targetRegionCount > 0) {
|
||||
avgRegionSize = totalSizeMb / (double) targetRegionCount;
|
||||
} else {
|
||||
avgRegionSize = acutalRegionCnt == 0 ? 0 : totalSizeMb / (double) acutalRegionCnt;
|
||||
avgRegionSize = actualRegionCnt == 0 ? 0 : totalSizeMb / (double) actualRegionCnt;
|
||||
}
|
||||
|
||||
LOG.debug("Table " + table + ", total aggregated regions size: " + totalSizeMb);
|
||||
LOG.debug("Table " + table + ", average region size: " + avgRegionSize);
|
||||
|
||||
int candidateIdx = 0;
|
||||
int splitCount = 0;
|
||||
int mergeCount = 0;
|
||||
while (candidateIdx < tableRegions.size()) {
|
||||
for (int candidateIdx = 0; candidateIdx < tableRegions.size(); candidateIdx++) {
|
||||
HRegionInfo hri = tableRegions.get(candidateIdx);
|
||||
long regionSize = getRegionSize(hri);
|
||||
// if the region is > 2 times larger than average, we split it, split
|
||||
|
@ -214,7 +204,6 @@ public class SimpleRegionNormalizer implements RegionNormalizer {
|
|||
}
|
||||
}
|
||||
}
|
||||
candidateIdx++;
|
||||
}
|
||||
if (plans.isEmpty()) {
|
||||
LOG.debug("No normalization needed, regions look good for table: " + table);
|
||||
|
@ -227,7 +216,38 @@ public class SimpleRegionNormalizer implements RegionNormalizer {
|
|||
}
|
||||
return plans;
|
||||
}
|
||||
/**
|
||||
* Return configured value for MasterSwitchType.SPLIT.
|
||||
*/
|
||||
private boolean isSplitEnabled() {
|
||||
boolean splitEnabled = true;
|
||||
try {
|
||||
splitEnabled = masterRpcServices.isSplitOrMergeEnabled(null,
|
||||
RequestConverter.buildIsSplitOrMergeEnabledRequest(MasterSwitchType.SPLIT)).getEnabled();
|
||||
} catch (ServiceException se) {
|
||||
LOG.debug("Unable to determine whether split is enabled", se);
|
||||
}
|
||||
return splitEnabled;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return configured value for MasterSwitchType.MERGE.
|
||||
*/
|
||||
private boolean isMergeEnabled() {
|
||||
boolean mergeEnabled = true;
|
||||
try {
|
||||
mergeEnabled = masterRpcServices.isSplitOrMergeEnabled(null,
|
||||
RequestConverter.buildIsSplitOrMergeEnabledRequest(MasterSwitchType.MERGE)).getEnabled();
|
||||
} catch (ServiceException se) {
|
||||
LOG.debug("Unable to determine whether merge is enabled", se);
|
||||
}
|
||||
return mergeEnabled;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param hri used to calculate region size
|
||||
* @return region size in MB
|
||||
*/
|
||||
private long getRegionSize(HRegionInfo hri) {
|
||||
ServerName sn =
|
||||
masterServices.getAssignmentManager().getRegionStates().getRegionServerOfRegion(hri);
|
||||
|
|
|
@ -73,7 +73,7 @@ public class TestSimpleRegionNormalizer {
|
|||
Map<byte[], Integer> regionSizes = new HashMap<>();
|
||||
|
||||
setupMocksForNormalizer(regionSizes, hris);
|
||||
List<NormalizationPlan> plans = normalizer.computePlanForTable(testTable);
|
||||
List<NormalizationPlan> plans = normalizer.computePlansForTable(testTable);
|
||||
assertTrue(plans == null);
|
||||
}
|
||||
|
||||
|
@ -92,7 +92,7 @@ public class TestSimpleRegionNormalizer {
|
|||
regionSizes.put(hri2.getRegionName(), 15);
|
||||
|
||||
setupMocksForNormalizer(regionSizes, hris);
|
||||
List<NormalizationPlan> plans = normalizer.computePlanForTable(testTable);
|
||||
List<NormalizationPlan> plans = normalizer.computePlansForTable(testTable);
|
||||
assertTrue(plans == null);
|
||||
}
|
||||
|
||||
|
@ -119,7 +119,7 @@ public class TestSimpleRegionNormalizer {
|
|||
regionSizes.put(hri4.getRegionName(), 10);
|
||||
|
||||
setupMocksForNormalizer(regionSizes, hris);
|
||||
List<NormalizationPlan> plans = normalizer.computePlanForTable(testTable);
|
||||
List<NormalizationPlan> plans = normalizer.computePlansForTable(testTable);
|
||||
assertTrue(plans == null);
|
||||
}
|
||||
|
||||
|
@ -150,7 +150,7 @@ public class TestSimpleRegionNormalizer {
|
|||
regionSizes.put(hri5.getRegionName(), 16);
|
||||
|
||||
setupMocksForNormalizer(regionSizes, hris);
|
||||
List<NormalizationPlan> plans = normalizer.computePlanForTable(testTable);
|
||||
List<NormalizationPlan> plans = normalizer.computePlansForTable(testTable);
|
||||
|
||||
NormalizationPlan plan = plans.get(0);
|
||||
assertTrue(plan instanceof MergeNormalizationPlan);
|
||||
|
@ -190,7 +190,7 @@ public class TestSimpleRegionNormalizer {
|
|||
regionSizes.put(hri6.getRegionName(), 2700);
|
||||
|
||||
setupMocksForNormalizer(regionSizes, hris);
|
||||
List<NormalizationPlan> plans = normalizer.computePlanForTable(testTable);
|
||||
List<NormalizationPlan> plans = normalizer.computePlansForTable(testTable);
|
||||
NormalizationPlan plan = plans.get(0);
|
||||
|
||||
assertTrue(plan instanceof MergeNormalizationPlan);
|
||||
|
@ -225,7 +225,7 @@ public class TestSimpleRegionNormalizer {
|
|||
regionSizes.put(hri5.getRegionName(), 5);
|
||||
|
||||
setupMocksForNormalizer(regionSizes, hris);
|
||||
List<NormalizationPlan> plans = normalizer.computePlanForTable(testTable);
|
||||
List<NormalizationPlan> plans = normalizer.computePlansForTable(testTable);
|
||||
|
||||
assertTrue(plans == null);
|
||||
}
|
||||
|
@ -253,7 +253,7 @@ public class TestSimpleRegionNormalizer {
|
|||
regionSizes.put(hri4.getRegionName(), 30);
|
||||
|
||||
setupMocksForNormalizer(regionSizes, hris);
|
||||
List<NormalizationPlan> plans = normalizer.computePlanForTable(testTable);
|
||||
List<NormalizationPlan> plans = normalizer.computePlansForTable(testTable);
|
||||
NormalizationPlan plan = plans.get(0);
|
||||
|
||||
assertTrue(plan instanceof SplitNormalizationPlan);
|
||||
|
@ -296,7 +296,7 @@ public class TestSimpleRegionNormalizer {
|
|||
when(
|
||||
masterServices.getTableDescriptors().get((TableName) any()).getNormalizerTargetRegionSize())
|
||||
.thenReturn(20L);
|
||||
List<NormalizationPlan> plans = normalizer.computePlanForTable(tableName);
|
||||
List<NormalizationPlan> plans = normalizer.computePlansForTable(tableName);
|
||||
assertEquals(4, plans.size());
|
||||
|
||||
for (NormalizationPlan plan : plans) {
|
||||
|
@ -307,7 +307,7 @@ public class TestSimpleRegionNormalizer {
|
|||
when(
|
||||
masterServices.getTableDescriptors().get((TableName) any()).getNormalizerTargetRegionSize())
|
||||
.thenReturn(200L);
|
||||
plans = normalizer.computePlanForTable(tableName);
|
||||
plans = normalizer.computePlansForTable(tableName);
|
||||
assertEquals(2, plans.size());
|
||||
NormalizationPlan plan = plans.get(0);
|
||||
assertTrue(plan instanceof MergeNormalizationPlan);
|
||||
|
@ -343,7 +343,7 @@ public class TestSimpleRegionNormalizer {
|
|||
when(
|
||||
masterServices.getTableDescriptors().get((TableName) any()).getNormalizerTargetRegionCount())
|
||||
.thenReturn(8);
|
||||
List<NormalizationPlan> plans = normalizer.computePlanForTable(tableName);
|
||||
List<NormalizationPlan> plans = normalizer.computePlansForTable(tableName);
|
||||
assertEquals(2, plans.size());
|
||||
|
||||
for (NormalizationPlan plan : plans) {
|
||||
|
@ -354,7 +354,7 @@ public class TestSimpleRegionNormalizer {
|
|||
when(
|
||||
masterServices.getTableDescriptors().get((TableName) any()).getNormalizerTargetRegionCount())
|
||||
.thenReturn(3);
|
||||
plans = normalizer.computePlanForTable(tableName);
|
||||
plans = normalizer.computePlansForTable(tableName);
|
||||
assertEquals(1, plans.size());
|
||||
NormalizationPlan plan = plans.get(0);
|
||||
assertTrue(plan instanceof MergeNormalizationPlan);
|
||||
|
|
Loading…
Reference in New Issue