Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
This commit is contained in:
parent
7c78356218
commit
0fe85947ec
|
@ -1269,6 +1269,9 @@ public interface Admin extends Abortable, Closeable {
|
|||
|
||||
/**
|
||||
* Invoke region normalizer. Can NOT run for various reasons. Check logs.
|
||||
* This is a non-blocking invocation to region normalizer. If return value is true, it means
|
||||
* the request was submitted successfully. We need to check logs for the details of which regions
|
||||
* were split/merged.
|
||||
*
|
||||
* @return <code>true</code> if region normalizer ran, <code>false</code> otherwise.
|
||||
* @throws IOException if a remote or network exception occurs
|
||||
|
|
|
@ -1284,7 +1284,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
|
|||
return;
|
||||
}
|
||||
|
||||
MergeTableRegionsRequest request = null;
|
||||
final MergeTableRegionsRequest request;
|
||||
try {
|
||||
request = RequestConverter.buildMergeTableRegionsRequest(encodedNameOfRegionsToMerge,
|
||||
forcible, ng.getNonceGroup(), ng.newNonce());
|
||||
|
@ -1294,8 +1294,8 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
|
|||
}
|
||||
|
||||
addListener(
|
||||
this.<MergeTableRegionsRequest, MergeTableRegionsResponse> procedureCall(tableName, request,
|
||||
(s, c, req, done) -> s.mergeTableRegions(c, req, done), (resp) -> resp.getProcId(),
|
||||
this.procedureCall(tableName, request,
|
||||
MasterService.Interface::mergeTableRegions, MergeTableRegionsResponse::getProcId,
|
||||
new MergeTableRegionProcedureBiConsumer(tableName)),
|
||||
(ret, err2) -> {
|
||||
if (err2 != null) {
|
||||
|
@ -1470,7 +1470,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
|
|||
private CompletableFuture<Void> split(final RegionInfo hri, byte[] splitPoint) {
|
||||
CompletableFuture<Void> future = new CompletableFuture<>();
|
||||
TableName tableName = hri.getTable();
|
||||
SplitTableRegionRequest request = null;
|
||||
final SplitTableRegionRequest request;
|
||||
try {
|
||||
request = RequestConverter.buildSplitTableRegionRequest(hri, splitPoint, ng.getNonceGroup(),
|
||||
ng.newNonce());
|
||||
|
@ -1480,8 +1480,8 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
|
|||
}
|
||||
|
||||
addListener(
|
||||
this.<SplitTableRegionRequest, SplitTableRegionResponse> procedureCall(tableName,
|
||||
request, (s, c, req, done) -> s.splitRegion(c, req, done), (resp) -> resp.getProcId(),
|
||||
this.procedureCall(tableName,
|
||||
request, MasterService.Interface::splitRegion, SplitTableRegionResponse::getProcId,
|
||||
new SplitTableRegionProcedureBiConsumer(tableName)),
|
||||
(ret, err2) -> {
|
||||
if (err2 != null) {
|
||||
|
|
|
@ -1906,19 +1906,19 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
}
|
||||
|
||||
try {
|
||||
final List<TableName> allEnabledTables = new ArrayList<>(
|
||||
tableStateManager.getTablesInStates(TableState.State.ENABLED));
|
||||
final List<TableName> allEnabledTables =
|
||||
new ArrayList<>(tableStateManager.getTablesInStates(TableState.State.ENABLED));
|
||||
Collections.shuffle(allEnabledTables);
|
||||
|
||||
try (final Admin admin = clusterConnection.getAdmin()) {
|
||||
final List<Long> submittedPlanProcIds = new ArrayList<>();
|
||||
for (TableName table : allEnabledTables) {
|
||||
if (table.isSystemTable()) {
|
||||
continue;
|
||||
}
|
||||
final TableDescriptor tblDesc = getTableDescriptors().get(table);
|
||||
if (tblDesc != null && !tblDesc.isNormalizationEnabled()) {
|
||||
LOG.debug("Skipping table {} because normalization is disabled in its"
|
||||
+ " table properties.", table);
|
||||
LOG.debug(
|
||||
"Skipping table {} because normalization is disabled in its table properties.", table);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -1933,16 +1933,22 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
continue;
|
||||
}
|
||||
|
||||
// as of this writing, `plan.execute()` is non-blocking, so there's no artificial rate-
|
||||
// limiting of merge requests due to this serial loop.
|
||||
// as of this writing, `plan.submit()` is non-blocking and uses Async Admin APIs to
|
||||
// submit task , so there's no artificial rate-
|
||||
// limiting of merge/split requests due to this serial loop.
|
||||
for (NormalizationPlan plan : plans) {
|
||||
plan.execute(admin);
|
||||
long procId = plan.submit(this);
|
||||
submittedPlanProcIds.add(procId);
|
||||
if (plan.getType() == PlanType.SPLIT) {
|
||||
splitPlanCount++;
|
||||
} else if (plan.getType() == PlanType.MERGE) {
|
||||
mergePlanCount++;
|
||||
}
|
||||
}
|
||||
int totalPlansSubmitted = submittedPlanProcIds.size();
|
||||
if (totalPlansSubmitted > 0 && LOG.isDebugEnabled()) {
|
||||
LOG.debug("Normalizer plans submitted. Total plans count: {} , procID list: {}",
|
||||
totalPlansSubmitted, submittedPlanProcIds);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
|
@ -1985,8 +1991,8 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
" failed because merge switch is off");
|
||||
}
|
||||
|
||||
final String mergeRegionsStr = Arrays.stream(regionsToMerge).map(r -> r.getEncodedName()).
|
||||
collect(Collectors.joining(", "));
|
||||
final String mergeRegionsStr = Arrays.stream(regionsToMerge).map(RegionInfo::getEncodedName)
|
||||
.collect(Collectors.joining(", "));
|
||||
return MasterProcedureUtil.submitProcedure(new NonceProcedureRunnable(this, ng, nonce) {
|
||||
@Override
|
||||
protected void run() throws IOException {
|
||||
|
|
|
@ -1,54 +0,0 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.normalizer;
|
||||
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
|
||||
|
||||
/**
|
||||
* Plan which signifies that no normalization is required,
|
||||
* or normalization of this table isn't allowed, this is singleton.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public final class EmptyNormalizationPlan implements NormalizationPlan {
|
||||
private static final EmptyNormalizationPlan instance = new EmptyNormalizationPlan();
|
||||
|
||||
private EmptyNormalizationPlan() {
|
||||
}
|
||||
|
||||
/**
|
||||
* @return singleton instance
|
||||
*/
|
||||
public static EmptyNormalizationPlan getInstance(){
|
||||
return instance;
|
||||
}
|
||||
|
||||
/**
|
||||
* No-op for empty plan.
|
||||
*/
|
||||
@Override
|
||||
public void execute(Admin admin) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public PlanType getType() {
|
||||
return PlanType.NONE;
|
||||
}
|
||||
}
|
|
@ -20,8 +20,9 @@ package org.apache.hadoop.hbase.master.normalizer;
|
|||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -41,6 +42,20 @@ public class MergeNormalizationPlan implements NormalizationPlan {
|
|||
this.secondRegion = secondRegion;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public long submit(MasterServices masterServices) throws IOException {
|
||||
LOG.info("Executing merging normalization plan: " + this);
|
||||
// Do not use force=true as corner cases can happen, non adjacent regions,
|
||||
// merge with a merged child region with no GC done yet, it is going to
|
||||
// cause all different issues.
|
||||
return masterServices
|
||||
.mergeRegions(new RegionInfo[] { firstRegion, secondRegion }, false, HConstants.NO_NONCE,
|
||||
HConstants.NO_NONCE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public PlanType getType() {
|
||||
return PlanType.MERGE;
|
||||
|
@ -62,20 +77,4 @@ public class MergeNormalizationPlan implements NormalizationPlan {
|
|||
'}';
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public void execute(Admin admin) {
|
||||
LOG.info("Executing merging normalization plan: " + this);
|
||||
try {
|
||||
// Do not use force=true as corner cases can happen, non adjacent regions,
|
||||
// merge with a merged child region with no GC done yet, it is going to
|
||||
// cause all different issues.
|
||||
admin.mergeRegionsAsync(firstRegion.getEncodedNameAsBytes(),
|
||||
secondRegion.getEncodedNameAsBytes(), false);
|
||||
} catch (IOException ex) {
|
||||
LOG.error("Error during region merge: ", ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,8 +18,9 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.master.normalizer;
|
||||
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Interface for normalization plan.
|
||||
|
@ -33,10 +34,13 @@ public interface NormalizationPlan {
|
|||
}
|
||||
|
||||
/**
|
||||
* Executes normalization plan on cluster (does actual splitting/merging work).
|
||||
* @param admin instance of Admin
|
||||
* Submits normalization plan on cluster (does actual splitting/merging work) and
|
||||
* returns proc Id to caller.
|
||||
* @param masterServices instance of {@link MasterServices}
|
||||
* @return Proc Id for the submitted task
|
||||
* @throws IOException If plan submission to Admin fails
|
||||
*/
|
||||
void execute(Admin admin);
|
||||
long submit(MasterServices masterServices) throws IOException;
|
||||
|
||||
/**
|
||||
* @return the type of this plan
|
||||
|
|
|
@ -18,9 +18,11 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.master.normalizer;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -40,6 +42,14 @@ public class SplitNormalizationPlan implements NormalizationPlan {
|
|||
this.splitPoint = splitPoint;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public long submit(MasterServices masterServices) throws IOException {
|
||||
return masterServices.splitRegion(regionInfo, null, HConstants.NO_NONCE, HConstants.NO_NONCE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public PlanType getType() {
|
||||
return PlanType.SPLIT;
|
||||
|
@ -69,16 +79,4 @@ public class SplitNormalizationPlan implements NormalizationPlan {
|
|||
'}';
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public void execute(Admin admin) {
|
||||
LOG.info("Executing splitting normalization plan: " + this);
|
||||
try {
|
||||
admin.splitRegionAsync(regionInfo.getRegionName()).get();
|
||||
} catch (Exception ex) {
|
||||
LOG.error("Error during region split: ", ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -130,15 +130,23 @@ public class TestSimpleRegionNormalizerOnCluster {
|
|||
public void testHonorsNormalizerTableSetting() throws Exception {
|
||||
final TableName tn1 = TableName.valueOf(name.getMethodName() + "1");
|
||||
final TableName tn2 = TableName.valueOf(name.getMethodName() + "2");
|
||||
final TableName tn3 = TableName.valueOf(name.getMethodName() + "3");
|
||||
|
||||
try {
|
||||
final int tn1RegionCount = createTableBegsSplit(tn1, true);
|
||||
final int tn2RegionCount = createTableBegsSplit(tn2, false);
|
||||
final int tn1RegionCount = createTableBegsSplit(tn1, true, false);
|
||||
final int tn2RegionCount = createTableBegsSplit(tn2, false, false);
|
||||
final int tn3RegionCount = createTableBegsSplit(tn3, true, true);
|
||||
|
||||
assertFalse(admin.normalizerSwitch(true));
|
||||
assertTrue(admin.normalize());
|
||||
waitForTableSplit(tn1, tn1RegionCount + 1);
|
||||
|
||||
// confirm that tn1 has (tn1RegionCount + 1) number of regions.
|
||||
// tn2 has tn2RegionCount number of regions because normalizer has not been enabled on it.
|
||||
// tn3 has tn3RegionCount number of regions because two plans are run:
|
||||
// 1. split one region to two
|
||||
// 2. merge two regions into one
|
||||
// and hence, total number of regions for tn3 remains same
|
||||
assertEquals(
|
||||
tn1 + " should have split.",
|
||||
tn1RegionCount + 1,
|
||||
|
@ -147,9 +155,11 @@ public class TestSimpleRegionNormalizerOnCluster {
|
|||
tn2 + " should not have split.",
|
||||
tn2RegionCount,
|
||||
MetaTableAccessor.getRegionCount(TEST_UTIL.getConnection(), tn2));
|
||||
waitForTableRegionCount(tn3, tn3RegionCount);
|
||||
} finally {
|
||||
dropIfExists(tn1);
|
||||
dropIfExists(tn2);
|
||||
dropIfExists(tn3);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -170,7 +180,7 @@ public class TestSimpleRegionNormalizerOnCluster {
|
|||
? buildTableNameForQuotaTest(name.getMethodName())
|
||||
: TableName.valueOf(name.getMethodName());
|
||||
|
||||
final int currentRegionCount = createTableBegsSplit(tableName, true);
|
||||
final int currentRegionCount = createTableBegsSplit(tableName, true, false);
|
||||
final long existingSkippedSplitCount = master.getRegionNormalizer()
|
||||
.getSkippedCount(PlanType.SPLIT);
|
||||
assertFalse(admin.normalizerSwitch(true));
|
||||
|
@ -200,7 +210,7 @@ public class TestSimpleRegionNormalizerOnCluster {
|
|||
final int currentRegionCount = createTableBegsMerge(tableName);
|
||||
assertFalse(admin.normalizerSwitch(true));
|
||||
assertTrue(admin.normalize());
|
||||
waitforTableMerge(tableName, currentRegionCount - 1);
|
||||
waitForTableMerge(tableName, currentRegionCount - 1);
|
||||
assertEquals(
|
||||
tableName + " should have merged.",
|
||||
currentRegionCount - 1,
|
||||
|
@ -233,6 +243,22 @@ public class TestSimpleRegionNormalizerOnCluster {
|
|||
});
|
||||
}
|
||||
|
||||
private static void waitForTableRegionCount(final TableName tableName,
|
||||
final int targetRegionCount) throws IOException {
|
||||
TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(5), new ExplainingPredicate<IOException>() {
|
||||
@Override
|
||||
public String explainFailure() {
|
||||
return "expected " + targetRegionCount + " number of regions for table " + tableName;
|
||||
}
|
||||
@Override
|
||||
public boolean evaluate() throws IOException {
|
||||
final int currentRegionCount =
|
||||
MetaTableAccessor.getRegionCount(TEST_UTIL.getConnection(), tableName);
|
||||
return currentRegionCount == targetRegionCount;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private static void waitForTableSplit(final TableName tableName, final int targetRegionCount)
|
||||
throws IOException {
|
||||
TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(5), new ExplainingPredicate<IOException>() {
|
||||
|
@ -247,7 +273,7 @@ public class TestSimpleRegionNormalizerOnCluster {
|
|||
});
|
||||
}
|
||||
|
||||
private static void waitforTableMerge(final TableName tableName, final int targetRegionCount)
|
||||
private static void waitForTableMerge(final TableName tableName, final int targetRegionCount)
|
||||
throws IOException {
|
||||
TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(5), new ExplainingPredicate<IOException>() {
|
||||
@Override public String explainFailure() {
|
||||
|
@ -319,14 +345,16 @@ public class TestSimpleRegionNormalizerOnCluster {
|
|||
* <li>split threshold: 2.4 * 2 = 4.8</li>
|
||||
* </ul>
|
||||
*/
|
||||
private static int createTableBegsSplit(final TableName tableName, final boolean balancerEnabled)
|
||||
private static int createTableBegsSplit(final TableName tableName,
|
||||
final boolean normalizerEnabled, final boolean isMergeEnabled)
|
||||
throws IOException {
|
||||
final List<HRegion> generatedRegions = generateTestData(tableName, 1, 1, 2, 3, 5);
|
||||
assertEquals(5, MetaTableAccessor.getRegionCount(TEST_UTIL.getConnection(), tableName));
|
||||
admin.flush(tableName);
|
||||
|
||||
final TableDescriptor td = TableDescriptorBuilder.newBuilder(admin.getDescriptor(tableName))
|
||||
.setNormalizationEnabled(balancerEnabled)
|
||||
.setNormalizationEnabled(normalizerEnabled)
|
||||
.setMergeEnabled(isMergeEnabled)
|
||||
.build();
|
||||
admin.modifyTable(td);
|
||||
|
||||
|
|
Loading…
Reference in New Issue