HBASE-17251 Add a timeout parameter when locating region

This commit is contained in:
zhangduo 2016-12-07 16:57:04 +08:00
parent c1293cc91e
commit b3ae87bd7d
7 changed files with 250 additions and 64 deletions

View File

@ -24,6 +24,8 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR;
import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey; import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey;
import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLED_KEY; import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLED_KEY;
import com.google.common.annotations.VisibleForTesting;
import io.netty.util.HashedWheelTimer; import io.netty.util.HashedWheelTimer;
import java.io.IOException; import java.io.IOException;
@ -56,7 +58,8 @@ class AsyncConnectionImpl implements AsyncConnection {
private static final Log LOG = LogFactory.getLog(AsyncConnectionImpl.class); private static final Log LOG = LogFactory.getLog(AsyncConnectionImpl.class);
private static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer( @VisibleForTesting
static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer(
Threads.newDaemonThreadFactory("Async-Client-Retry-Timer"), 10, TimeUnit.MILLISECONDS); Threads.newDaemonThreadFactory("Async-Client-Retry-Timer"), 10, TimeUnit.MILLISECONDS);
private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure"; private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure";
@ -92,7 +95,7 @@ class AsyncConnectionImpl implements AsyncConnection {
this.conf = conf; this.conf = conf;
this.user = user; this.user = user;
this.connConf = new AsyncConnectionConfiguration(conf); this.connConf = new AsyncConnectionConfiguration(conf);
this.locator = new AsyncRegionLocator(this); this.locator = new AsyncRegionLocator(this, RETRY_TIMER);
this.registry = AsyncRegistryFactory.getRegistry(conf); this.registry = AsyncRegistryFactory.getRegistry(conf);
this.clusterId = Optional.ofNullable(registry.getClusterId()).orElseGet(() -> { this.clusterId = Optional.ofNullable(registry.getClusterId()).orElseGet(() -> {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {

View File

@ -27,6 +27,9 @@ import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.findExcept
import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.isMetaClearingException; import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.isMetaClearingException;
import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent; import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
@ -36,9 +39,11 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Function; import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -50,6 +55,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
/** /**
@ -62,6 +68,8 @@ class AsyncRegionLocator {
private final AsyncConnectionImpl conn; private final AsyncConnectionImpl conn;
private final HashedWheelTimer retryTimer;
private final AtomicReference<HRegionLocation> metaRegionLocation = new AtomicReference<>(); private final AtomicReference<HRegionLocation> metaRegionLocation = new AtomicReference<>();
private final AtomicReference<CompletableFuture<HRegionLocation>> metaRelocateFuture = private final AtomicReference<CompletableFuture<HRegionLocation>> metaRelocateFuture =
@ -70,8 +78,9 @@ class AsyncRegionLocator {
private final ConcurrentMap<TableName, ConcurrentNavigableMap<byte[], HRegionLocation>> cache = private final ConcurrentMap<TableName, ConcurrentNavigableMap<byte[], HRegionLocation>> cache =
new ConcurrentHashMap<>(); new ConcurrentHashMap<>();
AsyncRegionLocator(AsyncConnectionImpl conn) { AsyncRegionLocator(AsyncConnectionImpl conn, HashedWheelTimer retryTimer) {
this.conn = conn; this.conn = conn;
this.retryTimer = retryTimer;
} }
private CompletableFuture<HRegionLocation> locateMetaRegion() { private CompletableFuture<HRegionLocation> locateMetaRegion() {
@ -249,9 +258,6 @@ class AsyncRegionLocator {
return; return;
} }
otherCheck.accept(loc); otherCheck.accept(loc);
if (future.isDone()) {
return;
}
addToCache(loc); addToCache(loc);
future.complete(loc); future.complete(loc);
} }
@ -282,12 +288,34 @@ class AsyncRegionLocator {
return locateInMeta(tableName, row); return locateInMeta(tableName, row);
} }
CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row) { private CompletableFuture<HRegionLocation> withTimeout(CompletableFuture<HRegionLocation> future,
if (tableName.equals(META_TABLE_NAME)) { long timeoutNs, Supplier<String> timeoutMsg) {
return locateMetaRegion(); if (future.isDone() || timeoutNs <= 0) {
} else { return future;
return locateRegion(tableName, row);
} }
CompletableFuture<HRegionLocation> timeoutFuture = new CompletableFuture<>();
Timeout timeoutTask = retryTimer.newTimeout(
t -> timeoutFuture.completeExceptionally(new TimeoutIOException(timeoutMsg.get())), timeoutNs,
TimeUnit.NANOSECONDS);
future.whenComplete((loc, error) -> {
timeoutTask.cancel();
if (error != null) {
timeoutFuture.completeExceptionally(error);
} else {
timeoutFuture.complete(loc);
}
});
return timeoutFuture;
}
CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
long timeoutNs) {
CompletableFuture<HRegionLocation> future =
tableName.equals(META_TABLE_NAME) ? locateMetaRegion() : locateRegion(tableName, row);
return withTimeout(future, timeoutNs,
() -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs)
+ "ms) waiting for region location for " + tableName + ", row='"
+ Bytes.toStringBinary(row) + "'");
} }
private HRegionLocation locatePreviousInCache(TableName tableName, private HRegionLocation locatePreviousInCache(TableName tableName,
@ -356,14 +384,18 @@ class AsyncRegionLocator {
/** /**
* Locate the previous region using the current regions start key. Used for reverse scan. * Locate the previous region using the current regions start key. Used for reverse scan.
* <p>
* TODO: need to deal with region merge where the startRowOfCurrentRegion will not be the endRow
* of a region.
*/ */
CompletableFuture<HRegionLocation> getPreviousRegionLocation(TableName tableName, CompletableFuture<HRegionLocation> getPreviousRegionLocation(TableName tableName,
byte[] startRowOfCurrentRegion) { byte[] startRowOfCurrentRegion, long timeoutNs) {
if (tableName.equals(META_TABLE_NAME)) { CompletableFuture<HRegionLocation> future = tableName.equals(META_TABLE_NAME)
return locateMetaRegion(); ? locateMetaRegion() : locatePreviousRegion(tableName, startRowOfCurrentRegion);
} else { return withTimeout(future, timeoutNs,
return locatePreviousRegion(tableName, startRowOfCurrentRegion); () -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs)
} + "ms) waiting for region location for " + tableName + ", startRowOfCurrentRegion='"
+ Bytes.toStringBinary(startRowOfCurrentRegion) + "'");
} }
private boolean canUpdate(HRegionLocation loc, HRegionLocation oldLoc) { private boolean canUpdate(HRegionLocation loc, HRegionLocation oldLoc) {

View File

@ -30,6 +30,7 @@ import java.util.List;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier; import java.util.function.Supplier;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -51,6 +52,9 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
private static final Log LOG = LogFactory.getLog(AsyncSingleRequestRpcRetryingCaller.class); private static final Log LOG = LogFactory.getLog(AsyncSingleRequestRpcRetryingCaller.class);
// Add a delta to avoid timeout immediately after a retry sleeping.
private static final long SLEEP_DELTA_NS = TimeUnit.MILLISECONDS.toNanos(1);
@FunctionalInterface @FunctionalInterface
public interface Callable<T> { public interface Callable<T> {
CompletableFuture<T> call(HBaseRpcController controller, HRegionLocation loc, CompletableFuture<T> call(HBaseRpcController controller, HRegionLocation loc,
@ -65,7 +69,7 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
private final byte[] row; private final byte[] row;
private final Supplier<CompletableFuture<HRegionLocation>> locate; private final Function<Long, CompletableFuture<HRegionLocation>> locate;
private final Callable<T> callable; private final Callable<T> callable;
@ -118,6 +122,10 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs); return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs);
} }
private long remainingTimeNs() {
return operationTimeoutNs - (System.nanoTime() - startNs);
}
private void completeExceptionally() { private void completeExceptionally() {
future.completeExceptionally(new RetriesExhaustedException(tries, exceptions)); future.completeExceptionally(new RetriesExhaustedException(tries, exceptions));
} }
@ -138,7 +146,7 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
} }
long delayNs; long delayNs;
if (operationTimeoutNs > 0) { if (operationTimeoutNs > 0) {
long maxDelayNs = operationTimeoutNs - (System.nanoTime() - startNs); long maxDelayNs = operationTimeoutNs - (System.nanoTime() - startNs) - SLEEP_DELTA_NS;
if (maxDelayNs <= 0) { if (maxDelayNs <= 0) {
completeExceptionally(); completeExceptionally();
return; return;
@ -153,6 +161,17 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
} }
private void call(HRegionLocation loc) { private void call(HRegionLocation loc) {
long callTimeoutNs;
if (operationTimeoutNs > 0) {
callTimeoutNs = remainingTimeNs();
if (callTimeoutNs <= 0) {
completeExceptionally();
return;
}
callTimeoutNs = Math.min(callTimeoutNs, rpcTimeoutNs);
} else {
callTimeoutNs = rpcTimeoutNs;
}
ClientService.Interface stub; ClientService.Interface stub;
try { try {
stub = conn.getRegionServerStub(loc.getServerName()); stub = conn.getRegionServerStub(loc.getServerName());
@ -166,7 +185,7 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
err -> conn.getLocator().updateCachedLocation(loc, err)); err -> conn.getLocator().updateCachedLocation(loc, err));
return; return;
} }
resetController(controller, rpcTimeoutNs); resetController(controller, callTimeoutNs);
callable.call(controller, loc, stub).whenComplete((result, error) -> { callable.call(controller, loc, stub).whenComplete((result, error) -> {
if (error != null) { if (error != null) {
onError(error, onError(error,
@ -183,7 +202,17 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
} }
private void locateThenCall() { private void locateThenCall() {
locate.get().whenComplete((loc, error) -> { long locateTimeoutNs;
if (operationTimeoutNs > 0) {
locateTimeoutNs = remainingTimeNs();
if (locateTimeoutNs <= 0) {
completeExceptionally();
return;
}
} else {
locateTimeoutNs = -1L;
}
locate.apply(locateTimeoutNs).whenComplete((loc, error) -> {
if (error != null) { if (error != null) {
onError(error, onError(error,
() -> "Locate '" + Bytes.toStringBinary(row) + "' in " + tableName + " failed, tries = " () -> "Locate '" + Bytes.toStringBinary(row) + "' in " + tableName + " failed, tries = "
@ -198,12 +227,12 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
}); });
} }
private CompletableFuture<HRegionLocation> locate() { private CompletableFuture<HRegionLocation> locate(long timeoutNs) {
return conn.getLocator().getRegionLocation(tableName, row); return conn.getLocator().getRegionLocation(tableName, row, timeoutNs);
} }
private CompletableFuture<HRegionLocation> locatePrevious() { private CompletableFuture<HRegionLocation> locatePrevious(long timeoutNs) {
return conn.getLocator().getPreviousRegionLocation(tableName, row); return conn.getLocator().getPreviousRegionLocation(tableName, row, timeoutNs);
} }
public CompletableFuture<T> call() { public CompletableFuture<T> call() {

View File

@ -45,6 +45,6 @@ class AsyncTableRegionLocatorImpl implements AsyncTableRegionLocator {
@Override @Override
public CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, boolean reload) { public CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, boolean reload) {
return locator.getRegionLocation(tableName, row); return locator.getRegionLocation(tableName, row, 0L);
} }
} }

View File

@ -102,12 +102,12 @@ public class TestAsyncRegionLocator {
@Test @Test
public void testNoTable() throws InterruptedException { public void testNoTable() throws InterruptedException {
try { try {
LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get(); LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, 0L).get();
} catch (ExecutionException e) { } catch (ExecutionException e) {
assertThat(e.getCause(), instanceOf(TableNotFoundException.class)); assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
} }
try { try {
LOCATOR.getPreviousRegionLocation(TABLE_NAME, EMPTY_END_ROW).get(); LOCATOR.getPreviousRegionLocation(TABLE_NAME, EMPTY_END_ROW, 0L).get();
} catch (ExecutionException e) { } catch (ExecutionException e) {
assertThat(e.getCause(), instanceOf(TableNotFoundException.class)); assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
} }
@ -118,12 +118,12 @@ public class TestAsyncRegionLocator {
createSingleRegionTable(); createSingleRegionTable();
TEST_UTIL.getAdmin().disableTable(TABLE_NAME); TEST_UTIL.getAdmin().disableTable(TABLE_NAME);
try { try {
LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get(); LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, 0L).get();
} catch (ExecutionException e) { } catch (ExecutionException e) {
assertThat(e.getCause(), instanceOf(TableNotFoundException.class)); assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
} }
try { try {
LOCATOR.getPreviousRegionLocation(TABLE_NAME, EMPTY_END_ROW).get(); LOCATOR.getPreviousRegionLocation(TABLE_NAME, EMPTY_END_ROW, 0L).get();
} catch (ExecutionException e) { } catch (ExecutionException e) {
assertThat(e.getCause(), instanceOf(TableNotFoundException.class)); assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
} }
@ -143,17 +143,17 @@ public class TestAsyncRegionLocator {
createSingleRegionTable(); createSingleRegionTable();
ServerName serverName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName(); ServerName serverName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName();
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName, assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get()); LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, 0L).get());
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName, assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
LOCATOR.getPreviousRegionLocation(TABLE_NAME, EMPTY_START_ROW).get()); LOCATOR.getPreviousRegionLocation(TABLE_NAME, EMPTY_START_ROW, 0L).get());
byte[] randKey = new byte[ThreadLocalRandom.current().nextInt(128)]; byte[] randKey = new byte[ThreadLocalRandom.current().nextInt(128)];
ThreadLocalRandom.current().nextBytes(randKey); ThreadLocalRandom.current().nextBytes(randKey);
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName, assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
LOCATOR.getRegionLocation(TABLE_NAME, randKey).get()); LOCATOR.getRegionLocation(TABLE_NAME, randKey, 0L).get());
// Use a key which is not the endKey of a region will cause error // Use a key which is not the endKey of a region will cause error
try { try {
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName, assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
LOCATOR.getPreviousRegionLocation(TABLE_NAME, new byte[] { 1 }).get()); LOCATOR.getPreviousRegionLocation(TABLE_NAME, new byte[] { 1 }, 0L).get());
} catch (ExecutionException e) { } catch (ExecutionException e) {
assertThat(e.getCause(), instanceOf(IOException.class)); assertThat(e.getCause(), instanceOf(IOException.class));
assertTrue(e.getCause().getMessage().contains("end key of")); assertTrue(e.getCause().getMessage().contains("end key of"));
@ -193,7 +193,7 @@ public class TestAsyncRegionLocator {
IntStream.range(0, 2).forEach(n -> IntStream.range(0, startKeys.length).forEach(i -> { IntStream.range(0, 2).forEach(n -> IntStream.range(0, startKeys.length).forEach(i -> {
try { try {
assertLocEquals(startKeys[i], i == startKeys.length - 1 ? EMPTY_END_ROW : startKeys[i + 1], assertLocEquals(startKeys[i], i == startKeys.length - 1 ? EMPTY_END_ROW : startKeys[i + 1],
serverNames[i], LOCATOR.getRegionLocation(TABLE_NAME, startKeys[i]).get()); serverNames[i], LOCATOR.getRegionLocation(TABLE_NAME, startKeys[i], 0L).get());
} catch (InterruptedException | ExecutionException e) { } catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
@ -204,7 +204,7 @@ public class TestAsyncRegionLocator {
n -> IntStream.range(0, endKeys.length).map(i -> endKeys.length - 1 - i).forEach(i -> { n -> IntStream.range(0, endKeys.length).map(i -> endKeys.length - 1 - i).forEach(i -> {
try { try {
assertLocEquals(i == 0 ? EMPTY_START_ROW : endKeys[i - 1], endKeys[i], serverNames[i], assertLocEquals(i == 0 ? EMPTY_START_ROW : endKeys[i - 1], endKeys[i], serverNames[i],
LOCATOR.getPreviousRegionLocation(TABLE_NAME, endKeys[i]).get()); LOCATOR.getPreviousRegionLocation(TABLE_NAME, endKeys[i], 0L).get());
} catch (InterruptedException | ExecutionException e) { } catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
@ -215,7 +215,7 @@ public class TestAsyncRegionLocator {
public void testRegionMove() throws IOException, InterruptedException, ExecutionException { public void testRegionMove() throws IOException, InterruptedException, ExecutionException {
createSingleRegionTable(); createSingleRegionTable();
ServerName serverName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName(); ServerName serverName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName();
HRegionLocation loc = LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get(); HRegionLocation loc = LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, 0L).get();
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName, loc); assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName, loc);
ServerName newServerName = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream() ServerName newServerName = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
.map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName)) .map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName))
@ -228,12 +228,12 @@ public class TestAsyncRegionLocator {
Thread.sleep(100); Thread.sleep(100);
} }
// Should be same as it is in cache // Should be same as it is in cache
assertSame(loc, LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get()); assertSame(loc, LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, 0L).get());
LOCATOR.updateCachedLocation(loc, null); LOCATOR.updateCachedLocation(loc, null);
// null error will not trigger a cache cleanup // null error will not trigger a cache cleanup
assertSame(loc, LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get()); assertSame(loc, LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, 0L).get());
LOCATOR.updateCachedLocation(loc, new NotServingRegionException()); LOCATOR.updateCachedLocation(loc, new NotServingRegionException());
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName, assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName,
LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get()); LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, 0L).get());
} }
} }

View File

@ -0,0 +1,120 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT;
import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.REGION_COPROCESSOR_CONF_KEY;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ MediumTests.class, ClientTests.class })
public class TestAsyncRegionLocatorTimeout {
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static TableName TABLE_NAME = TableName.valueOf("async");
private static byte[] FAMILY = Bytes.toBytes("cf");
private static AsyncConnectionImpl CONN;
private static AsyncRegionLocator LOCATOR;
private static volatile long SLEEP_MS = 0L;
public static class SleepRegionObserver extends BaseRegionObserver {
@Override
public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, Scan scan,
RegionScanner s) throws IOException {
if (SLEEP_MS > 0) {
Threads.sleepWithoutInterrupt(SLEEP_MS);
}
return super.preScannerOpen(e, scan, s);
}
}
@BeforeClass
public static void setUp() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
conf.set(REGION_COPROCESSOR_CONF_KEY, SleepRegionObserver.class.getName());
conf.setLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, 2000);
TEST_UTIL.startMiniCluster(1);
TEST_UTIL.createTable(TABLE_NAME, FAMILY);
TEST_UTIL.waitTableAvailable(TABLE_NAME);
CONN = new AsyncConnectionImpl(conf, User.getCurrent());
LOCATOR = CONN.getLocator();
}
@AfterClass
public static void tearDown() throws Exception {
CONN.close();
TEST_UTIL.shutdownMiniCluster();
}
@Test
public void test() throws InterruptedException, ExecutionException {
SLEEP_MS = 1000;
long startNs = System.nanoTime();
try {
LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, TimeUnit.MILLISECONDS.toNanos(500))
.get();
fail();
} catch (ExecutionException e) {
e.printStackTrace();
assertThat(e.getCause(), instanceOf(TimeoutIOException.class));
}
long costMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs);
assertTrue(costMs >= 500);
assertTrue(costMs < 1000);
// wait for the background task finish
Thread.sleep(2000);
// Now the location should be in cache, so we will not visit meta again.
HRegionLocation loc = LOCATOR
.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, TimeUnit.MILLISECONDS.toNanos(500)).get();
assertEquals(loc.getServerName(),
TEST_UTIL.getHBaseCluster().getRegionServer(0).getServerName());
}
}

View File

@ -150,33 +150,35 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
AtomicBoolean errorTriggered = new AtomicBoolean(false); AtomicBoolean errorTriggered = new AtomicBoolean(false);
AtomicInteger count = new AtomicInteger(0); AtomicInteger count = new AtomicInteger(0);
HRegionLocation loc = asyncConn.getRegionLocator(TABLE_NAME).getRegionLocation(ROW).get(); HRegionLocation loc = asyncConn.getRegionLocator(TABLE_NAME).getRegionLocation(ROW).get();
AsyncRegionLocator mockedLocator = new AsyncRegionLocator(asyncConn) { AsyncRegionLocator mockedLocator =
@Override new AsyncRegionLocator(asyncConn, AsyncConnectionImpl.RETRY_TIMER) {
CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row) { @Override
if (tableName.equals(TABLE_NAME)) { CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
CompletableFuture<HRegionLocation> future = new CompletableFuture<>(); long timeoutNs) {
if (count.getAndIncrement() == 0) { if (tableName.equals(TABLE_NAME)) {
errorTriggered.set(true); CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
future.completeExceptionally(new RuntimeException("Inject error!")); if (count.getAndIncrement() == 0) {
} else { errorTriggered.set(true);
future.complete(loc); future.completeExceptionally(new RuntimeException("Inject error!"));
} else {
future.complete(loc);
}
return future;
} else {
return super.getRegionLocation(tableName, row, timeoutNs);
}
} }
return future;
} else {
return super.getRegionLocation(tableName, row);
}
}
@Override @Override
CompletableFuture<HRegionLocation> getPreviousRegionLocation(TableName tableName, CompletableFuture<HRegionLocation> getPreviousRegionLocation(TableName tableName,
byte[] startRowOfCurrentRegion) { byte[] startRowOfCurrentRegion, long timeoutNs) {
return super.getPreviousRegionLocation(tableName, startRowOfCurrentRegion); return super.getPreviousRegionLocation(tableName, startRowOfCurrentRegion, timeoutNs);
} }
@Override @Override
void updateCachedLocation(HRegionLocation loc, Throwable exception) { void updateCachedLocation(HRegionLocation loc, Throwable exception) {
} }
}; };
try (AsyncConnectionImpl mockedConn = try (AsyncConnectionImpl mockedConn =
new AsyncConnectionImpl(asyncConn.getConfiguration(), User.getCurrent()) { new AsyncConnectionImpl(asyncConn.getConfiguration(), User.getCurrent()) {