HBASE-19643 Need to update cache location when get error in AsyncBatchRpcRetryingCaller

This commit is contained in:
Guanghao Zhang 2017-12-27 17:39:52 +08:00
parent 53233be5a9
commit 8580ba7b6f
2 changed files with 29 additions and 0 deletions

View File

@ -262,6 +262,7 @@ class AsyncBatchRpcRetryingCaller<T> {
} else if (result instanceof Throwable) { } else if (result instanceof Throwable) {
Throwable error = translateException((Throwable) result); Throwable error = translateException((Throwable) result);
logException(tries, () -> Stream.of(regionReq), error, serverName); logException(tries, () -> Stream.of(regionReq), error, serverName);
conn.getLocator().updateCachedLocation(regionReq.loc, error);
if (error instanceof DoNotRetryIOException || tries >= maxAttempts) { if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
failOne(action, tries, error, EnvironmentEdgeManager.currentTime(), failOne(action, tries, error, EnvironmentEdgeManager.currentTime(),
getExtraContextForError(serverName)); getExtraContextForError(serverName));
@ -364,6 +365,8 @@ class AsyncBatchRpcRetryingCaller<T> {
ServerName serverName) { ServerName serverName) {
Throwable error = translateException(t); Throwable error = translateException(t);
logException(tries, () -> actionsByRegion.values().stream(), error, serverName); logException(tries, () -> actionsByRegion.values().stream(), error, serverName);
actionsByRegion
.forEach((rn, regionReq) -> conn.getLocator().updateCachedLocation(regionReq.loc, error));
if (error instanceof DoNotRetryIOException || tries >= maxAttempts) { if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
failAll(actionsByRegion.values().stream().flatMap(r -> r.actions.stream()), tries, error, failAll(actionsByRegion.values().stream().flatMap(r -> r.actions.stream()), tries, error,
serverName); serverName);

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client;
import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@ -178,6 +179,31 @@ public class TestAsyncTableBatch {
results.forEach(r -> assertTrue(r.isEmpty())); results.forEach(r -> assertTrue(r.isEmpty()));
} }
@Test
public void testWithRegionServerFailover() throws Exception {
AsyncTable<?> table = tableGetter.apply(TABLE_NAME);
table.putAll(IntStream.range(0, COUNT)
.mapToObj(i -> new Put(getRow(i)).addColumn(FAMILY, CQ, Bytes.toBytes(i)))
.collect(Collectors.toList())).get();
TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).abort("Aborting for tests");
Thread.sleep(100);
table.putAll(IntStream.range(COUNT, 2 * COUNT)
.mapToObj(i -> new Put(getRow(i)).addColumn(FAMILY, CQ, Bytes.toBytes(i)))
.collect(Collectors.toList())).get();
List<Result> results = table.getAll(
IntStream.range(0, 2 * COUNT).mapToObj(i -> new Get(getRow(i))).collect(Collectors.toList()))
.get();
assertEquals(2 * COUNT, results.size());
results.forEach(r -> assertFalse(r.isEmpty()));
table.deleteAll(IntStream.range(0, 2 * COUNT).mapToObj(i -> new Delete(getRow(i)))
.collect(Collectors.toList())).get();
results = table.getAll(
IntStream.range(0, 2 * COUNT).mapToObj(i -> new Get(getRow(i))).collect(Collectors.toList()))
.get();
assertEquals(2 * COUNT, results.size());
results.forEach(r -> assertTrue(r.isEmpty()));
}
@Test @Test
public void testMixed() throws InterruptedException, ExecutionException, IOException { public void testMixed() throws InterruptedException, ExecutionException, IOException {
AsyncTable<?> table = tableGetter.apply(TABLE_NAME); AsyncTable<?> table = tableGetter.apply(TABLE_NAME);