HBASE-18316 Implement async admin operations for draining region servers
This commit is contained in:
parent
bc8ebc6f72
commit
c48bb67123
|
@ -17,7 +17,6 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
|
@ -771,6 +770,24 @@ public interface AsyncAdmin {
|
|||
*/
|
||||
CompletableFuture<List<ProcedureInfo>> listProcedures();
|
||||
|
||||
/**
|
||||
* Mark a region server as draining to prevent additional regions from getting assigned to it.
|
||||
* @param servers
|
||||
*/
|
||||
CompletableFuture<Void> drainRegionServers(List<ServerName> servers);
|
||||
|
||||
/**
|
||||
* List region servers marked as draining to not get additional regions assigned to them.
|
||||
* @return List of draining region servers wrapped by {@link CompletableFuture}
|
||||
*/
|
||||
CompletableFuture<List<ServerName>> listDrainingRegionServers();
|
||||
|
||||
/**
|
||||
* Remove drain from a region server to allow additional regions assignments.
|
||||
* @param servers List of region servers to remove drain from.
|
||||
*/
|
||||
CompletableFuture<Void> removeDrainFromRegionServers(List<ServerName> servers);
|
||||
|
||||
/**
|
||||
* @return cluster status wrapped by {@link CompletableFuture}
|
||||
*/
|
||||
|
|
|
@ -438,6 +438,21 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
|
|||
return wrap(rawAdmin.listProcedures());
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> drainRegionServers(List<ServerName> servers) {
|
||||
return wrap(rawAdmin.drainRegionServers(servers));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<List<ServerName>> listDrainingRegionServers() {
|
||||
return wrap(rawAdmin.listDrainingRegionServers());
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> removeDrainFromRegionServers(List<ServerName> servers) {
|
||||
return wrap(rawAdmin.removeDrainFromRegionServers(servers));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<ClusterStatus> getClusterStatus() {
|
||||
return wrap(rawAdmin.getClusterStatus());
|
||||
|
|
|
@ -4036,19 +4036,11 @@ public class HBaseAdmin implements Admin {
|
|||
|
||||
@Override
|
||||
public void drainRegionServers(List<ServerName> servers) throws IOException {
|
||||
final List<HBaseProtos.ServerName> pbServers = new ArrayList<>(servers.size());
|
||||
for (ServerName server : servers) {
|
||||
// Parse to ServerName to do simple validation.
|
||||
ServerName.parseServerName(server.toString());
|
||||
pbServers.add(ProtobufUtil.toServerName(server));
|
||||
}
|
||||
|
||||
executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
|
||||
@Override
|
||||
public Void rpcCall() throws ServiceException {
|
||||
DrainRegionServersRequest req =
|
||||
DrainRegionServersRequest.newBuilder().addAllServerName(pbServers).build();
|
||||
master.drainRegionServers(getRpcController(), req);
|
||||
master.drainRegionServers(getRpcController(),
|
||||
RequestConverter.buildDrainRegionServersRequest(servers));
|
||||
return null;
|
||||
}
|
||||
});
|
||||
|
@ -4073,17 +4065,10 @@ public class HBaseAdmin implements Admin {
|
|||
|
||||
@Override
|
||||
public void removeDrainFromRegionServers(List<ServerName> servers) throws IOException {
|
||||
final List<HBaseProtos.ServerName> pbServers = new ArrayList<>(servers.size());
|
||||
for (ServerName server : servers) {
|
||||
pbServers.add(ProtobufUtil.toServerName(server));
|
||||
}
|
||||
|
||||
executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
|
||||
@Override
|
||||
public Void rpcCall() throws ServiceException {
|
||||
RemoveDrainFromRegionServersRequest req = RemoveDrainFromRegionServersRequest.newBuilder()
|
||||
.addAllServerName(pbServers).build();
|
||||
master.removeDrainFromRegionServers(getRpcController(), req);
|
||||
master.removeDrainFromRegionServers(getRpcController(), RequestConverter.buildRemoveDrainFromRegionServersRequest(servers));
|
||||
return null;
|
||||
}
|
||||
});
|
||||
|
|
|
@ -118,6 +118,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTab
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DrainRegionServersRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DrainRegionServersResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest;
|
||||
|
@ -156,6 +158,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedur
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDrainingRegionServersRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDrainingRegionServersResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListProceduresRequest;
|
||||
|
@ -176,6 +180,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeR
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RemoveDrainFromRegionServersRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RemoveDrainFromRegionServersResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest;
|
||||
|
@ -1915,6 +1921,44 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin {
|
|||
.collect(Collectors.toList()))).call();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> drainRegionServers(List<ServerName> servers) {
|
||||
return this
|
||||
.<Void> newMasterCaller()
|
||||
.action(
|
||||
(controller, stub) -> this
|
||||
.<DrainRegionServersRequest, DrainRegionServersResponse, Void> call(controller, stub,
|
||||
RequestConverter.buildDrainRegionServersRequest(servers),
|
||||
(s, c, req, done) -> s.drainRegionServers(c, req, done), resp -> null)).call();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<List<ServerName>> listDrainingRegionServers() {
|
||||
return this
|
||||
.<List<ServerName>> newMasterCaller()
|
||||
.action(
|
||||
(controller, stub) -> this
|
||||
.<ListDrainingRegionServersRequest, ListDrainingRegionServersResponse, List<ServerName>> call(
|
||||
controller,
|
||||
stub,
|
||||
ListDrainingRegionServersRequest.newBuilder().build(),
|
||||
(s, c, req, done) -> s.listDrainingRegionServers(c, req, done),
|
||||
resp -> resp.getServerNameList().stream().map(ProtobufUtil::toServerName)
|
||||
.collect(Collectors.toList()))).call();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> removeDrainFromRegionServers(List<ServerName> servers) {
|
||||
return this
|
||||
.<Void> newMasterCaller()
|
||||
.action(
|
||||
(controller, stub) -> this
|
||||
.<RemoveDrainFromRegionServersRequest, RemoveDrainFromRegionServersResponse, Void> call(
|
||||
controller, stub, RequestConverter
|
||||
.buildRemoveDrainFromRegionServersRequest(servers), (s, c, req, done) -> s
|
||||
.removeDrainFromRegionServers(c, req, done), resp -> null)).call();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the region location for the passed region name. The region name may be a full region name
|
||||
* or encoded region name. If the region does not found, then it'll throw an
|
||||
|
|
|
@ -25,7 +25,6 @@ import java.util.Optional;
|
|||
import java.util.Set;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
|
||||
import org.apache.hadoop.hbase.CellScannable;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
|
@ -70,6 +69,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavor
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest.RegionUpdateInfo;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Condition;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
|
||||
|
@ -93,9 +93,11 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColu
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DrainRegionServersRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RemoveDrainFromRegionServersRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
|
||||
|
@ -1808,4 +1810,23 @@ public final class RequestConverter {
|
|||
public static GetQuotaStatesRequest buildGetQuotaStatesRequest() {
|
||||
return GET_QUOTA_STATES_REQUEST;
|
||||
}
|
||||
|
||||
public static DrainRegionServersRequest buildDrainRegionServersRequest(List<ServerName> servers) {
|
||||
return DrainRegionServersRequest.newBuilder().addAllServerName(toProtoServerNames(servers))
|
||||
.build();
|
||||
}
|
||||
|
||||
public static RemoveDrainFromRegionServersRequest buildRemoveDrainFromRegionServersRequest(
|
||||
List<ServerName> servers) {
|
||||
return RemoveDrainFromRegionServersRequest.newBuilder()
|
||||
.addAllServerName(toProtoServerNames(servers)).build();
|
||||
}
|
||||
|
||||
private static List<HBaseProtos.ServerName> toProtoServerNames(List<ServerName> servers) {
|
||||
List<HBaseProtos.ServerName> pbServers = new ArrayList<>(servers.size());
|
||||
for (ServerName server : servers) {
|
||||
pbServers.add(ProtobufUtil.toServerName(server));
|
||||
}
|
||||
return pbServers;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,101 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
@Category({ ClientTests.class, MediumTests.class })
|
||||
public class TestAsyncDrainAdminApi extends TestAsyncAdminBase {
|
||||
|
||||
/*
|
||||
* This test drains all regions so cannot be run in parallel with other tests.
|
||||
*/
|
||||
@Ignore @Test(timeout = 30000)
|
||||
public void testDrainRegionServers() throws Exception {
|
||||
List<ServerName> drainingServers = admin.listDrainingRegionServers().get();
|
||||
assertTrue(drainingServers.isEmpty());
|
||||
|
||||
// Drain all region servers.
|
||||
Collection<ServerName> clusterServers = admin.getRegionServers().get();
|
||||
drainingServers = new ArrayList<>();
|
||||
for (ServerName server : clusterServers) {
|
||||
drainingServers.add(server);
|
||||
}
|
||||
admin.drainRegionServers(drainingServers).join();
|
||||
|
||||
// Check that drain lists all region servers.
|
||||
drainingServers = admin.listDrainingRegionServers().get();
|
||||
assertEquals(clusterServers.size(), drainingServers.size());
|
||||
for (ServerName server : clusterServers) {
|
||||
assertTrue(drainingServers.contains(server));
|
||||
}
|
||||
|
||||
// Try for 20 seconds to create table (new region). Will not complete because all RSs draining.
|
||||
TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
|
||||
builder.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).build());
|
||||
final Runnable createTable = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
admin.createTable(builder.build()).join();
|
||||
} catch (Exception ioe) {
|
||||
assertTrue(false); // Should not get IOException.
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
final ExecutorService executor = Executors.newSingleThreadExecutor();
|
||||
final java.util.concurrent.Future<?> future = executor.submit(createTable);
|
||||
executor.shutdown();
|
||||
try {
|
||||
future.get(20, TimeUnit.SECONDS);
|
||||
} catch (TimeoutException ie) {
|
||||
assertTrue(true); // Expecting timeout to happen.
|
||||
}
|
||||
|
||||
// Kill executor if still processing.
|
||||
if (!executor.isTerminated()) {
|
||||
executor.shutdownNow();
|
||||
assertTrue(true);
|
||||
}
|
||||
|
||||
// Remove drain list.
|
||||
admin.removeDrainFromRegionServers(drainingServers);
|
||||
drainingServers = admin.listDrainingRegionServers().get();
|
||||
assertTrue(drainingServers.isEmpty());
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue