HBASE-24588 : Submit task for NormalizationPlan (#1933)

Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
This commit is contained in:
Viraj Jasani 2020-06-26 22:08:52 +05:30 committed by GitHub
parent 0197438564
commit 71af97cd05
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 84 additions and 127 deletions

View File

@ -833,6 +833,9 @@ public interface Admin extends Abortable, Closeable {
/** /**
* Invoke region normalizer. Can NOT run for various reasons. Check logs. * Invoke region normalizer. Can NOT run for various reasons. Check logs.
* This is a non-blocking invocation to region normalizer. If return value is true, it means
* the request was submitted successfully. We need to check logs for the details of which regions
* were split/merged.
* *
* @return <code>true</code> if region normalizer ran, <code>false</code> otherwise. * @return <code>true</code> if region normalizer ran, <code>false</code> otherwise.
* @throws IOException if a remote or network exception occurs * @throws IOException if a remote or network exception occurs

View File

@ -1298,7 +1298,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
return; return;
} }
MergeTableRegionsRequest request = null; final MergeTableRegionsRequest request;
try { try {
request = RequestConverter.buildMergeTableRegionsRequest(encodedNameOfRegionsToMerge, request = RequestConverter.buildMergeTableRegionsRequest(encodedNameOfRegionsToMerge,
forcible, ng.getNonceGroup(), ng.newNonce()); forcible, ng.getNonceGroup(), ng.newNonce());
@ -1308,8 +1308,8 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
} }
addListener( addListener(
this.<MergeTableRegionsRequest, MergeTableRegionsResponse> procedureCall(tableName, request, this.procedureCall(tableName, request,
(s, c, req, done) -> s.mergeTableRegions(c, req, done), (resp) -> resp.getProcId(), MasterService.Interface::mergeTableRegions, MergeTableRegionsResponse::getProcId,
new MergeTableRegionProcedureBiConsumer(tableName)), new MergeTableRegionProcedureBiConsumer(tableName)),
(ret, err2) -> { (ret, err2) -> {
if (err2 != null) { if (err2 != null) {
@ -1485,7 +1485,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
private CompletableFuture<Void> split(final RegionInfo hri, byte[] splitPoint) { private CompletableFuture<Void> split(final RegionInfo hri, byte[] splitPoint) {
CompletableFuture<Void> future = new CompletableFuture<>(); CompletableFuture<Void> future = new CompletableFuture<>();
TableName tableName = hri.getTable(); TableName tableName = hri.getTable();
SplitTableRegionRequest request = null; final SplitTableRegionRequest request;
try { try {
request = RequestConverter.buildSplitTableRegionRequest(hri, splitPoint, ng.getNonceGroup(), request = RequestConverter.buildSplitTableRegionRequest(hri, splitPoint, ng.getNonceGroup(),
ng.newNonce()); ng.newNonce());
@ -1495,8 +1495,8 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
} }
addListener( addListener(
this.<SplitTableRegionRequest, SplitTableRegionResponse> procedureCall(tableName, this.procedureCall(tableName,
request, (s, c, req, done) -> s.splitRegion(c, req, done), (resp) -> resp.getProcId(), request, MasterService.Interface::splitRegion, SplitTableRegionResponse::getProcId,
new SplitTableRegionProcedureBiConsumer(tableName)), new SplitTableRegionProcedureBiConsumer(tableName)),
(ret, err2) -> { (ret, err2) -> {
if (err2 != null) { if (err2 != null) {

View File

@ -1934,19 +1934,20 @@ public class HMaster extends HRegionServer implements MasterServices {
} }
try { try {
final List<TableName> allEnabledTables = new ArrayList<>( final List<TableName> allEnabledTables =
tableStateManager.getTablesInStates(TableState.State.ENABLED)); new ArrayList<>(tableStateManager.getTablesInStates(TableState.State.ENABLED));
Collections.shuffle(allEnabledTables); Collections.shuffle(allEnabledTables);
try (final Admin admin = asyncClusterConnection.toConnection().getAdmin()) { final List<Long> submittedPlanProcIds = new ArrayList<>();
for (TableName table : allEnabledTables) { for (TableName table : allEnabledTables) {
if (table.isSystemTable()) { if (table.isSystemTable()) {
continue; continue;
} }
final TableDescriptor tblDesc = getTableDescriptors().get(table); final TableDescriptor tblDesc = getTableDescriptors().get(table);
if (tblDesc != null && !tblDesc.isNormalizationEnabled()) { if (tblDesc != null && !tblDesc.isNormalizationEnabled()) {
LOG.debug("Skipping table {} because normalization is disabled in its" LOG.debug(
+ " table properties.", table); "Skipping table {} because normalization is disabled in its" + " table properties.",
table);
continue; continue;
} }
@ -1961,16 +1962,22 @@ public class HMaster extends HRegionServer implements MasterServices {
continue; continue;
} }
// as of this writing, `plan.execute()` is non-blocking, so there's no artificial rate- // as of this writing, `plan.submit()` is non-blocking and uses Async Admin APIs to
// limiting of merge requests due to this serial loop. // submit task , so there's no artificial rate-
// limiting of merge/split requests due to this serial loop.
for (NormalizationPlan plan : plans) { for (NormalizationPlan plan : plans) {
plan.execute(admin); long procId = plan.submit(this);
submittedPlanProcIds.add(procId);
if (plan.getType() == PlanType.SPLIT) { if (plan.getType() == PlanType.SPLIT) {
splitPlanCount++; splitPlanCount++;
} else if (plan.getType() == PlanType.MERGE) { } else if (plan.getType() == PlanType.MERGE) {
mergePlanCount++; mergePlanCount++;
} }
} }
int totalPlansSubmitted = submittedPlanProcIds.size();
if (totalPlansSubmitted > 0 && LOG.isDebugEnabled()) {
LOG.debug("Normalizer plans submitted. Total plans count: {} , procID list: {}",
totalPlansSubmitted, submittedPlanProcIds);
} }
} }
} finally { } finally {
@ -2013,8 +2020,8 @@ public class HMaster extends HRegionServer implements MasterServices {
" failed because merge switch is off"); " failed because merge switch is off");
} }
final String mergeRegionsStr = Arrays.stream(regionsToMerge).map(r -> r.getEncodedName()). final String mergeRegionsStr = Arrays.stream(regionsToMerge).map(RegionInfo::getEncodedName)
collect(Collectors.joining(", ")); .collect(Collectors.joining(", "));
return MasterProcedureUtil.submitProcedure(new NonceProcedureRunnable(this, ng, nonce) { return MasterProcedureUtil.submitProcedure(new NonceProcedureRunnable(this, ng, nonce) {
@Override @Override
protected void run() throws IOException { protected void run() throws IOException {

View File

@ -1,54 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.normalizer;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
/**
* Plan which signifies that no normalization is required,
* or normalization of this table isn't allowed, this is singleton.
*/
@InterfaceAudience.Private
public final class EmptyNormalizationPlan implements NormalizationPlan {
private static final EmptyNormalizationPlan instance = new EmptyNormalizationPlan();
private EmptyNormalizationPlan() {
}
/**
* @return singleton instance
*/
public static EmptyNormalizationPlan getInstance(){
return instance;
}
/**
* No-op for empty plan.
*/
@Override
public void execute(Admin admin) {
}
@Override
public PlanType getType() {
return PlanType.NONE;
}
}

View File

@ -20,8 +20,9 @@ package org.apache.hadoop.hbase.master.normalizer;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -41,6 +42,20 @@ public class MergeNormalizationPlan implements NormalizationPlan {
this.secondRegion = secondRegion; this.secondRegion = secondRegion;
} }
/**
* {@inheritDoc}
*/
@Override
public long submit(MasterServices masterServices) throws IOException {
LOG.info("Executing merging normalization plan: " + this);
// Do not use force=true as corner cases can happen, non adjacent regions,
// merge with a merged child region with no GC done yet, it is going to
// cause all different issues.
return masterServices
.mergeRegions(new RegionInfo[] { firstRegion, secondRegion }, false, HConstants.NO_NONCE,
HConstants.NO_NONCE);
}
@Override @Override
public PlanType getType() { public PlanType getType() {
return PlanType.MERGE; return PlanType.MERGE;
@ -62,20 +77,4 @@ public class MergeNormalizationPlan implements NormalizationPlan {
'}'; '}';
} }
/**
* {@inheritDoc}
*/
@Override
public void execute(Admin admin) {
LOG.info("Executing merging normalization plan: " + this);
try {
// Do not use force=true as corner cases can happen, non adjacent regions,
// merge with a merged child region with no GC done yet, it is going to
// cause all different issues.
admin.mergeRegionsAsync(firstRegion.getEncodedNameAsBytes(),
secondRegion.getEncodedNameAsBytes(), false);
} catch (IOException ex) {
LOG.error("Error during region merge: ", ex);
}
}
} }

View File

@ -18,8 +18,9 @@
*/ */
package org.apache.hadoop.hbase.master.normalizer; package org.apache.hadoop.hbase.master.normalizer;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.Admin; import java.io.IOException;
/** /**
* Interface for normalization plan. * Interface for normalization plan.
@ -33,10 +34,13 @@ public interface NormalizationPlan {
} }
/** /**
* Executes normalization plan on cluster (does actual splitting/merging work). * Submits normalization plan on cluster (does actual splitting/merging work) and
* @param admin instance of Admin * returns proc Id to caller.
* @param masterServices instance of {@link MasterServices}
* @return Proc Id for the submitted task
* @throws IOException If plan submission to Admin fails
*/ */
void execute(Admin admin); long submit(MasterServices masterServices) throws IOException;
/** /**
* @return the type of this plan * @return the type of this plan

View File

@ -18,9 +18,11 @@
*/ */
package org.apache.hadoop.hbase.master.normalizer; package org.apache.hadoop.hbase.master.normalizer;
import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -40,6 +42,14 @@ public class SplitNormalizationPlan implements NormalizationPlan {
this.splitPoint = splitPoint; this.splitPoint = splitPoint;
} }
/**
* {@inheritDoc}
*/
@Override
public long submit(MasterServices masterServices) throws IOException {
return masterServices.splitRegion(regionInfo, null, HConstants.NO_NONCE, HConstants.NO_NONCE);
}
@Override @Override
public PlanType getType() { public PlanType getType() {
return PlanType.SPLIT; return PlanType.SPLIT;
@ -69,16 +79,4 @@ public class SplitNormalizationPlan implements NormalizationPlan {
'}'; '}';
} }
/**
* {@inheritDoc}
*/
@Override
public void execute(Admin admin) {
LOG.info("Executing splitting normalization plan: " + this);
try {
admin.splitRegionAsync(regionInfo.getRegionName()).get();
} catch (Exception ex) {
LOG.error("Error during region split: ", ex);
}
}
} }