From 88adfa327845e8703aec96a87b4210b7216cd8eb Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Tue, 15 Jan 2019 11:43:41 +0800 Subject: [PATCH] HBASE-21719 Rewrite RegionPlacementMaintainer to use AsyncClusterConnection Signed-off-by: Michael Stack --- .../master/RegionPlacementMaintainer.java | 235 +++++++++--------- 1 file changed, 118 insertions(+), 117 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlacementMaintainer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlacementMaintainer.java index faf5e4a2686..fda0a9cf302 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlacementMaintainer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlacementMaintainer.java @@ -1,5 +1,4 @@ /** - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -16,9 +15,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.master; +import java.io.Closeable; import java.io.IOException; import java.text.DecimalFormat; import java.util.ArrayList; @@ -39,29 +38,30 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.ClusterConnection; -import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.AsyncClusterConnection; +import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin; +import org.apache.hadoop.hbase.client.ClusterConnectionFactory; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.favored.FavoredNodeAssignmentHelper; import org.apache.hadoop.hbase.favored.FavoredNodesPlan; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.util.MunkresAssignment; import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.io.Closeables; import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; import org.apache.hbase.thirdparty.org.apache.commons.cli.GnuParser; import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter; import org.apache.hbase.thirdparty.org.apache.commons.cli.Options; import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse; @@ -71,7 +71,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavor */ @InterfaceAudience.Private // TODO: Remove? Unused. Partially implemented only. -public class RegionPlacementMaintainer { +public class RegionPlacementMaintainer implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(RegionPlacementMaintainer.class .getName()); //The cost of a placement that should never be assigned. @@ -96,9 +96,9 @@ public class RegionPlacementMaintainer { private final boolean enforceMinAssignmentMove; private RackManager rackManager; private Set targetTableSet; - private final Connection connection; + private AsyncClusterConnection connection; - public RegionPlacementMaintainer(Configuration conf) { + public RegionPlacementMaintainer(Configuration conf) throws IOException { this(conf, true, true); } @@ -109,11 +109,6 @@ public class RegionPlacementMaintainer { this.enforceMinAssignmentMove = enforceMinAssignmentMove; this.targetTableSet = new HashSet<>(); this.rackManager = new RackManager(conf); - try { - this.connection = ConnectionFactory.createConnection(this.conf); - } catch (IOException e) { - throw new RuntimeException(e); - } } private static void printHelp(Options opt) { @@ -124,6 +119,14 @@ public class RegionPlacementMaintainer { " [-fs hdfs://a.b.c.d:9000] [-hbase_root /HBASE]", opt); } + private AsyncClusterConnection getConnection() throws IOException { + if (connection == null) { + connection = + ClusterConnectionFactory.createAsyncClusterConnection(this.conf, null, User.getCurrent()); + } + return connection; + } + public void setTargetTableName(String[] tableNames) { if (tableNames != null) { for (String table : tableNames) @@ -133,10 +136,8 @@ public class RegionPlacementMaintainer { /** * @return the new RegionAssignmentSnapshot - * @throws IOException */ - public SnapshotOfRegionAssignmentFromMeta getRegionAssignmentSnapshot() - throws IOException { + public SnapshotOfRegionAssignmentFromMeta getRegionAssignmentSnapshot() throws IOException { SnapshotOfRegionAssignmentFromMeta currentAssignmentShapshot = new SnapshotOfRegionAssignmentFromMeta(ConnectionFactory.createConnection(conf)); currentAssignmentShapshot.initialize(); @@ -145,9 +146,6 @@ public class RegionPlacementMaintainer { /** * Verify the region placement is consistent with the assignment plan - * @param isDetailMode - * @return reports - * @throws IOException */ public List verifyRegionPlacement(boolean isDetailMode) throws IOException { @@ -206,10 +204,9 @@ public class RegionPlacementMaintainer { // Get the all the region servers List servers = new ArrayList<>(); - try (Admin admin = this.connection.getAdmin()) { - servers.addAll(admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)) + servers.addAll( + FutureUtils.get(getConnection().getAdmin().getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS))) .getLiveServerMetrics().keySet()); - } LOG.info("Start to generate assignment plan for " + numRegions + " regions from table " + tableName + " with " + @@ -492,6 +489,11 @@ public class RegionPlacementMaintainer { return plan; } + @Override + public void close() throws IOException { + Closeables.close(connection, true); + } + /** * Some algorithms for solving the assignment problem may traverse workers or * jobs in linear order which may result in skewing the assignments of the @@ -690,19 +692,17 @@ public class RegionPlacementMaintainer { } if (singleServerPlan != null) { // Update the current region server with its updated favored nodes - BlockingInterface currentRegionServer = - ((ClusterConnection)this.connection).getAdmin(entry.getKey()); + AsyncRegionServerAdmin rsAdmin = getConnection().getRegionServerAdmin(entry.getKey()); UpdateFavoredNodesRequest request = - RequestConverter.buildUpdateFavoredNodesRequest(regionUpdateInfos); - + RequestConverter.buildUpdateFavoredNodesRequest(regionUpdateInfos); UpdateFavoredNodesResponse updateFavoredNodesResponse = - currentRegionServer.updateFavoredNodes(null, request); + FutureUtils.get(rsAdmin.updateFavoredNodes(request)); LOG.info("Region server " + - ProtobufUtil.getServerInfo(null, currentRegionServer).getServerName() + - " has updated " + updateFavoredNodesResponse.getResponse() + " / " + - singleServerPlan.getAssignmentMap().size() + - " regions with the assignment plan"); - succeededNum ++; + FutureUtils.get(rsAdmin.getServerInfo(RequestConverter.buildGetServerInfoRequest())) + .getServerInfo() + + " has updated " + updateFavoredNodesResponse.getResponse() + " / " + + singleServerPlan.getAssignmentMap().size() + " regions with the assignment plan"); + succeededNum++; } } catch (Exception e) { failedUpdateMap.put(entry.getKey(), e); @@ -719,7 +719,7 @@ public class RegionPlacementMaintainer { " region servers with its corresponding favored nodes"); for (Map.Entry entry : failedUpdateMap.entrySet() ) { - LOG.error("Failed to update " + entry.getKey().getHostAndPort() + + LOG.error("Failed to update " + entry.getKey().getAddress() + " because of " + entry.getValue().getMessage()); } } @@ -1019,93 +1019,94 @@ public class RegionPlacementMaintainer { } // Create the region placement obj - RegionPlacementMaintainer rp = new RegionPlacementMaintainer(conf, enforceLocality, - enforceMinAssignmentMove); + try (RegionPlacementMaintainer rp = + new RegionPlacementMaintainer(conf, enforceLocality, enforceMinAssignmentMove)) { - if (cmd.hasOption("d") || cmd.hasOption("verification-details")) { - verificationDetails = true; - } - - if (cmd.hasOption("tables")) { - String tableNameListStr = cmd.getOptionValue("tables"); - String[] tableNames = StringUtils.split(tableNameListStr, ","); - rp.setTargetTableName(tableNames); - } - - if (cmd.hasOption("munkres")) { - USE_MUNKRES_FOR_PLACING_SECONDARY_AND_TERTIARY = true; - } - - // Read all the modes - if (cmd.hasOption("v") || cmd.hasOption("verify")) { - // Verify the region placement. - rp.verifyRegionPlacement(verificationDetails); - } else if (cmd.hasOption("n") || cmd.hasOption("dry-run")) { - // Generate the assignment plan only without updating the hbase:meta and RS - FavoredNodesPlan plan = rp.getNewAssignmentPlan(); - printAssignmentPlan(plan); - } else if (cmd.hasOption("w") || cmd.hasOption("write")) { - // Generate the new assignment plan - FavoredNodesPlan plan = rp.getNewAssignmentPlan(); - // Print the new assignment plan - printAssignmentPlan(plan); - // Write the new assignment plan to META - rp.updateAssignmentPlanToMeta(plan); - } else if (cmd.hasOption("u") || cmd.hasOption("update")) { - // Generate the new assignment plan - FavoredNodesPlan plan = rp.getNewAssignmentPlan(); - // Print the new assignment plan - printAssignmentPlan(plan); - // Update the assignment to hbase:meta and Region Servers - rp.updateAssignmentPlan(plan); - } else if (cmd.hasOption("diff")) { - FavoredNodesPlan newPlan = rp.getNewAssignmentPlan(); - Map> locality = FSUtils - .getRegionDegreeLocalityMappingFromFS(conf); - Map movesPerTable = rp.getRegionsMovement(newPlan); - rp.checkDifferencesWithOldPlan(movesPerTable, locality, newPlan); - System.out.println("Do you want to update the assignment plan? [y/n]"); - Scanner s = new Scanner(System.in); - String input = s.nextLine().trim(); - if (input.equals("y")) { - System.out.println("Updating assignment plan..."); - rp.updateAssignmentPlan(newPlan); - } - s.close(); - } else if (cmd.hasOption("ld")) { - Map> locality = FSUtils - .getRegionDegreeLocalityMappingFromFS(conf); - rp.printLocalityAndDispersionForCurrentPlan(locality); - } else if (cmd.hasOption("p") || cmd.hasOption("print")) { - FavoredNodesPlan plan = rp.getRegionAssignmentSnapshot().getExistingAssignmentPlan(); - printAssignmentPlan(plan); - } else if (cmd.hasOption("overwrite")) { - if (!cmd.hasOption("f") || !cmd.hasOption("r")) { - throw new IllegalArgumentException("Please specify: " + - " -update -r regionName -f server1:port,server2:port,server3:port"); + if (cmd.hasOption("d") || cmd.hasOption("verification-details")) { + verificationDetails = true; } - String regionName = cmd.getOptionValue("r"); - String favoredNodesStr = cmd.getOptionValue("f"); - LOG.info("Going to update the region " + regionName + " with the new favored nodes " + - favoredNodesStr); - List favoredNodes = null; - RegionInfo regionInfo = - rp.getRegionAssignmentSnapshot().getRegionNameToRegionInfoMap().get(regionName); - if (regionInfo == null) { - LOG.error("Cannot find the region " + regionName + " from the META"); - } else { - try { - favoredNodes = getFavoredNodeList(favoredNodesStr); - } catch (IllegalArgumentException e) { - LOG.error("Cannot parse the invalid favored nodes because " + e); + if (cmd.hasOption("tables")) { + String tableNameListStr = cmd.getOptionValue("tables"); + String[] tableNames = StringUtils.split(tableNameListStr, ","); + rp.setTargetTableName(tableNames); + } + + if (cmd.hasOption("munkres")) { + USE_MUNKRES_FOR_PLACING_SECONDARY_AND_TERTIARY = true; + } + + // Read all the modes + if (cmd.hasOption("v") || cmd.hasOption("verify")) { + // Verify the region placement. + rp.verifyRegionPlacement(verificationDetails); + } else if (cmd.hasOption("n") || cmd.hasOption("dry-run")) { + // Generate the assignment plan only without updating the hbase:meta and RS + FavoredNodesPlan plan = rp.getNewAssignmentPlan(); + printAssignmentPlan(plan); + } else if (cmd.hasOption("w") || cmd.hasOption("write")) { + // Generate the new assignment plan + FavoredNodesPlan plan = rp.getNewAssignmentPlan(); + // Print the new assignment plan + printAssignmentPlan(plan); + // Write the new assignment plan to META + rp.updateAssignmentPlanToMeta(plan); + } else if (cmd.hasOption("u") || cmd.hasOption("update")) { + // Generate the new assignment plan + FavoredNodesPlan plan = rp.getNewAssignmentPlan(); + // Print the new assignment plan + printAssignmentPlan(plan); + // Update the assignment to hbase:meta and Region Servers + rp.updateAssignmentPlan(plan); + } else if (cmd.hasOption("diff")) { + FavoredNodesPlan newPlan = rp.getNewAssignmentPlan(); + Map> locality = + FSUtils.getRegionDegreeLocalityMappingFromFS(conf); + Map movesPerTable = rp.getRegionsMovement(newPlan); + rp.checkDifferencesWithOldPlan(movesPerTable, locality, newPlan); + System.out.println("Do you want to update the assignment plan? [y/n]"); + Scanner s = new Scanner(System.in); + String input = s.nextLine().trim(); + if (input.equals("y")) { + System.out.println("Updating assignment plan..."); + rp.updateAssignmentPlan(newPlan); } - FavoredNodesPlan newPlan = new FavoredNodesPlan(); - newPlan.updateFavoredNodesMap(regionInfo, favoredNodes); - rp.updateAssignmentPlan(newPlan); + s.close(); + } else if (cmd.hasOption("ld")) { + Map> locality = + FSUtils.getRegionDegreeLocalityMappingFromFS(conf); + rp.printLocalityAndDispersionForCurrentPlan(locality); + } else if (cmd.hasOption("p") || cmd.hasOption("print")) { + FavoredNodesPlan plan = rp.getRegionAssignmentSnapshot().getExistingAssignmentPlan(); + printAssignmentPlan(plan); + } else if (cmd.hasOption("overwrite")) { + if (!cmd.hasOption("f") || !cmd.hasOption("r")) { + throw new IllegalArgumentException("Please specify: " + + " -update -r regionName -f server1:port,server2:port,server3:port"); + } + + String regionName = cmd.getOptionValue("r"); + String favoredNodesStr = cmd.getOptionValue("f"); + LOG.info("Going to update the region " + regionName + " with the new favored nodes " + + favoredNodesStr); + List favoredNodes = null; + RegionInfo regionInfo = + rp.getRegionAssignmentSnapshot().getRegionNameToRegionInfoMap().get(regionName); + if (regionInfo == null) { + LOG.error("Cannot find the region " + regionName + " from the META"); + } else { + try { + favoredNodes = getFavoredNodeList(favoredNodesStr); + } catch (IllegalArgumentException e) { + LOG.error("Cannot parse the invalid favored nodes because " + e); + } + FavoredNodesPlan newPlan = new FavoredNodesPlan(); + newPlan.updateFavoredNodesMap(regionInfo, favoredNodes); + rp.updateAssignmentPlan(newPlan); + } + } else { + printHelp(opt); } - } else { - printHelp(opt); } } catch (ParseException e) { printHelp(opt);