HBASE-27531 AsyncRequestFutureImpl unnecessarily clears meta cache for full server (#4930)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
26b022116b
commit
06c7548770
|
@ -989,7 +989,7 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
|
|||
}
|
||||
|
||||
private void cleanServerCache(ServerName server, Throwable regionException) {
|
||||
if (ClientExceptionsUtil.isMetaClearingException(regionException)) {
|
||||
if (tableName == null && ClientExceptionsUtil.isMetaClearingException(regionException)) {
|
||||
// We want to make sure to clear the cache in case there were location-related exceptions.
|
||||
// We don't to clear the cache for every possible exception that comes through, however.
|
||||
MetricsConnection metrics = asyncProcess.connection.getConnectionMetrics();
|
||||
|
|
|
@ -30,6 +30,7 @@ import java.util.HashMap;
|
|||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
|
@ -319,19 +320,66 @@ public class TestAsyncProcess {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Used to simulate the case where a RegionServer responds to a multi request, but some or all of
|
||||
* the actions have an Exception instead of Result. These responses go through receiveMultiAction,
|
||||
* which has handling for individual action failures.
|
||||
*/
|
||||
static class CallerWithRegionException extends RpcRetryingCallerImpl<AbstractResponse> {
|
||||
|
||||
private final IOException e;
|
||||
private MultiAction multi;
|
||||
|
||||
public CallerWithRegionException(IOException e, MultiAction multi) {
|
||||
super(100, 500, 100, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, 9, 0, null);
|
||||
this.e = e;
|
||||
this.multi = multi;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AbstractResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable,
|
||||
int callTimeout) throws IOException, RuntimeException {
|
||||
MultiResponse response = new MultiResponse();
|
||||
for (Entry<byte[], List<Action>> entry : multi.actions.entrySet()) {
|
||||
response.addException(entry.getKey(), e);
|
||||
}
|
||||
return response;
|
||||
}
|
||||
}
|
||||
|
||||
static class AsyncProcessWithFailure extends MyAsyncProcess {
|
||||
|
||||
private final IOException ioe;
|
||||
private final ServerName failingServer;
|
||||
private final boolean returnAsRegionException;
|
||||
|
||||
public AsyncProcessWithFailure(ClusterConnection hc, Configuration conf, IOException ioe) {
|
||||
super(hc, conf);
|
||||
public AsyncProcessWithFailure(ClusterConnection hc, Configuration myConf, IOException ioe,
|
||||
ServerName failingServer, boolean returnAsRegionException) {
|
||||
super(hc, myConf);
|
||||
this.ioe = ioe;
|
||||
this.failingServer = failingServer;
|
||||
this.returnAsRegionException = returnAsRegionException;
|
||||
serverTrackerTimeout = 1L;
|
||||
}
|
||||
|
||||
public AsyncProcessWithFailure(ClusterConnection hc, Configuration myConf, IOException ioe) {
|
||||
this(hc, myConf, ioe, null, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RpcRetryingCaller<AbstractResponse>
|
||||
createCaller(CancellableRegionServerCallable callable, int rpcTimeout) {
|
||||
MultiServerCallable msc = (MultiServerCallable) callable;
|
||||
if (failingServer != null) {
|
||||
if (!msc.getServerName().equals(failingServer)) {
|
||||
return super.createCaller(callable, rpcTimeout);
|
||||
}
|
||||
}
|
||||
|
||||
if (returnAsRegionException) {
|
||||
return new CallerWithRegionException(ioe, msc.getMulti());
|
||||
}
|
||||
|
||||
callsCt.incrementAndGet();
|
||||
return new CallerWithFailure(ioe);
|
||||
}
|
||||
|
@ -1754,14 +1802,35 @@ public class TestAsyncProcess {
|
|||
Assert.assertTrue("Slept for too long: " + actualSleep + "ms", actualSleep <= expectedSleep);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that we properly recover from exceptions that DO NOT go through receiveGlobalFailure, due
|
||||
* to updating the meta cache for the region which failed. Successful multigets can include region
|
||||
* exceptions in the MultiResponse. In that case, it skips receiveGlobalFailure and instead
|
||||
* handles in receiveMultiAction
|
||||
*/
|
||||
@Test
|
||||
public void testRetryWithExceptionClearsMetaCache() throws Exception {
|
||||
public void testRetryWithExceptionClearsMetaCacheUsingRegionException() throws Exception {
|
||||
testRetryWithExceptionClearsMetaCache(true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that we properly recover from exceptions that go through receiveGlobalFailure, due to
|
||||
* updating the meta cache for the region which failed.
|
||||
*/
|
||||
@Test
|
||||
public void testRetryWithExceptionClearsMetaCacheUsingServerException() throws Exception {
|
||||
testRetryWithExceptionClearsMetaCache(false);
|
||||
}
|
||||
|
||||
private void testRetryWithExceptionClearsMetaCache(boolean useRegionException)
|
||||
throws IOException {
|
||||
Configuration myConf = new Configuration(CONF);
|
||||
myConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
|
||||
myConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
|
||||
ClusterConnection conn = createHConnection(new ConnectionConfiguration(myConf));
|
||||
|
||||
AsyncProcessWithFailure ap =
|
||||
new AsyncProcessWithFailure(conn, myConf, new RegionOpeningException("test"));
|
||||
// we pass in loc1.getServerName here so that only calls to that server will fail
|
||||
AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, myConf,
|
||||
new RegionOpeningException("test"), loc1.getServerName(), useRegionException);
|
||||
BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
|
||||
BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
|
||||
|
||||
|
@ -1770,20 +1839,33 @@ public class TestAsyncProcess {
|
|||
Assert.assertEquals(conn.locateRegion(DUMMY_TABLE, DUMMY_BYTES_1, true, true).toString(),
|
||||
new RegionLocations(loc1).toString());
|
||||
|
||||
Mockito.verify(conn, Mockito.times(0)).clearCaches(Mockito.any());
|
||||
// simulate updateCachedLocations, by changing the loc for this row to loc3. only loc1 fails,
|
||||
// so this means retry will succeed
|
||||
Mockito.doAnswer(invocation -> {
|
||||
setMockLocation(conn, DUMMY_BYTES_1, new RegionLocations(loc3));
|
||||
return null;
|
||||
}).when(conn).updateCachedLocations(Mockito.eq(DUMMY_TABLE),
|
||||
Mockito.eq(loc1.getRegion().getRegionName()), Mockito.eq(DUMMY_BYTES_1), Mockito.any(),
|
||||
Mockito.eq(loc1.getServerName()));
|
||||
|
||||
// Ensure we haven't called updateCachedLocations yet
|
||||
Mockito.verify(conn, Mockito.times(0)).updateCachedLocations(Mockito.any(), Mockito.any(),
|
||||
Mockito.any(), Mockito.any(), Mockito.any());
|
||||
|
||||
Put p = createPut(1, true);
|
||||
mutator.mutate(p);
|
||||
|
||||
try {
|
||||
mutator.flush();
|
||||
Assert.fail();
|
||||
} catch (RetriesExhaustedWithDetailsException expected) {
|
||||
assertEquals(1, expected.getNumExceptions());
|
||||
assertTrue(expected.getRow(0) == p);
|
||||
}
|
||||
// we expect this to succeed because the bad region location should be updated upon
|
||||
// the initial failure causing retries to succeed.
|
||||
mutator.flush();
|
||||
|
||||
Mockito.verify(conn, Mockito.times(1)).clearCaches(loc1.getServerName());
|
||||
// validate that we updated the location, as we expected
|
||||
Assert.assertEquals(conn.locateRegion(DUMMY_TABLE, DUMMY_BYTES_1, true, true).toString(),
|
||||
new RegionLocations(loc3).toString());
|
||||
// this is a given since the location updated, but validate that we called updateCachedLocations
|
||||
Mockito.verify(conn, Mockito.atLeastOnce()).updateCachedLocations(Mockito.eq(DUMMY_TABLE),
|
||||
Mockito.eq(loc1.getRegion().getRegionName()), Mockito.eq(DUMMY_BYTES_1), Mockito.any(),
|
||||
Mockito.eq(loc1.getServerName()));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue