diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java index 6f3c1d1f15f..365082682be 100644 --- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java +++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java @@ -202,8 +202,14 @@ public class RSGroupAdminEndpoint implements MasterCoprocessor, MasterObserver { LOG.info(master.getClientIdAuditPrefix() + " move servers " + hostPorts +" to rsgroup " + request.getTargetGroup()); try { + if (master.getMasterCoprocessorHost() != null) { + master.getMasterCoprocessorHost().preMoveServers(hostPorts, request.getTargetGroup()); + } checkPermission("moveServers"); groupAdminServer.moveServers(hostPorts, request.getTargetGroup()); + if (master.getMasterCoprocessorHost() != null) { + master.getMasterCoprocessorHost().postMoveServers(hostPorts, request.getTargetGroup()); + } } catch (IOException e) { CoprocessorRpcUtils.setControllerException(controller, e); } @@ -221,8 +227,14 @@ public class RSGroupAdminEndpoint implements MasterCoprocessor, MasterObserver { LOG.info(master.getClientIdAuditPrefix() + " move tables " + tables +" to rsgroup " + request.getTargetGroup()); try { + if (master.getMasterCoprocessorHost() != null) { + master.getMasterCoprocessorHost().preMoveTables(tables, request.getTargetGroup()); + } checkPermission("moveTables"); groupAdminServer.moveTables(tables, request.getTargetGroup()); + if (master.getMasterCoprocessorHost() != null) { + master.getMasterCoprocessorHost().postMoveTables(tables, request.getTargetGroup()); + } } catch (IOException e) { CoprocessorRpcUtils.setControllerException(controller, e); } @@ -235,8 +247,14 @@ public class RSGroupAdminEndpoint implements MasterCoprocessor, MasterObserver { AddRSGroupResponse.Builder builder = AddRSGroupResponse.newBuilder(); LOG.info(master.getClientIdAuditPrefix() + " add rsgroup " + request.getRSGroupName()); try { + if (master.getMasterCoprocessorHost() != null) { + master.getMasterCoprocessorHost().preAddRSGroup(request.getRSGroupName()); + } checkPermission("addRSGroup"); groupAdminServer.addRSGroup(request.getRSGroupName()); + if (master.getMasterCoprocessorHost() != null) { + master.getMasterCoprocessorHost().postAddRSGroup(request.getRSGroupName()); + } } catch (IOException e) { CoprocessorRpcUtils.setControllerException(controller, e); } @@ -250,8 +268,14 @@ public class RSGroupAdminEndpoint implements MasterCoprocessor, MasterObserver { RemoveRSGroupResponse.newBuilder(); LOG.info(master.getClientIdAuditPrefix() + " remove rsgroup " + request.getRSGroupName()); try { + if (master.getMasterCoprocessorHost() != null) { + master.getMasterCoprocessorHost().preRemoveRSGroup(request.getRSGroupName()); + } checkPermission("removeRSGroup"); groupAdminServer.removeRSGroup(request.getRSGroupName()); + if (master.getMasterCoprocessorHost() != null) { + master.getMasterCoprocessorHost().postRemoveRSGroup(request.getRSGroupName()); + } } catch (IOException e) { CoprocessorRpcUtils.setControllerException(controller, e); } @@ -265,8 +289,16 @@ public class RSGroupAdminEndpoint implements MasterCoprocessor, MasterObserver { LOG.info(master.getClientIdAuditPrefix() + " balance rsgroup, group=" + request.getRSGroupName()); try { + if (master.getMasterCoprocessorHost() != null) { + master.getMasterCoprocessorHost().preBalanceRSGroup(request.getRSGroupName()); + } checkPermission("balanceRSGroup"); - builder.setBalanceRan(groupAdminServer.balanceRSGroup(request.getRSGroupName())); + boolean balancerRan = groupAdminServer.balanceRSGroup(request.getRSGroupName()); + builder.setBalanceRan(balancerRan); + if (master.getMasterCoprocessorHost() != null) { + master.getMasterCoprocessorHost().postBalanceRSGroup(request.getRSGroupName(), + balancerRan); + } } catch (IOException e) { CoprocessorRpcUtils.setControllerException(controller, e); builder.setBalanceRan(false); @@ -325,8 +357,16 @@ public class RSGroupAdminEndpoint implements MasterCoprocessor, MasterObserver { LOG.info(master.getClientIdAuditPrefix() + " move servers " + hostPorts + " and tables " + tables + " to rsgroup" + request.getTargetGroup()); try { + if (master.getMasterCoprocessorHost() != null) { + master.getMasterCoprocessorHost().preMoveServersAndTables(hostPorts, tables, + request.getTargetGroup()); + } checkPermission("moveServersAndTables"); groupAdminServer.moveServersAndTables(hostPorts, tables, request.getTargetGroup()); + if (master.getMasterCoprocessorHost() != null) { + master.getMasterCoprocessorHost().postMoveServersAndTables(hostPorts, tables, + request.getTargetGroup()); + } } catch (IOException e) { CoprocessorRpcUtils.setControllerException(controller, e); } @@ -346,8 +386,14 @@ public class RSGroupAdminEndpoint implements MasterCoprocessor, MasterObserver { LOG.info(master.getClientIdAuditPrefix() + " remove decommissioned servers from rsgroup: " + servers); try { + if (master.getMasterCoprocessorHost() != null) { + master.getMasterCoprocessorHost().preRemoveServers(servers); + } checkPermission("removeServers"); groupAdminServer.removeServers(servers); + if (master.getMasterCoprocessorHost() != null) { + master.getMasterCoprocessorHost().postRemoveServers(servers); + } } catch (IOException e) { CoprocessorRpcUtils.setControllerException(controller, e); } diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java index 670e8aa62ba..b39d3a19a1b 100644 --- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java +++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java @@ -291,9 +291,6 @@ public class RSGroupAdminServer implements RSGroupAdmin { // Hold a lock on the manager instance while moving servers to prevent // another writer changing our state while we are working. synchronized (rsGroupInfoManager) { - if (master.getMasterCoprocessorHost() != null) { - master.getMasterCoprocessorHost().preMoveServers(servers, targetGroupName); - } // Presume first server's source group. Later ensure all servers are from this group. Address firstServer = servers.iterator().next(); RSGroupInfo srcGrp = rsGroupInfoManager.getRSGroupOfServer(firstServer); @@ -370,9 +367,6 @@ public class RSGroupAdminServer implements RSGroupAdmin { } } while (foundRegionsToMove); - if (master.getMasterCoprocessorHost() != null) { - master.getMasterCoprocessorHost().postMoveServers(servers, targetGroupName); - } LOG.info("Move server done: " + srcGrp.getName() + "=>" + targetGroupName); } } @@ -390,9 +384,6 @@ public class RSGroupAdminServer implements RSGroupAdmin { // Hold a lock on the manager instance while moving servers to prevent // another writer changing our state while we are working. synchronized (rsGroupInfoManager) { - if (master.getMasterCoprocessorHost() != null) { - master.getMasterCoprocessorHost().preMoveTables(tables, targetGroup); - } if(targetGroup != null) { RSGroupInfo destGroup = rsGroupInfoManager.getRSGroup(targetGroup); if(destGroup == null) { @@ -430,22 +421,12 @@ public class RSGroupAdminServer implements RSGroupAdmin { } } } - - if (master.getMasterCoprocessorHost() != null) { - master.getMasterCoprocessorHost().postMoveTables(tables, targetGroup); - } } } @Override public void addRSGroup(String name) throws IOException { - if (master.getMasterCoprocessorHost() != null) { - master.getMasterCoprocessorHost().preAddRSGroup(name); - } rsGroupInfoManager.addRSGroup(new RSGroupInfo(name)); - if (master.getMasterCoprocessorHost() != null) { - master.getMasterCoprocessorHost().postAddRSGroup(name); - } } @Override @@ -453,9 +434,6 @@ public class RSGroupAdminServer implements RSGroupAdmin { // Hold a lock on the manager instance while moving servers to prevent // another writer changing our state while we are working. synchronized (rsGroupInfoManager) { - if (master.getMasterCoprocessorHost() != null) { - master.getMasterCoprocessorHost().preRemoveRSGroup(name); - } RSGroupInfo rsGroupInfo = rsGroupInfoManager.getRSGroup(name); if (rsGroupInfo == null) { throw new ConstraintException("RSGroup " + name + " does not exist"); @@ -480,9 +458,6 @@ public class RSGroupAdminServer implements RSGroupAdmin { } } rsGroupInfoManager.removeRSGroup(name); - if (master.getMasterCoprocessorHost() != null) { - master.getMasterCoprocessorHost().postRemoveRSGroup(name); - } } } @@ -498,9 +473,6 @@ public class RSGroupAdminServer implements RSGroupAdmin { return false; } - if (master.getMasterCoprocessorHost() != null) { - master.getMasterCoprocessorHost().preBalanceRSGroup(groupName); - } if (getRSGroupInfo(groupName) == null) { throw new ConstraintException("RSGroup does not exist: "+groupName); } @@ -542,9 +514,6 @@ public class RSGroupAdminServer implements RSGroupAdmin { LOG.info("RSGroup balance " + groupName + " completed after " + (System.currentTimeMillis()-startTime) + " seconds"); } - if (master.getMasterCoprocessorHost() != null) { - master.getMasterCoprocessorHost().postBalanceRSGroup(groupName, balancerRan); - } return balancerRan; } } @@ -575,9 +544,6 @@ public class RSGroupAdminServer implements RSGroupAdmin { // Hold a lock on the manager instance while moving servers and tables to prevent // another writer changing our state while we are working. synchronized (rsGroupInfoManager) { - if (master.getMasterCoprocessorHost() != null) { - master.getMasterCoprocessorHost().preMoveServersAndTables(servers, tables, targetGroup); - } //check servers and tables status checkServersAndTables(servers, tables, targetGroup); @@ -589,10 +555,6 @@ public class RSGroupAdminServer implements RSGroupAdmin { moveRegionsFromServers(servers, tables, targetGroup); //move regions which should belong to these servers moveRegionsToServers(servers, tables, targetGroup); - - if (master.getMasterCoprocessorHost() != null) { - master.getMasterCoprocessorHost().postMoveServersAndTables(servers, tables, targetGroup); - } } LOG.info("Move servers and tables done. Severs :" + servers + " , Tables : " + tables + " => " + targetGroup); @@ -607,15 +569,9 @@ public class RSGroupAdminServer implements RSGroupAdmin { // Hold a lock on the manager instance while moving servers to prevent // another writer changing our state while we are working. synchronized (rsGroupInfoManager) { - if (master.getMasterCoprocessorHost() != null) { - master.getMasterCoprocessorHost().preRemoveServers(servers); - } //check the set of servers checkForDeadOrOnlineServers(servers); rsGroupInfoManager.removeServers(servers); - if (master.getMasterCoprocessorHost() != null) { - master.getMasterCoprocessorHost().postRemoveServers(servers); - } LOG.info("Remove decommissioned servers " + servers + " from rsgroup done."); } } diff --git a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroups.java b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroups.java index 521b8b91a4c..3e74f819cd3 100644 --- a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroups.java +++ b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroups.java @@ -23,6 +23,9 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.util.Iterator; +import java.util.Optional; +import java.util.Set; + import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; @@ -36,6 +39,11 @@ import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.Waiter.Predicate; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor; +import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.MasterObserver; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.ServerManager; import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; import org.apache.hadoop.hbase.net.Address; @@ -69,7 +77,7 @@ public class TestRSGroups extends TestRSGroupsBase { protected static final Logger LOG = LoggerFactory.getLogger(TestRSGroups.class); private static boolean INIT = false; private static RSGroupAdminEndpoint rsGroupAdminEndpoint; - + private static CPMasterObserver observer; @BeforeClass public static void setUp() throws Exception { @@ -80,7 +88,7 @@ public class TestRSGroups extends TestRSGroupsBase { HConstants.HBASE_MASTER_LOADBALANCER_CLASS, RSGroupBasedLoadBalancer.class.getName()); TEST_UTIL.getConfiguration().set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, - RSGroupAdminEndpoint.class.getName()); + RSGroupAdminEndpoint.class.getName() + "," + CPMasterObserver.class.getName()); // Enable quota for testRSGroupsWithHBaseQuota() TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true); TEST_UTIL.startMiniCluster(NUM_SLAVES_BASE - 1); @@ -104,8 +112,10 @@ public class TestRSGroups extends TestRSGroupsBase { admin.setBalancerRunning(false,true); rsGroupAdmin = new VerifyingRSGroupAdminClient( new RSGroupAdminClient(TEST_UTIL.getConnection()), TEST_UTIL.getConfiguration()); + MasterCoprocessorHost host = master.getMasterCoprocessorHost(); + observer = (CPMasterObserver) host.findCoprocessor(CPMasterObserver.class.getName()); rsGroupAdminEndpoint = (RSGroupAdminEndpoint) - master.getMasterCoprocessorHost().findCoprocessor(RSGroupAdminEndpoint.class.getName()); + host.findCoprocessor(RSGroupAdminEndpoint.class.getName()); } @AfterClass @@ -148,6 +158,7 @@ public class TestRSGroups extends TestRSGroupsBase { } catch (Exception ex) { LOG.warn("Got this on setup, FYI", ex); } + assertTrue(observer.preMoveServersCalled); TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate() { @Override public boolean evaluate() throws Exception { @@ -221,6 +232,9 @@ public class TestRSGroups extends TestRSGroupsBase { String groupName = tablePrefix+"_foo"; LOG.info("testNamespaceConstraint"); rsGroupAdmin.addRSGroup(groupName); + assertTrue(observer.preAddRSGroupCalled); + assertTrue(observer.postAddRSGroupCalled); + admin.createNamespace(NamespaceDescriptor.create(nsName) .addConfiguration(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP, groupName) .build()); @@ -241,6 +255,8 @@ public class TestRSGroups extends TestRSGroupsBase { //test add non-existent group admin.deleteNamespace(nsName); rsGroupAdmin.removeRSGroup(groupName); + assertTrue(observer.preRemoveRSGroupCalled); + assertTrue(observer.postRemoveRSGroupCalled); try { admin.createNamespace(NamespaceDescriptor.create(nsName) .addConfiguration(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP, "foo") @@ -261,6 +277,120 @@ public class TestRSGroups extends TestRSGroupsBase { it.next(); } + public static class CPMasterObserver implements MasterCoprocessor, MasterObserver { + boolean preBalanceRSGroupCalled = false; + boolean postBalanceRSGroupCalled = false; + boolean preMoveServersCalled = false; + boolean postMoveServersCalled = false; + boolean preMoveTablesCalled = false; + boolean postMoveTablesCalled = false; + boolean preAddRSGroupCalled = false; + boolean postAddRSGroupCalled = false; + boolean preRemoveRSGroupCalled = false; + boolean postRemoveRSGroupCalled = false; + boolean preRemoveServersCalled = false; + boolean postRemoveServersCalled = false; + boolean preMoveServersAndTables = false; + boolean postMoveServersAndTables = false; + + @Override + public Optional getMasterObserver() { + return Optional.of(this); + } + @Override + public void preMoveServersAndTables(final ObserverContext ctx, + Set
servers, Set tables, String targetGroup) throws IOException { + preMoveServersAndTables = true; + } + @Override + public void postMoveServersAndTables(final ObserverContext ctx, + Set
servers, Set tables, String targetGroup) throws IOException { + postMoveServersAndTables = true; + } + @Override + public void preRemoveServers( + final ObserverContext ctx, + Set
servers) throws IOException { + preRemoveServersCalled = true; + } + @Override + public void postRemoveServers( + final ObserverContext ctx, + Set
servers) throws IOException { + postRemoveServersCalled = true; + } + @Override + public void preRemoveRSGroup(final ObserverContext ctx, + String name) throws IOException { + preRemoveRSGroupCalled = true; + } + @Override + public void postRemoveRSGroup(final ObserverContext ctx, + String name) throws IOException { + postRemoveRSGroupCalled = true; + } + @Override + public void preAddRSGroup(final ObserverContext ctx, + String name) throws IOException { + preAddRSGroupCalled = true; + } + @Override + public void postAddRSGroup(final ObserverContext ctx, + String name) throws IOException { + postAddRSGroupCalled = true; + } + @Override + public void preMoveTables(final ObserverContext ctx, + Set tables, String targetGroup) throws IOException { + preMoveTablesCalled = true; + } + @Override + public void postMoveTables(final ObserverContext ctx, + Set tables, String targetGroup) throws IOException { + postMoveTablesCalled = true; + } + @Override + public void preMoveServers(final ObserverContext ctx, + Set
servers, String targetGroup) throws IOException { + preMoveServersCalled = true; + } + + @Override + public void postMoveServers(final ObserverContext ctx, + Set
servers, String targetGroup) throws IOException { + postMoveServersCalled = true; + } + @Override + public void preBalanceRSGroup(final ObserverContext ctx, + String groupName) throws IOException { + preBalanceRSGroupCalled = true; + } + @Override + public void postBalanceRSGroup(final ObserverContext ctx, + String groupName, boolean balancerRan) throws IOException { + postBalanceRSGroupCalled = true; + } + } + @Test + public void testMoveServersAndTables() throws Exception { + super.testMoveServersAndTables(); + assertTrue(observer.preMoveServersAndTables); + assertTrue(observer.postMoveServersAndTables); + } + @Test + public void testTableMoveTruncateAndDrop() throws Exception { + super.testTableMoveTruncateAndDrop(); + assertTrue(observer.preMoveTablesCalled); + assertTrue(observer.postMoveTablesCalled); + } + + @Test + public void testRemoveServers() throws Exception { + super.testRemoveServers(); + assertTrue(observer.preRemoveServersCalled); + assertTrue(observer.postRemoveServersCalled); + } + @Test public void testMisplacedRegions() throws Exception { final TableName tableName = TableName.valueOf(tablePrefix+"_testMisplacedRegions"); @@ -277,6 +407,8 @@ public class TestRSGroups extends TestRSGroupsBase { admin.setBalancerRunning(true,true); assertTrue(rsGroupAdmin.balanceRSGroup(RSGroupInfo.getName())); admin.setBalancerRunning(false,true); + assertTrue(observer.preBalanceRSGroupCalled); + assertTrue(observer.postBalanceRSGroupCalled); TEST_UTIL.waitFor(60000, new Predicate() { @Override