HBASE-16010 Put draining function through Admin API (Matt Warhaftig)

This commit is contained in:
Jerry He 2016-12-23 13:41:36 -08:00
parent 8fb9a91d44
commit 992e5717d4
11 changed files with 4217 additions and 147 deletions

View File

@ -1858,4 +1858,23 @@ public interface Admin extends Abortable, Closeable {
*/
default void disableReplicationPeer(final String peerId) throws IOException {
}
/**
* Mark a region server as draining to prevent additional regions from getting assigned to it.
* @param servers List of region servers to drain.
*/
void drainRegionServers(List<ServerName> servers) throws IOException;
/**
* List region servers marked as draining to not get additional regions assigned to them.
* @return List of draining region servers.
*/
List<ServerName> listDrainingRegionServers() throws IOException;
/**
* Remove drain from a region server to allow additional regions assignments.
* @param servers List of region servers to remove drain from.
*/
void removeDrainFromRegionServers(List<ServerName> servers) throws IOException;
}

View File

@ -78,12 +78,18 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.BlockingInterface;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DrainRegionServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DrainRegionServersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDrainingRegionServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDrainingRegionServersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RemoveDrainFromRegionServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RemoveDrainFromRegionServersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
@ -1669,6 +1675,25 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
DisableReplicationPeerRequest request) throws ServiceException {
return stub.disableReplicationPeer(controller, request);
}
@Override
public ListDrainingRegionServersResponse listDrainingRegionServers(RpcController controller,
ListDrainingRegionServersRequest request) throws ServiceException {
return stub.listDrainingRegionServers(controller, request);
}
@Override
public DrainRegionServersResponse drainRegionServers(RpcController controller,
DrainRegionServersRequest request) throws ServiceException {
return stub.drainRegionServers(controller, request);
}
@Override
public RemoveDrainFromRegionServersResponse removeDrainFromRegionServers(
RpcController controller, RemoveDrainFromRegionServersRequest request)
throws ServiceException {
return stub.removeDrainFromRegionServers(controller, request);
}
};
}

View File

@ -120,6 +120,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTabl
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DrainRegionServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest;
@ -140,6 +141,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedur
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDrainingRegionServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListProceduresRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
@ -155,6 +157,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyName
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RemoveDrainFromRegionServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest;
@ -3794,4 +3797,59 @@ public class HBaseAdmin implements Admin {
}
});
}
@Override
public void drainRegionServers(List<ServerName> servers) throws IOException {
final List<HBaseProtos.ServerName> pbServers = new ArrayList<HBaseProtos.ServerName>();
for (ServerName server : servers) {
// Parse to ServerName to do simple validation.
ServerName.parseServerName(server.toString());
pbServers.add(ProtobufUtil.toServerName(server));
}
executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
@Override
public Void rpcCall() throws ServiceException {
DrainRegionServersRequest req =
DrainRegionServersRequest.newBuilder().addAllServerName(pbServers).build();
master.drainRegionServers(getRpcController(), req);
return null;
}
});
}
@Override
public List<ServerName> listDrainingRegionServers() throws IOException {
return executeCallable(new MasterCallable<List<ServerName>>(getConnection(),
getRpcControllerFactory()) {
@Override
public List<ServerName> rpcCall() throws ServiceException {
ListDrainingRegionServersRequest req = ListDrainingRegionServersRequest.newBuilder().build();
List<ServerName> servers = new ArrayList<ServerName>();
for (HBaseProtos.ServerName server : master.listDrainingRegionServers(null, req)
.getServerNameList()) {
servers.add(ProtobufUtil.toServerName(server));
}
return servers;
}
});
}
@Override
public void removeDrainFromRegionServers(List<ServerName> servers) throws IOException {
final List<HBaseProtos.ServerName> pbServers = new ArrayList<HBaseProtos.ServerName>();
for (ServerName server : servers) {
pbServers.add(ProtobufUtil.toServerName(server));
}
executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
@Override
public Void rpcCall() throws ServiceException {
RemoveDrainFromRegionServersRequest req = RemoveDrainFromRegionServersRequest.newBuilder()
.addAllServerName(pbServers).build();
master.removeDrainFromRegionServers(getRpcController(), req);
return null;
}
});
}
}

View File

@ -568,6 +568,27 @@ message SecurityCapabilitiesResponse {
repeated Capability capabilities = 1;
}
message ListDrainingRegionServersRequest {
}
message ListDrainingRegionServersResponse {
repeated ServerName server_name = 1;
}
message DrainRegionServersRequest {
repeated ServerName server_name = 1;
}
message DrainRegionServersResponse {
}
message RemoveDrainFromRegionServersRequest {
repeated ServerName server_name = 1;
}
message RemoveDrainFromRegionServersResponse {
}
service MasterService {
/** Used by the client to get the number of regions that have received the updated schema */
rpc GetSchemaAlterStatus(GetSchemaAlterStatusRequest)
@ -863,4 +884,16 @@ service MasterService {
/** Disable a replication peer */
rpc DisableReplicationPeer(DisableReplicationPeerRequest)
returns(DisableReplicationPeerResponse);
/** Returns a list of ServerNames marked as draining. */
rpc listDrainingRegionServers(ListDrainingRegionServersRequest)
returns(ListDrainingRegionServersResponse);
/** Mark a list of ServerNames as draining. */
rpc drainRegionServers(DrainRegionServersRequest)
returns(DrainRegionServersResponse);
/** Unmark a list of ServerNames marked as draining. */
rpc removeDrainFromRegionServers(RemoveDrainFromRegionServersRequest)
returns(RemoveDrainFromRegionServersResponse);
}

View File

@ -3196,4 +3196,54 @@ public class HMaster extends HRegionServer implements MasterServices {
cpHost.postDisableReplicationPeer(peerId);
}
}
@Override
public void drainRegionServer(final ServerName server) {
String parentZnode = getZooKeeper().znodePaths.drainingZNode;
try {
String node = ZKUtil.joinZNode(parentZnode, server.getServerName());
ZKUtil.createAndFailSilent(getZooKeeper(), node);
} catch (KeeperException ke) {
LOG.warn(this.zooKeeper.prefix("Unable to add drain for '" + server.getServerName() + "'."),
ke);
}
}
@Override
public List<ServerName> listDrainingRegionServers() {
String parentZnode = getZooKeeper().znodePaths.drainingZNode;
List<ServerName> serverNames = new ArrayList<ServerName>();
List<String> serverStrs = null;
try {
serverStrs = ZKUtil.listChildrenNoWatch(getZooKeeper(), parentZnode);
} catch (KeeperException ke) {
LOG.warn(this.zooKeeper.prefix("Unable to list draining servers."), ke);
}
// No nodes is empty draining list or ZK connectivity issues.
if (serverStrs == null) {
return serverNames;
}
// Skip invalid ServerNames in result
for (String serverStr : serverStrs) {
try {
serverNames.add(ServerName.parseServerName(serverStr));
} catch (IllegalArgumentException iae) {
LOG.warn("Unable to cast '" + serverStr + "' to ServerName.", iae);
}
}
return serverNames;
}
@Override
public void removeDrainFromRegionServer(ServerName server) {
String parentZnode = getZooKeeper().znodePaths.drainingZNode;
String node = ZKUtil.joinZNode(parentZnode, server.getServerName());
try {
ZKUtil.deleteNodeFailSilent(getZooKeeper(), node);
} catch (KeeperException ke) {
LOG.warn(
this.zooKeeper.prefix("Unable to remove drain for '" + server.getServerName() + "'."), ke);
}
}
}

View File

@ -1693,4 +1693,55 @@ public class MasterRpcServices extends RSRpcServices
throw new ServiceException(e);
}
}
@Override
public ListDrainingRegionServersResponse listDrainingRegionServers(RpcController controller,
ListDrainingRegionServersRequest request) throws ServiceException {
ListDrainingRegionServersResponse.Builder response =
ListDrainingRegionServersResponse.newBuilder();
try {
master.checkInitialized();
List<ServerName> servers = master.listDrainingRegionServers();
for (ServerName server : servers) {
response.addServerName(ProtobufUtil.toServerName(server));
}
} catch (IOException io) {
throw new ServiceException(io);
}
return response.build();
}
@Override
public DrainRegionServersResponse drainRegionServers(RpcController controller,
DrainRegionServersRequest request) throws ServiceException {
DrainRegionServersResponse.Builder response = DrainRegionServersResponse.newBuilder();
try {
master.checkInitialized();
for (HBaseProtos.ServerName pbServer : request.getServerNameList()) {
master.drainRegionServer(ProtobufUtil.toServerName(pbServer));
}
} catch (IOException io) {
throw new ServiceException(io);
}
return response.build();
}
@Override
public RemoveDrainFromRegionServersResponse removeDrainFromRegionServers(RpcController controller,
RemoveDrainFromRegionServersRequest request) throws ServiceException {
RemoveDrainFromRegionServersResponse.Builder response =
RemoveDrainFromRegionServersResponse.newBuilder();
try {
master.checkInitialized();
for (HBaseProtos.ServerName pbServer : request.getServerNameList()) {
master.removeDrainFromRegionServer(ProtobufUtil.toServerName(pbServer));
}
} catch (IOException io) {
throw new ServiceException(io);
}
return response.build();
}
}

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.ProcedureInfo;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotDisabledException;
@ -443,4 +444,23 @@ public interface MasterServices extends Server {
* @param peerId a short name that identifies the peer
*/
void disableReplicationPeer(String peerId) throws ReplicationException, IOException;
/**
* Mark a region server as draining to prevent additional regions from getting assigned to it.
* @param server Region servers to drain.
*/
void drainRegionServer(final ServerName server);
/**
* List region servers marked as draining to not get additional regions assigned to them.
* @return List of draining servers.
*/
List<ServerName> listDrainingRegionServers();
/**
* Remove drain from a region server to allow additional regions assignments.
* @param server Region server to remove drain from.
*/
void removeDrainFromRegionServer(final ServerName server);
}

View File

@ -24,8 +24,14 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
@ -758,4 +764,75 @@ public class TestAdmin2 {
ProcedureInfo[] procList = admin.listProcedures();
assertTrue(procList.length >= 0);
}
/*
* Test that invalid draining server names (invalid start code) don't get added to drain list.
*/
@Test(timeout = 10000, expected = IllegalArgumentException.class)
public void testCheckDrainServerName() throws Exception {
List<ServerName> servers = new ArrayList<ServerName>();
servers.add(ServerName.parseServerName("127.0.0.1:123"));
admin.drainRegionServers(servers);
}
/*
* This test drains all regions so cannot be run in parallel with other tests.
*/
@Test(timeout = 30000)
public void testDrainRegionServers() throws Exception {
List<ServerName> drainingServers = admin.listDrainingRegionServers();
assertTrue(drainingServers.isEmpty());
// Drain all region servers.
Collection<ServerName> clusterServers = admin.getClusterStatus().getServers();
drainingServers = new ArrayList<ServerName>();
for (ServerName server : clusterServers) {
drainingServers.add(server);
}
admin.drainRegionServers(drainingServers);
// Check that drain lists all region servers.
drainingServers = admin.listDrainingRegionServers();
assertEquals(clusterServers.size(), drainingServers.size());
for (ServerName server : clusterServers) {
assertTrue(drainingServers.contains(server));
}
// Try for 20 seconds to create table (new region). Will not complete because all RSs draining.
TableName hTable = TableName.valueOf("testDrainRegionServer");
final HTableDescriptor htd = new HTableDescriptor(hTable);
htd.addFamily(new HColumnDescriptor("cf"));
final Runnable createTable = new Thread() {
@Override
public void run() {
try {
admin.createTable(htd);
} catch (IOException ioe) {
assertTrue(false); // Should not get IOException.
}
}
};
final ExecutorService executor = Executors.newSingleThreadExecutor();
final java.util.concurrent.Future<?> future = executor.submit(createTable);
executor.shutdown();
try {
future.get(20, TimeUnit.SECONDS);
} catch (TimeoutException ie) {
assertTrue(true); // Expecting timeout to happen.
}
// Kill executor if still processing.
if (!executor.isTerminated()) {
executor.shutdownNow();
assertTrue(true);
}
// Remove drain list.
admin.removeDrainFromRegionServers(drainingServers);
drainingServers = admin.listDrainingRegionServers();
assertTrue(drainingServers.isEmpty());
}
}

View File

@ -399,4 +399,19 @@ public class MockNoopMasterServices implements MasterServices, Server {
@Override
public void disableReplicationPeer(String peerId) throws ReplicationException, IOException {
}
@Override
public void drainRegionServer(ServerName server) {
return;
}
@Override
public List<ServerName> listDrainingRegionServers() {
return null;
}
@Override
public void removeDrainFromRegionServer(ServerName servers) {
return;
}
}

View File

@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import javax.security.auth.login.AppConfigurationEntry;
@ -319,5 +320,25 @@ public class TestZooKeeperACL {
}
}
@Test(timeout = 10000)
public void testAdminDrainAllowedOnSecureZK() throws Exception {
if (!secureZKAvailable) {
return;
}
List<ServerName> drainingServers = new ArrayList<ServerName>();
drainingServers.add(ServerName.parseServerName("ZZZ,123,123"));
// If unable to connect to secure ZK cluster then this operation would fail.
TEST_UTIL.getAdmin().drainRegionServers(drainingServers);
drainingServers = TEST_UTIL.getAdmin().listDrainingRegionServers();
assertEquals(1, drainingServers.size());
assertEquals(ServerName.parseServerName("ZZZ,123,123"), drainingServers.get(0));
TEST_UTIL.getAdmin().removeDrainFromRegionServers(drainingServers);
drainingServers = TEST_UTIL.getAdmin().listDrainingRegionServers();
assertEquals(0, drainingServers.size());
}
}