HBASE-17396 Add first async admin impl and implement balance methods

This commit is contained in:
Guanghao Zhang 2017-01-19 10:15:12 +08:00
parent 8f1d0a2b84
commit cb9ce2ceaf
9 changed files with 720 additions and 138 deletions

View File

@ -0,0 +1,64 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
/**
* The asynchronous administrative API for HBase.
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public interface AsyncAdmin {
/**
* Turn the load balancer on or off.
* @param on
* @return Previous balancer value wrapped by a {@link CompletableFuture}.
*/
CompletableFuture<Boolean> setBalancerRunning(final boolean on) throws IOException;
/**
* Invoke the balancer. Will run the balancer and if regions to move, it will go ahead and do the
* reassignments. Can NOT run for various reasons. Check logs.
* @return True if balancer ran, false otherwise. The return value will be wrapped by a
* {@link CompletableFuture}.
*/
CompletableFuture<Boolean> balancer() throws IOException;
/**
* Invoke the balancer. Will run the balancer and if regions to move, it will go ahead and do the
* reassignments. If there is region in transition, force parameter of true would still run
* balancer. Can *not* run for other reasons. Check logs.
* @param force whether we should force balance even if there is region in transition.
* @return True if balancer ran, false otherwise. The return value will be wrapped by a
* {@link CompletableFuture}.
*/
CompletableFuture<Boolean> balancer(boolean force) throws IOException;
/**
* Query the current state of the balancer.
* @return true if the balancer is enabled, false otherwise.
* The return value will be wrapped by a {@link CompletableFuture}.
*/
CompletableFuture<Boolean> isBalancerEnabled() throws IOException;
}

View File

@ -96,4 +96,13 @@ public interface AsyncConnection extends Closeable {
* @param pool the thread pool to use for executing callback
*/
AsyncTableBuilder<AsyncTable> getTableBuilder(TableName tableName, ExecutorService pool);
/**
* Retrieve an AsyncAdmin implementation to administer an HBase cluster. The returned AsyncAdmin
* is not guaranteed to be thread-safe. A new instance should be created for each using thread.
* This is a lightweight operation. Pooling or caching of the returned AsyncAdmin is not
* recommended.
* @return an AsyncAdmin instance for cluster administration
*/
AsyncAdmin getAdmin();
}

View File

@ -28,23 +28,32 @@ import io.netty.util.HashedWheelTimer;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsMasterRunningResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
import org.apache.hadoop.hbase.util.CollectionUtils;
import org.apache.hadoop.hbase.util.Threads;
@ -88,6 +97,11 @@ class AsyncConnectionImpl implements AsyncConnection {
private final ConcurrentMap<String, ClientService.Interface> rsStubs = new ConcurrentHashMap<>();
private final AtomicReference<MasterService.Interface> masterStub = new AtomicReference<>();
private final AtomicReference<CompletableFuture<MasterService.Interface>> masterStubMakeFuture =
new AtomicReference<>();
public AsyncConnectionImpl(Configuration conf, User user) {
this.conf = conf;
this.user = user;
@ -149,6 +163,93 @@ class AsyncConnectionImpl implements AsyncConnection {
() -> createRegionServerStub(serverName));
}
private MasterService.Interface createMasterStub(ServerName serverName) throws IOException {
return MasterService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
}
private void makeMasterStub(CompletableFuture<MasterService.Interface> future) {
registry.getMasterAddress().whenComplete(
(sn, error) -> {
if (sn == null) {
String msg = "ZooKeeper available but no active master location found";
LOG.info(msg);
this.masterStubMakeFuture.getAndSet(null).completeExceptionally(
new MasterNotRunningException(msg));
return;
}
try {
MasterService.Interface stub = createMasterStub(sn);
HBaseRpcController controller = getRpcController();
stub.isMasterRunning(controller, RequestConverter.buildIsMasterRunningRequest(),
new RpcCallback<IsMasterRunningResponse>() {
@Override
public void run(IsMasterRunningResponse resp) {
if (controller.failed() || resp == null
|| (resp != null && !resp.getIsMasterRunning())) {
masterStubMakeFuture.getAndSet(null).completeExceptionally(
new MasterNotRunningException("Master connection is not running anymore"));
} else {
masterStub.set(stub);
masterStubMakeFuture.set(null);
future.complete(stub);
}
}
});
} catch (IOException e) {
this.masterStubMakeFuture.getAndSet(null).completeExceptionally(
new IOException("Failed to create async master stub", e));
}
});
}
CompletableFuture<MasterService.Interface> getMasterStub() {
MasterService.Interface masterStub = this.masterStub.get();
if (masterStub == null) {
for (;;) {
if (this.masterStubMakeFuture.compareAndSet(null, new CompletableFuture<>())) {
CompletableFuture<MasterService.Interface> future = this.masterStubMakeFuture.get();
makeMasterStub(future);
} else {
CompletableFuture<MasterService.Interface> future = this.masterStubMakeFuture.get();
if (future != null) {
return future;
}
}
}
}
for (;;) {
if (masterStubMakeFuture.compareAndSet(null, new CompletableFuture<>())) {
CompletableFuture<MasterService.Interface> future = masterStubMakeFuture.get();
HBaseRpcController controller = getRpcController();
masterStub.isMasterRunning(controller, RequestConverter.buildIsMasterRunningRequest(),
new RpcCallback<IsMasterRunningResponse>() {
@Override
public void run(IsMasterRunningResponse resp) {
if (controller.failed() || resp == null
|| (resp != null && !resp.getIsMasterRunning())) {
makeMasterStub(future);
} else {
future.complete(masterStub);
}
}
});
} else {
CompletableFuture<MasterService.Interface> future = masterStubMakeFuture.get();
if (future != null) {
return future;
}
}
}
}
private HBaseRpcController getRpcController() {
HBaseRpcController controller = this.rpcControllerFactory.newController();
controller.setCallTimeout((int) TimeUnit.NANOSECONDS.toMillis(connConf.getRpcTimeoutNs()));
return controller;
}
@Override
public AsyncTableBuilder<RawAsyncTable> getRawTableBuilder(TableName tableName) {
return new AsyncTableBuilderBase<RawAsyncTable>(tableName, connConf) {
@ -171,4 +272,9 @@ class AsyncConnectionImpl implements AsyncConnection {
}
};
}
@Override
public AsyncAdmin getAdmin() {
return new AsyncHBaseAdmin(this);
}
}

View File

@ -0,0 +1,144 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
/**
* The implementation of AsyncAdmin.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class AsyncHBaseAdmin implements AsyncAdmin {
private final AsyncConnectionImpl connection;
private final long rpcTimeoutNs;
private final long operationTimeoutNs;
private final long pauseNs;
private final int maxAttempts;
private final int startLogErrorsCnt;
AsyncHBaseAdmin(AsyncConnectionImpl connection) {
this.connection = connection;
this.rpcTimeoutNs = connection.connConf.getRpcTimeoutNs();
this.operationTimeoutNs = connection.connConf.getOperationTimeoutNs();
this.pauseNs = connection.connConf.getPauseNs();
this.maxAttempts = connection.connConf.getMaxRetries();
this.startLogErrorsCnt = connection.connConf.getStartLogErrorsCnt();
}
private <T> MasterRequestCallerBuilder<T> newCaller() {
return this.connection.callerFactory.<T> masterRequest()
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
.pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
.startLogErrorsCnt(startLogErrorsCnt);
}
@FunctionalInterface
private interface RpcCall<RESP, REQ> {
void call(MasterService.Interface stub, HBaseRpcController controller, REQ req,
RpcCallback<RESP> done);
}
@FunctionalInterface
private interface Converter<D, S> {
D convert(S src) throws IOException;
}
private <PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller,
MasterService.Interface stub, PREQ preq, RpcCall<PRESP, PREQ> rpcCall,
Converter<RESP, PRESP> respConverter) {
CompletableFuture<RESP> future = new CompletableFuture<>();
rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
@Override
public void run(PRESP resp) {
if (controller.failed()) {
future.completeExceptionally(controller.getFailed());
} else {
try {
future.complete(respConverter.convert(resp));
} catch (IOException e) {
future.completeExceptionally(e);
}
}
}
});
return future;
}
@Override
public CompletableFuture<Boolean> setBalancerRunning(final boolean on) throws IOException {
return this
.<Boolean> newCaller()
.action(
(controller, stub) -> this
.<SetBalancerRunningRequest, SetBalancerRunningResponse, Boolean> call(controller,
stub, RequestConverter.buildSetBalancerRunningRequest(on, true),
(s, c, req, done) -> s.setBalancerRunning(c, req, done),
(resp) -> resp.getPrevBalanceValue())).call();
}
@Override
public CompletableFuture<Boolean> balancer() throws IOException {
return balancer(false);
}
@Override
public CompletableFuture<Boolean> balancer(boolean force) throws IOException {
return this
.<Boolean> newCaller()
.action(
(controller, stub) -> this.<BalanceRequest, BalanceResponse, Boolean> call(controller,
stub, RequestConverter.buildBalanceRequest(force),
(s, c, req, done) -> s.balance(c, req, done), (resp) -> resp.getBalancerRan())).call();
}
@Override
public CompletableFuture<Boolean> isBalancerEnabled() throws IOException {
return this
.<Boolean> newCaller()
.action(
(controller, stub) -> this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse, Boolean> call(
controller, stub, RequestConverter.buildIsBalancerEnabledRequest(),
(s, c, req, done) -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled()))
.call();
}
}

View File

@ -0,0 +1,73 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import io.netty.util.HashedWheelTimer;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
/**
* Retry caller for a request call to master.
*/
@InterfaceAudience.Private
public class AsyncMasterRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCaller<T> {
@FunctionalInterface
public interface Callable<T> {
CompletableFuture<T> call(HBaseRpcController controller, MasterService.Interface stub);
}
private final Callable<T> callable;
public AsyncMasterRequestRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn,
Callable<T> callable, long pauseNs, int maxRetries, long operationTimeoutNs,
long rpcTimeoutNs, int startLogErrorsCnt) {
super(retryTimer, conn, pauseNs, maxRetries, operationTimeoutNs, rpcTimeoutNs,
startLogErrorsCnt);
this.callable = callable;
}
@Override
protected void doCall() {
conn.getMasterStub().whenComplete((stub, error) -> {
if (error != null) {
onError(error, () -> "Get async master stub failed", err -> {
});
return;
}
resetCallTimeout();
callable.call(controller, stub).whenComplete((result, error2) -> {
if (error2 != null) {
onError(error2, () -> "Call to master failed", err -> {
});
return;
}
future.complete(result);
});
});
}
public CompletableFuture<T> call() {
doCall();
return future;
}
}

View File

@ -0,0 +1,151 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
import io.netty.util.HashedWheelTimer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@InterfaceAudience.Private
public abstract class AsyncRpcRetryingCaller<T> {
private static final Log LOG = LogFactory.getLog(AsyncRpcRetryingCaller.class);
private final HashedWheelTimer retryTimer;
private final long startNs;
private final long pauseNs;
private int tries = 1;
private final int maxAttempts;
private final int startLogErrorsCnt;
private final List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions;
private final long rpcTimeoutNs;
protected final long operationTimeoutNs;
protected final AsyncConnectionImpl conn;
protected final CompletableFuture<T> future;
protected final HBaseRpcController controller;
public AsyncRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn,
long pauseNs, int maxAttempts, long operationTimeoutNs,
long rpcTimeoutNs, int startLogErrorsCnt) {
this.retryTimer = retryTimer;
this.conn = conn;
this.pauseNs = pauseNs;
this.maxAttempts = maxAttempts;
this.operationTimeoutNs = operationTimeoutNs;
this.rpcTimeoutNs = rpcTimeoutNs;
this.startLogErrorsCnt = startLogErrorsCnt;
this.future = new CompletableFuture<>();
this.controller = conn.rpcControllerFactory.newController();
this.exceptions = new ArrayList<>();
this.startNs = System.nanoTime();
}
private long elapsedMs() {
return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs);
}
protected long remainingTimeNs() {
return operationTimeoutNs - (System.nanoTime() - startNs);
}
protected void completeExceptionally() {
future.completeExceptionally(new RetriesExhaustedException(tries - 1, exceptions));
}
protected void resetCallTimeout() {
long callTimeoutNs;
if (operationTimeoutNs > 0) {
callTimeoutNs = remainingTimeNs();
if (callTimeoutNs <= 0) {
completeExceptionally();
return;
}
callTimeoutNs = Math.min(callTimeoutNs, rpcTimeoutNs);
} else {
callTimeoutNs = rpcTimeoutNs;
}
resetController(controller, callTimeoutNs);
}
protected void onError(Throwable error, Supplier<String> errMsg,
Consumer<Throwable> updateCachedLocation) {
error = translateException(error);
if (tries > startLogErrorsCnt) {
LOG.warn(errMsg.get() + ", tries = " + tries + ", maxAttempts = " + maxAttempts
+ ", timeout = " + TimeUnit.NANOSECONDS.toMillis(operationTimeoutNs)
+ " ms, time elapsed = " + elapsedMs() + " ms", error);
}
RetriesExhaustedException.ThrowableWithExtraContext qt = new RetriesExhaustedException.ThrowableWithExtraContext(
error, EnvironmentEdgeManager.currentTime(), "");
exceptions.add(qt);
if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
completeExceptionally();
return;
}
long delayNs;
if (operationTimeoutNs > 0) {
long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
if (maxDelayNs <= 0) {
completeExceptionally();
return;
}
delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1));
} else {
delayNs = getPauseTime(pauseNs, tries - 1);
}
updateCachedLocation.accept(error);
tries++;
retryTimer.newTimeout(t -> doCall(), delayNs, TimeUnit.NANOSECONDS);
}
protected abstract void doCall();
CompletableFuture<T> call() {
doCall();
return future;
}
}

View File

@ -369,4 +369,59 @@ class AsyncRpcRetryingCallerFactory {
public BatchCallerBuilder batch() {
return new BatchCallerBuilder();
}
public class MasterRequestCallerBuilder<T> extends BuilderBase {
private AsyncMasterRequestRpcRetryingCaller.Callable<T> callable;
private long operationTimeoutNs = -1L;
private long rpcTimeoutNs = -1L;
public MasterRequestCallerBuilder<T> action(AsyncMasterRequestRpcRetryingCaller.Callable<T> callable) {
this.callable = callable;
return this;
}
public MasterRequestCallerBuilder<T> operationTimeout(long operationTimeout, TimeUnit unit) {
this.operationTimeoutNs = unit.toNanos(operationTimeout);
return this;
}
public MasterRequestCallerBuilder<T> rpcTimeout(long rpcTimeout, TimeUnit unit) {
this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
return this;
}
public MasterRequestCallerBuilder<T> pause(long pause, TimeUnit unit) {
this.pauseNs = unit.toNanos(pause);
return this;
}
public MasterRequestCallerBuilder<T> maxAttempts(int maxAttempts) {
this.maxAttempts = maxAttempts;
return this;
}
public MasterRequestCallerBuilder<T> startLogErrorsCnt(int startLogErrorsCnt) {
this.startLogErrorsCnt = startLogErrorsCnt;
return this;
}
public AsyncMasterRequestRpcRetryingCaller<T> build() {
return new AsyncMasterRequestRpcRetryingCaller<T>(retryTimer, conn, checkNotNull(callable,
"action is null"), pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
startLogErrorsCnt);
}
/**
* Shortcut for {@code build().call()}
*/
public CompletableFuture<T> call() {
return build().call();
}
}
public <T> MasterRequestCallerBuilder<T> masterRequest() {
return new MasterRequestCallerBuilder<>();
}
}

View File

@ -17,39 +17,23 @@
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
import io.netty.util.HashedWheelTimer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/**
* Retry caller for a single request, such as get, put, delete, etc.
*/
@InterfaceAudience.Private
class AsyncSingleRequestRpcRetryingCaller<T> {
private static final Log LOG = LogFactory.getLog(AsyncSingleRequestRpcRetryingCaller.class);
class AsyncSingleRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCaller<T> {
@FunctionalInterface
public interface Callable<T> {
@ -57,10 +41,6 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
ClientService.Interface stub);
}
private final HashedWheelTimer retryTimer;
private final AsyncConnectionImpl conn;
private final TableName tableName;
private final byte[] row;
@ -69,123 +49,36 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
private final Callable<T> callable;
private final long pauseNs;
private final int maxAttempts;
private final long operationTimeoutNs;
private final long rpcTimeoutNs;
private final int startLogErrorsCnt;
private final CompletableFuture<T> future;
private final HBaseRpcController controller;
private final List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions;
private final long startNs;
public AsyncSingleRequestRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn,
TableName tableName, byte[] row, RegionLocateType locateType, Callable<T> callable,
long pauseNs, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs,
int startLogErrorsCnt) {
this.retryTimer = retryTimer;
this.conn = conn;
super(retryTimer, conn, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
startLogErrorsCnt);
this.tableName = tableName;
this.row = row;
this.locateType = locateType;
this.callable = callable;
this.pauseNs = pauseNs;
this.maxAttempts = maxAttempts;
this.operationTimeoutNs = operationTimeoutNs;
this.rpcTimeoutNs = rpcTimeoutNs;
this.startLogErrorsCnt = startLogErrorsCnt;
this.future = new CompletableFuture<>();
this.controller = conn.rpcControllerFactory.newController();
this.exceptions = new ArrayList<>();
this.startNs = System.nanoTime();
}
private int tries = 1;
private long elapsedMs() {
return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs);
}
private long remainingTimeNs() {
return operationTimeoutNs - (System.nanoTime() - startNs);
}
private void completeExceptionally() {
future.completeExceptionally(new RetriesExhaustedException(tries - 1, exceptions));
}
private void onError(Throwable error, Supplier<String> errMsg,
Consumer<Throwable> updateCachedLocation) {
error = translateException(error);
if (tries > startLogErrorsCnt) {
LOG.warn(errMsg.get(), error);
}
RetriesExhaustedException.ThrowableWithExtraContext qt =
new RetriesExhaustedException.ThrowableWithExtraContext(error,
EnvironmentEdgeManager.currentTime(), "");
exceptions.add(qt);
if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
completeExceptionally();
return;
}
long delayNs;
if (operationTimeoutNs > 0) {
long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
if (maxDelayNs <= 0) {
completeExceptionally();
return;
}
delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1));
} else {
delayNs = getPauseTime(pauseNs, tries - 1);
}
updateCachedLocation.accept(error);
tries++;
retryTimer.newTimeout(t -> locateThenCall(), delayNs, TimeUnit.NANOSECONDS);
}
private void call(HRegionLocation loc) {
long callTimeoutNs;
if (operationTimeoutNs > 0) {
callTimeoutNs = remainingTimeNs();
if (callTimeoutNs <= 0) {
completeExceptionally();
return;
}
callTimeoutNs = Math.min(callTimeoutNs, rpcTimeoutNs);
} else {
callTimeoutNs = rpcTimeoutNs;
}
ClientService.Interface stub;
try {
stub = conn.getRegionServerStub(loc.getServerName());
} catch (IOException e) {
onError(e,
() -> "Get async stub to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row)
+ "' in " + loc.getRegionInfo().getEncodedName() + " of " + tableName
+ " failed, tries = " + tries + ", maxAttempts = " + maxAttempts + ", timeout = "
+ TimeUnit.NANOSECONDS.toMillis(operationTimeoutNs) + " ms, time elapsed = "
+ elapsedMs() + " ms",
+ "' in " + loc.getRegionInfo().getEncodedName() + " of " + tableName + " failed",
err -> conn.getLocator().updateCachedLocation(loc, err));
return;
}
resetController(controller, callTimeoutNs);
callable.call(controller, loc, stub).whenComplete((result, error) -> {
resetCallTimeout();
callable.call(controller, loc, stub).whenComplete(
(result, error) -> {
if (error != null) {
onError(error,
() -> "Call to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row) + "' in "
+ loc.getRegionInfo().getEncodedName() + " of " + tableName + " failed, tries = "
+ tries + ", maxAttempts = " + maxAttempts + ", timeout = "
+ TimeUnit.NANOSECONDS.toMillis(operationTimeoutNs) + " ms, time elapsed = "
+ elapsedMs() + " ms",
+ loc.getRegionInfo().getEncodedName() + " of " + tableName + " failed",
err -> conn.getLocator().updateCachedLocation(loc, err));
return;
}
@ -193,7 +86,8 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
});
}
private void locateThenCall() {
@Override
protected void doCall() {
long locateTimeoutNs;
if (operationTimeoutNs > 0) {
locateTimeoutNs = remainingTimeNs();
@ -204,15 +98,13 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
} else {
locateTimeoutNs = -1L;
}
conn.getLocator().getRegionLocation(tableName, row, locateType, locateTimeoutNs)
.whenComplete((loc, error) -> {
conn.getLocator()
.getRegionLocation(tableName, row, locateType, locateTimeoutNs)
.whenComplete(
(loc, error) -> {
if (error != null) {
onError(error,
() -> "Locate '" + Bytes.toStringBinary(row) + "' in " + tableName
+ " failed, tries = " + tries + ", maxAttempts = " + maxAttempts + ", timeout = "
+ TimeUnit.NANOSECONDS.toMillis(operationTimeoutNs) + " ms, time elapsed = "
+ elapsedMs() + " ms",
err -> {
onError(error, () -> "Locate '" + Bytes.toStringBinary(row) + "' in " + tableName
+ " failed", err -> {
});
return;
}
@ -220,8 +112,9 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
});
}
@Override
public CompletableFuture<T> call() {
locateThenCall();
doCall();
return future;
}
}

View File

@ -0,0 +1,87 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
* Class to test AsyncAdmin.
*/
@Category({LargeTests.class, ClientTests.class})
public class TestAsyncAdmin {
private static final Log LOG = LogFactory.getLog(TestAdmin1.class);
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static AsyncConnection ASYNC_CONN;
private AsyncAdmin admin;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 10);
TEST_UTIL.getConfiguration().setInt("hbase.client.retries.number", 3);
TEST_UTIL.getConfiguration().setInt("hbase.rpc.timeout", 1000);
TEST_UTIL.startMiniCluster(1);
ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration());
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
IOUtils.closeQuietly(ASYNC_CONN);
TEST_UTIL.shutdownMiniCluster();
}
@Before
public void setUp() throws Exception {
this.admin = ASYNC_CONN.getAdmin();
}
@Test(timeout = 30000)
public void testBalancer() throws Exception {
boolean initialState = admin.isBalancerEnabled().get();
// Start the balancer, wait for it.
boolean prevState = admin.setBalancerRunning(!initialState).get();
// The previous state should be the original state we observed
assertEquals(initialState, prevState);
// Current state should be opposite of the original
assertEquals(!initialState, admin.isBalancerEnabled().get());
// Reset it back to what it was
prevState = admin.setBalancerRunning(initialState).get();
// The previous state should be the opposite of the initial state
assertEquals(!initialState, prevState);
// Current state should be the original state again
assertEquals(initialState, admin.isBalancerEnabled().get());
}
}