From 936bb82908fd0525273222cd50d23c91d11d8f72 Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Tue, 10 Sep 2019 11:14:17 +0800 Subject: [PATCH] Revert "HBASE-22695 Store the rsgroup of a table in table configuration (#426)" This reverts commit eab7d3d502be9a2ac58852c6743f3c7536bbf0c3. --- .../apache/hadoop/hbase/HTableDescriptor.java | 6 - .../hadoop/hbase/client/TableDescriptor.java | 8 - .../hbase/client/TableDescriptorBuilder.java | 19 -- .../hadoop/hbase/rsgroup/RSGroupInfo.java | 42 +-- .../apache/hadoop/hbase/master/HMaster.java | 4 +- .../hadoop/hbase/master/LoadBalancer.java | 49 ++- .../master/assignment/AssignmentManager.java | 6 +- .../hadoop/hbase/rsgroup/RSGroupAdmin.java | 23 ++ .../hbase/rsgroup/RSGroupAdminClient.java | 13 +- .../hbase/rsgroup/RSGroupAdminEndpoint.java | 182 +++++----- .../hbase/rsgroup/RSGroupAdminServer.java | 311 +++++++++++++----- .../rsgroup/RSGroupAdminServiceImpl.java | 111 ++----- .../rsgroup/RSGroupBasedLoadBalancer.java | 97 +++--- .../hbase/rsgroup/RSGroupInfoManager.java | 23 ++ .../hbase/rsgroup/RSGroupInfoManagerImpl.java | 115 ++++++- .../hadoop/hbase/rsgroup/RSGroupUtil.java | 113 ------- .../hbase/master/TestRegionPlacement2.java | 6 +- .../balancer/RSGroupableBalancerTestBase.java | 84 +++-- .../TestRSGroupBasedLoadBalancer.java | 42 +-- ...rWithStochasticLoadBalancerAsInternal.java | 4 +- .../hbase/rsgroup/TestRSGroupsAdmin1.java | 1 + .../hbase/rsgroup/TestRSGroupsAdmin2.java | 104 +++++- .../hbase/rsgroup/TestRSGroupsBalance.java | 20 +- .../hbase/rsgroup/TestRSGroupsBase.java | 8 +- .../rsgroup/TestRSGroupsOfflineMode.java | 6 +- .../rsgroup/VerifyingRSGroupAdminClient.java | 67 ++-- 26 files changed, 840 insertions(+), 624 deletions(-) delete mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupUtil.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java index 188bed64d59..8866eba94fc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java @@ -23,7 +23,6 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -988,9 +987,4 @@ public class HTableDescriptor implements TableDescriptor, Comparable getRegionServerGroup() { - return delegatee.getRegionServerGroup(); - } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java index a4523872c9c..fc5e69e88c4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java @@ -23,7 +23,6 @@ import java.util.Collection; import java.util.Comparator; import java.util.Iterator; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.stream.Stream; import org.apache.hadoop.hbase.HConstants; @@ -184,13 +183,6 @@ public interface TableDescriptor { @Deprecated String getOwnerString(); - /** - * Get the region server group this table belongs to. The regions of this table will be placed - * only on the region servers within this group. If not present, will be placed on - * {@link org.apache.hadoop.hbase.rsgroup.RSGroupInfo#DEFAULT_GROUP}. - */ - Optional getRegionServerGroup(); - /** * Getter for accessing the metadata associated with the key. * diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java index 09ee0c53557..037a7f860cb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java @@ -39,7 +39,6 @@ import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.exceptions.DeserializationException; -import org.apache.hadoop.hbase.rsgroup.RSGroupInfo; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.yetus.audience.InterfaceAudience; @@ -189,9 +188,6 @@ public class TableDescriptorBuilder { private static final Bytes PRIORITY_KEY = new Bytes(Bytes.toBytes(PRIORITY)); - private static final Bytes RSGROUP_KEY = - new Bytes(Bytes.toBytes(RSGroupInfo.TABLE_DESC_PROP_GROUP)); - /** * Relative priority of the table used for rpc scheduling */ @@ -541,11 +537,6 @@ public class TableDescriptorBuilder { return this; } - public TableDescriptorBuilder setRegionServerGroup(String group) { - desc.setValue(RSGROUP_KEY, new Bytes(Bytes.toBytes(group))); - return this; - } - public TableDescriptor build() { return new ModifyableTableDescriptor(desc); } @@ -1586,16 +1577,6 @@ public class TableDescriptorBuilder { public int getColumnFamilyCount() { return families.size(); } - - @Override - public Optional getRegionServerGroup() { - Bytes value = values.get(RSGROUP_KEY); - if (value != null) { - return Optional.of(Bytes.toString(value.get(), value.getOffset(), value.getLength())); - } else { - return Optional.empty(); - } - } } private static Optional toCoprocessorDescriptor(String spec) { diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfo.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfo.java index ad55d1f2a46..25e827de052 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfo.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfo.java @@ -34,38 +34,21 @@ import org.apache.yetus.audience.InterfaceAudience; public class RSGroupInfo { public static final String DEFAULT_GROUP = "default"; public static final String NAMESPACE_DESC_PROP_GROUP = "hbase.rsgroup.name"; - public static final String TABLE_DESC_PROP_GROUP = "hbase.rsgroup.name"; private final String name; // Keep servers in a sorted set so has an expected ordering when displayed. private final SortedSet
servers; // Keep tables sorted too. - /** - * @deprecated Since 3.0.0, will be removed in 4.0.0. The rsgroup information will be stored in - * the configuration of a table so this will be removed. - */ - @Deprecated private final SortedSet tables; public RSGroupInfo(String name) { this(name, new TreeSet
(), new TreeSet()); } - RSGroupInfo(String name, SortedSet
servers) { - this.name = name; - this.servers = servers == null ? new TreeSet<>() : new TreeSet<>(servers); - this.tables = new TreeSet<>(); - } - - /** - * @deprecated Since 3.0.0, will be removed in 4.0.0. The rsgroup information for a table will be - * stored in the configuration of a table so this will be removed. - */ - @Deprecated RSGroupInfo(String name, SortedSet
servers, SortedSet tables) { this.name = name; this.servers = (servers == null) ? new TreeSet<>() : new TreeSet<>(servers); - this.tables = (tables == null) ? new TreeSet<>() : new TreeSet<>(tables); + this.tables = (tables == null) ? new TreeSet<>() : new TreeSet<>(tables); } public RSGroupInfo(RSGroupInfo src) { @@ -117,46 +100,23 @@ public class RSGroupInfo { /** * Get set of tables that are members of the group. - * @deprecated Since 3.0.0, will be removed in 4.0.0. The rsgroup information will be stored in - * the configuration of a table so this will be removed. */ - @Deprecated public SortedSet getTables() { return tables; } - /** - * @deprecated Since 3.0.0, will be removed in 4.0.0. The rsgroup information will be stored in - * the configuration of a table so this will be removed. - */ - @Deprecated public void addTable(TableName table) { tables.add(table); } - /** - * @deprecated Since 3.0.0, will be removed in 4.0.0. The rsgroup information will be stored in - * the configuration of a table so this will be removed. - */ - @Deprecated public void addAllTables(Collection arg) { tables.addAll(arg); } - /** - * @deprecated Since 3.0.0, will be removed in 4.0.0. The rsgroup information will be stored in - * the configuration of a table so this will be removed. - */ - @Deprecated public boolean containsTable(TableName table) { return tables.contains(table); } - /** - * @deprecated Since 3.0.0, will be removed in 4.0.0. The rsgroup information will be stored in - * the configuration of a table so this will be removed. - */ - @Deprecated public boolean removeTable(TableName table) { return tables.remove(table); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 79ed2607e07..bb2aadbf74d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -1963,7 +1963,7 @@ public class HMaster extends HRegionServer implements MasterServices { // Replace with an async implementation from which you can get // a success/failure result. @VisibleForTesting - public void move(final byte[] encodedRegionName, byte[] destServerName) throws IOException { + public void move(final byte[] encodedRegionName, byte[] destServerName) throws HBaseIOException { RegionState regionState = assignmentManager.getRegionStates(). getRegionState(Bytes.toString(encodedRegionName)); @@ -3557,7 +3557,7 @@ public class HMaster extends HRegionServer implements MasterServices { * @param servers Region servers to decommission. */ public void decommissionRegionServers(final List servers, final boolean offload) - throws IOException { + throws HBaseIOException { List serversAdded = new ArrayList<>(servers.size()); // Place the decommission marker first. String parentZnode = getZooKeeper().getZNodePaths().drainingZNode; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java index 0fc544a6aec..816636f8ae0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java @@ -19,12 +19,12 @@ package org.apache.hadoop.hbase.master; import edu.umd.cs.findbugs.annotations.Nullable; -import java.io.IOException; import java.util.List; import java.util.Map; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ClusterMetrics; +import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.TableName; @@ -65,72 +65,95 @@ public interface LoadBalancer extends Configurable, Stoppable, ConfigurationObse ServerName BOGUS_SERVER_NAME = ServerName.valueOf("localhost,1,1"); /** - * Set the current cluster status. This allows a LoadBalancer to map host name to a server + * Set the current cluster status. This allows a LoadBalancer to map host name to a server + * @param st */ void setClusterMetrics(ClusterMetrics st); /** * Pass RegionStates and allow balancer to set the current cluster load. + * @param ClusterLoad */ void setClusterLoad(Map>> ClusterLoad); /** * Set the master service. + * @param masterServices */ void setMasterServices(MasterServices masterServices); /** * Perform the major balance operation + * @param tableName + * @param clusterState * @return List of plans */ - List balanceCluster(TableName tableName, - Map> clusterState) throws IOException; + List balanceCluster(TableName tableName, Map> clusterState) throws HBaseIOException; /** * Perform the major balance operation + * @param clusterState * @return List of plans */ - List balanceCluster(Map> clusterState) - throws IOException; + List balanceCluster(Map> clusterState) throws HBaseIOException; /** * Perform a Round Robin assignment of regions. + * @param regions + * @param servers * @return Map of servername to regioninfos */ - Map> roundRobinAssignment(List regions, - List servers) throws IOException; + Map> roundRobinAssignment( + List regions, + List servers + ) throws HBaseIOException; /** * Assign regions to the previously hosting region server + * @param regions + * @param servers * @return List of plans */ @Nullable - Map> retainAssignment(Map regions, - List servers) throws IOException; + Map> retainAssignment( + Map regions, + List servers + ) throws HBaseIOException; /** * Get a random region server from the list * @param regionInfo Region for which this selection is being done. + * @param servers + * @return Servername */ - ServerName randomAssignment(RegionInfo regionInfo, List servers) throws IOException; + ServerName randomAssignment( + RegionInfo regionInfo, List servers + ) throws HBaseIOException; /** * Initialize the load balancer. Must be called after setters. + * @throws HBaseIOException */ - void initialize() throws IOException; + void initialize() throws HBaseIOException; /** * Marks the region as online at balancer. + * @param regionInfo + * @param sn */ void regionOnline(RegionInfo regionInfo, ServerName sn); /** * Marks the region as offline at balancer. + * @param regionInfo */ void regionOffline(RegionInfo regionInfo); - /** + /* * Notification that config has changed + * @param conf */ @Override void onConfigurationChange(Configuration conf); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java index 24ad0d9098e..a231facfb79 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java @@ -683,7 +683,7 @@ public class AssignmentManager { this.master.getServerManager().createDestinationServersList(serversToExclude)); // Return mid-method! return createAssignProcedures(assignments); - } catch (IOException hioe) { + } catch (HBaseIOException hioe) { LOG.warn("Failed roundRobinAssignment", hioe); } // If an error above, fall-through to this simpler assign. Last resort. @@ -1986,7 +1986,7 @@ public class AssignmentManager { } try { acceptPlan(regions, balancer.retainAssignment(retainMap, servers)); - } catch (IOException e) { + } catch (HBaseIOException e) { LOG.warn("unable to retain assignment", e); addToPendingAssignment(regions, retainMap.keySet()); } @@ -2001,7 +2001,7 @@ public class AssignmentManager { } try { acceptPlan(regions, balancer.roundRobinAssignment(hris, servers)); - } catch (IOException e) { + } catch (HBaseIOException e) { LOG.warn("unable to round-robin assignment", e); addToPendingAssignment(regions, hris); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdmin.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdmin.java index 344d0b38536..9ea996be1cc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdmin.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdmin.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.rsgroup; import java.io.IOException; import java.util.List; import java.util.Set; + +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.net.Address; import org.apache.yetus.audience.InterfaceAudience; @@ -33,11 +35,22 @@ public interface RSGroupAdmin { */ RSGroupInfo getRSGroupInfo(String groupName) throws IOException; + /** + * Gets {@code RSGroupInfo} for the given table's group. + */ + RSGroupInfo getRSGroupInfoOfTable(TableName tableName) throws IOException; + /** * Move given set of servers to the specified target RegionServer group. */ void moveServers(Set
servers, String targetGroup) throws IOException; + /** + * Move given set of tables to the specified target RegionServer group. + * This will unassign all of a table's region so it can be reassigned to the correct group. + */ + void moveTables(Set tables, String targetGroup) throws IOException; + /** * Creates a new RegionServer group with the given name. */ @@ -66,6 +79,16 @@ public interface RSGroupAdmin { */ RSGroupInfo getRSGroupOfServer(Address hostPort) throws IOException; + /** + * Move given set of servers and tables to the specified target RegionServer group. + * @param servers set of servers to move + * @param tables set of tables to move + * @param targetGroup the target group name + * @throws IOException if moving the server and tables fail + */ + void moveServersAndTables(Set
servers, Set tables, + String targetGroup) throws IOException; + /** * Remove decommissioned servers from rsgroup. * 1. Sometimes we may find the server aborted due to some hardware failure and we must offline diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminClient.java index 07f0efdadbb..e7ab7f23e80 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminClient.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminClient.java @@ -47,7 +47,6 @@ import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.RemoveServe import org.apache.hadoop.hbase.protobuf.generated.RSGroupProtos; import org.apache.yetus.audience.InterfaceAudience; -import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.collect.Sets; /** @@ -63,17 +62,12 @@ public class RSGroupAdminClient implements RSGroupAdmin { stub = RSGroupAdminService.newBlockingStub(admin.coprocessorService()); } - // for writing UTs - @VisibleForTesting - protected RSGroupAdminClient() { - } - @Override public RSGroupInfo getRSGroupInfo(String groupName) throws IOException { try { GetRSGroupInfoResponse resp = stub.getRSGroupInfo(null, - GetRSGroupInfoRequest.newBuilder().setRSGroupName(groupName).build()); - if (resp.hasRSGroupInfo()) { + GetRSGroupInfoRequest.newBuilder().setRSGroupName(groupName).build()); + if(resp.hasRSGroupInfo()) { return ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()); } return null; @@ -82,6 +76,7 @@ public class RSGroupAdminClient implements RSGroupAdmin { } } + @Override public RSGroupInfo getRSGroupInfoOfTable(TableName tableName) throws IOException { GetRSGroupInfoOfTableRequest request = GetRSGroupInfoOfTableRequest.newBuilder().setTableName( ProtobufUtil.toProtoTableName(tableName)).build(); @@ -116,6 +111,7 @@ public class RSGroupAdminClient implements RSGroupAdmin { } } + @Override public void moveTables(Set tables, String targetGroup) throws IOException { MoveTablesRequest.Builder builder = MoveTablesRequest.newBuilder().setTargetGroup(targetGroup); for(TableName tableName: tables) { @@ -196,6 +192,7 @@ public class RSGroupAdminClient implements RSGroupAdmin { } } + @Override public void moveServersAndTables(Set
servers, Set tables, String targetGroup) throws IOException { MoveServersAndTablesRequest.Builder builder = diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java index 3c4530f0da9..2d5af04c389 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java @@ -27,10 +27,13 @@ import java.util.stream.Collectors; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.PleaseHoldException; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.SnapshotDescription; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.constraint.ConstraintException; import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor; @@ -44,16 +47,21 @@ import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.access.AccessChecker; import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hbase.thirdparty.com.google.common.collect.Sets; // TODO: Encapsulate MasterObserver functions into separate subclass. @CoreCoprocessor @InterfaceAudience.Private public class RSGroupAdminEndpoint implements MasterCoprocessor, MasterObserver { + static final Logger LOG = LoggerFactory.getLogger(RSGroupAdminEndpoint.class); + + private MasterServices master; // Only instance of RSGroupInfoManager. RSGroup aware load balancers ask for this instance on // their setup. - private MasterServices master; private RSGroupInfoManager groupInfoManager; private RSGroupAdminServer groupAdminServer; private RSGroupAdminServiceImpl groupAdminService = new RSGroupAdminServiceImpl(); @@ -102,10 +110,109 @@ public class RSGroupAdminEndpoint implements MasterCoprocessor, MasterObserver { return groupAdminService; } + private void assignTableToGroup(TableDescriptor desc) throws IOException { + String groupName = + master.getClusterSchema().getNamespace(desc.getTableName().getNamespaceAsString()) + .getConfigurationValue(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP); + if (groupName == null) { + groupName = RSGroupInfo.DEFAULT_GROUP; + } + RSGroupInfo rsGroupInfo = groupAdminServer.getRSGroupInfo(groupName); + if (rsGroupInfo == null) { + throw new ConstraintException( + "Default RSGroup (" + groupName + ") for this table's namespace does not exist."); + } + if (!rsGroupInfo.containsTable(desc.getTableName())) { + LOG.debug("Pre-moving table " + desc.getTableName() + " to RSGroup " + groupName); + groupAdminServer.moveTables(Sets.newHashSet(desc.getTableName()), groupName); + } + } + ///////////////////////////////////////////////////////////////////////////// // MasterObserver overrides ///////////////////////////////////////////////////////////////////////////// + private boolean rsgroupHasServersOnline(TableDescriptor desc) throws IOException { + String groupName; + try { + groupName = master.getClusterSchema().getNamespace(desc.getTableName().getNamespaceAsString()) + .getConfigurationValue(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP); + if (groupName == null) { + groupName = RSGroupInfo.DEFAULT_GROUP; + } + } catch (MasterNotRunningException | PleaseHoldException e) { + LOG.info("Master has not initialized yet; temporarily using default RSGroup '" + + RSGroupInfo.DEFAULT_GROUP + "' for deploy of system table"); + groupName = RSGroupInfo.DEFAULT_GROUP; + } + + RSGroupInfo rsGroupInfo = groupAdminServer.getRSGroupInfo(groupName); + if (rsGroupInfo == null) { + throw new ConstraintException( + "Default RSGroup (" + groupName + ") for this table's " + "namespace does not exist."); + } + + for (ServerName onlineServer : master.getServerManager().createDestinationServersList()) { + if (rsGroupInfo.getServers().contains(onlineServer.getAddress())) { + return true; + } + } + return false; + } + + @Override + public void preCreateTableAction(final ObserverContext ctx, + final TableDescriptor desc, final RegionInfo[] regions) throws IOException { + if (!desc.getTableName().isSystemTable() && !rsgroupHasServersOnline(desc)) { + throw new HBaseIOException("No online servers in the rsgroup, which table " + + desc.getTableName().getNameAsString() + " belongs to"); + } + } + + // Assign table to default RSGroup. + @Override + public void postCreateTable(ObserverContext ctx, + TableDescriptor desc, RegionInfo[] regions) throws IOException { + assignTableToGroup(desc); + } + + // Remove table from its RSGroup. + @Override + public void postDeleteTable(ObserverContext ctx, + TableName tableName) throws IOException { + try { + RSGroupInfo group = groupAdminServer.getRSGroupInfoOfTable(tableName); + if (group != null) { + LOG.debug(String.format("Removing deleted table '%s' from rsgroup '%s'", tableName, + group.getName())); + groupAdminServer.moveTables(Sets.newHashSet(tableName), null); + } + } catch (IOException ex) { + LOG.debug("Failed to perform RSGroup information cleanup for table: " + tableName, ex); + } + } + + @Override + public void preCreateNamespace(ObserverContext ctx, + NamespaceDescriptor ns) throws IOException { + String group = ns.getConfigurationValue(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP); + if (group != null && groupAdminServer.getRSGroupInfo(group) == null) { + throw new ConstraintException("Region server group " + group + " does not exit"); + } + } + + @Override + public void preModifyNamespace(ObserverContext ctx, + NamespaceDescriptor currentNsDesc, NamespaceDescriptor newNsDesc) throws IOException { + preCreateNamespace(ctx, newNsDesc); + } + + @Override + public void preCloneSnapshot(ObserverContext ctx, + SnapshotDescription snapshot, TableDescriptor desc) throws IOException { + assignTableToGroup(desc); + } + @Override public void postClearDeadServers(ObserverContext ctx, List servers, List notClearedServers) throws IOException { @@ -116,77 +223,4 @@ public class RSGroupAdminEndpoint implements MasterCoprocessor, MasterObserver { groupAdminServer.removeServers(clearedServer); } } - - private void checkGroupExists(Optional optGroupName) throws IOException { - if (optGroupName.isPresent()) { - String groupName = optGroupName.get(); - if (groupAdminServer.getRSGroupInfo(groupName) == null) { - throw new ConstraintException("Region server group " + groupName + " does not exit"); - } - } - } - - private boolean rsgroupHasServersOnline(TableDescriptor desc) throws IOException { - RSGroupInfo rsGroupInfo; - Optional optGroupName = desc.getRegionServerGroup(); - if (optGroupName.isPresent()) { - String groupName = optGroupName.get(); - if (groupName.equals(RSGroupInfo.DEFAULT_GROUP)) { - // do not check for default group - return true; - } - rsGroupInfo = groupAdminServer.getRSGroupInfo(groupName); - if (rsGroupInfo == null) { - throw new ConstraintException( - "RSGroup " + groupName + " for table " + desc.getTableName() + " does not exist"); - } - } else { - NamespaceDescriptor nd = - master.getClusterSchema().getNamespace(desc.getTableName().getNamespaceAsString()); - String groupNameOfNs = nd.getConfigurationValue(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP); - if (groupNameOfNs == null || groupNameOfNs.equals(RSGroupInfo.DEFAULT_GROUP)) { - // do not check for default group - return true; - } - rsGroupInfo = groupAdminServer.getRSGroupInfo(groupNameOfNs); - if (rsGroupInfo == null) { - throw new ConstraintException("RSGroup " + groupNameOfNs + " for table " + - desc.getTableName() + "(inherit from namespace) does not exist"); - } - } - return master.getServerManager().createDestinationServersList().stream() - .anyMatch(onlineServer -> rsGroupInfo.containsServer(onlineServer.getAddress())); - } - - @Override - public void preCreateTableAction(ObserverContext ctx, - TableDescriptor desc, RegionInfo[] regions) throws IOException { - checkGroupExists(desc.getRegionServerGroup()); - if (!desc.getTableName().isSystemTable() && !rsgroupHasServersOnline(desc)) { - throw new HBaseIOException("No online servers in the rsgroup for " + desc); - } - } - - @Override - public TableDescriptor preModifyTable(ObserverContext ctx, - TableName tableName, TableDescriptor currentDescriptor, TableDescriptor newDescriptor) - throws IOException { - checkGroupExists(newDescriptor.getRegionServerGroup()); - return MasterObserver.super.preModifyTable(ctx, tableName, currentDescriptor, newDescriptor); - } - - @Override - public void preCreateNamespace(ObserverContext ctx, - NamespaceDescriptor ns) throws IOException { - checkGroupExists( - Optional.ofNullable(ns.getConfigurationValue(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP))); - } - - @Override - public void preModifyNamespace(ObserverContext ctx, - NamespaceDescriptor currentNsDescriptor, NamespaceDescriptor newNsDescriptor) - throws IOException { - checkGroupExists(Optional - .ofNullable(newNsDescriptor.getConfigurationValue(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP))); - } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java index 59950e10de9..f3ef4fb96d2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java @@ -26,16 +26,15 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.function.Function; + import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.RegionInfo; -import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.constraint.ConstraintException; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.LoadBalancer; @@ -43,6 +42,7 @@ import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.master.RegionState; import org.apache.hadoop.hbase.master.ServerManager; +import org.apache.hadoop.hbase.master.assignment.AssignmentManager; import org.apache.hadoop.hbase.master.assignment.RegionStateNode; import org.apache.hadoop.hbase.net.Address; import org.apache.yetus.audience.InterfaceAudience; @@ -85,6 +85,14 @@ public class RSGroupAdminServer implements RSGroupAdmin { return rsGroupInfoManager.getRSGroup(groupName); } + @Override + public RSGroupInfo getRSGroupInfoOfTable(TableName tableName) throws IOException { + // We are reading across two Maps in the below with out synchronizing across + // them; should be safe most of the time. + String groupName = rsGroupInfoManager.getRSGroupOfTable(tableName); + return groupName == null? null: rsGroupInfoManager.getRSGroup(groupName); + } + private void checkOnlineServersOnly(Set
servers) throws ConstraintException { // This uglyness is because we only have Address, not ServerName. // Online servers are keyed by ServerName. @@ -151,24 +159,104 @@ public class RSGroupAdminServer implements RSGroupAdmin { } /** - * Move every region from servers which are currently located on these servers, but should not be - * located there. + * Check servers and tables. + * + * @param servers servers to move + * @param tables tables to move + * @param targetGroupName target group name + * @throws IOException if nulls or if servers and tables not belong to the same group + */ + private void checkServersAndTables(Set
servers, Set tables, + String targetGroupName) throws IOException { + // Presume first server's source group. Later ensure all servers are from this group. + Address firstServer = servers.iterator().next(); + RSGroupInfo tmpSrcGrp = rsGroupInfoManager.getRSGroupOfServer(firstServer); + if (tmpSrcGrp == null) { + // Be careful. This exception message is tested for in TestRSGroupsBase... + throw new ConstraintException("Source RSGroup for server " + firstServer + + " does not exist."); + } + RSGroupInfo srcGrp = new RSGroupInfo(tmpSrcGrp); + + // Only move online servers + checkOnlineServersOnly(servers); + + // Ensure all servers are of same rsgroup. + for (Address server: servers) { + String tmpGroup = rsGroupInfoManager.getRSGroupOfServer(server).getName(); + if (!tmpGroup.equals(srcGrp.getName())) { + throw new ConstraintException("Move server request should only come from one source " + + "RSGroup. Expecting only " + srcGrp.getName() + " but contains " + tmpGroup); + } + } + + // Ensure all tables and servers are of same rsgroup. + for (TableName table : tables) { + String tmpGroup = rsGroupInfoManager.getRSGroupOfTable(table); + if (!tmpGroup.equals(srcGrp.getName())) { + throw new ConstraintException("Move table request should only come from one source " + + "RSGroup. Expecting only " + srcGrp.getName() + " but contains " + tmpGroup); + } + } + + if (srcGrp.getServers().size() <= servers.size() && srcGrp.getTables().size() > tables.size()) { + throw new ConstraintException("Cannot leave a RSGroup " + srcGrp.getName() + + " that contains tables without servers to host them."); + } + } + + /** + * Move every region from servers which are currently located on these servers, + * but should not be located there. + * * @param servers the servers that will move to new group * @param targetGroupName the target group name * @throws IOException if moving the server and tables fail */ private void moveServerRegionsFromGroup(Set
servers, String targetGroupName) throws IOException { - moveRegionsBetweenGroups(servers, targetGroupName, rs -> getRegions(rs), info -> { - try { - String groupName = RSGroupUtil.getRSGroupInfo(master, rsGroupInfoManager, info.getTable()) - .map(RSGroupInfo::getName).orElse(RSGroupInfo.DEFAULT_GROUP); - return groupName.equals(targetGroupName); - } catch (IOException e) { - LOG.warn("Failed to test group for region {} and target group {}", info, targetGroupName); - return false; - } - }, rs -> rs.getHostname()); + moveRegionsBetweenGroups(servers, targetGroupName, + rs -> getRegions(rs), + info -> { + try { + RSGroupInfo group = getRSGroupInfo(targetGroupName); + return group.containsTable(info.getTable()); + } catch (IOException e) { + e.printStackTrace(); + return false; + } + }, + rs -> rs.getHostname()); + } + + /** + * Moves regions of tables which are not on target group servers. + * + * @param tables the tables that will move to new group + * @param targetGroupName the target group name + * @throws IOException if moving the region fails + */ + private void moveTableRegionsToGroup(Set tables, String targetGroupName) + throws IOException { + moveRegionsBetweenGroups(tables, targetGroupName, + table -> { + if (master.getAssignmentManager().isTableDisabled(table)) { + return new ArrayList<>(); + } + return master.getAssignmentManager().getRegionStates().getRegionsOfTable(table); + }, + info -> { + try { + RSGroupInfo group = getRSGroupInfo(targetGroupName); + ServerName sn = + master.getAssignmentManager().getRegionStates().getRegionServerOfRegion(info); + return group.containsServer(sn.getAddress()); + } catch (IOException e) { + e.printStackTrace(); + return false; + } + }, + table -> table.getNameWithNamespaceInclAsString()); } private void moveRegionsBetweenGroups(Set regionsOwners, String targetGroupName, @@ -233,6 +321,9 @@ public class RSGroupAdminServer implements RSGroupAdmin { } } + @edu.umd.cs.findbugs.annotations.SuppressWarnings( + value="RCN_REDUNDANT_NULLCHECK_WOULD_HAVE_BEEN_A_NPE", + justification="Ignoring complaint because don't know what it is complaining about") @Override public void moveServers(Set
servers, String targetGroupName) throws IOException { if (servers == null) { @@ -273,16 +364,9 @@ public class RSGroupAdminServer implements RSGroupAdmin { "RSGroup. Expecting only " + srcGrp.getName() + " but contains " + tmpGroup); } } - if (srcGrp.getServers().size() <= servers.size()) { - // check if there are still tables reference this group - for (TableDescriptor td : master.getTableDescriptors().getAll().values()) { - Optional optGroupName = td.getRegionServerGroup(); - if (optGroupName.isPresent() && optGroupName.get().equals(srcGrp.getName())) { - throw new ConstraintException( - "Cannot leave a RSGroup " + srcGrp.getName() + " that contains tables('" + - td.getTableName() + "' at least) without servers to host them."); - } - } + if (srcGrp.getServers().size() <= servers.size() && srcGrp.getTables().size() > 0) { + throw new ConstraintException("Cannot leave a RSGroup " + srcGrp.getName() + + " that contains tables without servers to host them."); } // MovedServers may be < passed in 'servers'. @@ -293,6 +377,38 @@ public class RSGroupAdminServer implements RSGroupAdmin { } } + @Override + public void moveTables(Set tables, String targetGroup) throws IOException { + if (tables == null) { + throw new ConstraintException("The list of servers cannot be null."); + } + if (tables.size() < 1) { + LOG.debug("moveTables() passed an empty set. Ignoring."); + return; + } + + // Hold a lock on the manager instance while moving servers to prevent + // another writer changing our state while we are working. + synchronized (rsGroupInfoManager) { + if(targetGroup != null) { + RSGroupInfo destGroup = rsGroupInfoManager.getRSGroup(targetGroup); + if(destGroup == null) { + throw new ConstraintException("Target " + targetGroup + " RSGroup does not exist."); + } + if(destGroup.getServers().size() < 1) { + throw new ConstraintException("Target RSGroup must have at least one server."); + } + } + rsGroupInfoManager.moveTables(tables, targetGroup); + + // targetGroup is null when a table is being deleted. In this case no further + // action is required. + if (targetGroup != null) { + moveTableRegionsToGroup(tables, targetGroup); + } + } + } + @Override public void addRSGroup(String name) throws IOException { rsGroupInfoManager.addRSGroup(new RSGroupInfo(name)); @@ -307,18 +423,17 @@ public class RSGroupAdminServer implements RSGroupAdmin { if (rsGroupInfo == null) { throw new ConstraintException("RSGroup " + name + " does not exist"); } + int tableCount = rsGroupInfo.getTables().size(); + if (tableCount > 0) { + throw new ConstraintException("RSGroup " + name + " has " + tableCount + + " tables; you must remove these tables from the rsgroup before " + + "the rsgroup can be removed."); + } int serverCount = rsGroupInfo.getServers().size(); if (serverCount > 0) { throw new ConstraintException("RSGroup " + name + " has " + serverCount + - " servers; you must remove these servers from the RSGroup before" + - " the RSGroup can be removed."); - } - for (TableDescriptor td : master.getTableDescriptors().getAll().values()) { - if (td.getRegionServerGroup().map(name::equals).orElse(false)) { - throw new ConstraintException("RSGroup " + name + " is already referenced by " + - td.getTableName() + "; you must remove all the tables from the rsgroup before " + - "the rsgroup can be removed."); - } + " servers; you must remove these servers from the RSGroup before" + + "the RSGroup can be removed."); } for (NamespaceDescriptor ns : master.getClusterSchema().getNamespaces()) { String nsGroup = ns.getConfigurationValue(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP); @@ -343,29 +458,27 @@ public class RSGroupAdminServer implements RSGroupAdmin { } if (getRSGroupInfo(groupName) == null) { - throw new ConstraintException("RSGroup does not exist: " + groupName); + throw new ConstraintException("RSGroup does not exist: "+groupName); } // Only allow one balance run at at time. Map groupRIT = rsGroupGetRegionsInTransition(groupName); if (groupRIT.size() > 0) { LOG.debug("Not running balancer because {} region(s) in transition: {}", groupRIT.size(), - StringUtils.abbreviate( - master.getAssignmentManager().getRegionStates().getRegionsInTransition().toString(), - 256)); + StringUtils.abbreviate( + master.getAssignmentManager().getRegionStates().getRegionsInTransition().toString(), + 256)); return false; } if (serverManager.areDeadServersInProgress()) { LOG.debug("Not running balancer because processing dead regionserver(s): {}", - serverManager.getDeadServers()); + serverManager.getDeadServers()); return false; } - // We balance per group instead of per table + //We balance per group instead of per table List plans = new ArrayList<>(); - Map>> assignmentsByTable = - getRSGroupAssignmentsByTable(groupName); - for (Map.Entry>> tableMap : assignmentsByTable - .entrySet()) { + for(Map.Entry>> tableMap: + getRSGroupAssignmentsByTable(groupName).entrySet()) { LOG.info("Creating partial plan for table {} : {}", tableMap.getKey(), tableMap.getValue()); List partialPlans = balancer.balanceCluster(tableMap.getValue()); LOG.info("Partial plan for table {} : {}", tableMap.getKey(), partialPlans); @@ -394,66 +507,100 @@ public class RSGroupAdminServer implements RSGroupAdmin { } @Override - public void removeServers(Set
servers) throws IOException { + public void moveServersAndTables(Set
servers, Set tables, String targetGroup) + throws IOException { if (servers == null || servers.isEmpty()) { - throw new ConstraintException("The set of servers to remove cannot be null or empty."); + throw new ConstraintException("The list of servers to move cannot be null or empty."); } - // Hold a lock on the manager instance while moving servers to prevent + if (tables == null || tables.isEmpty()) { + throw new ConstraintException("The list of tables to move cannot be null or empty."); + } + + //check target group + getAndCheckRSGroupInfo(targetGroup); + + // Hold a lock on the manager instance while moving servers and tables to prevent // another writer changing our state while we are working. synchronized (rsGroupInfoManager) { - // check the set of servers - checkForDeadOrOnlineServers(servers); - rsGroupInfoManager.removeServers(servers); - LOG.info("Remove decommissioned servers {} from RSGroup done", servers); + //check servers and tables status + checkServersAndTables(servers, tables, targetGroup); + + //Move servers and tables to a new group. + String srcGroup = getRSGroupOfServer(servers.iterator().next()).getName(); + rsGroupInfoManager.moveServersAndTables(servers, tables, srcGroup, targetGroup); + + //move regions on these servers which do not belong to group tables + moveServerRegionsFromGroup(servers, targetGroup); + //move regions of these tables which are not on group servers + moveTableRegionsToGroup(tables, targetGroup); } + LOG.info("Move servers and tables done. Severs: {}, Tables: {} => {}", servers, tables, + targetGroup); } - private boolean isTableInGroup(TableName tableName, String groupName, - Set tablesInGroupCache) throws IOException { - if (tablesInGroupCache.contains(tableName)) { - return true; + @Override + public void removeServers(Set
servers) throws IOException { + { + if (servers == null || servers.isEmpty()) { + throw new ConstraintException("The set of servers to remove cannot be null or empty."); + } + // Hold a lock on the manager instance while moving servers to prevent + // another writer changing our state while we are working. + synchronized (rsGroupInfoManager) { + //check the set of servers + checkForDeadOrOnlineServers(servers); + rsGroupInfoManager.removeServers(servers); + LOG.info("Remove decommissioned servers {} from RSGroup done", servers); + } } - if (RSGroupUtil.getRSGroupInfo(master, rsGroupInfoManager, tableName).map(RSGroupInfo::getName) - .orElse(RSGroupInfo.DEFAULT_GROUP).equals(groupName)) { - tablesInGroupCache.add(tableName); - return true; - } - return false; } private Map rsGroupGetRegionsInTransition(String groupName) - throws IOException { + throws IOException { Map rit = Maps.newTreeMap(); - Set tablesInGroupCache = new HashSet<>(); - for (RegionStateNode regionNode : master.getAssignmentManager().getRegionsInTransition()) { - TableName tn = regionNode.getTable(); - if (isTableInGroup(tn, groupName, tablesInGroupCache)) { - rit.put(regionNode.getRegionInfo().getEncodedName(), regionNode.toRegionState()); + AssignmentManager am = master.getAssignmentManager(); + for(TableName tableName : getRSGroupInfo(groupName).getTables()) { + for(RegionInfo regionInfo: am.getRegionStates().getRegionsOfTable(tableName)) { + RegionState state = am.getRegionStates().getRegionTransitionState(regionInfo); + if(state != null) { + rit.put(regionInfo.getEncodedName(), state); + } } } return rit; } private Map>> - getRSGroupAssignmentsByTable(String groupName) throws IOException { + getRSGroupAssignmentsByTable(String groupName) throws IOException { Map>> result = Maps.newHashMap(); - Set tablesInGroupCache = new HashSet<>(); - for (Map.Entry entry : master.getAssignmentManager().getRegionStates() - .getRegionAssignments().entrySet()) { - RegionInfo region = entry.getKey(); - TableName tn = region.getTable(); - ServerName server = entry.getValue(); - if (isTableInGroup(tn, groupName, tablesInGroupCache)) { - result.computeIfAbsent(tn, k -> new HashMap<>()) - .computeIfAbsent(server, k -> new ArrayList<>()).add(region); + RSGroupInfo rsGroupInfo = getRSGroupInfo(groupName); + Map>> assignments = Maps.newHashMap(); + for(Map.Entry entry: + master.getAssignmentManager().getRegionStates().getRegionAssignments().entrySet()) { + TableName currTable = entry.getKey().getTable(); + ServerName currServer = entry.getValue(); + RegionInfo currRegion = entry.getKey(); + if (rsGroupInfo.getTables().contains(currTable)) { + assignments.putIfAbsent(currTable, new HashMap<>()); + assignments.get(currTable).putIfAbsent(currServer, new ArrayList<>()); + assignments.get(currTable).get(currServer).add(currRegion); } } - RSGroupInfo rsGroupInfo = getRSGroupInfo(groupName); - for (ServerName serverName : master.getServerManager().getOnlineServers().keySet()) { - if (rsGroupInfo.containsServer(serverName.getAddress())) { - for (Map> map : result.values()) { - map.computeIfAbsent(serverName, k -> Collections.emptyList()); - } + + Map> serverMap = Maps.newHashMap(); + for(ServerName serverName: master.getServerManager().getOnlineServers().keySet()) { + if(rsGroupInfo.getServers().contains(serverName.getAddress())) { + serverMap.put(serverName, Collections.emptyList()); + } + } + + // add all tables that are members of the group + for(TableName tableName : rsGroupInfo.getTables()) { + if(assignments.containsKey(tableName)) { + result.put(tableName, new HashMap<>()); + result.get(tableName).putAll(serverMap); + result.get(tableName).putAll(assignments.get(tableName)); + LOG.debug("Adding assignments for {}: {}", tableName, assignments.get(tableName)); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServiceImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServiceImpl.java index 6bc45194ded..918a4fead8a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServiceImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServiceImpl.java @@ -20,24 +20,14 @@ package org.apache.hadoop.hbase.rsgroup; import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcController; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; import java.util.HashSet; -import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.stream.Collectors; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.TableDescriptor; -import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.master.MasterServices; -import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait; import org.apache.hadoop.hbase.net.Address; -import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos; @@ -67,8 +57,6 @@ import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.access.AccessChecker; import org.apache.hadoop.hbase.security.access.Permission.Action; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.collect.Sets; @@ -80,8 +68,6 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Sets; */ class RSGroupAdminServiceImpl extends RSGroupAdminProtos.RSGroupAdminService { - private static final Logger LOG = LoggerFactory.getLogger(RSGroupAdminServiceImpl.class); - private MasterServices master; private RSGroupAdminServer groupAdminServer; @@ -121,17 +107,12 @@ class RSGroupAdminServiceImpl extends RSGroupAdminProtos.RSGroupAdminService { return userProvider.getCurrent(); } - // for backward compatible - private RSGroupInfo fillTables(RSGroupInfo rsGroupInfo) throws IOException { - return RSGroupUtil.fillTables(rsGroupInfo, master.getTableDescriptors().getAll().values()); - } - @Override public void getRSGroupInfo(RpcController controller, GetRSGroupInfoRequest request, RpcCallback done) { GetRSGroupInfoResponse.Builder builder = GetRSGroupInfoResponse.newBuilder(); String groupName = request.getRSGroupName(); - LOG.info( + RSGroupAdminEndpoint.LOG.info( master.getClientIdAuditPrefix() + " initiates rsgroup info retrieval, group=" + groupName); try { if (master.getMasterCoprocessorHost() != null) { @@ -140,7 +121,7 @@ class RSGroupAdminServiceImpl extends RSGroupAdminProtos.RSGroupAdminService { checkPermission("getRSGroupInfo"); RSGroupInfo rsGroupInfo = groupAdminServer.getRSGroupInfo(groupName); if (rsGroupInfo != null) { - builder.setRSGroupInfo(ProtobufUtil.toProtoGroupInfo(fillTables(rsGroupInfo))); + builder.setRSGroupInfo(ProtobufUtil.toProtoGroupInfo(rsGroupInfo)); } if (master.getMasterCoprocessorHost() != null) { master.getMasterCoprocessorHost().postGetRSGroupInfo(groupName); @@ -156,24 +137,17 @@ class RSGroupAdminServiceImpl extends RSGroupAdminProtos.RSGroupAdminService { RpcCallback done) { GetRSGroupInfoOfTableResponse.Builder builder = GetRSGroupInfoOfTableResponse.newBuilder(); TableName tableName = ProtobufUtil.toTableName(request.getTableName()); - LOG.info( + RSGroupAdminEndpoint.LOG.info( master.getClientIdAuditPrefix() + " initiates rsgroup info retrieval, table=" + tableName); try { if (master.getMasterCoprocessorHost() != null) { master.getMasterCoprocessorHost().preGetRSGroupInfoOfTable(tableName); } checkPermission("getRSGroupInfoOfTable"); - Optional optGroup = - RSGroupUtil.getRSGroupInfo(master, groupAdminServer, tableName); - if (optGroup.isPresent()) { - builder.setRSGroupInfo(ProtobufUtil.toProtoGroupInfo(fillTables(optGroup.get()))); - } else { - if (master.getTableStateManager().isTablePresent(tableName)) { - RSGroupInfo rsGroupInfo = groupAdminServer.getRSGroupInfo(RSGroupInfo.DEFAULT_GROUP); - builder.setRSGroupInfo(ProtobufUtil.toProtoGroupInfo(fillTables(rsGroupInfo))); - } + RSGroupInfo RSGroupInfo = groupAdminServer.getRSGroupInfoOfTable(tableName); + if (RSGroupInfo != null) { + builder.setRSGroupInfo(ProtobufUtil.toProtoGroupInfo(RSGroupInfo)); } - if (master.getMasterCoprocessorHost() != null) { master.getMasterCoprocessorHost().postGetRSGroupInfoOfTable(tableName); } @@ -191,8 +165,8 @@ class RSGroupAdminServiceImpl extends RSGroupAdminProtos.RSGroupAdminService { for (HBaseProtos.ServerName el : request.getServersList()) { hostPorts.add(Address.fromParts(el.getHostName(), el.getPort())); } - LOG.info(master.getClientIdAuditPrefix() + " move servers " + hostPorts + " to rsgroup " + - request.getTargetGroup()); + RSGroupAdminEndpoint.LOG.info(master.getClientIdAuditPrefix() + " move servers " + hostPorts + + " to rsgroup " + request.getTargetGroup()); try { if (master.getMasterCoprocessorHost() != null) { master.getMasterCoprocessorHost().preMoveServers(hostPorts, request.getTargetGroup()); @@ -208,27 +182,6 @@ class RSGroupAdminServiceImpl extends RSGroupAdminProtos.RSGroupAdminService { done.run(builder.build()); } - private void moveTablesAndWait(Set tables, String targetGroup) throws IOException { - List procIds = new ArrayList(); - for (TableName tableName : tables) { - TableDescriptor oldTd = master.getTableDescriptors().get(tableName); - if (oldTd == null) { - continue; - } - TableDescriptor newTd = - TableDescriptorBuilder.newBuilder(oldTd).setRegionServerGroup(targetGroup).build(); - procIds.add(master.modifyTable(tableName, newTd, HConstants.NO_NONCE, HConstants.NO_NONCE)); - } - for (long procId : procIds) { - Procedure proc = master.getMasterProcedureExecutor().getProcedure(procId); - if (proc == null) { - continue; - } - ProcedureSyncWait.waitForProcedureToCompleteIOE(master.getMasterProcedureExecutor(), proc, - Long.MAX_VALUE); - } - } - @Override public void moveTables(RpcController controller, MoveTablesRequest request, RpcCallback done) { @@ -237,14 +190,14 @@ class RSGroupAdminServiceImpl extends RSGroupAdminProtos.RSGroupAdminService { for (HBaseProtos.TableName tableName : request.getTableNameList()) { tables.add(ProtobufUtil.toTableName(tableName)); } - LOG.info(master.getClientIdAuditPrefix() + " move tables " + tables + " to rsgroup " + - request.getTargetGroup()); + RSGroupAdminEndpoint.LOG.info(master.getClientIdAuditPrefix() + " move tables " + tables + + " to rsgroup " + request.getTargetGroup()); try { if (master.getMasterCoprocessorHost() != null) { master.getMasterCoprocessorHost().preMoveTables(tables, request.getTargetGroup()); } checkPermission("moveTables"); - moveTablesAndWait(tables, request.getTargetGroup()); + groupAdminServer.moveTables(tables, request.getTargetGroup()); if (master.getMasterCoprocessorHost() != null) { master.getMasterCoprocessorHost().postMoveTables(tables, request.getTargetGroup()); } @@ -258,7 +211,8 @@ class RSGroupAdminServiceImpl extends RSGroupAdminProtos.RSGroupAdminService { public void addRSGroup(RpcController controller, AddRSGroupRequest request, RpcCallback done) { AddRSGroupResponse.Builder builder = AddRSGroupResponse.newBuilder(); - LOG.info(master.getClientIdAuditPrefix() + " add rsgroup " + request.getRSGroupName()); + RSGroupAdminEndpoint.LOG + .info(master.getClientIdAuditPrefix() + " add rsgroup " + request.getRSGroupName()); try { if (master.getMasterCoprocessorHost() != null) { master.getMasterCoprocessorHost().preAddRSGroup(request.getRSGroupName()); @@ -278,7 +232,8 @@ class RSGroupAdminServiceImpl extends RSGroupAdminProtos.RSGroupAdminService { public void removeRSGroup(RpcController controller, RemoveRSGroupRequest request, RpcCallback done) { RemoveRSGroupResponse.Builder builder = RemoveRSGroupResponse.newBuilder(); - LOG.info(master.getClientIdAuditPrefix() + " remove rsgroup " + request.getRSGroupName()); + RSGroupAdminEndpoint.LOG + .info(master.getClientIdAuditPrefix() + " remove rsgroup " + request.getRSGroupName()); try { if (master.getMasterCoprocessorHost() != null) { master.getMasterCoprocessorHost().preRemoveRSGroup(request.getRSGroupName()); @@ -298,7 +253,7 @@ class RSGroupAdminServiceImpl extends RSGroupAdminProtos.RSGroupAdminService { public void balanceRSGroup(RpcController controller, BalanceRSGroupRequest request, RpcCallback done) { BalanceRSGroupResponse.Builder builder = BalanceRSGroupResponse.newBuilder(); - LOG.info( + RSGroupAdminEndpoint.LOG.info( master.getClientIdAuditPrefix() + " balance rsgroup, group=" + request.getRSGroupName()); try { if (master.getMasterCoprocessorHost() != null) { @@ -321,28 +276,14 @@ class RSGroupAdminServiceImpl extends RSGroupAdminProtos.RSGroupAdminService { public void listRSGroupInfos(RpcController controller, ListRSGroupInfosRequest request, RpcCallback done) { ListRSGroupInfosResponse.Builder builder = ListRSGroupInfosResponse.newBuilder(); - LOG.info(master.getClientIdAuditPrefix() + " list rsgroup"); + RSGroupAdminEndpoint.LOG.info(master.getClientIdAuditPrefix() + " list rsgroup"); try { if (master.getMasterCoprocessorHost() != null) { master.getMasterCoprocessorHost().preListRSGroups(); } checkPermission("listRSGroup"); - List rsGroupInfos = groupAdminServer.listRSGroups().stream() - .map(RSGroupInfo::new).collect(Collectors.toList()); - Map name2Info = new HashMap<>(); - for (RSGroupInfo rsGroupInfo : rsGroupInfos) { - name2Info.put(rsGroupInfo.getName(), rsGroupInfo); - } - for (TableDescriptor td : master.getTableDescriptors().getAll().values()) { - String groupName = td.getRegionServerGroup().orElse(RSGroupInfo.DEFAULT_GROUP); - RSGroupInfo rsGroupInfo = name2Info.get(groupName); - if (rsGroupInfo != null) { - rsGroupInfo.addTable(td.getTableName()); - } - } - for (RSGroupInfo rsGroupInfo : rsGroupInfos) { - // TODO: this can be done at once outside this loop, do not need to scan all every time. - builder.addRSGroupInfo(ProtobufUtil.toProtoGroupInfo(rsGroupInfo)); + for (RSGroupInfo RSGroupInfo : groupAdminServer.listRSGroups()) { + builder.addRSGroupInfo(ProtobufUtil.toProtoGroupInfo(RSGroupInfo)); } if (master.getMasterCoprocessorHost() != null) { master.getMasterCoprocessorHost().postListRSGroups(); @@ -359,7 +300,8 @@ class RSGroupAdminServiceImpl extends RSGroupAdminProtos.RSGroupAdminService { GetRSGroupInfoOfServerResponse.Builder builder = GetRSGroupInfoOfServerResponse.newBuilder(); Address hp = Address.fromParts(request.getServer().getHostName(), request.getServer().getPort()); - LOG.info(master.getClientIdAuditPrefix() + " initiates rsgroup info retrieval, server=" + hp); + RSGroupAdminEndpoint.LOG + .info(master.getClientIdAuditPrefix() + " initiates rsgroup info retrieval, server=" + hp); try { if (master.getMasterCoprocessorHost() != null) { master.getMasterCoprocessorHost().preGetRSGroupInfoOfServer(hp); @@ -367,7 +309,7 @@ class RSGroupAdminServiceImpl extends RSGroupAdminProtos.RSGroupAdminService { checkPermission("getRSGroupInfoOfServer"); RSGroupInfo info = groupAdminServer.getRSGroupOfServer(hp); if (info != null) { - builder.setRSGroupInfo(ProtobufUtil.toProtoGroupInfo(fillTables(info))); + builder.setRSGroupInfo(ProtobufUtil.toProtoGroupInfo(info)); } if (master.getMasterCoprocessorHost() != null) { master.getMasterCoprocessorHost().postGetRSGroupInfoOfServer(hp); @@ -390,16 +332,15 @@ class RSGroupAdminServiceImpl extends RSGroupAdminProtos.RSGroupAdminService { for (HBaseProtos.TableName tableName : request.getTableNameList()) { tables.add(ProtobufUtil.toTableName(tableName)); } - LOG.info(master.getClientIdAuditPrefix() + " move servers " + hostPorts + " and tables " + - tables + " to rsgroup" + request.getTargetGroup()); + RSGroupAdminEndpoint.LOG.info(master.getClientIdAuditPrefix() + " move servers " + hostPorts + + " and tables " + tables + " to rsgroup" + request.getTargetGroup()); try { if (master.getMasterCoprocessorHost() != null) { master.getMasterCoprocessorHost().preMoveServersAndTables(hostPorts, tables, request.getTargetGroup()); } checkPermission("moveServersAndTables"); - groupAdminServer.moveServers(hostPorts, request.getTargetGroup()); - moveTablesAndWait(tables, request.getTargetGroup()); + groupAdminServer.moveServersAndTables(hostPorts, tables, request.getTargetGroup()); if (master.getMasterCoprocessorHost() != null) { master.getMasterCoprocessorHost().postMoveServersAndTables(hostPorts, tables, request.getTargetGroup()); @@ -418,7 +359,7 @@ class RSGroupAdminServiceImpl extends RSGroupAdminProtos.RSGroupAdminService { for (HBaseProtos.ServerName el : request.getServersList()) { servers.add(Address.fromParts(el.getHostName(), el.getPort())); } - LOG.info( + RSGroupAdminEndpoint.LOG.info( master.getClientIdAuditPrefix() + " remove decommissioned servers from rsgroup: " + servers); try { if (master.getMasterCoprocessorHost() != null) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java index cb514c139ea..f585a851b4c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ClusterMetrics; import org.apache.hadoop.hbase.HBaseIOException; @@ -110,13 +111,13 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer { @Override public List balanceCluster(TableName tableName, Map> - clusterState) throws IOException { + clusterState) throws HBaseIOException { return balanceCluster(clusterState); } @Override public List balanceCluster(Map> clusterState) - throws IOException { + throws HBaseIOException { if (!isOnline()) { throw new ConstraintException( RSGroupInfoManager.class.getSimpleName() + " is not online, unable to perform balance"); @@ -168,7 +169,7 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer { @Override public Map> roundRobinAssignment( - List regions, List servers) throws IOException { + List regions, List servers) throws HBaseIOException { Map> assignments = Maps.newHashMap(); ListMultimap regionMap = ArrayListMultimap.create(); ListMultimap serverMap = ArrayListMultimap.create(); @@ -200,12 +201,13 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer { Map> assignments = new TreeMap<>(); ListMultimap groupToRegion = ArrayListMultimap.create(); Set misplacedRegions = getMisplacedRegions(regions); - RSGroupInfo defaultInfo = rsGroupInfoManager.getRSGroup(RSGroupInfo.DEFAULT_GROUP); for (RegionInfo region : regions.keySet()) { if (!misplacedRegions.contains(region)) { - String groupName = - RSGroupUtil.getRSGroupInfo(masterServices, rsGroupInfoManager, region.getTable()) - .orElse(defaultInfo).getName(); + String groupName = rsGroupInfoManager.getRSGroupOfTable(region.getTable()); + if (groupName == null) { + LOG.debug("Group not found for table " + region.getTable() + ", using default"); + groupName = RSGroupInfo.DEFAULT_GROUP; + } groupToRegion.put(groupName, region); } } @@ -233,11 +235,15 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer { } for (RegionInfo region : misplacedRegions) { - RSGroupInfo info = - RSGroupUtil.getRSGroupInfo(masterServices, rsGroupInfoManager, region.getTable()) - .orElse(defaultInfo); + String groupName = rsGroupInfoManager.getRSGroupOfTable(region.getTable()); + if (groupName == null) { + LOG.debug("Group not found for table " + region.getTable() + ", using default"); + groupName = RSGroupInfo.DEFAULT_GROUP; + } + RSGroupInfo info = rsGroupInfoManager.getRSGroup(groupName); List candidateList = filterOfflineServers(info, servers); - ServerName server = this.internalBalancer.randomAssignment(region, candidateList); + ServerName server = this.internalBalancer.randomAssignment(region, + candidateList); if (server != null) { assignments.computeIfAbsent(server, s -> new ArrayList<>()).add(region); } else { @@ -253,7 +259,7 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer { @Override public ServerName randomAssignment(RegionInfo region, - List servers) throws IOException { + List servers) throws HBaseIOException { ListMultimap regionMap = LinkedListMultimap.create(); ListMultimap serverMap = LinkedListMultimap.create(); generateGroupMaps(Lists.newArrayList(region), servers, regionMap, serverMap); @@ -261,15 +267,18 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer { return this.internalBalancer.randomAssignment(region, filteredServers); } - private void generateGroupMaps(List regions, List servers, - ListMultimap regionMap, ListMultimap serverMap) - throws HBaseIOException { + private void generateGroupMaps( + List regions, + List servers, + ListMultimap regionMap, + ListMultimap serverMap) throws HBaseIOException { try { - RSGroupInfo defaultInfo = rsGroupInfoManager.getRSGroup(RSGroupInfo.DEFAULT_GROUP); for (RegionInfo region : regions) { - String groupName = - RSGroupUtil.getRSGroupInfo(masterServices, rsGroupInfoManager, region.getTable()) - .orElse(defaultInfo).getName(); + String groupName = rsGroupInfoManager.getRSGroupOfTable(region.getTable()); + if (groupName == null) { + LOG.debug("Group not found for table " + region.getTable() + ", using default"); + groupName = RSGroupInfo.DEFAULT_GROUP; + } regionMap.put(groupName, region); } for (String groupKey : regionMap.keySet()) { @@ -321,26 +330,32 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer { } @VisibleForTesting - public Set getMisplacedRegions(Map regions) - throws IOException { + public Set getMisplacedRegions( + Map regions) throws IOException { Set misplacedRegions = new HashSet<>(); - RSGroupInfo defaultGroupInfo = rsGroupInfoManager.getRSGroup(RSGroupInfo.DEFAULT_GROUP); - for (Map.Entry region : regions.entrySet()) { + for(Map.Entry region : regions.entrySet()) { RegionInfo regionInfo = region.getKey(); ServerName assignedServer = region.getValue(); + String groupName = rsGroupInfoManager.getRSGroupOfTable(regionInfo.getTable()); + if (groupName == null) { + LOG.debug("Group not found for table " + regionInfo.getTable() + ", using default"); + groupName = RSGroupInfo.DEFAULT_GROUP; + } + RSGroupInfo info = rsGroupInfoManager.getRSGroup(groupName); if (assignedServer == null) { LOG.debug("There is no assigned server for {}", region); continue; } - RSGroupInfo info = - RSGroupUtil.getRSGroupInfo(masterServices, rsGroupInfoManager, regionInfo.getTable()) - .orElse(defaultGroupInfo); - if (!info.containsServer(assignedServer.getAddress())) { - RSGroupInfo otherInfo = rsGroupInfoManager.getRSGroupOfServer(assignedServer.getAddress()); - LOG.debug( - "Found misplaced region: {} on server: {} found in group: {} outside of group: {}", - regionInfo.getRegionNameAsString(), assignedServer, - otherInfo != null ? otherInfo.getName() : "UNKNOWN", info.getName()); + RSGroupInfo otherInfo = rsGroupInfoManager.getRSGroupOfServer(assignedServer.getAddress()); + if (info == null && otherInfo == null) { + LOG.warn("Couldn't obtain rs group information for {} on {}", region, assignedServer); + continue; + } + if ((info == null || !info.containsServer(assignedServer.getAddress()))) { + LOG.debug("Found misplaced region: " + regionInfo.getRegionNameAsString() + + " on server: " + assignedServer + + " found in group: " + otherInfo + + " outside of group: " + (info == null ? "UNKNOWN" : info.getName())); misplacedRegions.add(regionInfo); } } @@ -348,11 +363,11 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer { } private Pair>, List> correctAssignments( - Map> existingAssignments) throws IOException { + Map> existingAssignments) throws HBaseIOException{ // To return Map> correctAssignments = new TreeMap<>(); List regionPlansForMisplacedRegions = new ArrayList<>(); - RSGroupInfo defaultInfo = rsGroupInfoManager.getRSGroup(RSGroupInfo.DEFAULT_GROUP); + for (Map.Entry> assignments : existingAssignments.entrySet()){ ServerName currentHostServer = assignments.getKey(); correctAssignments.put(currentHostServer, new LinkedList<>()); @@ -360,11 +375,15 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer { for (RegionInfo region : regions) { RSGroupInfo targetRSGInfo = null; try { - targetRSGInfo = - RSGroupUtil.getRSGroupInfo(masterServices, rsGroupInfoManager, region.getTable()) - .orElse(defaultInfo); + String groupName = rsGroupInfoManager.getRSGroupOfTable(region.getTable()); + if (groupName == null) { + LOG.debug("Group not found for table " + region.getTable() + ", using default"); + groupName = RSGroupInfo.DEFAULT_GROUP; + } + targetRSGInfo = rsGroupInfoManager.getRSGroup(groupName); } catch (IOException exp) { - LOG.debug("RSGroup information null for region of table " + region.getTable(), exp); + LOG.debug("RSGroup information null for region of table " + region.getTable(), + exp); } if (targetRSGInfo == null || !targetRSGInfo.containsServer(currentHostServer.getAddress())) { // region is mis-placed @@ -381,7 +400,7 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer { } @Override - public void initialize() throws IOException { + public void initialize() throws HBaseIOException { try { if (rsGroupInfoManager == null) { List cps = diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java index 28f7c1f3e90..70aeabfee71 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.rsgroup; import java.io.IOException; import java.util.List; import java.util.Set; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.net.Address; import org.apache.yetus.audience.InterfaceAudience; @@ -62,6 +63,18 @@ public interface RSGroupInfoManager { */ RSGroupInfo getRSGroup(String groupName) throws IOException; + /** + * Get the group membership of a table + */ + String getRSGroupOfTable(TableName tableName) throws IOException; + + /** + * Set the group membership of a set of tables + * @param tableNames set of tables to move + * @param groupName name of group of tables to move to + */ + void moveTables(Set tableNames, String groupName) throws IOException; + /** * List the existing {@code RSGroupInfo}s. */ @@ -78,6 +91,16 @@ public interface RSGroupInfoManager { */ boolean isOnline(); + /** + * Move servers and tables to a new group. + * @param servers list of servers, must be part of the same group + * @param tables set of tables to move + * @param srcGroup groupName being moved from + * @param dstGroup groupName being moved to + */ + void moveServersAndTables(Set
servers, Set tables, String srcGroup, + String dstGroup) throws IOException; + /** * Remove decommissioned servers from rsgroup * @param servers set of servers to remove diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java index 37f3ce63f56..8aa752021e8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java @@ -23,8 +23,10 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.NavigableSet; import java.util.OptionalLong; import java.util.Set; import java.util.SortedSet; @@ -141,6 +143,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager { // There two Maps are immutable and wholesale replaced on each modification // so are safe to access concurrently. See class comment. private volatile Map rsGroupMap = Collections.emptyMap(); + private volatile Map tableMap = Collections.emptyMap(); private final MasterServices masterServices; private final AsyncClusterConnection conn; @@ -257,6 +260,44 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager { return rsGroupMap.get(groupName); } + @Override + public String getRSGroupOfTable(TableName tableName) { + return tableMap.get(tableName); + } + + @Override + public synchronized void moveTables(Set tableNames, String groupName) + throws IOException { + // Check if rsGroupMap contains the destination rsgroup + if (groupName != null && !rsGroupMap.containsKey(groupName)) { + throw new DoNotRetryIOException("Group " + groupName + " does not exist"); + } + + // Make a copy of rsGroupMap to update + Map newGroupMap = Maps.newHashMap(rsGroupMap); + + // Remove tables from their original rsgroups + // and update the copy of rsGroupMap + for (TableName tableName : tableNames) { + if (tableMap.containsKey(tableName)) { + RSGroupInfo src = new RSGroupInfo(newGroupMap.get(tableMap.get(tableName))); + src.removeTable(tableName); + newGroupMap.put(src.getName(), src); + } + } + + // Add tables to the destination rsgroup + // and update the copy of rsGroupMap + if (groupName != null) { + RSGroupInfo dstGroup = new RSGroupInfo(newGroupMap.get(groupName)); + dstGroup.addAllTables(tableNames); + newGroupMap.put(dstGroup.getName(), dstGroup); + } + + // Flush according to the updated copy of rsGroupMap + flushConfig(newGroupMap); + } + @Override public synchronized void removeRSGroup(String groupName) throws IOException { if (!rsGroupMap.containsKey(groupName) || groupName.equals(RSGroupInfo.DEFAULT_GROUP)) { @@ -270,7 +311,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager { @Override public List listRSGroups() { - return Lists.newArrayList(rsGroupMap.values()); + return Lists.newLinkedList(rsGroupMap.values()); } @Override @@ -278,6 +319,31 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager { return rsGroupStartupWorker.isOnline(); } + @Override + public void moveServersAndTables(Set
servers, Set tables, String srcGroup, + String dstGroup) throws IOException { + // get server's group + RSGroupInfo srcGroupInfo = getRSGroupInfo(srcGroup); + RSGroupInfo dstGroupInfo = getRSGroupInfo(dstGroup); + + // move servers + for (Address el : servers) { + srcGroupInfo.removeServer(el); + dstGroupInfo.addServer(el); + } + // move tables + for (TableName tableName : tables) { + srcGroupInfo.removeTable(tableName); + dstGroupInfo.addTable(tableName); + } + + // flush changed groupinfo + Map newGroupMap = Maps.newHashMap(rsGroupMap); + newGroupMap.put(srcGroupInfo.getName(), srcGroupInfo); + newGroupMap.put(dstGroupInfo.getName(), dstGroupInfo); + flushConfig(newGroupMap); + } + @Override public synchronized void removeServers(Set
servers) throws IOException { Map rsGroupInfos = new HashMap(); @@ -359,7 +425,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager { * startup of the manager. */ private synchronized void refresh(boolean forceOnline) throws IOException { - List groupList = new ArrayList<>(); + List groupList = new LinkedList<>(); // Overwrite anything read from zk, group table is source of truth // if online read from GROUP table @@ -371,20 +437,37 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager { groupList.addAll(retrieveGroupListFromZookeeper()); } + // refresh default group, prune + NavigableSet orphanTables = new TreeSet<>(); + for (String entry : masterServices.getTableDescriptors().getAll().keySet()) { + orphanTables.add(TableName.valueOf(entry)); + } + for (RSGroupInfo group : groupList) { + if (!group.getName().equals(RSGroupInfo.DEFAULT_GROUP)) { + orphanTables.removeAll(group.getTables()); + } + } + // This is added to the last of the list so it overwrites the 'default' rsgroup loaded // from region group table or zk - groupList.add(new RSGroupInfo(RSGroupInfo.DEFAULT_GROUP, getDefaultServers())); + groupList.add(new RSGroupInfo(RSGroupInfo.DEFAULT_GROUP, getDefaultServers(), orphanTables)); // populate the data HashMap newGroupMap = Maps.newHashMap(); + HashMap newTableMap = Maps.newHashMap(); for (RSGroupInfo group : groupList) { newGroupMap.put(group.getName(), group); + for (TableName table : group.getTables()) { + newTableMap.put(table, group.getName()); + } } - resetRSGroupMap(newGroupMap); + resetRSGroupAndTableMaps(newGroupMap, newTableMap); updateCacheOfRSGroups(rsGroupMap.keySet()); } - private void flushConfigTable(Map groupMap) throws IOException { + private synchronized Map flushConfigTable(Map groupMap) + throws IOException { + Map newTableMap = Maps.newHashMap(); List mutations = Lists.newArrayList(); // populate deletes @@ -401,11 +484,15 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager { Put p = new Put(Bytes.toBytes(RSGroupInfo.getName())); p.addColumn(META_FAMILY_BYTES, META_QUALIFIER_BYTES, proto.toByteArray()); mutations.add(p); + for (TableName entry : RSGroupInfo.getTables()) { + newTableMap.put(entry, RSGroupInfo.getName()); + } } if (mutations.size() > 0) { multiMutate(mutations); } + return newTableMap; } private synchronized void flushConfig() throws IOException { @@ -413,6 +500,8 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager { } private synchronized void flushConfig(Map newGroupMap) throws IOException { + Map newTableMap; + // For offline mode persistence is still unavailable // We're refreshing in-memory state but only for servers in default group if (!isOnline()) { @@ -427,7 +516,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager { RSGroupInfo newDefaultGroup = newGroupMap.remove(RSGroupInfo.DEFAULT_GROUP); if (!oldGroupMap.equals(newGroupMap) /* compare both tables and servers in other groups */ || !oldDefaultGroup.getTables().equals(newDefaultGroup.getTables()) - /* compare tables in default group */) { + /* compare tables in default group */) { throw new IOException("Only servers in default group can be updated during offline mode"); } @@ -444,11 +533,11 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager { return; } - /* For online mode, persist to hbase:rsgroup and Zookeeper */ - flushConfigTable(newGroupMap); + /* For online mode, persist to Zookeeper */ + newTableMap = flushConfigTable(newGroupMap); // Make changes visible after having been persisted to the source of truth - resetRSGroupMap(newGroupMap); + resetRSGroupAndTableMaps(newGroupMap, newTableMap); try { String groupBasePath = @@ -486,9 +575,11 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager { /** * Make changes visible. Caller must be synchronized on 'this'. */ - private void resetRSGroupMap(Map newRSGroupMap) { + private void resetRSGroupAndTableMaps(Map newRSGroupMap, + Map newTableMap) { // Make maps Immutable. this.rsGroupMap = Collections.unmodifiableMap(newRSGroupMap); + this.tableMap = Collections.unmodifiableMap(newTableMap); } /** @@ -506,7 +597,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager { return masterServices.getServerManager().getOnlineServersList(); } LOG.debug("Reading online RS from zookeeper"); - List servers = new ArrayList<>(); + List servers = new LinkedList<>(); try { for (String el : ZKUtil.listChildrenNoWatch(watcher, watcher.getZNodePaths().rsZNode)) { servers.add(ServerName.parseServerName(el)); @@ -542,7 +633,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager { // the rsGroupMap then writing it out. private synchronized void updateDefaultServers(SortedSet
servers) throws IOException { RSGroupInfo info = rsGroupMap.get(RSGroupInfo.DEFAULT_GROUP); - RSGroupInfo newInfo = new RSGroupInfo(info.getName(), servers); + RSGroupInfo newInfo = new RSGroupInfo(info.getName(), servers, info.getTables()); HashMap newGroupMap = Maps.newHashMap(rsGroupMap); newGroupMap.put(newInfo.getName(), newInfo); flushConfig(newGroupMap); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupUtil.java deleted file mode 100644 index a08d236129e..00000000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupUtil.java +++ /dev/null @@ -1,113 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable - * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" - * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License - * for the specific language governing permissions and limitations under the License. - */ -package org.apache.hadoop.hbase.rsgroup; - -import java.io.IOException; -import java.util.Collection; -import java.util.Optional; -import java.util.function.Predicate; -import org.apache.hadoop.hbase.NamespaceDescriptor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.TableDescriptor; -import org.apache.hadoop.hbase.master.ClusterSchema; -import org.apache.hadoop.hbase.master.MasterServices; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Helper class for RSGroup implementation - */ -@InterfaceAudience.Private -final class RSGroupUtil { - - private static final Logger LOG = LoggerFactory.getLogger(RSGroupUtil.class); - - private RSGroupUtil() { - } - - @FunctionalInterface - private interface GetRSGroup { - RSGroupInfo get(String groupName) throws IOException; - } - - private static Optional getRSGroupInfo(MasterServices master, GetRSGroup getter, - TableName tableName) throws IOException { - TableDescriptor td = master.getTableDescriptors().get(tableName); - if (td == null) { - return Optional.empty(); - } - Optional optGroupNameOfTable = td.getRegionServerGroup(); - if (optGroupNameOfTable.isPresent()) { - RSGroupInfo group = getter.get(optGroupNameOfTable.get()); - if (group != null) { - return Optional.of(group); - } - } - ClusterSchema clusterSchema = master.getClusterSchema(); - if (clusterSchema == null) { - if (TableName.isMetaTableName(tableName)) { - LOG.info("Can not get the namespace rs group config for meta table, since the" + - " meta table is not online yet, will use default group to assign meta first"); - } else { - LOG.warn("ClusterSchema is null, can only use default rsgroup, should not happen?"); - } - return Optional.empty(); - } - NamespaceDescriptor nd = clusterSchema.getNamespace(tableName.getNamespaceAsString()); - String groupNameOfNs = nd.getConfigurationValue(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP); - if (groupNameOfNs == null) { - return Optional.empty(); - } - return Optional.ofNullable(getter.get(groupNameOfNs)); - } - - /** - * Will try to get the rsgroup from {@link TableDescriptor} first, and then try to get the rsgroup - * from the {@link NamespaceDescriptor}. If still not present, return empty. - */ - static Optional getRSGroupInfo(MasterServices master, RSGroupInfoManager manager, - TableName tableName) throws IOException { - return getRSGroupInfo(master, manager::getRSGroup, tableName); - } - - /** - * Will try to get the rsgroup from {@link TableDescriptor} first, and then try to get the rsgroup - * from the {@link NamespaceDescriptor}. If still not present, return empty. - */ - static Optional getRSGroupInfo(MasterServices master, RSGroupAdmin admin, - TableName tableName) throws IOException { - return getRSGroupInfo(master, admin::getRSGroupInfo, tableName); - } - - /** - * Fill the tables field for {@link RSGroupInfo}, for backward compatibility. - */ - @SuppressWarnings("deprecation") - static RSGroupInfo fillTables(RSGroupInfo rsGroupInfo, Collection tds) { - RSGroupInfo newRsGroupInfo = new RSGroupInfo(rsGroupInfo); - Predicate filter; - if (rsGroupInfo.getName().equals(RSGroupInfo.DEFAULT_GROUP)) { - filter = td -> { - Optional optGroupName = td.getRegionServerGroup(); - return !optGroupName.isPresent() || optGroupName.get().equals(RSGroupInfo.DEFAULT_GROUP); - }; - } else { - filter = td -> { - Optional optGroupName = td.getRegionServerGroup(); - return optGroupName.isPresent() && optGroupName.get().equals(newRsGroupInfo.getName()); - }; - } - tds.stream().filter(filter).map(TableDescriptor::getTableName) - .forEach(newRsGroupInfo::addTable); - return newRsGroupInfo; - } -} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionPlacement2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionPlacement2.java index 47337f9f7c1..6dc371149a7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionPlacement2.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionPlacement2.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.master; import static org.junit.Assert.assertTrue; -import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -27,6 +26,7 @@ import java.util.Map; import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; @@ -82,7 +82,7 @@ public class TestRegionPlacement2 { } @Test - public void testFavoredNodesPresentForRoundRobinAssignment() throws IOException { + public void testFavoredNodesPresentForRoundRobinAssignment() throws HBaseIOException { LoadBalancer balancer = LoadBalancerFactory.getLoadBalancer(TEST_UTIL.getConfiguration()); balancer.setMasterServices(TEST_UTIL.getMiniHBaseCluster().getMaster()); balancer.initialize(); @@ -143,7 +143,7 @@ public class TestRegionPlacement2 { } @Test - public void testFavoredNodesPresentForRandomAssignment() throws IOException { + public void testFavoredNodesPresentForRandomAssignment() throws HBaseIOException { LoadBalancer balancer = LoadBalancerFactory.getLoadBalancer(TEST_UTIL.getConfiguration()); balancer.setMasterServices(TEST_UTIL.getMiniHBaseCluster().getMaster()); balancer.initialize(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/RSGroupableBalancerTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/RSGroupableBalancerTestBase.java index 4c00bcfcd0f..570bb3abb3e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/RSGroupableBalancerTestBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/RSGroupableBalancerTestBase.java @@ -28,7 +28,6 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; @@ -61,13 +60,17 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Lists; public class RSGroupableBalancerTestBase { static SecureRandom rand = new SecureRandom(); - static String[] groups = new String[] { RSGroupInfo.DEFAULT_GROUP, "dg2", "dg3", "dg4" }; + static String[] groups = new String[] {RSGroupInfo.DEFAULT_GROUP, "dg2", "dg3", "dg4"}; static TableName table0 = TableName.valueOf("dt0"); - static TableName[] tables = new TableName[] { TableName.valueOf("dt1"), TableName.valueOf("dt2"), - TableName.valueOf("dt3"), TableName.valueOf("dt4") }; + static TableName[] tables = + new TableName[] { TableName.valueOf("dt1"), + TableName.valueOf("dt2"), + TableName.valueOf("dt3"), + TableName.valueOf("dt4")}; static List servers; static Map groupMap; - static Map tableDescs; + static Map tableMap = new HashMap<>(); + static List tableDescs; int[] regionAssignment = new int[] { 2, 5, 7, 10, 4, 3, 1 }; static int regionId = 0; @@ -110,19 +113,20 @@ public class RSGroupableBalancerTestBase { /** * All regions have an assignment. */ - protected void assertImmediateAssignment(List regions, List servers, - Map assignments) throws IOException { + protected void assertImmediateAssignment(List regions, + List servers, + Map assignments) + throws IOException { for (RegionInfo region : regions) { assertTrue(assignments.containsKey(region)); ServerName server = assignments.get(region); TableName tableName = region.getTable(); - String groupName = - tableDescs.get(tableName).getRegionServerGroup().orElse(RSGroupInfo.DEFAULT_GROUP); + String groupName = getMockedGroupInfoManager().getRSGroupOfTable(tableName); assertTrue(StringUtils.isNotEmpty(groupName)); RSGroupInfo gInfo = getMockedGroupInfoManager().getRSGroup(groupName); assertTrue("Region is not correctly assigned to group servers.", - gInfo.containsServer(server.getAddress())); + gInfo.containsServer(server.getAddress())); } } @@ -165,13 +169,16 @@ public class RSGroupableBalancerTestBase { ServerName oldAssignedServer = existing.get(r); TableName tableName = r.getTable(); String groupName = - tableDescs.get(tableName).getRegionServerGroup().orElse(RSGroupInfo.DEFAULT_GROUP); + getMockedGroupInfoManager().getRSGroupOfTable(tableName); assertTrue(StringUtils.isNotEmpty(groupName)); - RSGroupInfo gInfo = getMockedGroupInfoManager().getRSGroup(groupName); - assertTrue("Region is not correctly assigned to group servers.", - gInfo.containsServer(currentServer.getAddress())); - if (oldAssignedServer != null && - onlineHostNames.contains(oldAssignedServer.getHostname())) { + RSGroupInfo gInfo = getMockedGroupInfoManager().getRSGroup( + groupName); + assertTrue( + "Region is not correctly assigned to group servers.", + gInfo.containsServer(currentServer.getAddress())); + if (oldAssignedServer != null + && onlineHostNames.contains(oldAssignedServer + .getHostname())) { // this region was previously assigned somewhere, and that // host is still around, then the host must have been is a // different group. @@ -351,12 +358,13 @@ public class RSGroupableBalancerTestBase { /** * Construct group info, with each group having at least one server. + * * @param servers the servers * @param groups the groups * @return the map */ - protected static Map constructGroupInfo(List servers, - String[] groups) { + protected static Map constructGroupInfo( + List servers, String[] groups) { assertTrue(servers != null); assertTrue(servers.size() >= groups.length); int index = 0; @@ -369,7 +377,8 @@ public class RSGroupableBalancerTestBase { } while (index < servers.size()) { int grpIndex = rand.nextInt(groups.length); - groupMap.get(groups[grpIndex]).addServer(servers.get(index).getAddress()); + groupMap.get(groups[grpIndex]).addServer( + servers.get(index).getAddress()); index++; } return groupMap; @@ -380,28 +389,29 @@ public class RSGroupableBalancerTestBase { * @param hasBogusTable there is a table that does not determine the group * @return the list of table descriptors */ - protected static Map constructTableDesc(boolean hasBogusTable) { - Map tds = new HashMap<>(); + protected static List constructTableDesc(boolean hasBogusTable) { + List tds = Lists.newArrayList(); int index = rand.nextInt(groups.length); for (int i = 0; i < tables.length; i++) { + TableDescriptor htd = TableDescriptorBuilder.newBuilder(tables[i]).build(); int grpIndex = (i + index) % groups.length; String groupName = groups[grpIndex]; - TableDescriptor htd = - TableDescriptorBuilder.newBuilder(tables[i]).setRegionServerGroup(groupName).build(); - tds.put(htd.getTableName(), htd); + tableMap.put(tables[i], groupName); + tds.add(htd); } if (hasBogusTable) { - tds.put(table0, TableDescriptorBuilder.newBuilder(table0).setRegionServerGroup("").build()); + tableMap.put(table0, ""); + tds.add(TableDescriptorBuilder.newBuilder(table0).build()); } return tds; } protected static MasterServices getMockedMaster() throws IOException { TableDescriptors tds = Mockito.mock(TableDescriptors.class); - Mockito.when(tds.get(tables[0])).thenReturn(tableDescs.get(tables[0])); - Mockito.when(tds.get(tables[1])).thenReturn(tableDescs.get(tables[1])); - Mockito.when(tds.get(tables[2])).thenReturn(tableDescs.get(tables[2])); - Mockito.when(tds.get(tables[3])).thenReturn(tableDescs.get(tables[3])); + Mockito.when(tds.get(tables[0])).thenReturn(tableDescs.get(0)); + Mockito.when(tds.get(tables[1])).thenReturn(tableDescs.get(1)); + Mockito.when(tds.get(tables[2])).thenReturn(tableDescs.get(2)); + Mockito.when(tds.get(tables[3])).thenReturn(tableDescs.get(3)); MasterServices services = Mockito.mock(HMaster.class); Mockito.when(services.getTableDescriptors()).thenReturn(tds); AssignmentManager am = Mockito.mock(AssignmentManager.class); @@ -420,6 +430,13 @@ public class RSGroupableBalancerTestBase { Mockito.when(gm.listRSGroups()).thenReturn( Lists.newLinkedList(groupMap.values())); Mockito.when(gm.isOnline()).thenReturn(true); + Mockito.when(gm.getRSGroupOfTable(Mockito.any())) + .thenAnswer(new Answer() { + @Override + public String answer(InvocationOnMock invocation) throws Throwable { + return tableMap.get(invocation.getArgument(0)); + } + }); return gm; } @@ -427,16 +444,15 @@ public class RSGroupableBalancerTestBase { TableName tableName = null; RSGroupInfoManager gm = getMockedGroupInfoManager(); RSGroupInfo groupOfServer = null; - for (RSGroupInfo gInfo : gm.listRSGroups()) { - if (gInfo.containsServer(sn.getAddress())) { + for(RSGroupInfo gInfo : gm.listRSGroups()){ + if(gInfo.containsServer(sn.getAddress())){ groupOfServer = gInfo; break; } } - for (TableDescriptor desc : tableDescs.values()) { - Optional optGroupName = desc.getRegionServerGroup(); - if (optGroupName.isPresent() && optGroupName.get().endsWith(groupOfServer.getName())) { + for(TableDescriptor desc : tableDescs){ + if(gm.getRSGroupOfTable(desc.getTableName()).endsWith(groupOfServer.getName())){ tableName = desc.getTableName(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancer.java index b2ea28b47cb..b60ca7ea299 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancer.java @@ -98,30 +98,33 @@ public class TestRSGroupBasedLoadBalancer extends RSGroupableBalancerTestBase { /** * Tests the bulk assignment used during cluster startup. - *

- * Round-robin. Should yield a balanced cluster so same invariant as the load balancer holds, all - * servers holding either floor(avg) or ceiling(avg). + * + * Round-robin. Should yield a balanced cluster so same invariant as the + * load balancer holds, all servers holding either floor(avg) or + * ceiling(avg). */ @Test public void testBulkAssignment() throws Exception { List regions = randomRegions(25); - Map> assignments = - loadBalancer.roundRobinAssignment(regions, servers); - // test empty region/servers scenario - // this should not throw an NPE + Map> assignments = loadBalancer + .roundRobinAssignment(regions, servers); + //test empty region/servers scenario + //this should not throw an NPE loadBalancer.roundRobinAssignment(regions, Collections.emptyList()); - // test regular scenario + //test regular scenario assertTrue(assignments.keySet().size() == servers.size()); for (ServerName sn : assignments.keySet()) { List regionAssigned = assignments.get(sn); for (RegionInfo region : regionAssigned) { TableName tableName = region.getTable(); String groupName = - tableDescs.get(tableName).getRegionServerGroup().orElse(RSGroupInfo.DEFAULT_GROUP); + getMockedGroupInfoManager().getRSGroupOfTable(tableName); assertTrue(StringUtils.isNotEmpty(groupName)); - RSGroupInfo gInfo = getMockedGroupInfoManager().getRSGroup(groupName); - assertTrue("Region is not correctly assigned to group servers.", - gInfo.containsServer(sn.getAddress())); + RSGroupInfo gInfo = getMockedGroupInfoManager().getRSGroup( + groupName); + assertTrue( + "Region is not correctly assigned to group servers.", + gInfo.containsServer(sn.getAddress())); } } ArrayListMultimap loadMap = convertToGroupBasedMap(assignments); @@ -172,25 +175,24 @@ public class TestRSGroupBasedLoadBalancer extends RSGroupableBalancerTestBase { onlineServers.addAll(servers); List regions = randomRegions(25); int bogusRegion = 0; - for (RegionInfo region : regions) { - String group = tableDescs.get(region.getTable()).getRegionServerGroup() - .orElse(RSGroupInfo.DEFAULT_GROUP); - if ("dg3".equals(group) || "dg4".equals(group)) { + for(RegionInfo region : regions){ + String group = tableMap.get(region.getTable()); + if("dg3".equals(group) || "dg4".equals(group)){ bogusRegion++; } } Set

offlineServers = new HashSet
(); offlineServers.addAll(groupMap.get("dg3").getServers()); offlineServers.addAll(groupMap.get("dg4").getServers()); - for (Iterator it = onlineServers.iterator(); it.hasNext();) { + for(Iterator it = onlineServers.iterator(); it.hasNext();){ ServerName server = it.next(); Address address = server.getAddress(); - if (offlineServers.contains(address)) { + if(offlineServers.contains(address)){ it.remove(); } } - Map> assignments = - loadBalancer.roundRobinAssignment(regions, onlineServers); + Map> assignments = loadBalancer + .roundRobinAssignment(regions, onlineServers); assertEquals(bogusRegion, assignments.get(LoadBalancer.BOGUS_SERVER_NAME).size()); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancerWithStochasticLoadBalancerAsInternal.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancerWithStochasticLoadBalancerAsInternal.java index a4ae636a9a8..e588a7e198b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancerWithStochasticLoadBalancerAsInternal.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancerWithStochasticLoadBalancerAsInternal.java @@ -22,7 +22,6 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import java.io.IOException; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -33,6 +32,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ClusterMetrics; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.RegionMetrics; import org.apache.hadoop.hbase.ServerMetrics; import org.apache.hadoop.hbase.ServerName; @@ -98,7 +98,7 @@ public class TestRSGroupBasedLoadBalancerWithStochasticLoadBalancerAsInternal * Test HBASE-20791 */ @Test - public void testBalanceCluster() throws IOException { + public void testBalanceCluster() throws HBaseIOException { // mock cluster State Map> clusterState = new HashMap>(); ServerName serverA = servers.get(0); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsAdmin1.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsAdmin1.java index 74714586301..27511e30794 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsAdmin1.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsAdmin1.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.SortedSet; + import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.NamespaceDescriptor; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsAdmin2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsAdmin2.java index 407737a2b1a..6553a85c938 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsAdmin2.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsAdmin2.java @@ -21,6 +21,7 @@ import static org.apache.hadoop.hbase.rsgroup.RSGroupAdminServer.DEFAULT_MAX_RET import static org.apache.hadoop.hbase.util.Threads.sleep; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -407,9 +408,7 @@ public class TestRSGroupsAdmin2 extends TestRSGroupsBase { assertTrue(newGroupTables.contains(tableName)); // verify that all region still assgin on targetServer - // TODO: uncomment after we reimplement moveServersAndTables, now the implementation is - // moveServers first and then moveTables, so the region will be moved to other region servers. - // Assert.assertEquals(5, getTableServerRegionMap().get(tableName).get(targetServer).size()); + Assert.assertEquals(5, getTableServerRegionMap().get(tableName).get(targetServer).size()); assertTrue(observer.preMoveServersAndTables); assertTrue(observer.postMoveServersAndTables); @@ -504,6 +503,61 @@ public class TestRSGroupsAdmin2 extends TestRSGroupsBase { }); } + @Test + public void testFailedMoveBeforeRetryExhaustedWhenMoveTable() throws Exception { + final RSGroupInfo newGroup = addGroup(getGroupName(name.getMethodName()), 1); + Pair gotPair = createTableWithRegionSplitting(newGroup, + 5); + + // move table to group + Thread t2 = new Thread(() -> { + LOG.info("thread2 start running, to move regions"); + try { + rsGroupAdmin.moveTables(Sets.newHashSet(tableName), newGroup.getName()); + } catch (IOException e) { + LOG.error("move server error", e); + } + }); + t2.start(); + + // start thread to recover region state + final ServerName ss = gotPair.getFirst(); + final RegionStateNode rsn = gotPair.getSecond(); + AtomicBoolean changed = new AtomicBoolean(false); + + Thread t1 = recoverRegionStateThread(ss, server -> { + List regions = master.getAssignmentManager().getRegionsOnServer(ss); + List tableRegions = new ArrayList<>(); + for (RegionInfo regionInfo : regions) { + if (regionInfo.getTable().equals(tableName)) { + tableRegions.add(regionInfo); + } + } + return tableRegions; + }, rsn, changed); + t1.start(); + + t1.join(); + t2.join(); + + TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate() { + @Override + public boolean evaluate() { + if (changed.get()) { + boolean serverHasTableRegions = false; + for (RegionInfo regionInfo : master.getAssignmentManager().getRegionsOnServer(ss)) { + if (regionInfo.getTable().equals(tableName)) { + serverHasTableRegions = true; + break; + } + } + return !serverHasTableRegions && !rsn.getRegionLocation().equals(ss); + } + return false; + } + }); + } + private Thread recoverRegionStateThread(T owner, Function> getRegions, RegionStateNode rsn, AtomicBoolean changed){ return new Thread(() -> { @@ -596,6 +650,50 @@ public class TestRSGroupsAdmin2 extends TestRSGroupsBase { return new Pair<>(srcServer, rsn); } + @Test + public void testFailedMoveTablesAndRepair() throws Exception{ + // This UT calls moveTables() twice to test the idempotency of it. + // The first time, movement fails because a region is made in SPLITTING state + // which will not be moved. + // The second time, the region state is OPEN and check if all + // regions on target group servers after the call. + final RSGroupInfo newGroup = addGroup(getGroupName(name.getMethodName()), 1); + Iterator iterator = newGroup.getServers().iterator(); + Address newGroupServer1 = (Address) iterator.next(); + + // create table + // randomly set a region state to SPLITTING to make move abort + Pair gotPair = createTableWithRegionSplitting(newGroup, + new Random().nextInt(8) + 4); + RegionStateNode rsn = gotPair.getSecond(); + + // move table to newGroup and check regions + try { + rsGroupAdmin.moveTables(Sets.newHashSet(tableName), newGroup.getName()); + fail("should get IOException when retry exhausted but there still exists failed moved " + + "regions"); + }catch (Exception e){ + assertTrue(e.getMessage().contains( + gotPair.getSecond().getRegionInfo().getRegionNameAsString())); + } + for(RegionInfo regionInfo : master.getAssignmentManager().getAssignedRegions()){ + if (regionInfo.getTable().equals(tableName) && regionInfo.equals(rsn.getRegionInfo())) { + assertNotEquals(master.getAssignmentManager().getRegionStates() + .getRegionServerOfRegion(regionInfo).getAddress(), newGroupServer1); + } + } + + // retry move table to newGroup and check if all regions are corrected + rsn.setState(RegionState.State.OPEN); + rsGroupAdmin.moveTables(Sets.newHashSet(tableName), newGroup.getName()); + for(RegionInfo regionInfo : master.getAssignmentManager().getAssignedRegions()){ + if (regionInfo.getTable().equals(tableName)) { + assertEquals(master.getAssignmentManager().getRegionStates() + .getRegionServerOfRegion(regionInfo).getAddress(), newGroupServer1); + } + } + } + @Test public void testFailedMoveServersAndRepair() throws Exception{ // This UT calls moveServers() twice to test the idempotency of it. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsBalance.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsBalance.java index 8d10850a073..67f5c7ee757 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsBalance.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsBalance.java @@ -45,6 +45,8 @@ import org.junit.experimental.categories.Category; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.collect.Sets; + @Category({ MediumTests.class }) public class TestRSGroupsBalance extends TestRSGroupsBase { @@ -151,21 +153,19 @@ public class TestRSGroupsBalance extends TestRSGroupsBase { @Test public void testMisplacedRegions() throws Exception { - String namespace = tablePrefix + "_" + name.getMethodName(); - TEST_UTIL.getAdmin().createNamespace(NamespaceDescriptor.create(namespace).build()); - final TableName tableName = - TableName.valueOf(namespace, tablePrefix + "_" + name.getMethodName()); - LOG.info(name.getMethodName()); + final TableName tableName = TableName.valueOf(tablePrefix + "_testMisplacedRegions"); + LOG.info("testMisplacedRegions"); - final RSGroupInfo rsGroupInfo = addGroup(name.getMethodName(), 1); + final RSGroupInfo RSGroupInfo = addGroup("testMisplacedRegions", 1); TEST_UTIL.createMultiRegionTable(tableName, new byte[] { 'f' }, 15); TEST_UTIL.waitUntilAllRegionsAssigned(tableName); - TEST_UTIL.getAdmin().modifyNamespace(NamespaceDescriptor.create(namespace) - .addConfiguration(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP, rsGroupInfo.getName()).build()); + + rsGroupAdminEndpoint.getGroupInfoManager().moveTables(Sets.newHashSet(tableName), + RSGroupInfo.getName()); admin.balancerSwitch(true, true); - assertTrue(rsGroupAdmin.balanceRSGroup(rsGroupInfo.getName())); + assertTrue(rsGroupAdmin.balanceRSGroup(RSGroupInfo.getName())); admin.balancerSwitch(false, true); assertTrue(observer.preBalanceRSGroupCalled); assertTrue(observer.postBalanceRSGroupCalled); @@ -174,7 +174,7 @@ public class TestRSGroupsBalance extends TestRSGroupsBase { @Override public boolean evaluate() throws Exception { ServerName serverName = - ServerName.valueOf(rsGroupInfo.getServers().iterator().next().toString(), 1); + ServerName.valueOf(RSGroupInfo.getServers().iterator().next().toString(), 1); return admin.getConnection().getAdmin().getRegions(serverName).size() == 15; } }); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsBase.java index 464410f5cbe..c5520cf11f1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsBase.java @@ -76,7 +76,7 @@ public abstract class TestRSGroupsBase { protected static HBaseTestingUtility TEST_UTIL; protected static Admin admin; protected static HBaseCluster cluster; - protected static RSGroupAdminClient rsGroupAdmin; + protected static RSGroupAdmin rsGroupAdmin; protected static HMaster master; protected boolean INIT = false; protected static RSGroupAdminEndpoint rsGroupAdminEndpoint; @@ -190,8 +190,8 @@ public abstract class TestRSGroupsBase { RSGroupInfo defaultInfo = rsGroupAdmin.getRSGroupInfo(RSGroupInfo.DEFAULT_GROUP); rsGroupAdmin.addRSGroup(groupName); Set
set = new HashSet<>(); - for (Address server : defaultInfo.getServers()) { - if (set.size() == serverCount) { + for(Address server: defaultInfo.getServers()) { + if(set.size() == serverCount) { break; } set.add(server); @@ -224,7 +224,7 @@ public abstract class TestRSGroupsBase { } public void deleteGroups() throws IOException { - RSGroupAdminClient groupAdmin = new RSGroupAdminClient(TEST_UTIL.getConnection()); + RSGroupAdmin groupAdmin = new RSGroupAdminClient(TEST_UTIL.getConnection()); for(RSGroupInfo group: groupAdmin.listRSGroups()) { if(!group.getName().equals(RSGroupInfo.DEFAULT_GROUP)) { groupAdmin.moveTables(group.getTables(), RSGroupInfo.DEFAULT_GROUP); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsOfflineMode.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsOfflineMode.java index d3577f24e73..60887e4219c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsOfflineMode.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsOfflineMode.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.rsgroup; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -112,7 +113,7 @@ public class TestRSGroupsOfflineMode { final HRegionServer groupRS = ((MiniHBaseCluster) cluster).getRegionServer(1); final HRegionServer failoverRS = ((MiniHBaseCluster) cluster).getRegionServer(2); String newGroup = "my_group"; - RSGroupAdminClient groupAdmin = new RSGroupAdminClient(TEST_UTIL.getConnection()); + RSGroupAdmin groupAdmin = new RSGroupAdminClient(TEST_UTIL.getConnection()); groupAdmin.addRSGroup(newGroup); if (master.getAssignmentManager().getRegionStates().getRegionAssignments() .containsValue(failoverRS.getServerName())) { @@ -167,6 +168,9 @@ public class TestRSGroupsOfflineMode { .getMasterCoprocessorHost().findCoprocessor(RSGroupAdminEndpoint.class).getGroupInfoManager(); // Make sure balancer is in offline mode, since this is what we're testing. assertFalse(groupMgr.isOnline()); + // Verify the group affiliation that's loaded from ZK instead of tables. + assertEquals(newGroup, groupMgr.getRSGroupOfTable(RSGroupInfoManagerImpl.RSGROUP_TABLE_NAME)); + assertEquals(RSGroupInfo.DEFAULT_GROUP, groupMgr.getRSGroupOfTable(failoverTable)); // Kill final regionserver to see the failover happens for all tables except GROUP table since // it's group does not have any online RS. killRS.stop("die"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdminClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdminClient.java index a8cd277e055..fcaf1a79121 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdminClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdminClient.java @@ -17,26 +17,17 @@ */ package org.apache.hadoop.hbase.rsgroup; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - import java.io.ByteArrayInputStream; import java.io.IOException; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -46,20 +37,22 @@ import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.hbase.zookeeper.ZNodePaths; import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; +import org.junit.Assert; import org.apache.hbase.thirdparty.com.google.common.collect.Maps; import org.apache.hbase.thirdparty.com.google.common.collect.Sets; @InterfaceAudience.Private -public class VerifyingRSGroupAdminClient extends RSGroupAdminClient { - private Connection conn; +public class VerifyingRSGroupAdminClient implements RSGroupAdmin { + private Table table; private ZKWatcher zkw; - private RSGroupAdminClient wrapped; + private RSGroupAdmin wrapped; - public VerifyingRSGroupAdminClient(RSGroupAdminClient RSGroupAdmin, Configuration conf) + public VerifyingRSGroupAdminClient(RSGroupAdmin RSGroupAdmin, Configuration conf) throws IOException { wrapped = RSGroupAdmin; - conn = ConnectionFactory.createConnection(conf); + table = ConnectionFactory.createConnection(conf) + .getTable(RSGroupInfoManagerImpl.RSGROUP_TABLE_NAME); zkw = new ZKWatcher(conf, this.getClass().getSimpleName(), null); } @@ -128,41 +121,31 @@ public class VerifyingRSGroupAdminClient extends RSGroupAdminClient { public void verify() throws IOException { Map groupMap = Maps.newHashMap(); Set zList = Sets.newHashSet(); - List tds = new ArrayList<>(); - try (Admin admin = conn.getAdmin()) { - tds.addAll(admin.listTableDescriptors()); - tds.addAll(admin.listTableDescriptorsByNamespace(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME)); + + for (Result result : table.getScanner(new Scan())) { + RSGroupProtos.RSGroupInfo proto = + RSGroupProtos.RSGroupInfo.parseFrom( + result.getValue( + RSGroupInfoManagerImpl.META_FAMILY_BYTES, + RSGroupInfoManagerImpl.META_QUALIFIER_BYTES)); + groupMap.put(proto.getName(), ProtobufUtil.toGroupInfo(proto)); } - try (Table table = conn.getTable(RSGroupInfoManagerImpl.RSGROUP_TABLE_NAME); - ResultScanner scanner = table.getScanner(new Scan())) { - for (;;) { - Result result = scanner.next(); - if (result == null) { - break; - } - RSGroupProtos.RSGroupInfo proto = RSGroupProtos.RSGroupInfo.parseFrom(result.getValue( - RSGroupInfoManagerImpl.META_FAMILY_BYTES, RSGroupInfoManagerImpl.META_QUALIFIER_BYTES)); - RSGroupInfo rsGroupInfo = ProtobufUtil.toGroupInfo(proto); - groupMap.put(proto.getName(), RSGroupUtil.fillTables(rsGroupInfo, tds)); - } - } - assertEquals(Sets.newHashSet(groupMap.values()), Sets.newHashSet(wrapped.listRSGroups())); + Assert.assertEquals(Sets.newHashSet(groupMap.values()), + Sets.newHashSet(wrapped.listRSGroups())); try { String groupBasePath = ZNodePaths.joinZNode(zkw.getZNodePaths().baseZNode, "rsgroup"); - for (String znode : ZKUtil.listChildrenNoWatch(zkw, groupBasePath)) { + for(String znode: ZKUtil.listChildrenNoWatch(zkw, groupBasePath)) { byte[] data = ZKUtil.getData(zkw, ZNodePaths.joinZNode(groupBasePath, znode)); - if (data.length > 0) { + if(data.length > 0) { ProtobufUtil.expectPBMagicPrefix(data); - ByteArrayInputStream bis = - new ByteArrayInputStream(data, ProtobufUtil.lengthOfPBMagic(), data.length); - RSGroupInfo rsGroupInfo = - ProtobufUtil.toGroupInfo(RSGroupProtos.RSGroupInfo.parseFrom(bis)); - zList.add(RSGroupUtil.fillTables(rsGroupInfo, tds)); + ByteArrayInputStream bis = new ByteArrayInputStream( + data, ProtobufUtil.lengthOfPBMagic(), data.length); + zList.add(ProtobufUtil.toGroupInfo(RSGroupProtos.RSGroupInfo.parseFrom(bis))); } } - assertEquals(zList.size(), groupMap.size()); - for (RSGroupInfo rsGroupInfo : zList) { - assertTrue(groupMap.get(rsGroupInfo.getName()).equals(rsGroupInfo)); + Assert.assertEquals(zList.size(), groupMap.size()); + for(RSGroupInfo RSGroupInfo : zList) { + Assert.assertTrue(groupMap.get(RSGroupInfo.getName()).equals(RSGroupInfo)); } } catch (KeeperException e) { throw new IOException("ZK verification failed", e);