From ce64e7eb6e9a6c3954b8ab6b5441ca6e4f952b26 Mon Sep 17 00:00:00 2001 From: Apekshit Sharma Date: Thu, 16 Feb 2017 02:00:37 -0800 Subject: [PATCH] HBASE-17654 RSGroup refactoring. Changes contain: - Making rsGroupInfoManager non-static in RSGroupAdminEndpoint - Encapsulate RSGroupAdminService into an internal class in RSGroupAdminEndpoint (on need of inheritence). - Change two internal classes in RSGroupAdminServer to non-static (so outer classes' variables can be shared). - Rename RSGroupSerDe to RSGroupProtobufUtil('ProtobufUtil' is what we use in other places). Moved 2 functions to RSGroupManagerImpl because they are only used there. - Javadoc comments - Improving variable names - Maybe other misc refactoring Change-Id: I09f0f5aa413150390c91795b8a8fd5e6cdd6c416 --- .../hadoop/hbase/rsgroup/RSGroupInfo.java | 30 +- .../hbase/rsgroup/IntegrationTestRSGroup.java | 6 +- .../hadoop/hbase/rsgroup/RSGroupAdmin.java | 52 +-- .../hbase/rsgroup/RSGroupAdminClient.java | 77 ++-- .../hbase/rsgroup/RSGroupAdminEndpoint.java | 352 +++++++++-------- .../hbase/rsgroup/RSGroupAdminServer.java | 154 +++----- .../rsgroup/RSGroupBasedLoadBalancer.java | 34 +- .../hbase/rsgroup/RSGroupInfoManager.java | 53 +-- .../hbase/rsgroup/RSGroupInfoManagerImpl.java | 357 +++++++++--------- .../hbase/rsgroup/RSGroupProtobufUtil.java | 60 +++ .../hadoop/hbase/rsgroup/RSGroupSerDe.java | 121 ------ .../hbase/rsgroup/RSGroupableBalancer.java | 5 +- .../TestRSGroupBasedLoadBalancer.java | 5 +- .../hadoop/hbase/rsgroup/TestRSGroups.java | 14 +- .../hbase/rsgroup/TestRSGroupsBase.java | 102 +++-- .../rsgroup/TestRSGroupsOfflineMode.java | 94 ++--- .../rsgroup/VerifyingRSGroupAdminClient.java | 8 +- 17 files changed, 658 insertions(+), 866 deletions(-) create mode 100644 hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupProtobufUtil.java delete mode 100644 hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupSerDe.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfo.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfo.java index ed0aec1d446..35563c5796c 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfo.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfo.java @@ -34,15 +34,14 @@ import org.apache.hadoop.hbase.net.Address; @InterfaceAudience.Public @InterfaceStability.Evolving public class RSGroupInfo { - public static final String DEFAULT_GROUP = "default"; - public static final String NAMESPACEDESC_PROP_GROUP = "hbase.rsgroup.name"; + public static final String NAMESPACE_DESC_PROP_GROUP = "hbase.rsgroup.name"; - private String name; + private final String name; // Keep servers in a sorted set so has an expected ordering when displayed. - private SortedSet
servers; + private final SortedSet
servers; // Keep tables sorted too. - private SortedSet tables; + private final SortedSet tables; public RSGroupInfo(String name) { this(name, new TreeSet
(), new TreeSet()); @@ -50,7 +49,7 @@ public class RSGroupInfo { RSGroupInfo(String name, SortedSet
servers, SortedSet tables) { this.name = name; - this.servers = servers == null? new TreeSet
(): servers; + this.servers = servers == null? new TreeSet<>(): servers; this.servers.addAll(servers); this.tables = new TreeSet<>(tables); } @@ -61,26 +60,20 @@ public class RSGroupInfo { /** * Get group name. - * - * @return group name */ public String getName() { return name; } /** - * Adds the server to the group. - * - * @param hostPort the server + * Adds the given server to the group. */ public void addServer(Address hostPort){ servers.add(hostPort); } /** - * Adds a group of servers. - * - * @param hostPort the servers + * Adds the given servers to the group. */ public void addAllServers(Collection
hostPort){ servers.addAll(hostPort); @@ -96,25 +89,20 @@ public class RSGroupInfo { /** * Get list of servers. - * - * @return set of servers */ public Set
getServers() { return servers; } /** - * Remove a server from this group. - * - * @param hostPort HostPort of the server to remove + * Remove given server from the group. */ public boolean removeServer(Address hostPort) { return servers.remove(hostPort); } /** - * Set of tables that are members of this group - * @return set of tables + * Get set of tables that are members of the group. */ public SortedSet getTables() { return tables; diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/rsgroup/IntegrationTestRSGroup.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/rsgroup/IntegrationTestRSGroup.java index da94bd7661e..6b1f1a772a9 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/rsgroup/IntegrationTestRSGroup.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/rsgroup/IntegrationTestRSGroup.java @@ -29,20 +29,18 @@ import org.junit.BeforeClass; import org.junit.experimental.categories.Category; /** - * Runs all of the units tests defined in TestGroupBase - * as an integration test. + * Runs all of the units tests defined in TestGroupBase as an integration test. * Requires TestRSGroupBase.NUM_SLAVE_BASE servers to run. */ @Category(IntegrationTests.class) public class IntegrationTestRSGroup extends TestRSGroupsBase { - //Integration specific private final static Log LOG = LogFactory.getLog(IntegrationTestRSGroup.class); private static boolean initialized = false; @BeforeClass public void beforeMethod() throws Exception { if(!initialized) { - LOG.info("Setting up IntegrationTestGroup"); + LOG.info("Setting up IntegrationTestRSGroup"); LOG.info("Initializing cluster with " + NUM_SLAVES_BASE + " servers"); TEST_UTIL = new IntegrationTestingUtility(); ((IntegrationTestingUtility)TEST_UTIL).initializeCluster(NUM_SLAVES_BASE); diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdmin.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdmin.java index 72b5f6e1ec5..20fdaa28034 100644 --- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdmin.java +++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdmin.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hbase.rsgroup; -import java.io.Closeable; import java.io.IOException; import java.util.List; import java.util.Set; @@ -32,76 +31,51 @@ import org.apache.hadoop.hbase.net.Address; @InterfaceAudience.Private public interface RSGroupAdmin { /** - * Gets the regionserver group information. - * - * @param groupName the group name - * @return An instance of RSGroupInfo + * Gets {@code RSGroupInfo} for given group name. */ RSGroupInfo getRSGroupInfo(String groupName) throws IOException; /** - * Gets the regionserver group info of table. - * - * @param tableName the table name - * @return An instance of RSGroupInfo. + * Gets {@code RSGroupInfo} for the given table's group. */ RSGroupInfo getRSGroupInfoOfTable(TableName tableName) throws IOException; /** - * Move a set of serves to another group - * - * - * @param servers set of servers, must be in the form HOST:PORT - * @param targetGroup the target group - * @throws java.io.IOException Signals that an I/O exception has occurred. + * Move given set of servers to the specified target RegionServer group. */ - void moveServers(Set
servers, String targetGroup) - throws IOException; + void moveServers(Set
servers, String targetGroup) throws IOException; /** - * Move tables to a new group. + * Move given set of tables to the specified target RegionServer group. * This will unassign all of a table's region so it can be reassigned to the correct group. - * @param tables list of tables to move - * @param targetGroup target group - * @throws java.io.IOException on failure to move tables */ void moveTables(Set tables, String targetGroup) throws IOException; /** - * Add a new group - * @param name name of the group - * @throws java.io.IOException on failure to add group + * Creates a new RegionServer group with the given name. */ - void addRSGroup(String name) throws IOException; + void addRSGroup(String groupName) throws IOException; /** - * Remove a regionserver group - * @param name name of the group - * @throws java.io.IOException on failure to remove group + * Removes RegionServer group associated with the given name. */ - void removeRSGroup(String name) throws IOException; + void removeRSGroup(String groupName) throws IOException; /** - * Balance the regions in a group + * Balance regions in the given RegionServer group. * - * @param name the name of the group to balance - * @return boolean whether balance ran or not - * @throws java.io.IOException on unexpected failure to balance group + * @return boolean Whether balance ran or not */ - boolean balanceRSGroup(String name) throws IOException; + boolean balanceRSGroup(String groupName) throws IOException; /** - * Lists the existing groups. - * - * @return Collection of RSGroupInfo. + * Lists current set of RegionServer groups. */ List listRSGroups() throws IOException; /** * Retrieve the RSGroupInfo a server is affiliated to * @param hostPort HostPort to get RSGroupInfo for - * @return RSGroupInfo associated with the server - * @throws java.io.IOException on unexpected failure to retrieve GroupInfo */ RSGroupInfo getRSGroupOfServer(Address hostPort) throws IOException; } \ No newline at end of file diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminClient.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminClient.java index 381fc600b0c..74e91fea981 100644 --- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminClient.java +++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminClient.java @@ -28,36 +28,42 @@ import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; -import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos; +import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.AddRSGroupRequest; +import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupRequest; +import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerRequest; import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerResponse; +import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableRequest; import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableResponse; +import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoRequest; +import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoResponse; +import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosRequest; +import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.MoveServersRequest; +import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.MoveTablesRequest; +import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.RSGroupAdminService; +import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupRequest; import org.apache.hadoop.hbase.protobuf.generated.RSGroupProtos; import com.google.common.collect.Sets; import com.google.protobuf.ServiceException; - /** * Client used for managing region server group information. */ @InterfaceAudience.Private class RSGroupAdminClient implements RSGroupAdmin { - private RSGroupAdminProtos.RSGroupAdminService.BlockingInterface stub; + private RSGroupAdminService.BlockingInterface stub; public RSGroupAdminClient(Connection conn) throws IOException { - stub = RSGroupAdminProtos.RSGroupAdminService.newBlockingStub( - conn.getAdmin().coprocessorService()); + stub = RSGroupAdminService.newBlockingStub(conn.getAdmin().coprocessorService()); } @Override public RSGroupInfo getRSGroupInfo(String groupName) throws IOException { try { - RSGroupAdminProtos.GetRSGroupInfoResponse resp = - stub.getRSGroupInfo(null, - RSGroupAdminProtos.GetRSGroupInfoRequest.newBuilder() - .setRSGroupName(groupName).build()); + GetRSGroupInfoResponse resp = stub.getRSGroupInfo(null, + GetRSGroupInfoRequest.newBuilder().setRSGroupName(groupName).build()); if(resp.hasRSGroupInfo()) { - return RSGroupSerDe.toGroupInfo(resp.getRSGroupInfo()); + return RSGroupProtobufUtil.toGroupInfo(resp.getRSGroupInfo()); } return null; } catch (ServiceException e) { @@ -67,14 +73,12 @@ class RSGroupAdminClient implements RSGroupAdmin { @Override public RSGroupInfo getRSGroupInfoOfTable(TableName tableName) throws IOException { - RSGroupAdminProtos.GetRSGroupInfoOfTableRequest request = - RSGroupAdminProtos.GetRSGroupInfoOfTableRequest.newBuilder() - .setTableName(ProtobufUtil.toProtoTableName(tableName)).build(); - + GetRSGroupInfoOfTableRequest request = GetRSGroupInfoOfTableRequest.newBuilder().setTableName( + ProtobufUtil.toProtoTableName(tableName)).build(); try { GetRSGroupInfoOfTableResponse resp = stub.getRSGroupInfoOfTable(null, request); if (resp.hasRSGroupInfo()) { - return RSGroupSerDe.toGroupInfo(resp.getRSGroupInfo()); + return RSGroupProtobufUtil.toGroupInfo(resp.getRSGroupInfo()); } return null; } catch (ServiceException e) { @@ -91,11 +95,10 @@ class RSGroupAdminClient implements RSGroupAdmin { .setPort(el.getPort()) .build()); } - RSGroupAdminProtos.MoveServersRequest request = - RSGroupAdminProtos.MoveServersRequest.newBuilder() + MoveServersRequest request = MoveServersRequest.newBuilder() .setTargetGroup(targetGroup) - .addAllServers(hostPorts).build(); - + .addAllServers(hostPorts) + .build(); try { stub.moveServers(null, request); } catch (ServiceException e) { @@ -105,9 +108,7 @@ class RSGroupAdminClient implements RSGroupAdmin { @Override public void moveTables(Set tables, String targetGroup) throws IOException { - RSGroupAdminProtos.MoveTablesRequest.Builder builder = - RSGroupAdminProtos.MoveTablesRequest.newBuilder() - .setTargetGroup(targetGroup); + MoveTablesRequest.Builder builder = MoveTablesRequest.newBuilder().setTargetGroup(targetGroup); for(TableName tableName: tables) { builder.addTableName(ProtobufUtil.toProtoTableName(tableName)); } @@ -120,9 +121,7 @@ class RSGroupAdminClient implements RSGroupAdmin { @Override public void addRSGroup(String groupName) throws IOException { - RSGroupAdminProtos.AddRSGroupRequest request = - RSGroupAdminProtos.AddRSGroupRequest.newBuilder() - .setRSGroupName(groupName).build(); + AddRSGroupRequest request = AddRSGroupRequest.newBuilder().setRSGroupName(groupName).build(); try { stub.addRSGroup(null, request); } catch (ServiceException e) { @@ -132,9 +131,7 @@ class RSGroupAdminClient implements RSGroupAdmin { @Override public void removeRSGroup(String name) throws IOException { - RSGroupAdminProtos.RemoveRSGroupRequest request = - RSGroupAdminProtos.RemoveRSGroupRequest.newBuilder() - .setRSGroupName(name).build(); + RemoveRSGroupRequest request = RemoveRSGroupRequest.newBuilder().setRSGroupName(name).build(); try { stub.removeRSGroup(null, request); } catch (ServiceException e) { @@ -143,11 +140,9 @@ class RSGroupAdminClient implements RSGroupAdmin { } @Override - public boolean balanceRSGroup(String name) throws IOException { - RSGroupAdminProtos.BalanceRSGroupRequest request = - RSGroupAdminProtos.BalanceRSGroupRequest.newBuilder() - .setRSGroupName(name).build(); - + public boolean balanceRSGroup(String groupName) throws IOException { + BalanceRSGroupRequest request = BalanceRSGroupRequest.newBuilder() + .setRSGroupName(groupName).build(); try { return stub.balanceRSGroup(null, request).getBalanceRan(); } catch (ServiceException e) { @@ -158,12 +153,11 @@ class RSGroupAdminClient implements RSGroupAdmin { @Override public List listRSGroups() throws IOException { try { - List resp = - stub.listRSGroupInfos(null, - RSGroupAdminProtos.ListRSGroupInfosRequest.newBuilder().build()).getRSGroupInfoList(); - List result = new ArrayList(resp.size()); - for(RSGroupProtos.RSGroupInfo entry: resp) { - result.add(RSGroupSerDe.toGroupInfo(entry)); + List resp = stub.listRSGroupInfos(null, + ListRSGroupInfosRequest.getDefaultInstance()).getRSGroupInfoList(); + List result = new ArrayList<>(resp.size()); + for(RSGroupProtos.RSGroupInfo entry : resp) { + result.add(RSGroupProtobufUtil.toGroupInfo(entry)); } return result; } catch (ServiceException e) { @@ -173,8 +167,7 @@ class RSGroupAdminClient implements RSGroupAdmin { @Override public RSGroupInfo getRSGroupOfServer(Address hostPort) throws IOException { - RSGroupAdminProtos.GetRSGroupInfoOfServerRequest request = - RSGroupAdminProtos.GetRSGroupInfoOfServerRequest.newBuilder() + GetRSGroupInfoOfServerRequest request = GetRSGroupInfoOfServerRequest.newBuilder() .setServer(HBaseProtos.ServerName.newBuilder() .setHostName(hostPort.getHostname()) .setPort(hostPort.getPort()) @@ -183,7 +176,7 @@ class RSGroupAdminClient implements RSGroupAdmin { try { GetRSGroupInfoOfServerResponse resp = stub.getRSGroupInfoOfServer(null, request); if (resp.hasRSGroupInfo()) { - return RSGroupSerDe.toGroupInfo(resp.getRSGroupInfo()); + return RSGroupProtobufUtil.toGroupInfo(resp.getRSGroupInfo()); } return null; } catch (ServiceException e) { diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java index 3dc88170daf..b917716b285 100644 --- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java +++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java @@ -20,22 +20,21 @@ package org.apache.hadoop.hbase.rsgroup; import java.io.IOException; import java.util.HashSet; -import java.util.List; import java.util.Set; -import org.apache.hadoop.hbase.Coprocessor; +import com.google.common.collect.Sets; +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; +import com.google.protobuf.Service; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.CoprocessorEnvironment; -import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.NamespaceDescriptor; -import org.apache.hadoop.hbase.ProcedureInfo; -import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.MasterSwitchType; -import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.constraint.ConstraintException; import org.apache.hadoop.hbase.coprocessor.CoprocessorService; import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment; @@ -43,12 +42,7 @@ import org.apache.hadoop.hbase.coprocessor.MasterObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; import org.apache.hadoop.hbase.master.MasterServices; -import org.apache.hadoop.hbase.master.RegionPlan; -import org.apache.hadoop.hbase.master.locking.LockProcedure; -import org.apache.hadoop.hbase.master.locking.LockProcedure.LockType; -import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.net.Address; -import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos; @@ -71,228 +65,231 @@ import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.MoveTablesR import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.RSGroupAdminService; import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupRequest; import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.SnapshotDescription; -import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas; - -import com.google.common.collect.Sets; -import com.google.protobuf.RpcCallback; -import com.google.protobuf.RpcController; -import com.google.protobuf.Service; - @InterfaceAudience.Private -public class RSGroupAdminEndpoint extends RSGroupAdminService implements CoprocessorService, - Coprocessor, MasterObserver { - private MasterServices master = null; +public class RSGroupAdminEndpoint implements MasterObserver, CoprocessorService { + private static final Log LOG = LogFactory.getLog(RSGroupAdminEndpoint.class); - // TODO: Static? Fix. - private static RSGroupInfoManager groupInfoManager; + private MasterServices master = null; + // Only instance of RSGroupInfoManager. RSGroup aware load balancers ask for this instance on + // their setup. + private RSGroupInfoManager groupInfoManager; private RSGroupAdminServer groupAdminServer; + private final RSGroupAdminService groupAdminService = new RSGroupAdminServiceImpl(); @Override public void start(CoprocessorEnvironment env) throws IOException { - MasterCoprocessorEnvironment menv = (MasterCoprocessorEnvironment)env; - master = menv.getMasterServices(); - setGroupInfoManager(new RSGroupInfoManagerImpl(master)); + master = ((MasterCoprocessorEnvironment)env).getMasterServices(); + groupInfoManager = RSGroupInfoManagerImpl.getInstance(master); groupAdminServer = new RSGroupAdminServer(master, groupInfoManager); Class clazz = master.getConfiguration().getClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, null); if (!RSGroupableBalancer.class.isAssignableFrom(clazz)) { - throw new IOException("Configured balancer is not a GroupableBalancer"); + throw new IOException("Configured balancer does not support RegionServer groups."); } } - @Override - public void stop(CoprocessorEnvironment env) throws IOException { - } - @Override public Service getService() { - return this; - } - - private static void setStaticGroupInfoManager(RSGroupInfoManagerImpl groupInfoManager) { - RSGroupAdminEndpoint.groupInfoManager = groupInfoManager; - } - - private void setGroupInfoManager(RSGroupInfoManagerImpl groupInfoManager) throws IOException { - if (groupInfoManager == null) { - groupInfoManager = new RSGroupInfoManagerImpl(master); - groupInfoManager.init(); - } else if (!groupInfoManager.isInit()) { - groupInfoManager.init(); - } - setStaticGroupInfoManager(groupInfoManager); + return groupAdminService; } RSGroupInfoManager getGroupInfoManager() { return groupInfoManager; } - @Override - public void getRSGroupInfo(RpcController controller, - GetRSGroupInfoRequest request, - RpcCallback done) { - GetRSGroupInfoResponse.Builder builder = - GetRSGroupInfoResponse.newBuilder(); - String groupName = request.getRSGroupName(); - try { - RSGroupInfo rsGroupInfo = groupAdminServer.getRSGroupInfo(groupName); - if (rsGroupInfo != null) { - builder.setRSGroupInfo(RSGroupSerDe.toProtoGroupInfo(rsGroupInfo)); + /** + * Implementation of RSGroupAdminService defined in RSGroupAdmin.proto. + * This class calls {@link RSGroupAdminServer} for actual work, converts result to protocol + * buffer response, handles exceptions if any occurred and then calls the {@code RpcCallback} with + * the response. + * Since our CoprocessorHost asks the Coprocessor for a Service + * ({@link CoprocessorService#getService()}) instead of doing "coproc instanceOf Service" + * and requiring Coprocessor itself to be Service (something we do with our observers), + * we can use composition instead of inheritance here. That makes it easy to manage + * functionalities in concise classes (sometimes inner classes) instead of single class doing + * many different things. + */ + private class RSGroupAdminServiceImpl extends RSGroupAdminProtos.RSGroupAdminService { + @Override + public void getRSGroupInfo(RpcController controller, + GetRSGroupInfoRequest request, RpcCallback done) { + GetRSGroupInfoResponse.Builder builder = GetRSGroupInfoResponse.newBuilder(); + String groupName = request.getRSGroupName(); + try { + RSGroupInfo rsGroupInfo = groupAdminServer.getRSGroupInfo(groupName); + if (rsGroupInfo != null) { + builder.setRSGroupInfo(RSGroupProtobufUtil.toProtoGroupInfo(rsGroupInfo)); + } + } catch (IOException e) { + CoprocessorRpcUtils.setControllerException(controller, e); } - } catch (IOException e) { - CoprocessorRpcUtils.setControllerException(controller, e); + done.run(builder.build()); } - done.run(builder.build()); - } - @Override - public void getRSGroupInfoOfTable(RpcController controller, - GetRSGroupInfoOfTableRequest request, - RpcCallback done) { - GetRSGroupInfoOfTableResponse.Builder builder = - GetRSGroupInfoOfTableResponse.newBuilder(); - try { - TableName tableName = ProtobufUtil.toTableName(request.getTableName()); - RSGroupInfo RSGroupInfo = groupAdminServer.getRSGroupInfoOfTable(tableName); - if (RSGroupInfo != null) { - builder.setRSGroupInfo(RSGroupSerDe.toProtoGroupInfo(RSGroupInfo)); + @Override + public void getRSGroupInfoOfTable(RpcController controller, + GetRSGroupInfoOfTableRequest request, RpcCallback done) { + GetRSGroupInfoOfTableResponse.Builder builder = GetRSGroupInfoOfTableResponse.newBuilder(); + try { + TableName tableName = ProtobufUtil.toTableName(request.getTableName()); + RSGroupInfo RSGroupInfo = groupAdminServer.getRSGroupInfoOfTable(tableName); + if (RSGroupInfo != null) { + builder.setRSGroupInfo(RSGroupProtobufUtil.toProtoGroupInfo(RSGroupInfo)); + } + } catch (IOException e) { + CoprocessorRpcUtils.setControllerException(controller, e); } - } catch (IOException e) { - CoprocessorRpcUtils.setControllerException(controller, e); + done.run(builder.build()); } - done.run(builder.build()); - } - @Override - public void moveServers(RpcController controller, - MoveServersRequest request, - RpcCallback done) { - RSGroupAdminProtos.MoveServersResponse.Builder builder = - RSGroupAdminProtos.MoveServersResponse.newBuilder(); - try { - Set
hostPorts = Sets.newHashSet(); - for(HBaseProtos.ServerName el: request.getServersList()) { - hostPorts.add(Address.fromParts(el.getHostName(), el.getPort())); + @Override + public void moveServers(RpcController controller, MoveServersRequest request, + RpcCallback done) { + MoveServersResponse.Builder builder = MoveServersResponse.newBuilder(); + try { + Set
hostPorts = Sets.newHashSet(); + for (HBaseProtos.ServerName el : request.getServersList()) { + hostPorts.add(Address.fromParts(el.getHostName(), el.getPort())); + } + groupAdminServer.moveServers(hostPorts, request.getTargetGroup()); + } catch (IOException e) { + CoprocessorRpcUtils.setControllerException(controller, e); } - groupAdminServer.moveServers(hostPorts, request.getTargetGroup()); - } catch (IOException e) { - CoprocessorRpcUtils.setControllerException(controller, e); + done.run(builder.build()); } - done.run(builder.build()); - } - @Override - public void moveTables(RpcController controller, - MoveTablesRequest request, - RpcCallback done) { - MoveTablesResponse.Builder builder = - MoveTablesResponse.newBuilder(); - try { - Set tables = new HashSet(request.getTableNameList().size()); - for(HBaseProtos.TableName tableName: request.getTableNameList()) { - tables.add(ProtobufUtil.toTableName(tableName)); + @Override + public void moveTables(RpcController controller, MoveTablesRequest request, + RpcCallback done) { + MoveTablesResponse.Builder builder = MoveTablesResponse.newBuilder(); + try { + Set tables = new HashSet<>(request.getTableNameList().size()); + for (HBaseProtos.TableName tableName : request.getTableNameList()) { + tables.add(ProtobufUtil.toTableName(tableName)); + } + groupAdminServer.moveTables(tables, request.getTargetGroup()); + } catch (IOException e) { + CoprocessorRpcUtils.setControllerException(controller, e); } - groupAdminServer.moveTables(tables, request.getTargetGroup()); - } catch (IOException e) { - CoprocessorRpcUtils.setControllerException(controller, e); + done.run(builder.build()); } - done.run(builder.build()); - } - @Override - public void addRSGroup(RpcController controller, - AddRSGroupRequest request, - RpcCallback done) { - AddRSGroupResponse.Builder builder = - AddRSGroupResponse.newBuilder(); - try { - groupAdminServer.addRSGroup(request.getRSGroupName()); - } catch (IOException e) { - CoprocessorRpcUtils.setControllerException(controller, e); + @Override + public void addRSGroup(RpcController controller, AddRSGroupRequest request, + RpcCallback done) { + AddRSGroupResponse.Builder builder = AddRSGroupResponse.newBuilder(); + try { + groupAdminServer.addRSGroup(request.getRSGroupName()); + } catch (IOException e) { + CoprocessorRpcUtils.setControllerException(controller, e); + } + done.run(builder.build()); } - done.run(builder.build()); - } - @Override - public void removeRSGroup(RpcController controller, - RemoveRSGroupRequest request, - RpcCallback done) { - RemoveRSGroupResponse.Builder builder = + @Override + public void removeRSGroup(RpcController controller, + RemoveRSGroupRequest request, RpcCallback done) { + RemoveRSGroupResponse.Builder builder = RemoveRSGroupResponse.newBuilder(); - try { - groupAdminServer.removeRSGroup(request.getRSGroupName()); - } catch (IOException e) { - CoprocessorRpcUtils.setControllerException(controller, e); - } - done.run(builder.build()); - } - - @Override - public void balanceRSGroup(RpcController controller, - BalanceRSGroupRequest request, - RpcCallback done) { - BalanceRSGroupResponse.Builder builder = BalanceRSGroupResponse.newBuilder(); - try { - builder.setBalanceRan(groupAdminServer.balanceRSGroup(request.getRSGroupName())); - } catch (IOException e) { - CoprocessorRpcUtils.setControllerException(controller, e); - builder.setBalanceRan(false); - } - done.run(builder.build()); - } - - @Override - public void listRSGroupInfos(RpcController controller, - ListRSGroupInfosRequest request, - RpcCallback done) { - ListRSGroupInfosResponse.Builder builder = - ListRSGroupInfosResponse.newBuilder(); - try { - for(RSGroupInfo RSGroupInfo : groupAdminServer.listRSGroups()) { - builder.addRSGroupInfo(RSGroupSerDe.toProtoGroupInfo(RSGroupInfo)); + try { + groupAdminServer.removeRSGroup(request.getRSGroupName()); + } catch (IOException e) { + CoprocessorRpcUtils.setControllerException(controller, e); } - } catch (IOException e) { - CoprocessorRpcUtils.setControllerException(controller, e); + done.run(builder.build()); } - done.run(builder.build()); - } - @Override - public void getRSGroupInfoOfServer(RpcController controller, - GetRSGroupInfoOfServerRequest request, - RpcCallback done) { - GetRSGroupInfoOfServerResponse.Builder builder = GetRSGroupInfoOfServerResponse.newBuilder(); - try { - Address hp = - Address.fromParts(request.getServer().getHostName(), request.getServer().getPort()); - RSGroupInfo RSGroupInfo = groupAdminServer.getRSGroupOfServer(hp); - if (RSGroupInfo != null) { - builder.setRSGroupInfo(RSGroupSerDe.toProtoGroupInfo(RSGroupInfo)); + @Override + public void balanceRSGroup(RpcController controller, + BalanceRSGroupRequest request, RpcCallback done) { + BalanceRSGroupResponse.Builder builder = BalanceRSGroupResponse.newBuilder(); + try { + builder.setBalanceRan(groupAdminServer.balanceRSGroup(request.getRSGroupName())); + } catch (IOException e) { + CoprocessorRpcUtils.setControllerException(controller, e); + builder.setBalanceRan(false); } - } catch (IOException e) { - CoprocessorRpcUtils.setControllerException(controller, e); + done.run(builder.build()); + } + + @Override + public void listRSGroupInfos(RpcController controller, + ListRSGroupInfosRequest request, RpcCallback done) { + ListRSGroupInfosResponse.Builder builder = ListRSGroupInfosResponse.newBuilder(); + try { + for (RSGroupInfo RSGroupInfo : groupAdminServer.listRSGroups()) { + builder.addRSGroupInfo(RSGroupProtobufUtil.toProtoGroupInfo(RSGroupInfo)); + } + } catch (IOException e) { + CoprocessorRpcUtils.setControllerException(controller, e); + } + done.run(builder.build()); + } + + @Override + public void getRSGroupInfoOfServer(RpcController controller, + GetRSGroupInfoOfServerRequest request, RpcCallback done) { + GetRSGroupInfoOfServerResponse.Builder builder = GetRSGroupInfoOfServerResponse.newBuilder(); + try { + Address hp = Address.fromParts(request.getServer().getHostName(), + request.getServer().getPort()); + RSGroupInfo RSGroupInfo = groupAdminServer.getRSGroupOfServer(hp); + if (RSGroupInfo != null) { + builder.setRSGroupInfo(RSGroupProtobufUtil.toProtoGroupInfo(RSGroupInfo)); + } + } catch (IOException e) { + CoprocessorRpcUtils.setControllerException(controller, e); + } + done.run(builder.build()); } - done.run(builder.build()); } + ///////////////////////////////////////////////////////////////////////////// + // MasterObserver overrides + ///////////////////////////////////////////////////////////////////////////// + + // Assign table to default RSGroup. @Override public void preCreateTable(ObserverContext ctx, HTableDescriptor desc, HRegionInfo[] regions) throws IOException { - groupAdminServer.prepareRSGroupForTable(desc); + String groupName = + master.getClusterSchema().getNamespace(desc.getTableName().getNamespaceAsString()) + .getConfigurationValue(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP); + if (groupName == null) { + groupName = RSGroupInfo.DEFAULT_GROUP; + } + RSGroupInfo rsGroupInfo = groupAdminServer.getRSGroupInfo(groupName); + if (rsGroupInfo == null) { + throw new ConstraintException("Default RSGroup (" + groupName + ") for this table's " + + "namespace does not exist."); + } + if (!rsGroupInfo.containsTable(desc.getTableName())) { + LOG.debug("Pre-moving table " + desc.getTableName() + " to RSGroup " + groupName); + groupAdminServer.moveTables(Sets.newHashSet(desc.getTableName()), groupName); + } } + // Remove table from its RSGroup. @Override public void postDeleteTable(ObserverContext ctx, TableName tableName) throws IOException { - groupAdminServer.cleanupRSGroupForTable(tableName); + try { + RSGroupInfo group = groupAdminServer.getRSGroupInfoOfTable(tableName); + if (group != null) { + LOG.debug(String.format("Removing deleted table '%s' from rsgroup '%s'", tableName, + group.getName())); + groupAdminServer.moveTables(Sets.newHashSet(tableName), null); + } + } catch (IOException ex) { + LOG.debug("Failed to perform RSGroup information cleanup for table: " + tableName, ex); + } } @Override public void preCreateNamespace(ObserverContext ctx, NamespaceDescriptor ns) throws IOException { - String group = ns.getConfigurationValue(RSGroupInfo.NAMESPACEDESC_PROP_GROUP); + String group = ns.getConfigurationValue(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP); if(group != null && groupAdminServer.getRSGroupInfo(group) == null) { throw new ConstraintException("Region server group "+group+" does not exit"); } @@ -303,4 +300,5 @@ public class RSGroupAdminEndpoint extends RSGroupAdminService implements Coproce NamespaceDescriptor ns) throws IOException { preCreateNamespace(ctx, ns); } + ///////////////////////////////////////////////////////////////////////////// } diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java index 5a00ddb9500..1f0be5ab5a9 100644 --- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java +++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java @@ -32,7 +32,6 @@ import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; @@ -50,7 +49,6 @@ import org.apache.hadoop.hbase.net.Address; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.Sets; /** * Service to support Region Server Grouping (HBase-6721). @@ -62,23 +60,23 @@ public class RSGroupAdminServer implements RSGroupAdmin { private MasterServices master; private final RSGroupInfoManager rsGroupInfoManager; - public RSGroupAdminServer(MasterServices master, - RSGroupInfoManager RSGroupInfoManager) throws IOException { + public RSGroupAdminServer(MasterServices master, RSGroupInfoManager rsGroupInfoManager) + throws IOException { this.master = master; - this.rsGroupInfoManager = RSGroupInfoManager; + this.rsGroupInfoManager = rsGroupInfoManager; } @Override public RSGroupInfo getRSGroupInfo(String groupName) throws IOException { - return getRSGroupInfoManager().getRSGroup(groupName); + return rsGroupInfoManager.getRSGroup(groupName); } @Override public RSGroupInfo getRSGroupInfoOfTable(TableName tableName) throws IOException { // We are reading across two Maps in the below with out synchronizing across // them; should be safe most of the time. - String groupName = getRSGroupInfoManager().getRSGroupOfTable(tableName); - return groupName == null? null: getRSGroupInfoManager().getRSGroup(groupName); + String groupName = rsGroupInfoManager.getRSGroupOfTable(tableName); + return groupName == null? null: rsGroupInfoManager.getRSGroup(groupName); } private void checkOnlineServersOnly(Set
servers) throws ConstraintException { @@ -99,18 +97,17 @@ public class RSGroupAdminServer implements RSGroupAdmin { /** * Check passed name. Fail if nulls or if corresponding RSGroupInfo not found. * @return The RSGroupInfo named name - * @throws IOException */ private RSGroupInfo getAndCheckRSGroupInfo(String name) throws IOException { if (StringUtils.isEmpty(name)) { throw new ConstraintException("RSGroup cannot be null."); } - RSGroupInfo rsgi = getRSGroupInfo(name); - if (rsgi == null) { + RSGroupInfo rsGroupInfo = getRSGroupInfo(name); + if (rsGroupInfo == null) { throw new ConstraintException("RSGroup does not exist: " + name); } - return rsgi; + return rsGroupInfo; } /** @@ -141,7 +138,8 @@ public class RSGroupAdminServer implements RSGroupAdmin { else regions.addFirst(hri); } - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="RCN_REDUNDANT_NULLCHECK_WOULD_HAVE_BEEN_A_NPE", + @edu.umd.cs.findbugs.annotations.SuppressWarnings( + value="RCN_REDUNDANT_NULLCHECK_WOULD_HAVE_BEEN_A_NPE", justification="Ignoring complaint because don't know what it is complaining about") @Override public void moveServers(Set
servers, String targetGroupName) @@ -155,19 +153,20 @@ public class RSGroupAdminServer implements RSGroupAdmin { return; } RSGroupInfo targetGrp = getAndCheckRSGroupInfo(targetGroupName); - RSGroupInfoManager manager = getRSGroupInfoManager(); + // Hold a lock on the manager instance while moving servers to prevent // another writer changing our state while we are working. - synchronized (manager) { + synchronized (rsGroupInfoManager) { if (master.getMasterCoprocessorHost() != null) { master.getMasterCoprocessorHost().preMoveServers(servers, targetGroupName); } // Presume first server's source group. Later ensure all servers are from this group. Address firstServer = servers.iterator().next(); - RSGroupInfo srcGrp = manager.getRSGroupOfServer(firstServer); + RSGroupInfo srcGrp = rsGroupInfoManager.getRSGroupOfServer(firstServer); if (srcGrp == null) { // Be careful. This exception message is tested for in TestRSGroupsBase... - throw new ConstraintException("Source RSGroup for server " + firstServer + " does not exist."); + throw new ConstraintException("Source RSGroup for server " + firstServer + + " does not exist."); } if (srcGrp.getName().equals(targetGroupName)) { throw new ConstraintException( "Target RSGroup " + targetGroupName + @@ -180,7 +179,7 @@ public class RSGroupAdminServer implements RSGroupAdmin { } // Ensure all servers are of same rsgroup. for (Address server: servers) { - String tmpGroup = manager.getRSGroupOfServer(server).getName(); + String tmpGroup = rsGroupInfoManager.getRSGroupOfServer(server).getName(); if (!tmpGroup.equals(srcGrp.getName())) { throw new ConstraintException("Move server request should only come from one source " + "RSGroup. Expecting only " + srcGrp.getName() + " but contains " + tmpGroup); @@ -192,7 +191,8 @@ public class RSGroupAdminServer implements RSGroupAdmin { } // MovedServers may be < passed in 'servers'. - Set
movedServers = manager.moveServers(servers, srcGrp.getName(), targetGroupName); + Set
movedServers = rsGroupInfoManager.moveServers(servers, srcGrp.getName(), + targetGroupName); List
editableMovedServers = Lists.newArrayList(movedServers); boolean foundRegionsToUnassign; do { @@ -231,7 +231,7 @@ public class RSGroupAdminServer implements RSGroupAdmin { } } try { - manager.wait(1000); + rsGroupInfoManager.wait(1000); } catch (InterruptedException e) { LOG.warn("Sleep interrupted", e); Thread.currentThread().interrupt(); @@ -247,22 +247,21 @@ public class RSGroupAdminServer implements RSGroupAdmin { @Override public void moveTables(Set tables, String targetGroup) throws IOException { if (tables == null) { - throw new ConstraintException( - "The list of servers cannot be null."); + throw new ConstraintException("The list of servers cannot be null."); } if (tables.size() < 1) { LOG.debug("moveTables() passed an empty set. Ignoring."); return; } - RSGroupInfoManager manager = getRSGroupInfoManager(); + // Hold a lock on the manager instance while moving servers to prevent // another writer changing our state while we are working. - synchronized (manager) { + synchronized (rsGroupInfoManager) { if (master.getMasterCoprocessorHost() != null) { master.getMasterCoprocessorHost().preMoveTables(tables, targetGroup); } if(targetGroup != null) { - RSGroupInfo destGroup = manager.getRSGroup(targetGroup); + RSGroupInfo destGroup = rsGroupInfoManager.getRSGroup(targetGroup); if(destGroup == null) { throw new ConstraintException("Target " + targetGroup + " RSGroup does not exist."); } @@ -272,14 +271,14 @@ public class RSGroupAdminServer implements RSGroupAdmin { } for (TableName table : tables) { - String srcGroup = manager.getRSGroupOfTable(table); + String srcGroup = rsGroupInfoManager.getRSGroupOfTable(table); if(srcGroup != null && srcGroup.equals(targetGroup)) { throw new ConstraintException( "Source RSGroup " + srcGroup + " is same as target " + targetGroup + " RSGroup for table " + table); } } - manager.moveTables(tables, targetGroup); + rsGroupInfoManager.moveTables(tables, targetGroup); if (master.getMasterCoprocessorHost() != null) { master.getMasterCoprocessorHost().postMoveTables(tables, targetGroup); } @@ -308,7 +307,7 @@ public class RSGroupAdminServer implements RSGroupAdmin { if (master.getMasterCoprocessorHost() != null) { master.getMasterCoprocessorHost().preAddRSGroup(name); } - getRSGroupInfoManager().addRSGroup(new RSGroupInfo(name)); + rsGroupInfoManager.addRSGroup(new RSGroupInfo(name)); if (master.getMasterCoprocessorHost() != null) { master.getMasterCoprocessorHost().postAddRSGroup(name); } @@ -316,37 +315,36 @@ public class RSGroupAdminServer implements RSGroupAdmin { @Override public void removeRSGroup(String name) throws IOException { - RSGroupInfoManager manager = getRSGroupInfoManager(); // Hold a lock on the manager instance while moving servers to prevent // another writer changing our state while we are working. - synchronized (manager) { + synchronized (rsGroupInfoManager) { if (master.getMasterCoprocessorHost() != null) { master.getMasterCoprocessorHost().preRemoveRSGroup(name); } - RSGroupInfo rsgi = manager.getRSGroup(name); - if (rsgi == null) { + RSGroupInfo rsGroupInfo = rsGroupInfoManager.getRSGroup(name); + if (rsGroupInfo == null) { throw new ConstraintException("RSGroup " + name + " does not exist"); } - int tableCount = rsgi.getTables().size(); + int tableCount = rsGroupInfo.getTables().size(); if (tableCount > 0) { throw new ConstraintException("RSGroup " + name + " has " + tableCount + " tables; you must remove these tables from the rsgroup before " + "the rsgroup can be removed."); } - int serverCount = rsgi.getServers().size(); + int serverCount = rsGroupInfo.getServers().size(); if (serverCount > 0) { throw new ConstraintException("RSGroup " + name + " has " + serverCount + " servers; you must remove these servers from the RSGroup before" + "the RSGroup can be removed."); } for (NamespaceDescriptor ns: master.getClusterSchema().getNamespaces()) { - String nsGroup = ns.getConfigurationValue(RSGroupInfo.NAMESPACEDESC_PROP_GROUP); + String nsGroup = ns.getConfigurationValue(rsGroupInfo.NAMESPACE_DESC_PROP_GROUP); if (nsGroup != null && nsGroup.equals(name)) { throw new ConstraintException("RSGroup " + name + " is referenced by namespace: " + ns.getName()); } } - manager.removeRSGroup(name); + rsGroupInfoManager.removeRSGroup(name); if (master.getMasterCoprocessorHost() != null) { master.getMasterCoprocessorHost().postRemoveRSGroup(name); } @@ -370,9 +368,7 @@ public class RSGroupAdminServer implements RSGroupAdmin { // Only allow one balance run at at time. Map groupRIT = rsGroupGetRegionsInTransition(groupName); if (groupRIT.size() > 0) { - LOG.debug("Not running balancer because " + - groupRIT.size() + - " region(s) in transition: " + + LOG.debug("Not running balancer because " + groupRIT.size() + " region(s) in transition: " + StringUtils.abbreviate( master.getAssignmentManager().getRegionStates().getRegionsInTransition().toString(), 256)); @@ -388,9 +384,10 @@ public class RSGroupAdminServer implements RSGroupAdmin { List plans = new ArrayList(); for(Map.Entry>> tableMap: getRSGroupAssignmentsByTable(groupName).entrySet()) { - LOG.info("Creating partial plan for table "+tableMap.getKey()+": "+tableMap.getValue()); + LOG.info("Creating partial plan for table " + tableMap.getKey() + ": " + + tableMap.getValue()); List partialPlans = balancer.balanceCluster(tableMap.getValue()); - LOG.info("Partial plan for table "+tableMap.getKey()+": "+partialPlans); + LOG.info("Partial plan for table " + tableMap.getKey() + ": " + partialPlans); if (partialPlans != null) { plans.addAll(partialPlans); } @@ -398,13 +395,13 @@ public class RSGroupAdminServer implements RSGroupAdmin { long startTime = System.currentTimeMillis(); balancerRan = plans != null; if (plans != null && !plans.isEmpty()) { - LOG.info("RSGroup balance "+groupName+" starting with plan count: "+plans.size()); + LOG.info("RSGroup balance " + groupName + " starting with plan count: " + plans.size()); for (RegionPlan plan: plans) { LOG.info("balance " + plan); assignmentManager.balance(plan); } - LOG.info("RSGroup balance "+groupName+" completed after "+ - (System.currentTimeMillis()-startTime)+" seconds"); + LOG.info("RSGroup balance " + groupName + " completed after " + + (System.currentTimeMillis()-startTime) + " seconds"); } if (master.getMasterCoprocessorHost() != null) { master.getMasterCoprocessorHost().postBalanceRSGroup(groupName, balancerRan); @@ -415,27 +412,21 @@ public class RSGroupAdminServer implements RSGroupAdmin { @Override public List listRSGroups() throws IOException { - return getRSGroupInfoManager().listRSGroups(); + return rsGroupInfoManager.listRSGroups(); } @Override public RSGroupInfo getRSGroupOfServer(Address hostPort) throws IOException { - return getRSGroupInfoManager().getRSGroupOfServer(hostPort); - } - - private RSGroupInfoManager getRSGroupInfoManager() throws IOException { - return rsGroupInfoManager; + return rsGroupInfoManager.getRSGroupOfServer(hostPort); } private Map rsGroupGetRegionsInTransition(String groupName) throws IOException { Map rit = Maps.newTreeMap(); AssignmentManager am = master.getAssignmentManager(); - RSGroupInfo RSGroupInfo = getRSGroupInfo(groupName); - for(TableName tableName : RSGroupInfo.getTables()) { + for(TableName tableName : getRSGroupInfo(groupName).getTables()) { for(HRegionInfo regionInfo: am.getRegionStates().getRegionsOfTable(tableName)) { - RegionState state = - master.getAssignmentManager().getRegionStates().getRegionTransitionState(regionInfo); + RegionState state = am.getRegionStates().getRegionTransitionState(regionInfo); if(state != null) { rit.put(regionInfo.getEncodedName(), state); } @@ -447,72 +438,37 @@ public class RSGroupAdminServer implements RSGroupAdmin { private Map>> getRSGroupAssignmentsByTable(String groupName) throws IOException { Map>> result = Maps.newHashMap(); - RSGroupInfo RSGroupInfo = getRSGroupInfo(groupName); + RSGroupInfo rsGroupInfo = getRSGroupInfo(groupName); Map>> assignments = Maps.newHashMap(); for(Map.Entry entry: master.getAssignmentManager().getRegionStates().getRegionAssignments().entrySet()) { TableName currTable = entry.getKey().getTable(); ServerName currServer = entry.getValue(); HRegionInfo currRegion = entry.getKey(); - if(RSGroupInfo.getTables().contains(currTable)) { - if(!assignments.containsKey(entry.getKey().getTable())) { - assignments.put(currTable, new HashMap>()); - } - if(!assignments.get(currTable).containsKey(currServer)) { - assignments.get(currTable).put(currServer, new ArrayList()); - } + if (rsGroupInfo.getTables().contains(currTable)) { + assignments.putIfAbsent(currTable, new HashMap<>()); + assignments.get(currTable).putIfAbsent(currServer, new ArrayList<>()); assignments.get(currTable).get(currServer).add(currRegion); } } Map> serverMap = Maps.newHashMap(); for(ServerName serverName: master.getServerManager().getOnlineServers().keySet()) { - if(RSGroupInfo.getServers().contains(serverName.getAddress())) { + if(rsGroupInfo.getServers().contains(serverName.getAddress())) { serverMap.put(serverName, Collections.emptyList()); } } - //add all tables that are members of the group - for(TableName tableName : RSGroupInfo.getTables()) { + // add all tables that are members of the group + for(TableName tableName : rsGroupInfo.getTables()) { if(assignments.containsKey(tableName)) { - result.put(tableName, new HashMap>()); + result.put(tableName, new HashMap<>()); result.get(tableName).putAll(serverMap); result.get(tableName).putAll(assignments.get(tableName)); - LOG.debug("Adding assignments for "+tableName+": "+assignments.get(tableName)); + LOG.debug("Adding assignments for " + tableName + ": " + assignments.get(tableName)); } } return result; } - - public void prepareRSGroupForTable(HTableDescriptor desc) throws IOException { - String groupName = - master.getClusterSchema().getNamespace(desc.getTableName().getNamespaceAsString()) - .getConfigurationValue(RSGroupInfo.NAMESPACEDESC_PROP_GROUP); - if (groupName == null) { - groupName = RSGroupInfo.DEFAULT_GROUP; - } - RSGroupInfo RSGroupInfo = getRSGroupInfo(groupName); - if (RSGroupInfo == null) { - throw new ConstraintException("RSGroup " + groupName + " does not exist."); - } - if (!RSGroupInfo.containsTable(desc.getTableName())) { - LOG.debug("Pre-moving table " + desc.getTableName() + " to RSGroup " + groupName); - moveTables(Sets.newHashSet(desc.getTableName()), groupName); - } - } - - public void cleanupRSGroupForTable(TableName tableName) throws IOException { - try { - RSGroupInfo group = getRSGroupInfoOfTable(tableName); - if (group != null) { - LOG.debug("Removing deleted table from table rsgroup " + group.getName()); - moveTables(Sets.newHashSet(tableName), null); - } - } catch (ConstraintException ex) { - LOG.debug("Failed to perform RSGroup information cleanup for table: " + tableName, ex); - } catch (IOException ex) { - LOG.debug("Failed to perform RSGroup information cleanup for table: " + tableName, ex); - } - } -} \ No newline at end of file +} diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java index b70c2364cc7..b36fd21f366 100644 --- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java +++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.rsgroup; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.LinkedListMultimap; import com.google.common.collect.ListMultimap; @@ -38,7 +39,6 @@ import java.util.TreeMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ClusterStatus; import org.apache.hadoop.hbase.HBaseIOException; @@ -70,10 +70,7 @@ import org.apache.hadoop.util.ReflectionUtils; * */ @InterfaceAudience.Private -public class RSGroupBasedLoadBalancer implements RSGroupableBalancer, LoadBalancer { - /** Config for pluggable load balancers */ - public static final String HBASE_GROUP_LOADBALANCER_CLASS = "hbase.group.grouploadbalancer.class"; - +public class RSGroupBasedLoadBalancer implements RSGroupableBalancer { private static final Log LOG = LogFactory.getLog(RSGroupBasedLoadBalancer.class); private Configuration config; @@ -82,16 +79,12 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer, LoadBalanc private volatile RSGroupInfoManager rsGroupInfoManager; private LoadBalancer internalBalancer; - //used during reflection by LoadBalancerFactory + /** + * Used by reflection in {@link org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory}. + */ @InterfaceAudience.Private public RSGroupBasedLoadBalancer() {} - //This constructor should only be used for unit testing - @InterfaceAudience.Private - public RSGroupBasedLoadBalancer(RSGroupInfoManager rsGroupInfoManager) { - this.rsGroupInfoManager = rsGroupInfoManager; - } - @Override public Configuration getConf() { return config; @@ -112,11 +105,6 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer, LoadBalanc this.masterServices = masterServices; } - @Override - public void setClusterLoad(Map>> clusterLoad){ - - } - @Override public List balanceCluster(TableName tableName, Map> clusterState) throws HBaseIOException { @@ -384,8 +372,7 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer, LoadBalanc } // Create the balancer - Class balancerKlass = config.getClass( - HBASE_GROUP_LOADBALANCER_CLASS, + Class balancerKlass = config.getClass(HBASE_RSGROUP_LOADBALANCER_CLASS, StochasticLoadBalancer.class, LoadBalancer.class); internalBalancer = ReflectionUtils.newInstance(balancerKlass, config); internalBalancer.setMasterServices(masterServices); @@ -399,6 +386,10 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer, LoadBalanc return this.rsGroupInfoManager.isOnline(); } + @Override + public void setClusterLoad(Map>> clusterLoad) { + } + @Override public void regionOnline(HRegionInfo regionInfo, ServerName sn) { } @@ -420,4 +411,9 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer, LoadBalanc public boolean isStopped() { return false; } + + @VisibleForTesting + public void setRsGroupInfoManager(RSGroupInfoManager rsGroupInfoManager) { + this.rsGroupInfoManager = rsGroupInfoManager; + } } diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java index 6d8c2c8ad70..88ea04b2fdb 100644 --- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java +++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java @@ -37,28 +37,21 @@ import org.apache.hadoop.hbase.net.Address; @InterfaceAudience.Private public interface RSGroupInfoManager { //Assigned before user tables - public static final TableName RSGROUP_TABLE_NAME = + TableName RSGROUP_TABLE_NAME = TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "rsgroup"); - public static final byte[] RSGROUP_TABLE_NAME_BYTES = RSGROUP_TABLE_NAME.toBytes(); - public static final String rsGroupZNode = "rsgroup"; - public static final byte[] META_FAMILY_BYTES = Bytes.toBytes("m"); - public static final byte[] META_QUALIFIER_BYTES = Bytes.toBytes("i"); - public static final byte[] ROW_KEY = {0}; - + byte[] RSGROUP_TABLE_NAME_BYTES = RSGROUP_TABLE_NAME.toBytes(); + String rsGroupZNode = "rsgroup"; + byte[] META_FAMILY_BYTES = Bytes.toBytes("m"); + byte[] META_QUALIFIER_BYTES = Bytes.toBytes("i"); + byte[] ROW_KEY = {0}; /** - * Adds the group. - * - * @param rsGroupInfo the group name - * @throws java.io.IOException Signals that an I/O exception has occurred. + * Add given RSGroupInfo to existing list of group infos. */ void addRSGroup(RSGroupInfo rsGroupInfo) throws IOException; /** * Remove a region server group. - * - * @param groupName the group name - * @throws java.io.IOException Signals that an I/O exception has occurred. */ void removeRSGroup(String groupName) throws IOException; @@ -68,32 +61,22 @@ public interface RSGroupInfoManager { * @param srcGroup groupName being moved from * @param dstGroup groupName being moved to * @return Set of servers moved (May be a subset of {@code servers}). - * @throws java.io.IOException on move failure */ - Set
moveServers(Set
servers, - String srcGroup, String dstGroup) throws IOException; + Set
moveServers(Set
servers, String srcGroup, String dstGroup) + throws IOException; /** * Gets the group info of server. - * - * @param hostPort the server - * @return An instance of RSGroupInfo */ - RSGroupInfo getRSGroupOfServer(Address hostPort) throws IOException; + RSGroupInfo getRSGroupOfServer(Address serverHostPort) throws IOException; /** - * Gets the group information. - * - * @param groupName the group name - * @return An instance of RSGroupInfo + * Gets {@code RSGroupInfo} for the given group name. */ RSGroupInfo getRSGroup(String groupName) throws IOException; /** * Get the group membership of a table - * @param tableName name of table to get group membership - * @return Group name of table - * @throws java.io.IOException on failure to retrive information */ String getRSGroupOfTable(TableName tableName) throws IOException; @@ -102,29 +85,21 @@ public interface RSGroupInfoManager { * * @param tableNames set of tables to move * @param groupName name of group of tables to move to - * @throws java.io.IOException on failure to move */ void moveTables(Set tableNames, String groupName) throws IOException; /** - * List the groups - * - * @return list of RSGroupInfo - * @throws java.io.IOException on failure + * List the existing {@code RSGroupInfo}s. */ List listRSGroups() throws IOException; /** - * Refresh/reload the group information from - * the persistent store - * - * @throws java.io.IOException on failure to refresh + * Refresh/reload the group information from the persistent store */ void refresh() throws IOException; /** - * Whether the manager is able to fully - * return group metadata + * Whether the manager is able to fully return group metadata * * @return whether the manager is in online mode */ diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java index cd941865f2c..6d157cc2838 100644 --- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java +++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java @@ -18,10 +18,10 @@ package org.apache.hadoop.hbase.rsgroup; -import static org.apache.hadoop.hbase.rsgroup.Utility.getOnlineServers; - +import java.io.ByteArrayInputStream; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -57,10 +57,12 @@ import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.constraint.ConstraintException; import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; +import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.ServerListener; @@ -92,11 +94,11 @@ import com.google.protobuf.ServiceException; * *

Concurrency

* RSGroup state is kept locally in Maps. There is a rsgroup name to cached - * RSGroupInfo Map at this.rsGroupMap and a Map of tables to the name of the - * rsgroup they belong too (in this.tableMap). These Maps are persisted to the + * RSGroupInfo Map at {@link #rsGroupMap} and a Map of tables to the name of the + * rsgroup they belong too (in {@link #tableMap}). These Maps are persisted to the * hbase:rsgroup table (and cached in zk) on each modification. * - *

Mutations on state are synchronized but so reads can continue without having + *

Mutations on state are synchronized but reads can continue without having * to wait on an instance monitor, mutations do wholesale replace of the Maps on * update -- Copy-On-Write; the local Maps of state are read-only, just-in-case * (see flushConfig). @@ -109,7 +111,7 @@ import com.google.protobuf.ServiceException; * no other has access concurrently. Reads must be able to continue concurrently. */ @InterfaceAudience.Private -public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListener { +class RSGroupInfoManagerImpl implements RSGroupInfoManager { private static final Log LOG = LogFactory.getLog(RSGroupInfoManagerImpl.class); /** Table descriptor for hbase:rsgroup catalog table */ @@ -132,43 +134,35 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene private volatile Map rsGroupMap = Collections.emptyMap(); private volatile Map tableMap = Collections.emptyMap(); - private final MasterServices master; + private final MasterServices masterServices; private Table rsGroupTable; private final ClusterConnection conn; private final ZooKeeperWatcher watcher; - private RSGroupStartupWorker rsGroupStartupWorker; + private final RSGroupStartupWorker rsGroupStartupWorker = new RSGroupStartupWorker(); // contains list of groups that were last flushed to persistent store - private Set prevRSGroups = new HashSet(); - private final RSGroupSerDe rsGroupSerDe = new RSGroupSerDe(); - private DefaultServerUpdater defaultServerUpdater; - private boolean init = false; + private Set prevRSGroups = new HashSet<>(); + private final ServerEventsListenerThread serverEventsListenerThread = + new ServerEventsListenerThread(); - public RSGroupInfoManagerImpl(MasterServices master) throws IOException { - this.master = master; - this.watcher = master.getZooKeeper(); - this.conn = master.getClusterConnection(); + private RSGroupInfoManagerImpl(MasterServices masterServices) throws IOException { + this.masterServices = masterServices; + this.watcher = masterServices.getZooKeeper(); + this.conn = masterServices.getClusterConnection(); } - public synchronized void init() throws IOException{ - if (this.init) return; - rsGroupStartupWorker = new RSGroupStartupWorker(this, master, conn); + private synchronized void init() throws IOException{ refresh(); rsGroupStartupWorker.start(); - defaultServerUpdater = new DefaultServerUpdater(this); - master.getServerManager().registerListener(this); - defaultServerUpdater.start(); - this.init = true; + serverEventsListenerThread.start(); + masterServices.getServerManager().registerListener(serverEventsListenerThread); } - synchronized boolean isInit() { - return init; + static RSGroupInfoManager getInstance(MasterServices master) throws IOException { + RSGroupInfoManagerImpl instance = new RSGroupInfoManagerImpl(master); + instance.init(); + return instance; } - /** - * Adds the group. - * - * @param rsGroupInfo the group name - */ @Override public synchronized void addRSGroup(RSGroupInfo rsGroupInfo) throws IOException { checkGroupName(rsGroupInfo.getName()); @@ -182,29 +176,23 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene } private RSGroupInfo getRSGroupInfo(final String groupName) throws DoNotRetryIOException { - RSGroupInfo rsgi = null; - try { - rsgi = getRSGroup(groupName); - } catch (IOException ioe) { - // Will never happen - throw new DoNotRetryIOException(ioe); - } - if (rsgi == null) { + RSGroupInfo rsGroupInfo = getRSGroup(groupName); + if (rsGroupInfo == null) { throw new DoNotRetryIOException("RSGroup " + groupName + " does not exist"); } - return rsgi; + return rsGroupInfo; } @Override - public synchronized Set

moveServers(Set
servers, String srcGroup, String dstGroup) - throws IOException { + public synchronized Set
moveServers(Set
servers, String srcGroup, + String dstGroup) throws IOException { RSGroupInfo src = getRSGroupInfo(srcGroup); RSGroupInfo dst = getRSGroupInfo(dstGroup); - // If destination is 'default' rsgroup, only add servers that are online. If not online, drop it. - // If not 'default' group, add server to 'dst' rsgroup EVEN IF IT IS NOT online (could be a rsgroup - // of dead servers that are to come back later). + // If destination is 'default' rsgroup, only add servers that are online. If not online, drop + // it. If not 'default' group, add server to 'dst' rsgroup EVEN IF IT IS NOT online (could be a + // rsgroup of dead servers that are to come back later). Set
onlineServers = dst.getName().equals(RSGroupInfo.DEFAULT_GROUP)? - getOnlineServers(this.master): null; + Utility.getOnlineServers(this.masterServices): null; for (Address el: servers) { src.removeServer(el); if (onlineServers != null) { @@ -224,45 +212,29 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene return dst.getServers(); } - /** - * Gets the group info of server. - * - * @param hostPort the server - * @return An instance of GroupInfo. - */ @Override - public RSGroupInfo getRSGroupOfServer(Address hostPort) - throws IOException { + public RSGroupInfo getRSGroupOfServer(Address serverHostPort) throws IOException { for (RSGroupInfo info: rsGroupMap.values()) { - if (info.containsServer(hostPort)) { + if (info.containsServer(serverHostPort)) { return info; } } return null; } - /** - * Gets the group information. - * - * @param groupName - * the group name - * @return An instance of GroupInfo - */ @Override - public RSGroupInfo getRSGroup(String groupName) throws IOException { - return this.rsGroupMap.get(groupName); + public RSGroupInfo getRSGroup(String groupName) { + return rsGroupMap.get(groupName); } - - @Override - public String getRSGroupOfTable(TableName tableName) throws IOException { + public String getRSGroupOfTable(TableName tableName) { return tableMap.get(tableName); } @Override - public synchronized void moveTables( - Set tableNames, String groupName) throws IOException { + public synchronized void moveTables(Set tableNames, String groupName) + throws IOException { if (groupName != null && !rsGroupMap.containsKey(groupName)) { throw new DoNotRetryIOException("Group "+groupName+" does not exist or is a special group"); } @@ -280,21 +252,14 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene newGroupMap.put(dst.getName(), dst); } } - flushConfig(newGroupMap); } - - /** - * Delete a region server group. - * - * @param groupName the group name - * @throws java.io.IOException Signals that an I/O exception has occurred. - */ @Override public synchronized void removeRSGroup(String groupName) throws IOException { if (!rsGroupMap.containsKey(groupName) || groupName.equals(RSGroupInfo.DEFAULT_GROUP)) { - throw new DoNotRetryIOException("Group "+groupName+" does not exist or is a reserved group"); + throw new DoNotRetryIOException("Group " + groupName + " does not exist or is a reserved " + + "group"); } Map newGroupMap = Maps.newHashMap(rsGroupMap); newGroupMap.remove(groupName); @@ -302,7 +267,7 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene } @Override - public List listRSGroups() throws IOException { + public List listRSGroups() { return Lists.newLinkedList(rsGroupMap.values()); } @@ -311,6 +276,41 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene return rsGroupStartupWorker.isOnline(); } + + List retrieveGroupListFromGroupTable() throws IOException { + List rsGroupInfoList = Lists.newArrayList(); + for (Result result : rsGroupTable.getScanner(new Scan())) { + RSGroupProtos.RSGroupInfo proto = RSGroupProtos.RSGroupInfo.parseFrom( + result.getValue(META_FAMILY_BYTES, META_QUALIFIER_BYTES)); + rsGroupInfoList.add(RSGroupProtobufUtil.toGroupInfo(proto)); + } + return rsGroupInfoList; + } + + List retrieveGroupListFromZookeeper() throws IOException { + String groupBasePath = ZKUtil.joinZNode(watcher.znodePaths.baseZNode, rsGroupZNode); + List RSGroupInfoList = Lists.newArrayList(); + //Overwrite any info stored by table, this takes precedence + try { + if(ZKUtil.checkExists(watcher, groupBasePath) != -1) { + for(String znode: ZKUtil.listChildrenAndWatchForNewChildren(watcher, groupBasePath)) { + byte[] data = ZKUtil.getData(watcher, ZKUtil.joinZNode(groupBasePath, znode)); + if(data.length > 0) { + ProtobufUtil.expectPBMagicPrefix(data); + ByteArrayInputStream bis = new ByteArrayInputStream( + data, ProtobufUtil.lengthOfPBMagic(), data.length); + RSGroupInfoList.add(RSGroupProtobufUtil.toGroupInfo( + RSGroupProtos.RSGroupInfo.parseFrom(bis))); + } + } + LOG.debug("Read ZK GroupInfo count:" + RSGroupInfoList.size()); + } + } catch (KeeperException|DeserializationException|InterruptedException e) { + throw new IOException("Failed to read rsGroupZNode",e); + } + return RSGroupInfoList; + } + @Override public void refresh() throws IOException { refresh(false); @@ -319,11 +319,9 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene /** * Read rsgroup info from the source of truth, the hbase:rsgroup table. * Update zk cache. Called on startup of the manager. - * @param forceOnline - * @throws IOException */ private synchronized void refresh(boolean forceOnline) throws IOException { - List groupList = new LinkedList(); + List groupList = new LinkedList<>(); // Overwrite anything read from zk, group table is source of truth // if online read from GROUP table @@ -332,29 +330,25 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene if (rsGroupTable == null) { rsGroupTable = conn.getTable(RSGROUP_TABLE_NAME); } - groupList.addAll(rsGroupSerDe.retrieveGroupList(rsGroupTable)); + groupList.addAll(retrieveGroupListFromGroupTable()); } else { LOG.debug("Refreshing in Offline mode."); - String groupBasePath = ZKUtil.joinZNode(watcher.znodePaths.baseZNode, rsGroupZNode); - groupList.addAll(rsGroupSerDe.retrieveGroupList(watcher, groupBasePath)); + groupList.addAll(retrieveGroupListFromZookeeper()); } // refresh default group, prune - NavigableSet orphanTables = new TreeSet(); - for(String entry: master.getTableDescriptors().getAll().keySet()) { + NavigableSet orphanTables = new TreeSet<>(); + for(String entry: masterServices.getTableDescriptors().getAll().keySet()) { orphanTables.add(TableName.valueOf(entry)); } - List specialTables; - if(!master.isInitialized()) { - specialTables = new ArrayList(4); - specialTables.add(AccessControlLists.ACL_TABLE_NAME); - specialTables.add(TableName.META_TABLE_NAME); - specialTables.add(TableName.NAMESPACE_TABLE_NAME); - specialTables.add(RSGROUP_TABLE_NAME); + final List specialTables; + if(!masterServices.isInitialized()) { + specialTables = Arrays.asList(AccessControlLists.ACL_TABLE_NAME, TableName.META_TABLE_NAME, + TableName.NAMESPACE_TABLE_NAME, RSGROUP_TABLE_NAME); } else { specialTables = - master.listTableNamesByNamespace(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR); + masterServices.listTableNamesByNamespace(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR); } for (TableName table : specialTables) { @@ -380,26 +374,26 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene newTableMap.put(table, group.getName()); } } - installNewMaps(newGroupMap, newTableMap); + resetRSGroupAndTableMaps(newGroupMap, newTableMap); updateCacheOfRSGroups(rsGroupMap.keySet()); } - private synchronized Map flushConfigTable(Map newGroupMap) + private synchronized Map flushConfigTable(Map groupMap) throws IOException { Map newTableMap = Maps.newHashMap(); List mutations = Lists.newArrayList(); // populate deletes for(String groupName : prevRSGroups) { - if(!newGroupMap.containsKey(groupName)) { + if(!groupMap.containsKey(groupName)) { Delete d = new Delete(Bytes.toBytes(groupName)); mutations.add(d); } } // populate puts - for(RSGroupInfo RSGroupInfo : newGroupMap.values()) { - RSGroupProtos.RSGroupInfo proto = RSGroupSerDe.toProtoGroupInfo(RSGroupInfo); + for(RSGroupInfo RSGroupInfo : groupMap.values()) { + RSGroupProtos.RSGroupInfo proto = RSGroupProtobufUtil.toProtoGroupInfo(RSGroupInfo); Put p = new Put(Bytes.toBytes(RSGroupInfo.getName())); p.addColumn(META_FAMILY_BYTES, META_QUALIFIER_BYTES, proto.toByteArray()); mutations.add(p); @@ -441,13 +435,13 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene newTableMap = flushConfigTable(newGroupMap); // Make changes visible after having been persisted to the source of truth - installNewMaps(newGroupMap, newTableMap); + resetRSGroupAndTableMaps(newGroupMap, newTableMap); try { String groupBasePath = ZKUtil.joinZNode(watcher.znodePaths.baseZNode, rsGroupZNode); ZKUtil.createAndFailSilent(watcher, groupBasePath, ProtobufMagic.PB_MAGIC); - List zkOps = new ArrayList(newGroupMap.size()); + List zkOps = new ArrayList<>(newGroupMap.size()); for(String groupName : prevRSGroups) { if(!newGroupMap.containsKey(groupName)) { String znode = ZKUtil.joinZNode(groupBasePath, groupName); @@ -458,7 +452,7 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene for (RSGroupInfo RSGroupInfo : newGroupMap.values()) { String znode = ZKUtil.joinZNode(groupBasePath, RSGroupInfo.getName()); - RSGroupProtos.RSGroupInfo proto = RSGroupSerDe.toProtoGroupInfo(RSGroupInfo); + RSGroupProtos.RSGroupInfo proto = RSGroupProtobufUtil.toProtoGroupInfo(RSGroupInfo); LOG.debug("Updating znode: "+znode); ZKUtil.createAndFailSilent(watcher, znode); zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode)); @@ -470,7 +464,7 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene ZKUtil.multiOrSequential(watcher, zkOps, false); } catch (KeeperException e) { LOG.error("Failed to write to rsGroupZNode", e); - master.abort("Failed to write to rsGroupZNode", e); + masterServices.abort("Failed to write to rsGroupZNode", e); throw new IOException("Failed to write to rsGroupZNode",e); } updateCacheOfRSGroups(newGroupMap.keySet()); @@ -480,7 +474,7 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene * Make changes visible. * Caller must be synchronized on 'this'. */ - private void installNewMaps(Map newRSGroupMap, + private void resetRSGroupAndTableMaps(Map newRSGroupMap, Map newTableMap) { // Make maps Immutable. this.rsGroupMap = Collections.unmodifiableMap(newRSGroupMap); @@ -499,22 +493,22 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene // Called by getDefaultServers. Presume it has lock in place. private List getOnlineRS() throws IOException { - if (master != null) { - return master.getServerManager().getOnlineServersList(); + if (masterServices != null) { + return masterServices.getServerManager().getOnlineServersList(); } + LOG.debug("Reading online RS from zookeeper"); + List servers = new LinkedList<>(); try { - LOG.debug("Reading online RS from zookeeper"); - List servers = new LinkedList(); for (String el: ZKUtil.listChildrenNoWatch(watcher, watcher.znodePaths.rsZNode)) { servers.add(ServerName.parseServerName(el)); } - return servers; } catch (KeeperException e) { throw new IOException("Failed to retrieve server list from zookeeper", e); } + return servers; } - // Called by DefaultServerUpdater. Presume it has lock on this manager when it runs. + // Called by ServerEventsListenerThread. Presume it has lock on this manager when it runs. private SortedSet
getDefaultServers() throws IOException { SortedSet
defaultServers = Sets.newTreeSet(); for (ServerName serverName : getOnlineRS()) { @@ -535,7 +529,7 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene return defaultServers; } - // Called by DefaultServerUpdater. Synchronize on this because redoing + // Called by ServerEventsListenerThread. Synchronize on this because redoing // the rsGroupMap then writing it out. private synchronized void updateDefaultServers(SortedSet
servers) throws IOException { RSGroupInfo info = rsGroupMap.get(RSGroupInfo.DEFAULT_GROUP); @@ -545,42 +539,47 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene flushConfig(newGroupMap); } - @Override - public void serverAdded(ServerName serverName) { - // #serverChanged is internally synchronized - defaultServerUpdater.serverChanged(); - } - - @Override - public void serverRemoved(ServerName serverName) { - // #serverChanged is internally synchronized - defaultServerUpdater.serverChanged(); - } - - // TODO: Why do we need this extra thread? Why can't we just go - // fetch at balance time or admin time? - private static class DefaultServerUpdater extends Thread { - private static final Log LOG = LogFactory.getLog(DefaultServerUpdater.class); - private final RSGroupInfoManagerImpl mgr; + /** + * Calls {@link RSGroupInfoManagerImpl#updateDefaultServers(SortedSet)} to update list of known + * servers. Notifications about server changes are received by registering {@link ServerListener}. + * As a listener, we need to return immediately, so the real work of updating the servers is + * done asynchronously in this thread. + */ + private class ServerEventsListenerThread extends Thread implements ServerListener { + private final Log LOG = LogFactory.getLog(ServerEventsListenerThread.class); private boolean changed = false; - public DefaultServerUpdater(RSGroupInfoManagerImpl mgr) { - super("RSGroup.ServerUpdater"); + ServerEventsListenerThread() { setDaemon(true); - this.mgr = mgr; + } + + @Override + public void serverAdded(ServerName serverName) { + serverChanged(); + } + + @Override + public void serverRemoved(ServerName serverName) { + serverChanged(); + } + + private synchronized void serverChanged() { + changed = true; + this.notify(); } @Override public void run() { + setName(ServerEventsListenerThread.class.getName() + "-" + masterServices.getServerName()); SortedSet
prevDefaultServers = new TreeSet<>(); - while(isMasterRunning(this.mgr.master)) { + while(isMasterRunning(masterServices)) { try { LOG.info("Updating default servers."); - SortedSet
servers = mgr.getDefaultServers(); + SortedSet
servers = RSGroupInfoManagerImpl.this.getDefaultServers(); if (!servers.equals(prevDefaultServers)) { - mgr.updateDefaultServers(servers); + RSGroupInfoManagerImpl.this.updateDefaultServers(servers); prevDefaultServers = servers; - LOG.info("Updated with servers: " + servers.size()); + LOG.info("Updated with servers: "+servers.size()); } try { synchronized (this) { @@ -597,46 +596,31 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene } } } - - public void serverChanged() { - synchronized (this) { - changed = true; - this.notify(); - } - } } - private static class RSGroupStartupWorker extends Thread { - private static final Log LOG = LogFactory.getLog(RSGroupStartupWorker.class); + private class RSGroupStartupWorker extends Thread { + private final Log LOG = LogFactory.getLog(RSGroupStartupWorker.class); private volatile boolean online = false; - private final MasterServices masterServices; - private final RSGroupInfoManagerImpl groupInfoManager; - private final ClusterConnection conn; - public RSGroupStartupWorker(RSGroupInfoManagerImpl groupInfoManager, - MasterServices masterServices, - ClusterConnection conn) { - this.masterServices = masterServices; - this.groupInfoManager = groupInfoManager; - this.conn = conn; - setName(RSGroupStartupWorker.class.getName()+"-"+masterServices.getServerName()); + RSGroupStartupWorker() { setDaemon(true); } @Override public void run() { + setName(RSGroupStartupWorker.class.getName() + "-" + masterServices.getServerName()); if (waitForGroupTableOnline()) { LOG.info("GroupBasedLoadBalancer is now online"); } } - public boolean waitForGroupTableOnline() { - final List foundRegions = new LinkedList(); - final List assignedRegions = new LinkedList(); + private boolean waitForGroupTableOnline() { + final List foundRegions = new LinkedList<>(); + final List assignedRegions = new LinkedList<>(); final AtomicBoolean found = new AtomicBoolean(false); final TableStateManager tsm = masterServices.getTableStateManager(); boolean createSent = false; - while (!found.get() && isMasterRunning(this.masterServices)) { + while (!found.get() && isMasterRunning(masterServices)) { foundRegions.clear(); assignedRegions.clear(); found.set(true); @@ -703,7 +687,7 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene MetaTableAccessor.fullScanRegions(conn, visitor); // if no regions in meta then we have to create the table if (foundRegions.size() < 1 && rootMetaFound && !createSent && nsFound.get()) { - groupInfoManager.createRSGroupTable(masterServices); + createRSGroupTable(); createSent = true; } LOG.info("RSGroup table=" + RSGROUP_TABLE_NAME + " isOnline=" + found.get() @@ -717,10 +701,10 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene } if (found.get()) { LOG.debug("With group table online, refreshing cached information."); - groupInfoManager.refresh(true); + RSGroupInfoManagerImpl.this.refresh(true); online = true; //flush any inconsistencies between ZK and HTable - groupInfoManager.flushConfig(); + RSGroupInfoManagerImpl.this.flushConfig(); } } catch (RuntimeException e) { throw e; @@ -737,6 +721,30 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene return found.get(); } + private void createRSGroupTable() throws IOException { + Long procId = masterServices.createSystemTable(RSGROUP_TABLE_DESC); + // wait for region to be online + int tries = 600; + while (!(masterServices.getMasterProcedureExecutor().isFinished(procId)) + && masterServices.getMasterProcedureExecutor().isRunning() + && tries > 0) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new IOException("Wait interrupted ", e); + } + tries--; + } + if(tries <= 0) { + throw new IOException("Failed to create group table in a given time."); + } else { + ProcedureInfo result = masterServices.getMasterProcedureExecutor().getResult(procId); + if (result != null && result.isFailed()) { + throw new IOException("Failed to create group table. " + result.getExceptionFullMessage()); + } + } + } + public boolean isOnline() { return online; } @@ -746,32 +754,7 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene return !masterServices.isAborted() && !masterServices.isStopped(); } - private void createRSGroupTable(MasterServices masterServices) throws IOException { - Long procId = masterServices.createSystemTable(RSGROUP_TABLE_DESC); - // wait for region to be online - int tries = 600; - while (!(masterServices.getMasterProcedureExecutor().isFinished(procId)) - && masterServices.getMasterProcedureExecutor().isRunning() - && tries > 0) { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - throw new IOException("Wait interrupted", e); - } - tries--; - } - if (tries <= 0) { - throw new IOException("Failed to create group table in a given time."); - } else { - ProcedureInfo result = masterServices.getMasterProcedureExecutor().getResult(procId); - if (result != null && result.isFailed()) { - throw new IOException("Failed to create group table. " + result.getExceptionFullMessage()); - } - } - } - - private void multiMutate(List mutations) - throws IOException { + private void multiMutate(List mutations) throws IOException { CoprocessorRpcChannel channel = rsGroupTable.coprocessorService(ROW_KEY); MultiRowMutationProtos.MutateRowsRequest.Builder mmrBuilder = MultiRowMutationProtos.MutateRowsRequest.newBuilder(); diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupProtobufUtil.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupProtobufUtil.java new file mode 100644 index 00000000000..24f397691f5 --- /dev/null +++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupProtobufUtil.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.rsgroup; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.net.Address; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.RSGroupProtos; + +@InterfaceAudience.Private +class RSGroupProtobufUtil { + static RSGroupInfo toGroupInfo(RSGroupProtos.RSGroupInfo proto) { + RSGroupInfo RSGroupInfo = new RSGroupInfo(proto.getName()); + for(HBaseProtos.ServerName el: proto.getServersList()) { + RSGroupInfo.addServer(Address.fromParts(el.getHostName(), el.getPort())); + } + for(HBaseProtos.TableName pTableName: proto.getTablesList()) { + RSGroupInfo.addTable(ProtobufUtil.toTableName(pTableName)); + } + return RSGroupInfo; + } + + static RSGroupProtos.RSGroupInfo toProtoGroupInfo(RSGroupInfo pojo) { + List tables = new ArrayList<>(pojo.getTables().size()); + for(TableName arg: pojo.getTables()) { + tables.add(ProtobufUtil.toProtoTableName(arg)); + } + List hostports = new ArrayList<>(pojo.getServers().size()); + for(Address el: pojo.getServers()) { + hostports.add(HBaseProtos.ServerName.newBuilder() + .setHostName(el.getHostname()) + .setPort(el.getPort()) + .build()); + } + return RSGroupProtos.RSGroupInfo.newBuilder().setName(pojo.getName()) + .addAllServers(hostports) + .addAllTables(tables).build(); + } +} diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupSerDe.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupSerDe.java deleted file mode 100644 index 3716ef46608..00000000000 --- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupSerDe.java +++ /dev/null @@ -1,121 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.rsgroup; - -import com.google.common.collect.Lists; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.exceptions.DeserializationException; -import org.apache.hadoop.hbase.net.Address; -import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.protobuf.generated.RSGroupProtos; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.zookeeper.KeeperException; - -//TODO do better encapsulation of SerDe logic from GroupInfoManager and GroupTracker -@InterfaceAudience.Private -public class RSGroupSerDe { - private static final Log LOG = LogFactory.getLog(RSGroupSerDe.class); - - public RSGroupSerDe() {super();} - - public List retrieveGroupList(Table groupTable) throws IOException { - List RSGroupInfoList = Lists.newArrayList(); - for (Result result : groupTable.getScanner(new Scan())) { - RSGroupProtos.RSGroupInfo proto = - RSGroupProtos.RSGroupInfo.parseFrom( - result.getValue( - RSGroupInfoManager.META_FAMILY_BYTES, - RSGroupInfoManager.META_QUALIFIER_BYTES)); - RSGroupInfoList.add(toGroupInfo(proto)); - } - return RSGroupInfoList; - } - - public List retrieveGroupList(ZooKeeperWatcher watcher, - String groupBasePath) throws IOException { - List RSGroupInfoList = Lists.newArrayList(); - //Overwrite any info stored by table, this takes precedence - try { - if(ZKUtil.checkExists(watcher, groupBasePath) != -1) { - for(String znode: ZKUtil.listChildrenAndWatchForNewChildren(watcher, groupBasePath)) { - byte[] data = ZKUtil.getData(watcher, ZKUtil.joinZNode(groupBasePath, znode)); - if(data.length > 0) { - ProtobufUtil.expectPBMagicPrefix(data); - ByteArrayInputStream bis = new ByteArrayInputStream( - data, ProtobufUtil.lengthOfPBMagic(), data.length); - RSGroupInfoList.add(toGroupInfo(RSGroupProtos.RSGroupInfo.parseFrom(bis))); - } - } - LOG.debug("Read ZK GroupInfo count:" + RSGroupInfoList.size()); - } - } catch (KeeperException e) { - throw new IOException("Failed to read rsGroupZNode",e); - } catch (DeserializationException e) { - throw new IOException("Failed to read rsGroupZNode",e); - } catch (InterruptedException e) { - throw new IOException("Failed to read rsGroupZNode",e); - } - return RSGroupInfoList; - } - - - public static RSGroupInfo toGroupInfo(RSGroupProtos.RSGroupInfo proto) { - RSGroupInfo RSGroupInfo = new RSGroupInfo(proto.getName()); - for(HBaseProtos.ServerName el: proto.getServersList()) { - RSGroupInfo.addServer(Address.fromParts(el.getHostName(), el.getPort())); - } - for(HBaseProtos.TableName pTableName: proto.getTablesList()) { - RSGroupInfo.addTable(ProtobufUtil.toTableName(pTableName)); - } - return RSGroupInfo; - } - - public static RSGroupProtos.RSGroupInfo toProtoGroupInfo(RSGroupInfo pojo) { - List tables = - new ArrayList(pojo.getTables().size()); - for(TableName arg: pojo.getTables()) { - tables.add(ProtobufUtil.toProtoTableName(arg)); - } - List hostports = - new ArrayList(pojo.getServers().size()); - for(Address el: pojo.getServers()) { - hostports.add(HBaseProtos.ServerName.newBuilder() - .setHostName(el.getHostname()) - .setPort(el.getPort()) - .build()); - } - return RSGroupProtos.RSGroupInfo.newBuilder().setName(pojo.getName()) - .addAllServers(hostports) - .addAllTables(tables).build(); - } -} diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupableBalancer.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupableBalancer.java index bff392b94b0..54992e90eae 100644 --- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupableBalancer.java +++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupableBalancer.java @@ -26,4 +26,7 @@ import org.apache.hadoop.hbase.master.LoadBalancer; * marked with this Interface before it runs. */ @InterfaceAudience.Private -public interface RSGroupableBalancer extends LoadBalancer {} \ No newline at end of file +public interface RSGroupableBalancer extends LoadBalancer { + /** Config for pluggable load balancers */ + String HBASE_RSGROUP_LOADBALANCER_CLASS = "hbase.rsgroup.grouploadbalancer.class"; +} \ No newline at end of file diff --git a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancer.java b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancer.java index b542fd33612..b7940847f4a 100644 --- a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancer.java +++ b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancer.java @@ -93,8 +93,9 @@ public class TestRSGroupBasedLoadBalancer { tableDescs = constructTableDesc(); Configuration conf = HBaseConfiguration.create(); conf.set("hbase.regions.slop", "0"); - conf.set("hbase.group.grouploadbalancer.class", SimpleLoadBalancer.class.getCanonicalName()); - loadBalancer = new RSGroupBasedLoadBalancer(getMockedGroupInfoManager()); + conf.set("hbase.rsgroup.grouploadbalancer.class", SimpleLoadBalancer.class.getCanonicalName()); + loadBalancer = new RSGroupBasedLoadBalancer(); + loadBalancer.setRsGroupInfoManager(getMockedGroupInfoManager()); loadBalancer.setMasterServices(getMockedMaster()); loadBalancer.setConf(conf); loadBalancer.initialize(); diff --git a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroups.java b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroups.java index 72db2d051ce..38866845ef6 100644 --- a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroups.java +++ b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroups.java @@ -160,9 +160,9 @@ public class TestRSGroups extends TestRSGroupsBase { LOG.info("testNamespaceCreateAndAssign"); String nsName = tablePrefix+"_foo"; final TableName tableName = TableName.valueOf(nsName, tablePrefix + "_testCreateAndAssign"); - RSGroupInfo appInfo = addGroup(rsGroupAdmin, "appInfo", 1); + RSGroupInfo appInfo = addGroup("appInfo", 1); admin.createNamespace(NamespaceDescriptor.create(nsName) - .addConfiguration(RSGroupInfo.NAMESPACEDESC_PROP_GROUP, "appInfo").build()); + .addConfiguration(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP, "appInfo").build()); final HTableDescriptor desc = new HTableDescriptor(tableName); desc.addFamily(new HColumnDescriptor("f")); admin.createTable(desc); @@ -186,7 +186,7 @@ public class TestRSGroups extends TestRSGroupsBase { LOG.info("testDefaultNamespaceCreateAndAssign"); final byte[] tableName = Bytes.toBytes(tablePrefix + "_testCreateAndAssign"); admin.modifyNamespace(NamespaceDescriptor.create("default") - .addConfiguration(RSGroupInfo.NAMESPACEDESC_PROP_GROUP, "default").build()); + .addConfiguration(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP, "default").build()); final HTableDescriptor desc = new HTableDescriptor(tableName); desc.addFamily(new HColumnDescriptor("f")); admin.createTable(desc); @@ -206,7 +206,7 @@ public class TestRSGroups extends TestRSGroupsBase { LOG.info("testNamespaceConstraint"); rsGroupAdmin.addRSGroup(groupName); admin.createNamespace(NamespaceDescriptor.create(nsName) - .addConfiguration(RSGroupInfo.NAMESPACEDESC_PROP_GROUP, groupName) + .addConfiguration(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP, groupName) .build()); //test removing a referenced group try { @@ -218,7 +218,7 @@ public class TestRSGroups extends TestRSGroupsBase { //changing with the same name is fine admin.modifyNamespace( NamespaceDescriptor.create(nsName) - .addConfiguration(RSGroupInfo.NAMESPACEDESC_PROP_GROUP, groupName) + .addConfiguration(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP, groupName) .build()); String anotherGroup = tablePrefix+"_anotherGroup"; rsGroupAdmin.addRSGroup(anotherGroup); @@ -227,7 +227,7 @@ public class TestRSGroups extends TestRSGroupsBase { rsGroupAdmin.removeRSGroup(groupName); try { admin.createNamespace(NamespaceDescriptor.create(nsName) - .addConfiguration(RSGroupInfo.NAMESPACEDESC_PROP_GROUP, "foo") + .addConfiguration(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP, "foo") .build()); fail("Expected a constraint exception"); } catch (IOException ex) { @@ -250,7 +250,7 @@ public class TestRSGroups extends TestRSGroupsBase { final TableName tableName = TableName.valueOf(tablePrefix+"_testMisplacedRegions"); LOG.info("testMisplacedRegions"); - final RSGroupInfo RSGroupInfo = addGroup(rsGroupAdmin, "testMisplacedRegions", 1); + final RSGroupInfo RSGroupInfo = addGroup("testMisplacedRegions", 1); TEST_UTIL.createMultiRegionTable(tableName, new byte[]{'f'}, 15); TEST_UTIL.waitUntilAllRegionsAssigned(tableName); diff --git a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsBase.java b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsBase.java index 944166f7875..9096dfea750 100644 --- a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsBase.java +++ b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsBase.java @@ -53,13 +53,18 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; import org.apache.hadoop.hbase.util.Bytes; import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; import org.junit.Test; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import org.junit.rules.TestName; public abstract class TestRSGroupsBase { protected static final Log LOG = LogFactory.getLog(TestRSGroupsBase.class); + @Rule + public TestName name = new TestName(); //shared protected final static String groupPrefix = "Group"; @@ -75,15 +80,20 @@ public abstract class TestRSGroupsBase { public final static long WAIT_TIMEOUT = 60000*5; public final static int NUM_SLAVES_BASE = 4; //number of slaves for the smallest cluster + // Per test variables + TableName tableName; + @Before + public void setup() { + LOG.info(name.getMethodName()); + tableName = TableName.valueOf(tablePrefix + "_" + name.getMethodName()); + } - - protected RSGroupInfo addGroup(RSGroupAdmin gAdmin, String groupName, - int serverCount) throws IOException, InterruptedException { - RSGroupInfo defaultInfo = gAdmin - .getRSGroupInfo(RSGroupInfo.DEFAULT_GROUP); + protected RSGroupInfo addGroup(String groupName, int serverCount) + throws IOException, InterruptedException { + RSGroupInfo defaultInfo = rsGroupAdmin.getRSGroupInfo(RSGroupInfo.DEFAULT_GROUP); assertTrue(defaultInfo != null); assertTrue(defaultInfo.getServers().size() >= serverCount); - gAdmin.addRSGroup(groupName); + rsGroupAdmin.addRSGroup(groupName); Set
set = new HashSet
(); for(Address server: defaultInfo.getServers()) { @@ -92,17 +102,17 @@ public abstract class TestRSGroupsBase { } set.add(server); } - gAdmin.moveServers(set, groupName); - RSGroupInfo result = gAdmin.getRSGroupInfo(groupName); + rsGroupAdmin.moveServers(set, groupName); + RSGroupInfo result = rsGroupAdmin.getRSGroupInfo(groupName); assertTrue(result.getServers().size() >= serverCount); return result; } - static void removeGroup(RSGroupAdminClient groupAdmin, String groupName) throws IOException { - RSGroupInfo RSGroupInfo = groupAdmin.getRSGroupInfo(groupName); - groupAdmin.moveTables(RSGroupInfo.getTables(), RSGroupInfo.DEFAULT_GROUP); - groupAdmin.moveServers(RSGroupInfo.getServers(), RSGroupInfo.DEFAULT_GROUP); - groupAdmin.removeRSGroup(groupName); + void removeGroup(String groupName) throws IOException { + RSGroupInfo RSGroupInfo = rsGroupAdmin.getRSGroupInfo(groupName); + rsGroupAdmin.moveTables(RSGroupInfo.getTables(), RSGroupInfo.DEFAULT_GROUP); + rsGroupAdmin.moveServers(RSGroupInfo.getServers(), RSGroupInfo.DEFAULT_GROUP); + rsGroupAdmin.removeRSGroup(groupName); } protected void deleteTableIfNecessary() throws IOException { @@ -137,7 +147,7 @@ public abstract class TestRSGroupsBase { = getTableServerRegionMap(); for(TableName tableName : tableServerRegionMap.keySet()) { if(!map.containsKey(tableName)) { - map.put(tableName, new LinkedList()); + map.put(tableName, new LinkedList<>()); } for(List subset: tableServerRegionMap.get(tableName).values()) { map.get(tableName).addAll(subset); @@ -154,10 +164,10 @@ public abstract class TestRSGroupsBase { for(RegionLoad rl : status.getLoad(serverName).getRegionsLoad().values()) { TableName tableName = HRegionInfo.getTable(rl.getName()); if(!map.containsKey(tableName)) { - map.put(tableName, new TreeMap>()); + map.put(tableName, new TreeMap<>()); } if(!map.get(tableName).containsKey(serverName)) { - map.get(tableName).put(serverName, new LinkedList()); + map.get(tableName).put(serverName, new LinkedList<>()); } map.get(tableName).get(serverName).add(rl.getNameAsString()); } @@ -202,8 +212,6 @@ public abstract class TestRSGroupsBase { @Test public void testCreateMultiRegion() throws IOException { - LOG.info("testCreateMultiRegion"); - TableName tableName = TableName.valueOf(tablePrefix + "_testCreateMultiRegion"); byte[] end = {1,3,5,7,9}; byte[] start = {0,2,4,6,8}; byte[][] f = {Bytes.toBytes("f")}; @@ -212,9 +220,6 @@ public abstract class TestRSGroupsBase { @Test public void testCreateAndDrop() throws Exception { - LOG.info("testCreateAndDrop"); - - final TableName tableName = TableName.valueOf(tablePrefix + "_testCreateAndDrop"); TEST_UTIL.createTable(tableName, Bytes.toBytes("cf")); //wait for created table to be assigned TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate() { @@ -230,11 +235,9 @@ public abstract class TestRSGroupsBase { @Test public void testSimpleRegionServerMove() throws IOException, InterruptedException { - LOG.info("testSimpleRegionServerMove"); - int initNumGroups = rsGroupAdmin.listRSGroups().size(); - RSGroupInfo appInfo = addGroup(rsGroupAdmin, getGroupName("testSimpleRegionServerMove"), 1); - RSGroupInfo adminInfo = addGroup(rsGroupAdmin, getGroupName("testSimpleRegionServerMove"), 1); + RSGroupInfo appInfo = addGroup(getGroupName(name.getMethodName()), 1); + RSGroupInfo adminInfo = addGroup(getGroupName(name.getMethodName()), 1); RSGroupInfo dInfo = rsGroupAdmin.getRSGroupInfo(RSGroupInfo.DEFAULT_GROUP); Assert.assertEquals(initNumGroups + 2, rsGroupAdmin.listRSGroups().size()); assertEquals(1, adminInfo.getServers().size()); @@ -264,10 +267,8 @@ public abstract class TestRSGroupsBase { @Test public void testMoveServers() throws Exception { - LOG.info("testMoveServers"); - //create groups and assign servers - addGroup(rsGroupAdmin, "bar", 3); + addGroup("bar", 3); rsGroupAdmin.addRSGroup("foo"); RSGroupInfo barGroup = rsGroupAdmin.getRSGroupInfo("bar"); @@ -319,12 +320,9 @@ public abstract class TestRSGroupsBase { @Test public void testTableMoveTruncateAndDrop() throws Exception { - LOG.info("testTableMove"); - - final TableName tableName = TableName.valueOf(tablePrefix + "_testTableMoveAndDrop"); final byte[] familyNameBytes = Bytes.toBytes("f"); - String newGroupName = getGroupName("testTableMove"); - final RSGroupInfo newGroup = addGroup(rsGroupAdmin, newGroupName, 2); + String newGroupName = getGroupName(name.getMethodName()); + final RSGroupInfo newGroup = addGroup(newGroupName, 2); TEST_UTIL.createMultiRegionTable(tableName, familyNameBytes, 5); TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate() { @@ -378,14 +376,14 @@ public abstract class TestRSGroupsBase { @Test public void testGroupBalance() throws Exception { - LOG.info("testGroupBalance"); - String newGroupName = getGroupName("testGroupBalance"); - final RSGroupInfo newGroup = addGroup(rsGroupAdmin, newGroupName, 3); + LOG.info(name.getMethodName()); + String newGroupName = getGroupName(name.getMethodName()); + final RSGroupInfo newGroup = addGroup(newGroupName, 3); - final TableName tableName = TableName.valueOf(tablePrefix+"_ns", "testGroupBalance"); + final TableName tableName = TableName.valueOf(tablePrefix+"_ns", name.getMethodName()); admin.createNamespace( NamespaceDescriptor.create(tableName.getNamespaceAsString()) - .addConfiguration(RSGroupInfo.NAMESPACEDESC_PROP_GROUP, newGroupName).build()); + .addConfiguration(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP, newGroupName).build()); final byte[] familyNameBytes = Bytes.toBytes("f"); final HTableDescriptor desc = new HTableDescriptor(tableName); desc.addFamily(new HColumnDescriptor("f")); @@ -447,10 +445,7 @@ public abstract class TestRSGroupsBase { @Test public void testRegionMove() throws Exception { - LOG.info("testRegionMove"); - - final RSGroupInfo newGroup = addGroup(rsGroupAdmin, getGroupName("testRegionMove"), 1); - final TableName tableName = TableName.valueOf(tablePrefix + rand.nextInt()); + final RSGroupInfo newGroup = addGroup(getGroupName(name.getMethodName()), 1); final byte[] familyNameBytes = Bytes.toBytes("f"); // All the regions created below will be assigned to the default group. TEST_UTIL.createMultiRegionTable(tableName, familyNameBytes, 6); @@ -515,11 +510,8 @@ public abstract class TestRSGroupsBase { @Test public void testFailRemoveGroup() throws IOException, InterruptedException { - LOG.info("testFailRemoveGroup"); - int initNumGroups = rsGroupAdmin.listRSGroups().size(); - addGroup(rsGroupAdmin, "bar", 3); - TableName tableName = TableName.valueOf(tablePrefix+"_my_table"); + addGroup("bar", 3); TEST_UTIL.createTable(tableName, Bytes.toBytes("f")); rsGroupAdmin.moveTables(Sets.newHashSet(tableName), "bar"); RSGroupInfo barGroup = rsGroupAdmin.getRSGroupInfo("bar"); @@ -551,14 +543,12 @@ public abstract class TestRSGroupsBase { @Test public void testKillRS() throws Exception { - LOG.info("testKillRS"); - RSGroupInfo appInfo = addGroup(rsGroupAdmin, "appInfo", 1); + RSGroupInfo appInfo = addGroup("appInfo", 1); - - final TableName tableName = TableName.valueOf(tablePrefix+"_ns", "_testKillRS"); + final TableName tableName = TableName.valueOf(tablePrefix+"_ns", name.getMethodName()); admin.createNamespace( NamespaceDescriptor.create(tableName.getNamespaceAsString()) - .addConfiguration(RSGroupInfo.NAMESPACEDESC_PROP_GROUP, appInfo.getName()).build()); + .addConfiguration(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP, appInfo.getName()).build()); final HTableDescriptor desc = new HTableDescriptor(tableName); desc.addFamily(new HColumnDescriptor("f")); admin.createTable(desc); @@ -645,13 +635,11 @@ public abstract class TestRSGroupsBase { @Test public void testMultiTableMove() throws Exception { - LOG.info("testMultiTableMove"); - - final TableName tableNameA = TableName.valueOf(tablePrefix + "_testMultiTableMoveA"); - final TableName tableNameB = TableName.valueOf(tablePrefix + "_testMultiTableMoveB"); + final TableName tableNameA = TableName.valueOf(tablePrefix + name.getMethodName() + "A"); + final TableName tableNameB = TableName.valueOf(tablePrefix + name.getMethodName() + "B"); final byte[] familyNameBytes = Bytes.toBytes("f"); - String newGroupName = getGroupName("testMultiTableMove"); - final RSGroupInfo newGroup = addGroup(rsGroupAdmin, newGroupName, 1); + String newGroupName = getGroupName(name.getMethodName()); + final RSGroupInfo newGroup = addGroup(newGroupName, 1); TEST_UTIL.createTable(tableNameA, familyNameBytes); TEST_UTIL.createTable(tableNameB, familyNameBytes); diff --git a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsOfflineMode.java b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsOfflineMode.java index 16719a1c7b0..4802ca45fdc 100644 --- a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsOfflineMode.java +++ b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsOfflineMode.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.rsgroup; import com.google.common.collect.Sets; +import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HBaseCluster; import org.apache.hadoop.hbase.HBaseTestingUtility; @@ -46,19 +47,21 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -//This tests that GroupBasedBalancer will use data in zk -//to do balancing during master startup -//This does not test retain assignment +// This tests that GroupBasedBalancer will use data in zk to do balancing during master startup. +// This does not test retain assignment. +// The tests brings up 3 RS, creates a new RS group 'my_group', moves 1 RS to 'my_group', assigns +// 'hbase:rsgroup' to 'my_group', and kill the only server in that group so that 'hbase:rsgroup' +// table isn't available. It then kills the active master and waits for backup master to come +// online. In new master, RSGroupInfoManagerImpl gets the data from zk and waits for the expected +// assignment with a timeout. @Category(MediumTests.class) public class TestRSGroupsOfflineMode { - private static final org.apache.commons.logging.Log LOG = - LogFactory.getLog(TestRSGroupsOfflineMode.class); + private static final Log LOG = LogFactory.getLog(TestRSGroupsOfflineMode.class); private static HMaster master; private static Admin hbaseAdmin; private static HBaseTestingUtility TEST_UTIL; private static HBaseCluster cluster; - private static RSGroupAdminEndpoint RSGroupAdminEndpoint; - public final static long WAIT_TIMEOUT = 60000*5; + private final static long WAIT_TIMEOUT = 60000 * 5; @Rule public TestName name = new TestName(); @@ -88,8 +91,6 @@ public class TestRSGroupsOfflineMode { master.getServerManager().getOnlineServersList().size() >= 3; } }); - RSGroupAdminEndpoint = - master.getMasterCoprocessorHost().findCoprocessors(RSGroupAdminEndpoint.class).get(0); } @AfterClass @@ -99,8 +100,7 @@ public class TestRSGroupsOfflineMode { @Test public void testOffline() throws Exception, InterruptedException { - //table should be after group table name - //so it gets assigned later + // Table should be after group table name so it gets assigned later. final TableName failoverTable = TableName.valueOf(name.getMethodName()); TEST_UTIL.createTable(failoverTable, Bytes.toBytes("f")); final HRegionServer killRS = ((MiniHBaseCluster)cluster).getRegionServer(0); @@ -111,41 +111,40 @@ public class TestRSGroupsOfflineMode { groupAdmin.addRSGroup(newGroup); if(master.getAssignmentManager().getRegionStates().getRegionAssignments() .containsValue(failoverRS.getServerName())) { - for(HRegionInfo regionInfo: hbaseAdmin.getOnlineRegions(failoverRS.getServerName())) { - hbaseAdmin.move(regionInfo.getEncodedNameAsBytes(), - Bytes.toBytes(failoverRS.getServerName().getServerName())); - } - LOG.info("Waiting for region unassignments on failover RS..."); - TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate() { - @Override - public boolean evaluate() throws Exception { - return master.getServerManager().getLoad(failoverRS.getServerName()) - .getRegionsLoad().size() > 0; - } - }); - - //move server to group and make sure all tables are assigned - groupAdmin.moveServers(Sets.newHashSet(groupRS.getServerName().getAddress()), newGroup); + for (HRegionInfo regionInfo : hbaseAdmin.getOnlineRegions(failoverRS.getServerName())) { + hbaseAdmin.move(regionInfo.getEncodedNameAsBytes(), + Bytes.toBytes(failoverRS.getServerName().getServerName())); + } + LOG.info("Waiting for region unassignments on failover RS..."); TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate() { - @Override - public boolean evaluate() throws Exception { - return groupRS.getNumberOfOnlineRegions() < 1 && - master.getAssignmentManager().getRegionStates().getRegionsInTransition().size() < 1; - } - }); - //move table to group and wait - groupAdmin.moveTables(Sets.newHashSet(RSGroupInfoManager.RSGROUP_TABLE_NAME), newGroup); - LOG.info("Waiting for move table..."); - TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate() { - @Override - public boolean evaluate() throws Exception { - return groupRS.getNumberOfOnlineRegions() == 1; + @Override public boolean evaluate() throws Exception { + return master.getServerManager().getLoad(failoverRS.getServerName()) + .getRegionsLoad().size() > 0; } }); } + // Move server to group and make sure all tables are assigned. + groupAdmin.moveServers(Sets.newHashSet(groupRS.getServerName().getAddress()), newGroup); + TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return groupRS.getNumberOfOnlineRegions() < 1 && + master.getAssignmentManager().getRegionStates().getRegionsInTransition().size() < 1; + } + }); + // Move table to group and wait. + groupAdmin.moveTables(Sets.newHashSet(RSGroupInfoManager.RSGROUP_TABLE_NAME), newGroup); + LOG.info("Waiting for move table..."); + TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return groupRS.getNumberOfOnlineRegions() == 1; + } + }); + groupRS.stop("die"); - //race condition here + // Race condition here. TEST_UTIL.getHBaseCluster().getMaster().stopMaster(); LOG.info("Waiting for offline mode..."); TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate() { @@ -159,16 +158,17 @@ public class TestRSGroupsOfflineMode { } }); - - RSGroupInfoManager groupMgr = RSGroupAdminEndpoint.getGroupInfoManager(); - //make sure balancer is in offline mode, since this is what we're testing + // Get groupInfoManager from the new active master. + RSGroupInfoManager groupMgr = ((MiniHBaseCluster)cluster).getMaster().getMasterCoprocessorHost() + .findCoprocessors(RSGroupAdminEndpoint.class).get(0).getGroupInfoManager(); + // Make sure balancer is in offline mode, since this is what we're testing. assertFalse(groupMgr.isOnline()); - //verify the group affiliation that's loaded from ZK instead of tables + // Verify the group affiliation that's loaded from ZK instead of tables. assertEquals(newGroup, groupMgr.getRSGroupOfTable(RSGroupInfoManager.RSGROUP_TABLE_NAME)); assertEquals(RSGroupInfo.DEFAULT_GROUP, groupMgr.getRSGroupOfTable(failoverTable)); - //kill final regionserver to see the failover happens for all tables - //except GROUP table since it's group does not have any online RS + // Kill final regionserver to see the failover happens for all tables except GROUP table since + // it's group does not have any online RS. killRS.stop("die"); master = TEST_UTIL.getHBaseCluster().getMaster(); LOG.info("Waiting for new table assignment..."); @@ -180,7 +180,7 @@ public class TestRSGroupsOfflineMode { }); Assert.assertEquals(0, failoverRS.getOnlineRegions(RSGroupInfoManager.RSGROUP_TABLE_NAME).size()); - //need this for minicluster to shutdown cleanly + // Need this for minicluster to shutdown cleanly. master.stopMaster(); } } diff --git a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdminClient.java b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdminClient.java index 0d7c8a8ef37..77b7cea7c39 100644 --- a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdminClient.java +++ b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdminClient.java @@ -89,8 +89,8 @@ public class VerifyingRSGroupAdminClient implements RSGroupAdmin { } @Override - public boolean balanceRSGroup(String name) throws IOException { - return wrapped.balanceRSGroup(name); + public boolean balanceRSGroup(String groupName) throws IOException { + return wrapped.balanceRSGroup(groupName); } @Override @@ -113,7 +113,7 @@ public class VerifyingRSGroupAdminClient implements RSGroupAdmin { result.getValue( RSGroupInfoManager.META_FAMILY_BYTES, RSGroupInfoManager.META_QUALIFIER_BYTES)); - groupMap.put(proto.getName(), RSGroupSerDe.toGroupInfo(proto)); + groupMap.put(proto.getName(), RSGroupProtobufUtil.toGroupInfo(proto)); } Assert.assertEquals(Sets.newHashSet(groupMap.values()), Sets.newHashSet(wrapped.listRSGroups())); @@ -125,7 +125,7 @@ public class VerifyingRSGroupAdminClient implements RSGroupAdmin { ProtobufUtil.expectPBMagicPrefix(data); ByteArrayInputStream bis = new ByteArrayInputStream( data, ProtobufUtil.lengthOfPBMagic(), data.length); - zList.add(RSGroupSerDe.toGroupInfo(RSGroupProtos.RSGroupInfo.parseFrom(bis))); + zList.add(RSGroupProtobufUtil.toGroupInfo(RSGroupProtos.RSGroupInfo.parseFrom(bis))); } } Assert.assertEquals(zList.size(), groupMap.size());