HBASE-17619: Add async admin Impl which connect to RegionServer and implement close region methods.

Signed-off-by: zhangduo <zhangduo@apache.org>
This commit is contained in:
huzheng 2017-02-13 19:53:11 +08:00 committed by zhangduo
parent ae840c0ccd
commit e129e6b65d
6 changed files with 431 additions and 18 deletions

View File

@ -21,7 +21,9 @@ import java.util.concurrent.CompletableFuture;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.classification.InterfaceStability;
@ -282,4 +284,52 @@ public interface AsyncAdmin {
* The return value will be wrapped by a {@link CompletableFuture}. * The return value will be wrapped by a {@link CompletableFuture}.
*/ */
CompletableFuture<Boolean> isBalancerEnabled(); CompletableFuture<Boolean> isBalancerEnabled();
/**
* Close a region. For expert-admins. Runs close on the regionserver. The master will not be
* informed of the close.
*
* @param regionname region name to close
* @param serverName If supplied, we'll use this location rather than the one currently in
* <code>hbase:meta</code>
*/
CompletableFuture<Void> closeRegion(String regionname, String serverName);
/**
* Close a region. For expert-admins Runs close on the regionserver. The master will not be
* informed of the close.
*
* @param regionname region name to close
* @param serverName The servername of the regionserver. If passed null we will use servername
* found in the hbase:meta table. A server name is made of host, port and startcode. Here is an
* example: <code> host187.example.com,60020,1289493121758</code>
*/
CompletableFuture<Void> closeRegion(byte[] regionname, String serverName);
/**
* For expert-admins. Runs close on the regionserver. Closes a region based on the encoded region
* name. The region server name is mandatory. If the servername is provided then based on the
* online regions in the specified regionserver the specified region will be closed. The master
* will not be informed of the close. Note that the regionname is the encoded regionname.
*
* @param encodedRegionName The encoded region name; i.e. the hash that makes up the region name
* suffix: e.g. if regionname is
* <code>TestTable,0094429456,1289497600452.527db22f95c8a9e0116f0cc13c680396.</code>,
* then the encoded region name is: <code>527db22f95c8a9e0116f0cc13c680396</code>.
* @param serverName The servername of the regionserver. A server name is made of host, port and
* startcode. This is mandatory. Here is an example:
* <code> host187.example.com,60020,1289493121758</code>
* @return true if the region was closed, false if not. The return value will be wrapped by a
* {@link CompletableFuture}.
*/
CompletableFuture<Boolean> closeRegionWithEncodedRegionName(String encodedRegionName, String serverName);
/**
* Close a region. For expert-admins Runs close on the regionserver. The master will not be
* informed of the close.
*
* @param sn
* @param hri
*/
CompletableFuture<Void> closeRegion(ServerName sn, HRegionInfo hri);
} }

View File

@ -0,0 +1,74 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import io.netty.util.HashedWheelTimer;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
@InterfaceAudience.Private
public class AsyncAdminRequestRetryingCaller<T> extends AsyncRpcRetryingCaller<T> {
@FunctionalInterface
public interface Callable<T> {
CompletableFuture<T> call(HBaseRpcController controller, AdminService.Interface stub);
}
private final Callable<T> callable;
private ServerName serverName;
public AsyncAdminRequestRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn,
long pauseNs, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs,
int startLogErrorsCnt, ServerName serverName, Callable<T> callable) {
super(retryTimer, conn, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
startLogErrorsCnt);
this.serverName = serverName;
this.callable = callable;
}
@Override
protected void doCall() {
AdminService.Interface adminStub;
try {
adminStub = this.conn.getAdminStub(serverName);
} catch (IOException e) {
onError(e, () -> "Get async admin stub to " + serverName + " failed", err -> {
});
return;
}
resetCallTimeout();
callable.call(controller, adminStub).whenComplete((result, error) -> {
if (error != null) {
onError(error, () -> "Call to admin stub failed", err -> {
});
return;
}
future.complete(result);
});
}
CompletableFuture<T> call() {
doCall();
return future;
}
}

View File

@ -31,7 +31,6 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -51,6 +50,7 @@ import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback; import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsMasterRunningResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsMasterRunningResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
@ -96,6 +96,7 @@ class AsyncConnectionImpl implements AsyncConnection {
private final NonceGenerator nonceGenerator; private final NonceGenerator nonceGenerator;
private final ConcurrentMap<String, ClientService.Interface> rsStubs = new ConcurrentHashMap<>(); private final ConcurrentMap<String, ClientService.Interface> rsStubs = new ConcurrentHashMap<>();
private final ConcurrentMap<String, AdminService.Interface> adminSubs = new ConcurrentHashMap<>();
private final AtomicReference<MasterService.Interface> masterStub = new AtomicReference<>(); private final AtomicReference<MasterService.Interface> masterStub = new AtomicReference<>();
@ -167,6 +168,16 @@ class AsyncConnectionImpl implements AsyncConnection {
return MasterService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout)); return MasterService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
} }
private AdminService.Interface createAdminServerStub(ServerName serverName) throws IOException{
return AdminService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
}
AdminService.Interface getAdminStub(ServerName serverName) throws IOException {
return CollectionUtils.computeIfAbsentEx(adminSubs,
getStubKey(AdminService.Interface.class.getSimpleName(), serverName, hostnameCanChange),
() -> createAdminServerStub(serverName));
}
private void makeMasterStub(CompletableFuture<MasterService.Interface> future) { private void makeMasterStub(CompletableFuture<MasterService.Interface> future) {
registry.getMasterAddress().whenComplete( registry.getMasterAddress().whenComplete(
(sn, error) -> { (sn, error) -> {

View File

@ -31,18 +31,24 @@ import java.util.regex.Pattern;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.AsyncMetaTableAccessor; import org.apache.hadoop.hbase.AsyncMetaTableAccessor;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder;
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder; import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder;
import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback; import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse;
@ -115,7 +121,7 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
this.ng = connection.getNonceGenerator(); this.ng = connection.getNonceGenerator();
} }
private <T> MasterRequestCallerBuilder<T> newCaller() { private <T> MasterRequestCallerBuilder<T> newMasterCaller() {
return this.connection.callerFactory.<T> masterRequest() return this.connection.callerFactory.<T> masterRequest()
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
@ -123,19 +129,33 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
.startLogErrorsCnt(startLogErrorsCnt); .startLogErrorsCnt(startLogErrorsCnt);
} }
private <T> AdminRequestCallerBuilder<T> newAdminCaller() {
return this.connection.callerFactory.<T> adminRequest()
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
.pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
.startLogErrorsCnt(startLogErrorsCnt);
}
@FunctionalInterface @FunctionalInterface
private interface RpcCall<RESP, REQ> { private interface MasterRpcCall<RESP, REQ> {
void call(MasterService.Interface stub, HBaseRpcController controller, REQ req, void call(MasterService.Interface stub, HBaseRpcController controller, REQ req,
RpcCallback<RESP> done); RpcCallback<RESP> done);
} }
@FunctionalInterface
private interface AdminRpcCall<RESP, REQ> {
void call(AdminService.Interface stub, HBaseRpcController controller, REQ req,
RpcCallback<RESP> done);
}
@FunctionalInterface @FunctionalInterface
private interface Converter<D, S> { private interface Converter<D, S> {
D convert(S src) throws IOException; D convert(S src) throws IOException;
} }
private <PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller, private <PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller,
MasterService.Interface stub, PREQ preq, RpcCall<PRESP, PREQ> rpcCall, MasterService.Interface stub, PREQ preq, MasterRpcCall<PRESP, PREQ> rpcCall,
Converter<RESP, PRESP> respConverter) { Converter<RESP, PRESP> respConverter) {
CompletableFuture<RESP> future = new CompletableFuture<>(); CompletableFuture<RESP> future = new CompletableFuture<>();
rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() { rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
@ -156,11 +176,37 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
return future; return future;
} }
//TODO abstract call and adminCall into a single method.
private <PREQ, PRESP, RESP> CompletableFuture<RESP> adminCall(HBaseRpcController controller,
AdminService.Interface stub, PREQ preq, AdminRpcCall<PRESP, PREQ> rpcCall,
Converter<RESP, PRESP> respConverter) {
CompletableFuture<RESP> future = new CompletableFuture<>();
rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
@Override
public void run(PRESP resp) {
if (controller.failed()) {
future.completeExceptionally(new IOException(controller.errorText()));
} else {
if (respConverter != null) {
try {
future.complete(respConverter.convert(resp));
} catch (IOException e) {
future.completeExceptionally(e);
}
}
}
}
});
return future;
}
private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq, private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq,
RpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter, MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
TableProcedureBiConsumer consumer) { TableProcedureBiConsumer consumer) {
CompletableFuture<Long> procFuture = this CompletableFuture<Long> procFuture = this
.<Long> newCaller() .<Long>newMasterCaller()
.action( .action(
(controller, stub) -> this.<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall, (controller, stub) -> this.<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall,
respConverter)).call(); respConverter)).call();
@ -219,7 +265,7 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
@Override @Override
public CompletableFuture<HTableDescriptor[]> listTables(Pattern pattern, boolean includeSysTables) { public CompletableFuture<HTableDescriptor[]> listTables(Pattern pattern, boolean includeSysTables) {
return this return this
.<HTableDescriptor[]> newCaller() .<HTableDescriptor[]>newMasterCaller()
.action( .action(
(controller, stub) -> this (controller, stub) -> this
.<GetTableDescriptorsRequest, GetTableDescriptorsResponse, HTableDescriptor[]> call( .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, HTableDescriptor[]> call(
@ -241,7 +287,7 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
@Override @Override
public CompletableFuture<TableName[]> listTableNames(Pattern pattern, boolean includeSysTables) { public CompletableFuture<TableName[]> listTableNames(Pattern pattern, boolean includeSysTables) {
return this return this
.<TableName[]> newCaller() .<TableName[]>newMasterCaller()
.action( .action(
(controller, stub) -> this (controller, stub) -> this
.<GetTableNamesRequest, GetTableNamesResponse, TableName[]> call(controller, stub, .<GetTableNamesRequest, GetTableNamesResponse, TableName[]> call(controller, stub,
@ -253,7 +299,7 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
@Override @Override
public CompletableFuture<HTableDescriptor> getTableDescriptor(TableName tableName) { public CompletableFuture<HTableDescriptor> getTableDescriptor(TableName tableName) {
CompletableFuture<HTableDescriptor> future = new CompletableFuture<>(); CompletableFuture<HTableDescriptor> future = new CompletableFuture<>();
this.<List<TableSchema>> newCaller() this.<List<TableSchema>> newMasterCaller()
.action( .action(
(controller, stub) -> this (controller, stub) -> this
.<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableSchema>> call( .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableSchema>> call(
@ -383,7 +429,7 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
@Override @Override
public CompletableFuture<Pair<Integer, Integer>> getAlterStatus(TableName tableName) { public CompletableFuture<Pair<Integer, Integer>> getAlterStatus(TableName tableName) {
return this return this
.<Pair<Integer, Integer>> newCaller() .<Pair<Integer, Integer>>newMasterCaller()
.action( .action(
(controller, stub) -> this (controller, stub) -> this
.<GetSchemaAlterStatusRequest, GetSchemaAlterStatusResponse, Pair<Integer, Integer>> call( .<GetSchemaAlterStatusRequest, GetSchemaAlterStatusResponse, Pair<Integer, Integer>> call(
@ -420,7 +466,7 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
@Override @Override
public CompletableFuture<Boolean> setBalancerRunning(final boolean on) { public CompletableFuture<Boolean> setBalancerRunning(final boolean on) {
return this return this
.<Boolean> newCaller() .<Boolean>newMasterCaller()
.action( .action(
(controller, stub) -> this (controller, stub) -> this
.<SetBalancerRunningRequest, SetBalancerRunningResponse, Boolean> call(controller, .<SetBalancerRunningRequest, SetBalancerRunningResponse, Boolean> call(controller,
@ -437,7 +483,7 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
@Override @Override
public CompletableFuture<Boolean> balancer(boolean force) { public CompletableFuture<Boolean> balancer(boolean force) {
return this return this
.<Boolean> newCaller() .<Boolean>newMasterCaller()
.action( .action(
(controller, stub) -> this.<BalanceRequest, BalanceResponse, Boolean> call(controller, (controller, stub) -> this.<BalanceRequest, BalanceResponse, Boolean> call(controller,
stub, RequestConverter.buildBalanceRequest(force), stub, RequestConverter.buildBalanceRequest(force),
@ -447,7 +493,7 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
@Override @Override
public CompletableFuture<Boolean> isBalancerEnabled() { public CompletableFuture<Boolean> isBalancerEnabled() {
return this return this
.<Boolean> newCaller() .<Boolean>newMasterCaller()
.action( .action(
(controller, stub) -> this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse, Boolean> call( (controller, stub) -> this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse, Boolean> call(
controller, stub, RequestConverter.buildIsBalancerEnabledRequest(), controller, stub, RequestConverter.buildIsBalancerEnabledRequest(),
@ -455,6 +501,39 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
.call(); .call();
} }
@Override
public CompletableFuture<Void> closeRegion(String regionname, String serverName) {
return closeRegion(Bytes.toBytes(regionname), serverName);
}
@Override
public CompletableFuture<Void> closeRegion(byte[] regionname, String serverName) {
throw new UnsupportedOperationException("closeRegion method depends on getRegion API, will support soon.");
}
@Override
public CompletableFuture<Boolean> closeRegionWithEncodedRegionName(String encodedRegionName,
String serverName) {
return this
.<Boolean> newAdminCaller()
.action(
(controller, stub) -> this.<CloseRegionRequest, CloseRegionResponse, Boolean> adminCall(
controller, stub,
ProtobufUtil.buildCloseRegionRequest(ServerName.valueOf(serverName), encodedRegionName),
(s, c, req, done) -> s.closeRegion(controller, req, done), (resp) -> resp.getClosed()))
.serverName(ServerName.valueOf(serverName)).call();
}
@Override
public CompletableFuture<Void> closeRegion(ServerName sn, HRegionInfo hri) {
return this.<Void> newAdminCaller()
.action(
(controller, stub) -> this.<CloseRegionRequest, CloseRegionResponse, Void> adminCall(
controller, stub, ProtobufUtil.buildCloseRegionRequest(sn, hri.getRegionName()),
(s, c, req, done) -> s.closeRegion(controller, req, done), null))
.serverName(sn).call();
}
private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int numRegions) { private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int numRegions) {
if (numRegions < 3) { if (numRegions < 3) {
throw new IllegalArgumentException("Must create at least three regions"); throw new IllegalArgumentException("Must create at least three regions");
@ -625,7 +704,7 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
} }
private void getProcedureResult(final long procId, CompletableFuture<Void> future) { private void getProcedureResult(final long procId, CompletableFuture<Void> future) {
this.<GetProcedureResultResponse> newCaller() this.<GetProcedureResultResponse> newMasterCaller()
.action( .action(
(controller, stub) -> this (controller, stub) -> this
.<GetProcedureResultRequest, GetProcedureResultResponse, GetProcedureResultResponse> call( .<GetProcedureResultRequest, GetProcedureResultResponse, GetProcedureResultResponse> call(

View File

@ -28,11 +28,13 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.ipc.ProtobufRpcEngine.Server;
/** /**
* Factory to create an AsyncRpcRetryCaller. * Factory to create an AsyncRpcRetryCaller.
@ -352,4 +354,65 @@ class AsyncRpcRetryingCallerFactory {
public <T> MasterRequestCallerBuilder<T> masterRequest() { public <T> MasterRequestCallerBuilder<T> masterRequest() {
return new MasterRequestCallerBuilder<>(); return new MasterRequestCallerBuilder<>();
} }
}
public class AdminRequestCallerBuilder<T> extends BuilderBase{
// TODO: maybe we can reuse AdminRequestCallerBuild, MasterRequestCallerBuild etc.
private AsyncAdminRequestRetryingCaller.Callable<T> callable;
private long operationTimeoutNs = -1L;
private long rpcTimeoutNs = -1L;
private ServerName serverName;
public AdminRequestCallerBuilder<T> action(AsyncAdminRequestRetryingCaller.Callable<T> callable) {
this.callable = callable;
return this;
}
public AdminRequestCallerBuilder<T> operationTimeout(long operationTimeout, TimeUnit unit) {
this.operationTimeoutNs = unit.toNanos(operationTimeout);
return this;
}
public AdminRequestCallerBuilder<T> rpcTimeout(long rpcTimeout, TimeUnit unit) {
this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
return this;
}
public AdminRequestCallerBuilder<T> pause(long pause, TimeUnit unit) {
this.pauseNs = unit.toNanos(pause);
return this;
}
public AdminRequestCallerBuilder<T> maxAttempts(int maxAttempts) {
this.maxAttempts = maxAttempts;
return this;
}
public AdminRequestCallerBuilder<T> startLogErrorsCnt(int startLogErrorsCnt) {
this.startLogErrorsCnt = startLogErrorsCnt;
return this;
}
public AdminRequestCallerBuilder<T> serverName(ServerName serverName){
this.serverName = serverName;
return this;
}
public AsyncAdminRequestRetryingCaller<T> build() {
return new AsyncAdminRequestRetryingCaller<T>(retryTimer, conn, pauseNs, maxAttempts,
operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, serverName, checkNotNull(callable,
"action is null"));
}
public CompletableFuture<T> call() {
return build().call();
}
}
public <T> AdminRequestCallerBuilder<T> adminRequest(){
return new AdminRequestCallerBuilder<>();
}
}

View File

@ -47,13 +47,12 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.InvalidFamilyOperationException;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotDisabledException;
import org.apache.hadoop.hbase.TableNotEnabledException; import org.apache.hadoop.hbase.TableNotEnabledException;
import org.apache.hadoop.hbase.constraint.ConstraintException;
import org.apache.hadoop.hbase.master.MasterFileSystem; import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -899,4 +898,141 @@ public class TestAsyncAdmin {
// Current state should be the original state again // Current state should be the original state again
assertEquals(initialState, admin.isBalancerEnabled().get()); assertEquals(initialState, admin.isBalancerEnabled().get());
} }
private void createTableWithDefaultConf(TableName TABLENAME) throws Exception {
HTableDescriptor htd = new HTableDescriptor(TABLENAME);
HColumnDescriptor hcd = new HColumnDescriptor("value");
htd.addFamily(hcd);
admin.createTable(htd, null).get();
}
@Test
public void testCloseRegion() throws Exception {
TableName TABLENAME = TableName.valueOf("TestHBACloseRegion");
createTableWithDefaultConf(TABLENAME);
HRegionInfo info = null;
HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(TABLENAME);
List<HRegionInfo> onlineRegions = ProtobufUtil.getOnlineRegions(rs.getRSRpcServices());
for (HRegionInfo regionInfo : onlineRegions) {
if (!regionInfo.getTable().isSystemTable()) {
info = regionInfo;
boolean closed = admin.closeRegionWithEncodedRegionName(regionInfo.getEncodedName(),
rs.getServerName().getServerName()).get();
assertTrue(closed);
}
}
boolean isInList = ProtobufUtil.getOnlineRegions(rs.getRSRpcServices()).contains(info);
long timeout = System.currentTimeMillis() + 10000;
while ((System.currentTimeMillis() < timeout) && (isInList)) {
Thread.sleep(100);
isInList = ProtobufUtil.getOnlineRegions(rs.getRSRpcServices()).contains(info);
}
assertFalse("The region should not be present in online regions list.", isInList);
}
@Test
public void testCloseRegionIfInvalidRegionNameIsPassed() throws Exception {
final String name = "TestHBACloseRegion1";
byte[] TABLENAME = Bytes.toBytes(name);
createTableWithDefaultConf(TableName.valueOf(TABLENAME));
HRegionInfo info = null;
HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(TableName.valueOf(TABLENAME));
List<HRegionInfo> onlineRegions = ProtobufUtil.getOnlineRegions(rs.getRSRpcServices());
for (HRegionInfo regionInfo : onlineRegions) {
if (!regionInfo.isMetaTable()) {
if (regionInfo.getRegionNameAsString().contains(name)) {
info = regionInfo;
boolean catchNotServingException = false;
try {
admin.closeRegionWithEncodedRegionName("sample", rs.getServerName().getServerName())
.get();
} catch (Exception e) {
catchNotServingException = true;
// expected, ignore it
}
assertTrue(catchNotServingException);
}
}
}
onlineRegions = ProtobufUtil.getOnlineRegions(rs.getRSRpcServices());
assertTrue("The region should be present in online regions list.",
onlineRegions.contains(info));
}
@Test
public void testCloseRegionWhenServerNameIsNull() throws Exception {
byte[] TABLENAME = Bytes.toBytes("TestHBACloseRegion3");
createTableWithDefaultConf(TableName.valueOf(TABLENAME));
HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(TableName.valueOf(TABLENAME));
try {
List<HRegionInfo> onlineRegions = ProtobufUtil.getOnlineRegions(rs.getRSRpcServices());
for (HRegionInfo regionInfo : onlineRegions) {
if (!regionInfo.isMetaTable()) {
if (regionInfo.getRegionNameAsString().contains("TestHBACloseRegion3")) {
admin.closeRegionWithEncodedRegionName(regionInfo.getEncodedName(), null).get();
}
}
}
fail("The test should throw exception if the servername passed is null.");
} catch (IllegalArgumentException e) {
}
}
@Test
public void testCloseRegionWhenServerNameIsEmpty() throws Exception {
byte[] TABLENAME = Bytes.toBytes("TestHBACloseRegionWhenServerNameIsEmpty");
createTableWithDefaultConf(TableName.valueOf(TABLENAME));
HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(TableName.valueOf(TABLENAME));
try {
List<HRegionInfo> onlineRegions = ProtobufUtil.getOnlineRegions(rs.getRSRpcServices());
for (HRegionInfo regionInfo : onlineRegions) {
if (!regionInfo.isMetaTable()) {
if (regionInfo.getRegionNameAsString()
.contains("TestHBACloseRegionWhenServerNameIsEmpty")) {
admin.closeRegionWithEncodedRegionName(regionInfo.getEncodedName(), " ").get();
}
}
}
fail("The test should throw exception if the servername passed is empty.");
} catch (IllegalArgumentException e) {
}
}
@Test
public void testCloseRegionWhenEncodedRegionNameIsNotGiven() throws Exception {
byte[] TABLENAME = Bytes.toBytes("TestHBACloseRegion4");
createTableWithDefaultConf(TableName.valueOf(TABLENAME));
HRegionInfo info = null;
HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(TableName.valueOf(TABLENAME));
List<HRegionInfo> onlineRegions = ProtobufUtil.getOnlineRegions(rs.getRSRpcServices());
for (HRegionInfo regionInfo : onlineRegions) {
if (!regionInfo.isMetaTable()) {
if (regionInfo.getRegionNameAsString().contains("TestHBACloseRegion4")) {
info = regionInfo;
boolean catchNotServingException = false;
try {
admin.closeRegionWithEncodedRegionName(regionInfo.getRegionNameAsString(),
rs.getServerName().getServerName()).get();
} catch (Exception e) {
// expected, ignore it.
catchNotServingException = true;
}
assertTrue(catchNotServingException);
}
}
}
onlineRegions = ProtobufUtil.getOnlineRegions(rs.getRSRpcServices());
assertTrue("The region should be present in online regions list.",
onlineRegions.contains(info));
}
} }