HBASE-19326 Remove decommissioned servers from rsgroup
Signed-off-by: Michael Stack <stack@apache.org> Amending-Author: Andrew Purtell <apurtell@apache.org> Conflicts: hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminClient.java hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroups.java hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsBase.java hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java hbase-shell/src/main/ruby/hbase/rsgroup_admin.rb
This commit is contained in:
parent
2f7a6f21eb
commit
6938720c12
File diff suppressed because it is too large
Load Diff
|
@ -116,6 +116,13 @@ message MoveServersAndTablesRequest {
|
|||
message MoveServersAndTablesResponse {
|
||||
}
|
||||
|
||||
message RemoveServersRequest {
|
||||
repeated ServerName servers = 1;
|
||||
}
|
||||
|
||||
message RemoveServersResponse {
|
||||
}
|
||||
|
||||
service RSGroupAdminService {
|
||||
rpc GetRSGroupInfo(GetRSGroupInfoRequest)
|
||||
returns (GetRSGroupInfoResponse);
|
||||
|
@ -146,4 +153,7 @@ service RSGroupAdminService {
|
|||
|
||||
rpc MoveServersAndTables(MoveServersAndTablesRequest)
|
||||
returns (MoveServersAndTablesResponse);
|
||||
|
||||
rpc RemoveServers(RemoveServersRequest)
|
||||
returns (RemoveServersResponse);
|
||||
}
|
||||
|
|
|
@ -89,4 +89,14 @@ public interface RSGroupAdmin extends Closeable {
|
|||
*/
|
||||
void moveServersAndTables(Set<Address> servers, Set<TableName> tables,
|
||||
String targetGroup) throws IOException;
|
||||
|
||||
/**
|
||||
* Remove decommissioned servers from rsgroup.
|
||||
* 1. Sometimes we may find the server aborted due to some hardware failure and we must offline
|
||||
* the server for repairing. Or we need to move some servers to join other clusters.
|
||||
* So we need to remove these servers from the rsgroup.
|
||||
* 2. Dead/recovering/live servers will be disallowed.
|
||||
* @param servers set of servers to remove
|
||||
*/
|
||||
void removeServers(Set<Address> servers) throws IOException;
|
||||
}
|
||||
|
|
|
@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.MoveServers
|
|||
import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.MoveTablesRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.RSGroupAdminService;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.RemoveServersRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.RSGroupProtos;
|
||||
|
||||
import com.google.common.collect.Sets;
|
||||
|
@ -206,6 +207,25 @@ class RSGroupAdminClient implements RSGroupAdmin {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeServers(Set<Address> servers) throws IOException {
|
||||
Set<HBaseProtos.ServerName> hostPorts = Sets.newHashSet();
|
||||
for(Address el: servers) {
|
||||
hostPorts.add(HBaseProtos.ServerName.newBuilder()
|
||||
.setHostName(el.getHostname())
|
||||
.setPort(el.getPort())
|
||||
.build());
|
||||
}
|
||||
RemoveServersRequest request = RemoveServersRequest.newBuilder()
|
||||
.addAllServers(hostPorts)
|
||||
.build();
|
||||
try {
|
||||
stub.removeServers(null, request);
|
||||
} catch (ServiceException e) {
|
||||
throw ProtobufUtil.handleRemoteException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
}
|
||||
|
|
|
@ -80,6 +80,8 @@ import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.MoveTablesR
|
|||
import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.RSGroupAdminService;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.RemoveServersRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.RemoveServersResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.TableProtos;
|
||||
|
||||
public class RSGroupAdminEndpoint extends RSGroupAdminService
|
||||
|
@ -312,6 +314,24 @@ public class RSGroupAdminEndpoint extends RSGroupAdminService
|
|||
done.run(builder.build());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeServers(RpcController controller,
|
||||
RemoveServersRequest request,
|
||||
RpcCallback<RemoveServersResponse> done) {
|
||||
RemoveServersResponse.Builder builder =
|
||||
RemoveServersResponse.newBuilder();
|
||||
try {
|
||||
Set<Address> servers = Sets.newHashSet();
|
||||
for (HBaseProtos.ServerName el : request.getServersList()) {
|
||||
servers.add(Address.fromParts(el.getHostName(), el.getPort()));
|
||||
}
|
||||
groupAdminServer.removeServers(servers);
|
||||
} catch (IOException e) {
|
||||
ResponseConverter.setControllerException(controller, e);
|
||||
}
|
||||
done.run(builder.build());
|
||||
}
|
||||
|
||||
void assignTableToGroup(HTableDescriptor desc) throws IOException {
|
||||
String groupName =
|
||||
master.getNamespaceDescriptor(desc.getTableName().getNamespaceAsString())
|
||||
|
@ -971,8 +991,15 @@ public class RSGroupAdminEndpoint extends RSGroupAdminService
|
|||
|
||||
@Override
|
||||
public void postClearDeadServers(ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
List<ServerName> servers, List<ServerName> notClearedServers) throws IOException {
|
||||
|
||||
List<ServerName> servers, List<ServerName> notClearedServers)
|
||||
throws IOException {
|
||||
Set<Address> clearedServer = Sets.newHashSet();
|
||||
for (ServerName server: servers) {
|
||||
if (!notClearedServers.contains(server)) {
|
||||
clearedServer.add(server.getAddress());
|
||||
}
|
||||
}
|
||||
groupAdminServer.removeServers(clearedServer);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1009,6 +1036,16 @@ public class RSGroupAdminEndpoint extends RSGroupAdminService
|
|||
Set<Address> servers, Set<TableName> tables, String targetGroup) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preRemoveServers(ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
Set<Address> servers) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postRemoveServers(ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
Set<Address> servers) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preAddRSGroup(ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
String name) throws IOException {
|
||||
|
@ -1044,5 +1081,4 @@ public class RSGroupAdminEndpoint extends RSGroupAdminService
|
|||
String groupName, boolean balancerRan) throws IOException {
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -68,12 +68,12 @@ public class RSGroupAdminServer implements RSGroupAdmin {
|
|||
//Key=host:port,Value=targetGroup
|
||||
private ConcurrentMap<Address,String> serversInTransition =
|
||||
new ConcurrentHashMap<Address, String>();
|
||||
private RSGroupInfoManager RSGroupInfoManager;
|
||||
private RSGroupInfoManager rsGroupInfoManager;
|
||||
|
||||
public RSGroupAdminServer(MasterServices master,
|
||||
RSGroupInfoManager RSGroupInfoManager) throws IOException {
|
||||
this.master = master;
|
||||
this.RSGroupInfoManager = RSGroupInfoManager;
|
||||
this.rsGroupInfoManager = RSGroupInfoManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -412,7 +412,30 @@ public class RSGroupAdminServer implements RSGroupAdmin {
|
|||
|
||||
@InterfaceAudience.Private
|
||||
public RSGroupInfoManager getRSGroupInfoManager() throws IOException {
|
||||
return RSGroupInfoManager;
|
||||
return rsGroupInfoManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeServers(Set<Address> servers) throws IOException {
|
||||
{
|
||||
if (servers == null || servers.isEmpty()) {
|
||||
throw new ConstraintException("The set of servers to remove cannot be null or empty.");
|
||||
}
|
||||
// Hold a lock on the manager instance while moving servers to prevent
|
||||
// another writer changing our state while we are working.
|
||||
synchronized (rsGroupInfoManager) {
|
||||
if (master.getMasterCoprocessorHost() != null) {
|
||||
master.getMasterCoprocessorHost().preRemoveServers(servers);
|
||||
}
|
||||
//check the set of servers
|
||||
checkForDeadOrOnlineServers(servers);
|
||||
rsGroupInfoManager.removeServers(servers);
|
||||
if (master.getMasterCoprocessorHost() != null) {
|
||||
master.getMasterCoprocessorHost().postRemoveServers(servers);
|
||||
}
|
||||
LOG.info("Remove decommissioned servers " + servers + " from rsgroup done.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private Map<String, RegionState> rsGroupGetRegionsInTransition(String groupName)
|
||||
|
@ -520,4 +543,33 @@ public class RSGroupAdminServer implements RSGroupAdmin {
|
|||
@Override
|
||||
public void close() throws IOException {
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the set of servers are belong to dead servers list or online servers list.
|
||||
* @param servers servers to remove
|
||||
*/
|
||||
private void checkForDeadOrOnlineServers(Set<Address> servers) throws ConstraintException {
|
||||
// This uglyness is because we only have Address, not ServerName.
|
||||
Set<Address> onlineServers = new HashSet<>();
|
||||
for(ServerName server: master.getServerManager().getOnlineServers().keySet()) {
|
||||
onlineServers.add(server.getAddress());
|
||||
}
|
||||
|
||||
Set<Address> deadServers = new HashSet<>();
|
||||
for(ServerName server: master.getServerManager().getDeadServers().copyServerNames()) {
|
||||
deadServers.add(server.getAddress());
|
||||
}
|
||||
|
||||
for (Address address: servers) {
|
||||
if (onlineServers.contains(address)) {
|
||||
throw new ConstraintException(
|
||||
"Server " + address + " is an online server, not allowed to remove.");
|
||||
}
|
||||
if (deadServers.contains(address)) {
|
||||
throw new ConstraintException(
|
||||
"Server " + address + " is on the dead servers list,"
|
||||
+ " Maybe it will come back again, not allowed to remove.");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -117,4 +117,10 @@ public interface RSGroupInfoManager {
|
|||
*/
|
||||
void moveServersAndTables(Set<Address> servers, Set<TableName> tables,
|
||||
String srcGroup, String dstGroup) throws IOException;
|
||||
|
||||
/**
|
||||
* Remove decommissioned servers from rsgroup
|
||||
* @param servers set of servers to remove
|
||||
*/
|
||||
void removeServers(Set<Address> servers) throws IOException;
|
||||
}
|
||||
|
|
|
@ -874,4 +874,29 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
|
|||
newGroupMap.put(dstGroupInfo.getName(), dstGroupInfo);
|
||||
flushConfig(newGroupMap);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void removeServers(Set<Address> servers) throws IOException {
|
||||
Map<String, RSGroupInfo> rsGroupInfos = new HashMap<String, RSGroupInfo>();
|
||||
for (Address el: servers) {
|
||||
RSGroupInfo rsGroupInfo = getRSGroupOfServer(el);
|
||||
if (rsGroupInfo != null) {
|
||||
RSGroupInfo newRsGroupInfo = rsGroupInfos.get(rsGroupInfo.getName());
|
||||
if (newRsGroupInfo == null) {
|
||||
rsGroupInfo.removeServer(el);
|
||||
rsGroupInfos.put(rsGroupInfo.getName(), rsGroupInfo);
|
||||
} else {
|
||||
newRsGroupInfo.removeServer(el);
|
||||
rsGroupInfos.put(newRsGroupInfo.getName(), newRsGroupInfo);
|
||||
}
|
||||
} else {
|
||||
LOG.warn("Server " + el + " does not belong to any rsgroup.");
|
||||
}
|
||||
}
|
||||
if (rsGroupInfos.size() > 0) {
|
||||
Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
|
||||
newGroupMap.putAll(rsGroupInfos);
|
||||
flushConfig(newGroupMap);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -78,10 +78,10 @@ public class TestRSGroups extends TestRSGroupsBase {
|
|||
TEST_UTIL.getConfiguration().setBoolean(
|
||||
HConstants.ZOOKEEPER_USEMULTI,
|
||||
true);
|
||||
TEST_UTIL.startMiniCluster(NUM_SLAVES_BASE);
|
||||
TEST_UTIL.getConfiguration().set(
|
||||
TEST_UTIL.startMiniCluster(NUM_SLAVES_BASE - 1);
|
||||
TEST_UTIL.getConfiguration().setInt(
|
||||
ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART,
|
||||
""+NUM_SLAVES_BASE);
|
||||
NUM_SLAVES_BASE - 1);
|
||||
TEST_UTIL.getConfiguration().setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
|
||||
|
||||
admin = TEST_UTIL.getHBaseAdmin();
|
||||
|
|
|
@ -19,8 +19,21 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.rsgroup;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.security.SecureRandom;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -35,30 +48,23 @@ import org.apache.hadoop.hbase.RegionLoad;
|
|||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.Waiter;
|
||||
import org.apache.hadoop.hbase.client.ClusterConnection;
|
||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||
import org.apache.hadoop.hbase.constraint.ConstraintException;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.master.RegionState;
|
||||
import org.apache.hadoop.hbase.net.Address;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoRequest;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.security.SecureRandom;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
public abstract class TestRSGroupsBase {
|
||||
protected static final Log LOG = LogFactory.getLog(TestRSGroupsBase.class);
|
||||
|
@ -854,4 +860,109 @@ public abstract class TestRSGroupsBase {
|
|||
Assert.assertEquals(newGroup.getName(),
|
||||
rsGroupAdmin.getRSGroupInfoOfTable(tableName).getName());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClearDeadServers() throws Exception {
|
||||
final RSGroupInfo newGroup = addGroup(rsGroupAdmin, "testClearDeadServers", 3);
|
||||
|
||||
ServerName targetServer = ServerName.parseServerName(
|
||||
newGroup.getServers().iterator().next().toString());
|
||||
AdminProtos.AdminService.BlockingInterface targetRS =
|
||||
((ClusterConnection) admin.getConnection()).getAdmin(targetServer);
|
||||
try {
|
||||
targetServer = ProtobufUtil.toServerName(targetRS.getServerInfo(null,
|
||||
GetServerInfoRequest.newBuilder().build()).getServerInfo().getServerName());
|
||||
//stopping may cause an exception
|
||||
//due to the connection loss
|
||||
targetRS.stopServer(null,
|
||||
AdminProtos.StopServerRequest.newBuilder().setReason("Die").build());
|
||||
} catch(Exception e) {
|
||||
}
|
||||
final HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
|
||||
//wait for stopped regionserver to dead server list
|
||||
TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
|
||||
@Override
|
||||
public boolean evaluate() throws Exception {
|
||||
return !master.getServerManager().areDeadServersInProgress()
|
||||
&& cluster.getClusterStatus().getDeadServerNames().size() > 0;
|
||||
}
|
||||
});
|
||||
assertFalse(cluster.getClusterStatus().getServers().contains(targetServer));
|
||||
assertTrue(cluster.getClusterStatus().getDeadServerNames().contains(targetServer));
|
||||
assertTrue(newGroup.getServers().contains(targetServer.getAddress()));
|
||||
|
||||
//clear dead servers list
|
||||
List<ServerName> notClearedServers = admin.clearDeadServers(Lists.newArrayList(targetServer));
|
||||
assertEquals(0, notClearedServers.size());
|
||||
|
||||
Set<Address> newGroupServers = rsGroupAdmin.getRSGroupInfo(newGroup.getName()).getServers();
|
||||
assertFalse(newGroupServers.contains(targetServer.getAddress()));
|
||||
assertEquals(2, newGroupServers.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemoveServers() throws Exception {
|
||||
final RSGroupInfo newGroup = addGroup(rsGroupAdmin, "testRemoveServers", 3);
|
||||
ServerName targetServer = ServerName.parseServerName(
|
||||
newGroup.getServers().iterator().next().toString());
|
||||
try {
|
||||
rsGroupAdmin.removeServers(Sets.newHashSet(targetServer.getAddress()));
|
||||
fail("Online servers shouldn't have been successfully removed.");
|
||||
} catch(IOException ex) {
|
||||
String exp = "Server " + targetServer.getAddress()
|
||||
+ " is an online server, not allowed to remove.";
|
||||
String msg = "Expected '" + exp + "' in exception message: ";
|
||||
assertTrue(msg + " " + ex.getMessage(), ex.getMessage().contains(exp));
|
||||
}
|
||||
assertTrue(newGroup.getServers().contains(targetServer.getAddress()));
|
||||
|
||||
AdminProtos.AdminService.BlockingInterface targetRS =
|
||||
((ClusterConnection) admin.getConnection()).getAdmin(targetServer);
|
||||
try {
|
||||
targetServer = ProtobufUtil.toServerName(targetRS.getServerInfo(null,
|
||||
GetServerInfoRequest.newBuilder().build()).getServerInfo().getServerName());
|
||||
//stopping may cause an exception
|
||||
//due to the connection loss
|
||||
targetRS.stopServer(null,
|
||||
AdminProtos.StopServerRequest.newBuilder().setReason("Die").build());
|
||||
} catch(Exception e) {
|
||||
}
|
||||
|
||||
final HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
|
||||
//wait for stopped regionserver to dead server list
|
||||
TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
|
||||
@Override
|
||||
public boolean evaluate() throws Exception {
|
||||
return !master.getServerManager().areDeadServersInProgress()
|
||||
&& cluster.getClusterStatus().getDeadServerNames().size() > 0;
|
||||
}
|
||||
});
|
||||
|
||||
try {
|
||||
rsGroupAdmin.removeServers(Sets.newHashSet(targetServer.getAddress()));
|
||||
fail("Dead servers shouldn't have been successfully removed.");
|
||||
} catch(IOException ex) {
|
||||
String exp = "Server " + targetServer.getAddress() + " is on the dead servers list,"
|
||||
+ " Maybe it will come back again, not allowed to remove.";
|
||||
String msg = "Expected '" + exp + "' in exception message: ";
|
||||
assertTrue(msg + " " + ex.getMessage(), ex.getMessage().contains(exp));
|
||||
}
|
||||
assertTrue(newGroup.getServers().contains(targetServer.getAddress()));
|
||||
|
||||
ServerName sn = TEST_UTIL.getHBaseClusterInterface().getClusterStatus().getMaster();
|
||||
TEST_UTIL.getHBaseClusterInterface().stopMaster(sn);
|
||||
TEST_UTIL.getHBaseClusterInterface().waitForMasterToStop(sn, 60000);
|
||||
TEST_UTIL.getHBaseClusterInterface().startMaster(sn.getHostname(), 0);
|
||||
TEST_UTIL.getHBaseClusterInterface().waitForActiveAndReadyMaster(60000);
|
||||
|
||||
assertEquals(3, cluster.getClusterStatus().getServersSize());
|
||||
assertFalse(cluster.getClusterStatus().getServers().contains(targetServer));
|
||||
assertFalse(cluster.getClusterStatus().getDeadServerNames().contains(targetServer));
|
||||
assertTrue(newGroup.getServers().contains(targetServer.getAddress()));
|
||||
|
||||
rsGroupAdmin.removeServers(Sets.newHashSet(targetServer.getAddress()));
|
||||
Set<Address> newGroupServers = rsGroupAdmin.getRSGroupInfo(newGroup.getName()).getServers();
|
||||
assertFalse(newGroupServers.contains(targetServer.getAddress()));
|
||||
assertEquals(2, newGroupServers.size());
|
||||
}
|
||||
}
|
|
@ -104,6 +104,12 @@ public class VerifyingRSGroupAdminClient implements RSGroupAdmin {
|
|||
return wrapped.getRSGroupOfServer(server);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeServers(Set<Address> servers) throws IOException {
|
||||
wrapped.removeServers(servers);
|
||||
verify();
|
||||
}
|
||||
|
||||
public void verify() throws IOException {
|
||||
Map<String, RSGroupInfo> groupMap = Maps.newHashMap();
|
||||
Set<RSGroupInfo> zList = Sets.newHashSet();
|
||||
|
|
|
@ -665,4 +665,14 @@ public class BaseMasterAndRegionObserver extends BaseRegionObserver
|
|||
public void preRemoveRSGroup(ObserverContext<MasterCoprocessorEnvironment> ctx, String name)
|
||||
throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preRemoveServers(ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
Set<Address> servers) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postRemoveServers(ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
Set<Address> servers) throws IOException {
|
||||
}
|
||||
}
|
||||
|
|
|
@ -656,6 +656,16 @@ public class BaseMasterObserver implements MasterObserver {
|
|||
throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preRemoveServers(ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
Set<Address> servers) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postRemoveServers(ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
Set<Address> servers) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preBalanceRSGroup(ObserverContext<MasterCoprocessorEnvironment> ctx, String groupName)
|
||||
throws IOException {
|
||||
|
|
|
@ -1164,6 +1164,22 @@ public interface MasterObserver extends Coprocessor {
|
|||
void postRemoveRSGroup(final ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
String name) throws IOException;
|
||||
|
||||
/**
|
||||
* Called before servers are removed from rsgroup
|
||||
* @param ctx the environment to interact with the framework and master
|
||||
* @param servers set of decommissioned servers to remove
|
||||
*/
|
||||
void preRemoveServers(final ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
Set<Address> servers) throws IOException;
|
||||
|
||||
/**
|
||||
* Called after servers are removed from rsgroup
|
||||
* @param ctx the environment to interact with the framework and master
|
||||
* @param servers set of servers to remove
|
||||
*/
|
||||
void postRemoveServers(final ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
Set<Address> servers) throws IOException;
|
||||
|
||||
/**
|
||||
* Called before a region server group is removed
|
||||
* @param ctx the environment to interact with the framework and master
|
||||
|
|
|
@ -1297,6 +1297,32 @@ public class MasterCoprocessorHost
|
|||
});
|
||||
}
|
||||
|
||||
public void preRemoveServers(final Set<Address> servers)
|
||||
throws IOException {
|
||||
execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
|
||||
@Override
|
||||
public void call(MasterObserver oserver,
|
||||
ObserverContext<MasterCoprocessorEnvironment> ctx) throws IOException {
|
||||
if(((MasterEnvironment)getEnvironment()).supportGroupCPs) {
|
||||
oserver.preRemoveServers(this, servers);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void postRemoveServers(final Set<Address> servers)
|
||||
throws IOException {
|
||||
execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
|
||||
@Override
|
||||
public void call(MasterObserver oserver,
|
||||
ObserverContext<MasterCoprocessorEnvironment> ctx) throws IOException {
|
||||
if(((MasterEnvironment)getEnvironment()).supportGroupCPs) {
|
||||
oserver.postRemoveServers(this, servers);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void preAddRSGroup(final String name)
|
||||
throws IOException {
|
||||
execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
|
||||
|
|
|
@ -2718,6 +2718,12 @@ public class AccessController extends BaseMasterAndRegionObserver
|
|||
requirePermission("moveTables", Action.ADMIN);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preRemoveServers(ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
Set<Address> servers) throws IOException {
|
||||
requirePermission("removeServers", Action.ADMIN);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preAddRSGroup(ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
String name) throws IOException {
|
||||
|
|
|
@ -1331,6 +1331,16 @@ public class TestMasterObserver {
|
|||
throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preRemoveServers(ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
Set<Address> servers) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postRemoveServers(ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
Set<Address> servers) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preBalanceRSGroup(ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
String groupName) throws IOException {
|
||||
|
|
|
@ -160,5 +160,16 @@ module Hbase
|
|||
@admin.moveServersAndTables(servers, tables, dest)
|
||||
end
|
||||
|
||||
#--------------------------------------------------------------------------
|
||||
# remove decommissioned server from rsgroup
|
||||
def remove_servers(*args)
|
||||
# Flatten params array
|
||||
args = args.flatten.compact
|
||||
servers = java.util.HashSet.new
|
||||
args.each do |s|
|
||||
servers.add(org.apache.hadoop.hbase.net.Address.fromString(s))
|
||||
end
|
||||
@admin.removeServers(servers)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -464,5 +464,6 @@ Shell.load_command_group(
|
|||
move_servers_tables_rsgroup
|
||||
get_server_rsgroup
|
||||
get_table_rsgroup
|
||||
remove_servers_rsgroup
|
||||
]
|
||||
)
|
||||
|
|
|
@ -0,0 +1,35 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
module Shell
|
||||
module Commands
|
||||
class RemoveServersRsgroup < Command
|
||||
def help
|
||||
<<-EOF
|
||||
Remove decommissioned servers from rsgroup.
|
||||
Dead/recovering/live servers will be disallowed.
|
||||
Example:
|
||||
hbase> remove_servers_rsgroup ['server1:port','server2:port']
|
||||
EOF
|
||||
end
|
||||
|
||||
def command(servers)
|
||||
rsgroup_admin.remove_servers(servers)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
Loading…
Reference in New Issue