HBASE-19695 Handle disabled table for async client

Signed-off-by: tianjingyun <tianjy1990@gmail.com>
This commit is contained in:
zhangduo 2019-01-13 14:34:07 +08:00 committed by Duo Zhang
parent 483b7d008e
commit b06387b2b0
4 changed files with 106 additions and 40 deletions

View File

@ -23,9 +23,7 @@ import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionException;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
@ -107,11 +105,12 @@ class AsyncRegionLocator {
}
HRegionLocation loc = locs.getRegionLocation(replicaId);
if (loc == null) {
future
.completeExceptionally(new RegionException("No location for " + tableName + ", row='" +
future.completeExceptionally(
new RegionOfflineException("No location for " + tableName + ", row='" +
Bytes.toStringBinary(row) + "', locateType=" + type + ", replicaId=" + replicaId));
} else if (loc.getServerName() == null) {
future.completeExceptionally(new HBaseIOException("No server address listed for region '" +
future.completeExceptionally(
new RegionOfflineException("No server address listed for region '" +
loc.getRegion().getRegionNameAsString() + ", row='" + Bytes.toStringBinary(row) +
"', locateType=" + type + ", replicaId=" + replicaId));
} else {

View File

@ -26,13 +26,19 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotEnabledException;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -110,32 +116,7 @@ public abstract class AsyncRpcRetryingCaller<T> {
resetController(controller, callTimeoutNs);
}
protected final void onError(Throwable error, Supplier<String> errMsg,
Consumer<Throwable> updateCachedLocation) {
if (future.isDone()) {
// Give up if the future is already done, this is possible if user has already canceled the
// future. And for timeline consistent read, we will also cancel some requests if we have
// already get one of the responses.
LOG.debug("The future is already done, canceled={}, give up retrying", future.isCancelled());
return;
}
error = translateException(error);
if (error instanceof DoNotRetryIOException) {
future.completeExceptionally(error);
return;
}
if (tries > startLogErrorsCnt) {
LOG.warn(errMsg.get() + ", tries = " + tries + ", maxAttempts = " + maxAttempts
+ ", timeout = " + TimeUnit.NANOSECONDS.toMillis(operationTimeoutNs)
+ " ms, time elapsed = " + elapsedMs() + " ms", error);
}
RetriesExhaustedException.ThrowableWithExtraContext qt = new RetriesExhaustedException.ThrowableWithExtraContext(
error, EnvironmentEdgeManager.currentTime(), "");
exceptions.add(qt);
if (tries >= maxAttempts) {
completeExceptionally();
return;
}
private void tryScheduleRetry(Throwable error, Consumer<Throwable> updateCachedLocation) {
long delayNs;
if (operationTimeoutNs > 0) {
long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
@ -147,11 +128,69 @@ public abstract class AsyncRpcRetryingCaller<T> {
} else {
delayNs = getPauseTime(pauseNs, tries - 1);
}
updateCachedLocation.accept(error);
tries++;
retryTimer.newTimeout(t -> doCall(), delayNs, TimeUnit.NANOSECONDS);
}
protected Optional<TableName> getTableName() {
return Optional.empty();
}
protected final void onError(Throwable t, Supplier<String> errMsg,
Consumer<Throwable> updateCachedLocation) {
if (future.isDone()) {
// Give up if the future is already done, this is possible if user has already canceled the
// future. And for timeline consistent read, we will also cancel some requests if we have
// already get one of the responses.
LOG.debug("The future is already done, canceled={}, give up retrying", future.isCancelled());
return;
}
Throwable error = translateException(t);
if (error instanceof DoNotRetryIOException) {
future.completeExceptionally(error);
return;
}
if (tries > startLogErrorsCnt) {
LOG.warn(errMsg.get() + ", tries = " + tries + ", maxAttempts = " + maxAttempts +
", timeout = " + TimeUnit.NANOSECONDS.toMillis(operationTimeoutNs) +
" ms, time elapsed = " + elapsedMs() + " ms", error);
}
updateCachedLocation.accept(error);
RetriesExhaustedException.ThrowableWithExtraContext qt =
new RetriesExhaustedException.ThrowableWithExtraContext(error,
EnvironmentEdgeManager.currentTime(), "");
exceptions.add(qt);
if (tries >= maxAttempts) {
completeExceptionally();
return;
}
// check whether the table has been disabled, notice that the check will introduce a request to
// meta, so here we only check for disabled for some specific exception types.
if (error instanceof NotServingRegionException || error instanceof RegionOfflineException) {
Optional<TableName> tableName = getTableName();
if (tableName.isPresent()) {
FutureUtils.addListener(conn.getAdmin().isTableDisabled(tableName.get()), (disabled, e) -> {
if (e != null) {
if (e instanceof TableNotFoundException) {
future.completeExceptionally(e);
} else {
// failed to test whether the table is disabled, not a big deal, continue retrying
tryScheduleRetry(error, updateCachedLocation);
}
return;
}
if (disabled) {
future.completeExceptionally(new TableNotEnabledException(tableName.get()));
} else {
tryScheduleRetry(error, updateCachedLocation);
}
});
}
} else {
tryScheduleRetry(error, updateCachedLocation);
}
}
protected abstract void doCall();
CompletableFuture<T> call() {

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
@ -114,4 +115,9 @@ class AsyncSingleRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCaller<T> {
call(loc);
});
}
@Override
protected Optional<TableName> getTableName() {
return Optional.of(tableName);
}
}

View File

@ -17,11 +17,15 @@
*/
package org.apache.hadoop.hbase.client;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.io.UncheckedIOException;
@ -40,6 +44,7 @@ import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotEnabledException;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@ -115,8 +120,11 @@ public class TestAsyncTable {
}
@Before
public void setUp() throws IOException, InterruptedException {
public void setUp() throws IOException, InterruptedException, ExecutionException {
row = Bytes.toBytes(testName.getMethodName().replaceAll("[^0-9A-Za-z]", "_"));
if (ASYNC_CONN.getAdmin().isTableDisabled(TABLE_NAME).get()) {
ASYNC_CONN.getAdmin().enableTable(TABLE_NAME).get();
}
}
@Test
@ -283,14 +291,14 @@ public class TestAsyncTable {
public void testMutateRow() throws InterruptedException, ExecutionException, IOException {
AsyncTable<?> table = getTable.get();
RowMutations mutation = new RowMutations(row);
mutation.add(new Put(row).addColumn(FAMILY, concat(QUALIFIER, 1), VALUE));
mutation.add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, 1), VALUE));
table.mutateRow(mutation).get();
Result result = table.get(new Get(row)).get();
assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 1)));
mutation = new RowMutations(row);
mutation.add(new Delete(row).addColumn(FAMILY, concat(QUALIFIER, 1)));
mutation.add(new Put(row).addColumn(FAMILY, concat(QUALIFIER, 2), VALUE));
mutation.add((Mutation) new Delete(row).addColumn(FAMILY, concat(QUALIFIER, 1)));
mutation.add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, 2), VALUE));
table.mutateRow(mutation).get();
result = table.get(new Get(row)).get();
assertNull(result.getValue(FAMILY, concat(QUALIFIER, 1)));
@ -314,8 +322,9 @@ public class TestAsyncTable {
IntStream.range(0, count).forEach(i -> {
RowMutations mutation = new RowMutations(row);
try {
mutation.add(new Delete(row).addColumn(FAMILY, QUALIFIER));
mutation.add(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), concat(VALUE, i)));
mutation.add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER));
mutation
.add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), concat(VALUE, i)));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
@ -400,4 +409,17 @@ public class TestAsyncTable {
.get();
assertTrue(ok);
}
@Test
public void testDisabled() throws InterruptedException, ExecutionException {
ASYNC_CONN.getAdmin().disableTable(TABLE_NAME).get();
try {
getTable.get().get(new Get(row)).get();
fail("Should fail since table has been disabled");
} catch (ExecutionException e) {
Throwable cause = e.getCause();
assertThat(cause, instanceOf(TableNotEnabledException.class));
assertThat(cause.getMessage(), containsString(TABLE_NAME.getNameAsString()));
}
}
}