HBASE-21719 Rewrite RegionPlacementMaintainer to use AsyncClusterConnection

Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
Duo Zhang 2019-01-15 11:43:41 +08:00 committed by zhangduo
parent a5bcf7d6c7
commit 88adfa3278
1 changed files with 118 additions and 117 deletions

View File

@ -1,5 +1,4 @@
/** /**
*
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -16,9 +15,9 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hbase.master; package org.apache.hadoop.hbase.master;
import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.text.DecimalFormat; import java.text.DecimalFormat;
import java.util.ArrayList; import java.util.ArrayList;
@ -39,29 +38,30 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.favored.FavoredNodeAssignmentHelper; import org.apache.hadoop.hbase.favored.FavoredNodeAssignmentHelper;
import org.apache.hadoop.hbase.favored.FavoredNodesPlan; import org.apache.hadoop.hbase.favored.FavoredNodesPlan;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.MunkresAssignment; import org.apache.hadoop.hbase.util.MunkresAssignment;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
import org.apache.hbase.thirdparty.org.apache.commons.cli.GnuParser; import org.apache.hbase.thirdparty.org.apache.commons.cli.GnuParser;
import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter; import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
import org.apache.hbase.thirdparty.org.apache.commons.cli.Options; import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException; import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
@ -71,7 +71,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavor
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
// TODO: Remove? Unused. Partially implemented only. // TODO: Remove? Unused. Partially implemented only.
public class RegionPlacementMaintainer { public class RegionPlacementMaintainer implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(RegionPlacementMaintainer.class private static final Logger LOG = LoggerFactory.getLogger(RegionPlacementMaintainer.class
.getName()); .getName());
//The cost of a placement that should never be assigned. //The cost of a placement that should never be assigned.
@ -96,9 +96,9 @@ public class RegionPlacementMaintainer {
private final boolean enforceMinAssignmentMove; private final boolean enforceMinAssignmentMove;
private RackManager rackManager; private RackManager rackManager;
private Set<TableName> targetTableSet; private Set<TableName> targetTableSet;
private final Connection connection; private AsyncClusterConnection connection;
public RegionPlacementMaintainer(Configuration conf) { public RegionPlacementMaintainer(Configuration conf) throws IOException {
this(conf, true, true); this(conf, true, true);
} }
@ -109,11 +109,6 @@ public class RegionPlacementMaintainer {
this.enforceMinAssignmentMove = enforceMinAssignmentMove; this.enforceMinAssignmentMove = enforceMinAssignmentMove;
this.targetTableSet = new HashSet<>(); this.targetTableSet = new HashSet<>();
this.rackManager = new RackManager(conf); this.rackManager = new RackManager(conf);
try {
this.connection = ConnectionFactory.createConnection(this.conf);
} catch (IOException e) {
throw new RuntimeException(e);
}
} }
private static void printHelp(Options opt) { private static void printHelp(Options opt) {
@ -124,6 +119,14 @@ public class RegionPlacementMaintainer {
" [-fs hdfs://a.b.c.d:9000] [-hbase_root /HBASE]", opt); " [-fs hdfs://a.b.c.d:9000] [-hbase_root /HBASE]", opt);
} }
private AsyncClusterConnection getConnection() throws IOException {
if (connection == null) {
connection =
ClusterConnectionFactory.createAsyncClusterConnection(this.conf, null, User.getCurrent());
}
return connection;
}
public void setTargetTableName(String[] tableNames) { public void setTargetTableName(String[] tableNames) {
if (tableNames != null) { if (tableNames != null) {
for (String table : tableNames) for (String table : tableNames)
@ -133,10 +136,8 @@ public class RegionPlacementMaintainer {
/** /**
* @return the new RegionAssignmentSnapshot * @return the new RegionAssignmentSnapshot
* @throws IOException
*/ */
public SnapshotOfRegionAssignmentFromMeta getRegionAssignmentSnapshot() public SnapshotOfRegionAssignmentFromMeta getRegionAssignmentSnapshot() throws IOException {
throws IOException {
SnapshotOfRegionAssignmentFromMeta currentAssignmentShapshot = SnapshotOfRegionAssignmentFromMeta currentAssignmentShapshot =
new SnapshotOfRegionAssignmentFromMeta(ConnectionFactory.createConnection(conf)); new SnapshotOfRegionAssignmentFromMeta(ConnectionFactory.createConnection(conf));
currentAssignmentShapshot.initialize(); currentAssignmentShapshot.initialize();
@ -145,9 +146,6 @@ public class RegionPlacementMaintainer {
/** /**
* Verify the region placement is consistent with the assignment plan * Verify the region placement is consistent with the assignment plan
* @param isDetailMode
* @return reports
* @throws IOException
*/ */
public List<AssignmentVerificationReport> verifyRegionPlacement(boolean isDetailMode) public List<AssignmentVerificationReport> verifyRegionPlacement(boolean isDetailMode)
throws IOException { throws IOException {
@ -206,10 +204,9 @@ public class RegionPlacementMaintainer {
// Get the all the region servers // Get the all the region servers
List<ServerName> servers = new ArrayList<>(); List<ServerName> servers = new ArrayList<>();
try (Admin admin = this.connection.getAdmin()) { servers.addAll(
servers.addAll(admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)) FutureUtils.get(getConnection().getAdmin().getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)))
.getLiveServerMetrics().keySet()); .getLiveServerMetrics().keySet());
}
LOG.info("Start to generate assignment plan for " + numRegions + LOG.info("Start to generate assignment plan for " + numRegions +
" regions from table " + tableName + " with " + " regions from table " + tableName + " with " +
@ -492,6 +489,11 @@ public class RegionPlacementMaintainer {
return plan; return plan;
} }
@Override
public void close() throws IOException {
Closeables.close(connection, true);
}
/** /**
* Some algorithms for solving the assignment problem may traverse workers or * Some algorithms for solving the assignment problem may traverse workers or
* jobs in linear order which may result in skewing the assignments of the * jobs in linear order which may result in skewing the assignments of the
@ -690,19 +692,17 @@ public class RegionPlacementMaintainer {
} }
if (singleServerPlan != null) { if (singleServerPlan != null) {
// Update the current region server with its updated favored nodes // Update the current region server with its updated favored nodes
BlockingInterface currentRegionServer = AsyncRegionServerAdmin rsAdmin = getConnection().getRegionServerAdmin(entry.getKey());
((ClusterConnection)this.connection).getAdmin(entry.getKey());
UpdateFavoredNodesRequest request = UpdateFavoredNodesRequest request =
RequestConverter.buildUpdateFavoredNodesRequest(regionUpdateInfos); RequestConverter.buildUpdateFavoredNodesRequest(regionUpdateInfos);
UpdateFavoredNodesResponse updateFavoredNodesResponse = UpdateFavoredNodesResponse updateFavoredNodesResponse =
currentRegionServer.updateFavoredNodes(null, request); FutureUtils.get(rsAdmin.updateFavoredNodes(request));
LOG.info("Region server " + LOG.info("Region server " +
ProtobufUtil.getServerInfo(null, currentRegionServer).getServerName() + FutureUtils.get(rsAdmin.getServerInfo(RequestConverter.buildGetServerInfoRequest()))
.getServerInfo() +
" has updated " + updateFavoredNodesResponse.getResponse() + " / " + " has updated " + updateFavoredNodesResponse.getResponse() + " / " +
singleServerPlan.getAssignmentMap().size() + singleServerPlan.getAssignmentMap().size() + " regions with the assignment plan");
" regions with the assignment plan"); succeededNum++;
succeededNum ++;
} }
} catch (Exception e) { } catch (Exception e) {
failedUpdateMap.put(entry.getKey(), e); failedUpdateMap.put(entry.getKey(), e);
@ -719,7 +719,7 @@ public class RegionPlacementMaintainer {
" region servers with its corresponding favored nodes"); " region servers with its corresponding favored nodes");
for (Map.Entry<ServerName, Exception> entry : for (Map.Entry<ServerName, Exception> entry :
failedUpdateMap.entrySet() ) { failedUpdateMap.entrySet() ) {
LOG.error("Failed to update " + entry.getKey().getHostAndPort() + LOG.error("Failed to update " + entry.getKey().getAddress() +
" because of " + entry.getValue().getMessage()); " because of " + entry.getValue().getMessage());
} }
} }
@ -1019,8 +1019,8 @@ public class RegionPlacementMaintainer {
} }
// Create the region placement obj // Create the region placement obj
RegionPlacementMaintainer rp = new RegionPlacementMaintainer(conf, enforceLocality, try (RegionPlacementMaintainer rp =
enforceMinAssignmentMove); new RegionPlacementMaintainer(conf, enforceLocality, enforceMinAssignmentMove)) {
if (cmd.hasOption("d") || cmd.hasOption("verification-details")) { if (cmd.hasOption("d") || cmd.hasOption("verification-details")) {
verificationDetails = true; verificationDetails = true;
@ -1060,8 +1060,8 @@ public class RegionPlacementMaintainer {
rp.updateAssignmentPlan(plan); rp.updateAssignmentPlan(plan);
} else if (cmd.hasOption("diff")) { } else if (cmd.hasOption("diff")) {
FavoredNodesPlan newPlan = rp.getNewAssignmentPlan(); FavoredNodesPlan newPlan = rp.getNewAssignmentPlan();
Map<String, Map<String, Float>> locality = FSUtils Map<String, Map<String, Float>> locality =
.getRegionDegreeLocalityMappingFromFS(conf); FSUtils.getRegionDegreeLocalityMappingFromFS(conf);
Map<TableName, Integer> movesPerTable = rp.getRegionsMovement(newPlan); Map<TableName, Integer> movesPerTable = rp.getRegionsMovement(newPlan);
rp.checkDifferencesWithOldPlan(movesPerTable, locality, newPlan); rp.checkDifferencesWithOldPlan(movesPerTable, locality, newPlan);
System.out.println("Do you want to update the assignment plan? [y/n]"); System.out.println("Do you want to update the assignment plan? [y/n]");
@ -1073,8 +1073,8 @@ public class RegionPlacementMaintainer {
} }
s.close(); s.close();
} else if (cmd.hasOption("ld")) { } else if (cmd.hasOption("ld")) {
Map<String, Map<String, Float>> locality = FSUtils Map<String, Map<String, Float>> locality =
.getRegionDegreeLocalityMappingFromFS(conf); FSUtils.getRegionDegreeLocalityMappingFromFS(conf);
rp.printLocalityAndDispersionForCurrentPlan(locality); rp.printLocalityAndDispersionForCurrentPlan(locality);
} else if (cmd.hasOption("p") || cmd.hasOption("print")) { } else if (cmd.hasOption("p") || cmd.hasOption("print")) {
FavoredNodesPlan plan = rp.getRegionAssignmentSnapshot().getExistingAssignmentPlan(); FavoredNodesPlan plan = rp.getRegionAssignmentSnapshot().getExistingAssignmentPlan();
@ -1107,6 +1107,7 @@ public class RegionPlacementMaintainer {
} else { } else {
printHelp(opt); printHelp(opt);
} }
}
} catch (ParseException e) { } catch (ParseException e) {
printHelp(opt); printHelp(opt);
} }