HBASE-20627 Relocate RS Group pre/post hooks from RSGroupAdminServer to RSGroupAdminEndpoint

This commit is contained in:
tedyu 2018-05-23 15:57:00 -07:00
parent afddf6b1c2
commit 12d75724d7
3 changed files with 182 additions and 48 deletions

View File

@ -202,8 +202,14 @@ public class RSGroupAdminEndpoint implements MasterCoprocessor, MasterObserver {
LOG.info(master.getClientIdAuditPrefix() + " move servers " + hostPorts +" to rsgroup "
+ request.getTargetGroup());
try {
if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().preMoveServers(hostPorts, request.getTargetGroup());
}
checkPermission("moveServers");
groupAdminServer.moveServers(hostPorts, request.getTargetGroup());
if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().postMoveServers(hostPorts, request.getTargetGroup());
}
} catch (IOException e) {
CoprocessorRpcUtils.setControllerException(controller, e);
}
@ -221,8 +227,14 @@ public class RSGroupAdminEndpoint implements MasterCoprocessor, MasterObserver {
LOG.info(master.getClientIdAuditPrefix() + " move tables " + tables +" to rsgroup "
+ request.getTargetGroup());
try {
if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().preMoveTables(tables, request.getTargetGroup());
}
checkPermission("moveTables");
groupAdminServer.moveTables(tables, request.getTargetGroup());
if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().postMoveTables(tables, request.getTargetGroup());
}
} catch (IOException e) {
CoprocessorRpcUtils.setControllerException(controller, e);
}
@ -235,8 +247,14 @@ public class RSGroupAdminEndpoint implements MasterCoprocessor, MasterObserver {
AddRSGroupResponse.Builder builder = AddRSGroupResponse.newBuilder();
LOG.info(master.getClientIdAuditPrefix() + " add rsgroup " + request.getRSGroupName());
try {
if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().preAddRSGroup(request.getRSGroupName());
}
checkPermission("addRSGroup");
groupAdminServer.addRSGroup(request.getRSGroupName());
if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().postAddRSGroup(request.getRSGroupName());
}
} catch (IOException e) {
CoprocessorRpcUtils.setControllerException(controller, e);
}
@ -250,8 +268,14 @@ public class RSGroupAdminEndpoint implements MasterCoprocessor, MasterObserver {
RemoveRSGroupResponse.newBuilder();
LOG.info(master.getClientIdAuditPrefix() + " remove rsgroup " + request.getRSGroupName());
try {
if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().preRemoveRSGroup(request.getRSGroupName());
}
checkPermission("removeRSGroup");
groupAdminServer.removeRSGroup(request.getRSGroupName());
if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().postRemoveRSGroup(request.getRSGroupName());
}
} catch (IOException e) {
CoprocessorRpcUtils.setControllerException(controller, e);
}
@ -265,8 +289,16 @@ public class RSGroupAdminEndpoint implements MasterCoprocessor, MasterObserver {
LOG.info(master.getClientIdAuditPrefix() + " balance rsgroup, group="
+ request.getRSGroupName());
try {
if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().preBalanceRSGroup(request.getRSGroupName());
}
checkPermission("balanceRSGroup");
builder.setBalanceRan(groupAdminServer.balanceRSGroup(request.getRSGroupName()));
boolean balancerRan = groupAdminServer.balanceRSGroup(request.getRSGroupName());
builder.setBalanceRan(balancerRan);
if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().postBalanceRSGroup(request.getRSGroupName(),
balancerRan);
}
} catch (IOException e) {
CoprocessorRpcUtils.setControllerException(controller, e);
builder.setBalanceRan(false);
@ -325,8 +357,16 @@ public class RSGroupAdminEndpoint implements MasterCoprocessor, MasterObserver {
LOG.info(master.getClientIdAuditPrefix() + " move servers " + hostPorts
+ " and tables " + tables + " to rsgroup" + request.getTargetGroup());
try {
if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().preMoveServersAndTables(hostPorts, tables,
request.getTargetGroup());
}
checkPermission("moveServersAndTables");
groupAdminServer.moveServersAndTables(hostPorts, tables, request.getTargetGroup());
if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().postMoveServersAndTables(hostPorts, tables,
request.getTargetGroup());
}
} catch (IOException e) {
CoprocessorRpcUtils.setControllerException(controller, e);
}
@ -346,8 +386,14 @@ public class RSGroupAdminEndpoint implements MasterCoprocessor, MasterObserver {
LOG.info(master.getClientIdAuditPrefix()
+ " remove decommissioned servers from rsgroup: " + servers);
try {
if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().preRemoveServers(servers);
}
checkPermission("removeServers");
groupAdminServer.removeServers(servers);
if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().postRemoveServers(servers);
}
} catch (IOException e) {
CoprocessorRpcUtils.setControllerException(controller, e);
}

View File

@ -291,9 +291,6 @@ public class RSGroupAdminServer implements RSGroupAdmin {
// Hold a lock on the manager instance while moving servers to prevent
// another writer changing our state while we are working.
synchronized (rsGroupInfoManager) {
if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().preMoveServers(servers, targetGroupName);
}
// Presume first server's source group. Later ensure all servers are from this group.
Address firstServer = servers.iterator().next();
RSGroupInfo srcGrp = rsGroupInfoManager.getRSGroupOfServer(firstServer);
@ -370,9 +367,6 @@ public class RSGroupAdminServer implements RSGroupAdmin {
}
} while (foundRegionsToMove);
if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().postMoveServers(servers, targetGroupName);
}
LOG.info("Move server done: " + srcGrp.getName() + "=>" + targetGroupName);
}
}
@ -390,9 +384,6 @@ public class RSGroupAdminServer implements RSGroupAdmin {
// Hold a lock on the manager instance while moving servers to prevent
// another writer changing our state while we are working.
synchronized (rsGroupInfoManager) {
if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().preMoveTables(tables, targetGroup);
}
if(targetGroup != null) {
RSGroupInfo destGroup = rsGroupInfoManager.getRSGroup(targetGroup);
if(destGroup == null) {
@ -430,22 +421,12 @@ public class RSGroupAdminServer implements RSGroupAdmin {
}
}
}
if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().postMoveTables(tables, targetGroup);
}
}
}
@Override
public void addRSGroup(String name) throws IOException {
if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().preAddRSGroup(name);
}
rsGroupInfoManager.addRSGroup(new RSGroupInfo(name));
if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().postAddRSGroup(name);
}
}
@Override
@ -453,9 +434,6 @@ public class RSGroupAdminServer implements RSGroupAdmin {
// Hold a lock on the manager instance while moving servers to prevent
// another writer changing our state while we are working.
synchronized (rsGroupInfoManager) {
if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().preRemoveRSGroup(name);
}
RSGroupInfo rsGroupInfo = rsGroupInfoManager.getRSGroup(name);
if (rsGroupInfo == null) {
throw new ConstraintException("RSGroup " + name + " does not exist");
@ -480,9 +458,6 @@ public class RSGroupAdminServer implements RSGroupAdmin {
}
}
rsGroupInfoManager.removeRSGroup(name);
if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().postRemoveRSGroup(name);
}
}
}
@ -498,9 +473,6 @@ public class RSGroupAdminServer implements RSGroupAdmin {
return false;
}
if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().preBalanceRSGroup(groupName);
}
if (getRSGroupInfo(groupName) == null) {
throw new ConstraintException("RSGroup does not exist: "+groupName);
}
@ -542,9 +514,6 @@ public class RSGroupAdminServer implements RSGroupAdmin {
LOG.info("RSGroup balance " + groupName + " completed after " +
(System.currentTimeMillis()-startTime) + " seconds");
}
if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().postBalanceRSGroup(groupName, balancerRan);
}
return balancerRan;
}
}
@ -575,9 +544,6 @@ public class RSGroupAdminServer implements RSGroupAdmin {
// Hold a lock on the manager instance while moving servers and tables to prevent
// another writer changing our state while we are working.
synchronized (rsGroupInfoManager) {
if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().preMoveServersAndTables(servers, tables, targetGroup);
}
//check servers and tables status
checkServersAndTables(servers, tables, targetGroup);
@ -589,10 +555,6 @@ public class RSGroupAdminServer implements RSGroupAdmin {
moveRegionsFromServers(servers, tables, targetGroup);
//move regions which should belong to these servers
moveRegionsToServers(servers, tables, targetGroup);
if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().postMoveServersAndTables(servers, tables, targetGroup);
}
}
LOG.info("Move servers and tables done. Severs :"
+ servers + " , Tables : " + tables + " => " + targetGroup);
@ -607,15 +569,9 @@ public class RSGroupAdminServer implements RSGroupAdmin {
// Hold a lock on the manager instance while moving servers to prevent
// another writer changing our state while we are working.
synchronized (rsGroupInfoManager) {
if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().preRemoveServers(servers);
}
//check the set of servers
checkForDeadOrOnlineServers(servers);
rsGroupInfoManager.removeServers(servers);
if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().postRemoveServers(servers);
}
LOG.info("Remove decommissioned servers " + servers + " from rsgroup done.");
}
}

View File

@ -23,6 +23,9 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.Iterator;
import java.util.Optional;
import java.util.Set;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
@ -36,6 +39,11 @@ import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.Waiter.Predicate;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.MasterObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
import org.apache.hadoop.hbase.net.Address;
@ -67,7 +75,7 @@ public class TestRSGroups extends TestRSGroupsBase {
protected static final Logger LOG = LoggerFactory.getLogger(TestRSGroups.class);
private static boolean INIT = false;
private static RSGroupAdminEndpoint rsGroupAdminEndpoint;
private static CPMasterObserver observer;
@BeforeClass
public static void setUp() throws Exception {
@ -78,7 +86,7 @@ public class TestRSGroups extends TestRSGroupsBase {
HConstants.HBASE_MASTER_LOADBALANCER_CLASS,
RSGroupBasedLoadBalancer.class.getName());
TEST_UTIL.getConfiguration().set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
RSGroupAdminEndpoint.class.getName());
RSGroupAdminEndpoint.class.getName() + "," + CPMasterObserver.class.getName());
TEST_UTIL.startMiniCluster(NUM_SLAVES_BASE - 1);
TEST_UTIL.getConfiguration().setInt(
ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART,
@ -100,8 +108,10 @@ public class TestRSGroups extends TestRSGroupsBase {
admin.setBalancerRunning(false,true);
rsGroupAdmin = new VerifyingRSGroupAdminClient(
new RSGroupAdminClient(TEST_UTIL.getConnection()), TEST_UTIL.getConfiguration());
MasterCoprocessorHost host = master.getMasterCoprocessorHost();
observer = (CPMasterObserver) host.findCoprocessor(CPMasterObserver.class.getName());
rsGroupAdminEndpoint = (RSGroupAdminEndpoint)
master.getMasterCoprocessorHost().findCoprocessor(RSGroupAdminEndpoint.class.getName());
host.findCoprocessor(RSGroupAdminEndpoint.class.getName());
}
@AfterClass
@ -144,6 +154,7 @@ public class TestRSGroups extends TestRSGroupsBase {
} catch (Exception ex) {
LOG.warn("Got this on setup, FYI", ex);
}
assertTrue(observer.preMoveServersCalled);
TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
@ -217,6 +228,9 @@ public class TestRSGroups extends TestRSGroupsBase {
String groupName = tablePrefix+"_foo";
LOG.info("testNamespaceConstraint");
rsGroupAdmin.addRSGroup(groupName);
assertTrue(observer.preAddRSGroupCalled);
assertTrue(observer.postAddRSGroupCalled);
admin.createNamespace(NamespaceDescriptor.create(nsName)
.addConfiguration(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP, groupName)
.build());
@ -237,6 +251,8 @@ public class TestRSGroups extends TestRSGroupsBase {
//test add non-existent group
admin.deleteNamespace(nsName);
rsGroupAdmin.removeRSGroup(groupName);
assertTrue(observer.preRemoveRSGroupCalled);
assertTrue(observer.postRemoveRSGroupCalled);
try {
admin.createNamespace(NamespaceDescriptor.create(nsName)
.addConfiguration(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP, "foo")
@ -257,6 +273,120 @@ public class TestRSGroups extends TestRSGroupsBase {
it.next();
}
public static class CPMasterObserver implements MasterCoprocessor, MasterObserver {
boolean preBalanceRSGroupCalled = false;
boolean postBalanceRSGroupCalled = false;
boolean preMoveServersCalled = false;
boolean postMoveServersCalled = false;
boolean preMoveTablesCalled = false;
boolean postMoveTablesCalled = false;
boolean preAddRSGroupCalled = false;
boolean postAddRSGroupCalled = false;
boolean preRemoveRSGroupCalled = false;
boolean postRemoveRSGroupCalled = false;
boolean preRemoveServersCalled = false;
boolean postRemoveServersCalled = false;
boolean preMoveServersAndTables = false;
boolean postMoveServersAndTables = false;
@Override
public Optional<MasterObserver> getMasterObserver() {
return Optional.of(this);
}
@Override
public void preMoveServersAndTables(final ObserverContext<MasterCoprocessorEnvironment> ctx,
Set<Address> servers, Set<TableName> tables, String targetGroup) throws IOException {
preMoveServersAndTables = true;
}
@Override
public void postMoveServersAndTables(final ObserverContext<MasterCoprocessorEnvironment> ctx,
Set<Address> servers, Set<TableName> tables, String targetGroup) throws IOException {
postMoveServersAndTables = true;
}
@Override
public void preRemoveServers(
final ObserverContext<MasterCoprocessorEnvironment> ctx,
Set<Address> servers) throws IOException {
preRemoveServersCalled = true;
}
@Override
public void postRemoveServers(
final ObserverContext<MasterCoprocessorEnvironment> ctx,
Set<Address> servers) throws IOException {
postRemoveServersCalled = true;
}
@Override
public void preRemoveRSGroup(final ObserverContext<MasterCoprocessorEnvironment> ctx,
String name) throws IOException {
preRemoveRSGroupCalled = true;
}
@Override
public void postRemoveRSGroup(final ObserverContext<MasterCoprocessorEnvironment> ctx,
String name) throws IOException {
postRemoveRSGroupCalled = true;
}
@Override
public void preAddRSGroup(final ObserverContext<MasterCoprocessorEnvironment> ctx,
String name) throws IOException {
preAddRSGroupCalled = true;
}
@Override
public void postAddRSGroup(final ObserverContext<MasterCoprocessorEnvironment> ctx,
String name) throws IOException {
postAddRSGroupCalled = true;
}
@Override
public void preMoveTables(final ObserverContext<MasterCoprocessorEnvironment> ctx,
Set<TableName> tables, String targetGroup) throws IOException {
preMoveTablesCalled = true;
}
@Override
public void postMoveTables(final ObserverContext<MasterCoprocessorEnvironment> ctx,
Set<TableName> tables, String targetGroup) throws IOException {
postMoveTablesCalled = true;
}
@Override
public void preMoveServers(final ObserverContext<MasterCoprocessorEnvironment> ctx,
Set<Address> servers, String targetGroup) throws IOException {
preMoveServersCalled = true;
}
@Override
public void postMoveServers(final ObserverContext<MasterCoprocessorEnvironment> ctx,
Set<Address> servers, String targetGroup) throws IOException {
postMoveServersCalled = true;
}
@Override
public void preBalanceRSGroup(final ObserverContext<MasterCoprocessorEnvironment> ctx,
String groupName) throws IOException {
preBalanceRSGroupCalled = true;
}
@Override
public void postBalanceRSGroup(final ObserverContext<MasterCoprocessorEnvironment> ctx,
String groupName, boolean balancerRan) throws IOException {
postBalanceRSGroupCalled = true;
}
}
@Test
public void testMoveServersAndTables() throws Exception {
super.testMoveServersAndTables();
assertTrue(observer.preMoveServersAndTables);
assertTrue(observer.postMoveServersAndTables);
}
@Test
public void testTableMoveTruncateAndDrop() throws Exception {
super.testTableMoveTruncateAndDrop();
assertTrue(observer.preMoveTablesCalled);
assertTrue(observer.postMoveTablesCalled);
}
@Test
public void testRemoveServers() throws Exception {
super.testRemoveServers();
assertTrue(observer.preRemoveServersCalled);
assertTrue(observer.postRemoveServersCalled);
}
@Test
public void testMisplacedRegions() throws Exception {
final TableName tableName = TableName.valueOf(tablePrefix+"_testMisplacedRegions");
@ -273,6 +403,8 @@ public class TestRSGroups extends TestRSGroupsBase {
admin.setBalancerRunning(true,true);
assertTrue(rsGroupAdmin.balanceRSGroup(RSGroupInfo.getName()));
admin.setBalancerRunning(false,true);
assertTrue(observer.preBalanceRSGroupCalled);
assertTrue(observer.postBalanceRSGroupCalled);
TEST_UTIL.waitFor(60000, new Predicate<Exception>() {
@Override