HBASE-11347 For some errors, the client can retry infinitely

This commit is contained in:
Nicolas Liochon 2014-06-14 08:45:07 +02:00
parent 84ed7cf645
commit 3fa92647d2
2 changed files with 98 additions and 52 deletions

View File

@ -522,9 +522,6 @@ class AsyncProcess {
private final Object[] results;
private final long nonceGroup;
@VisibleForTesting
protected AtomicInteger hardRetryLimit = null; // used for tests to stop retries.
public AsyncRequestFutureImpl(TableName tableName, List<Action<Row>> actions, long nonceGroup,
ExecutorService pool, boolean needResults, Object[] results,
Batch.Callback<CResult> callback) {
@ -558,7 +555,7 @@ class AsyncProcess {
final Map<ServerName, MultiAction<Row>> actionsByServer =
new HashMap<ServerName, MultiAction<Row>>();
HRegionLocation loc = null;
HRegionLocation loc;
for (Action<Row> action : currentActions) {
try {
loc = findDestLocation(tableName, action.getAction());
@ -661,10 +658,6 @@ class AsyncProcess {
canRetry = false;
}
if (canRetry && hardRetryLimit != null) {
canRetry = hardRetryLimit.decrementAndGet() >= 0;
}
if (!canRetry) {
// Batch.Callback<Res> was not called on failure in 0.94. We keep this.
errors.add(throwable, row, server);
@ -692,11 +685,12 @@ class AsyncProcess {
byte[] row = rsActions.actions.values().iterator().next().get(0).getAction().getRow();
hConnection.updateCachedLocations(tableName, row, null, server);
errorsByServer.reportServerError(server);
boolean canRetry = errorsByServer.canRetryMore(numAttempt);
List<Action<Row>> toReplay = new ArrayList<Action<Row>>();
for (Map.Entry<byte[], List<Action<Row>>> e : rsActions.actions.entrySet()) {
for (Action<Row> action : e.getValue()) {
if (manageError(action.getOriginalIndex(), action.getAction(), true, t, server)) {
if (manageError(action.getOriginalIndex(), action.getAction(), canRetry, t, server)) {
toReplay.add(action);
}
}

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
@ -45,6 +46,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
@ -77,10 +79,32 @@ public class TestAsyncProcess {
private static final String success = "success";
private static Exception failure = new Exception("failure");
private static int NB_RETRIES = 3;
@BeforeClass
public static void beforeClass(){
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, NB_RETRIES);
}
static class CountingThreadFactory implements ThreadFactory {
final AtomicInteger nbThreads;
ThreadFactory realFactory = Threads.newDaemonThreadFactory("test-TestAsyncProcess");
@Override
public Thread newThread(Runnable r) {
nbThreads.incrementAndGet();
return realFactory.newThread(r);
}
CountingThreadFactory(AtomicInteger nbThreads){
this.nbThreads = nbThreads;
}
}
static class MyAsyncProcess extends AsyncProcess {
final AtomicInteger nbMultiResponse = new AtomicInteger();
final AtomicInteger nbActions = new AtomicInteger();
public List<AsyncRequestFuture> allReqs = new ArrayList<AsyncRequestFuture>();
public AtomicInteger callsCt = new AtomicInteger();
@Override
protected <Res> AsyncRequestFutureImpl<Res> createAsyncRequestFuture(TableName tableName,
@ -89,36 +113,11 @@ public class TestAsyncProcess {
// Test HTable has tableName of null, so pass DUMMY_TABLE
AsyncRequestFutureImpl<Res> r = super.createAsyncRequestFuture(
DUMMY_TABLE, actions, nonceGroup, pool, callback, results, needResults);
r.hardRetryLimit = new AtomicInteger(1);
allReqs.add(r);
callsCt.incrementAndGet();
return r;
}
@SuppressWarnings("unchecked")
public long getRetriesRequested() {
long result = 0;
for (AsyncRequestFuture ars : allReqs) {
if (ars instanceof AsyncProcess.AsyncRequestFutureImpl) {
result += (1 - ((AsyncRequestFutureImpl<?>)ars).hardRetryLimit.get());
}
}
return result;
}
static class CountingThreadFactory implements ThreadFactory {
final AtomicInteger nbThreads;
ThreadFactory realFactory = Threads.newDaemonThreadFactory("test-TestAsyncProcess");
@Override
public Thread newThread(Runnable r) {
nbThreads.incrementAndGet();
return realFactory.newThread(r);
}
CountingThreadFactory(AtomicInteger nbThreads){
this.nbThreads = nbThreads;
}
}
public MyAsyncProcess(ClusterConnection hc, Configuration conf) {
this(hc, conf, new AtomicInteger());
}
@ -136,6 +135,17 @@ public class TestAsyncProcess {
new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf));
}
public MyAsyncProcess(
ClusterConnection hc, Configuration conf, boolean useGlobalErrors, boolean dummy) {
super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), new CountingThreadFactory(new AtomicInteger())) {
public void execute(Runnable command) {
throw new RejectedExecutionException("test under failure");
}
},
new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf));
}
@Override
public <Res> AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows,
boolean atLeastOne, Callback<Res> callback, boolean needResults)
@ -146,6 +156,7 @@ public class TestAsyncProcess {
@Override
protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
callsCt.incrementAndGet();
final MultiResponse mr = createMultiResponse(
callable.getMulti(), nbMultiResponse, nbActions);
return new RpcRetryingCaller<MultiResponse>(100, 10) {
@ -166,6 +177,33 @@ public class TestAsyncProcess {
}
}
static class CallerWithFailure extends RpcRetryingCaller<MultiResponse>{
public CallerWithFailure() {
super(100, 100);
}
@Override
public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable, int callTimeout)
throws IOException, RuntimeException {
throw new IOException("test");
}
}
static class AsyncProcessWithFailure extends MyAsyncProcess {
public AsyncProcessWithFailure(ClusterConnection hc, Configuration conf) {
super(hc, conf, true);
serverTrackerTimeout = 1;
}
@Override
protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
callsCt.incrementAndGet();
return new CallerWithFailure();
}
}
static MultiResponse createMultiResponse(
final MultiAction<Row> multi, AtomicInteger nbMultiResponse, AtomicInteger nbActions) {
final MultiResponse mr = new MultiResponse();
@ -188,15 +226,7 @@ public class TestAsyncProcess {
*/
static class MyConnectionImpl extends ConnectionManager.HConnectionImplementation {
final AtomicInteger nbThreads = new AtomicInteger(0);
final static Configuration c = new Configuration();
static {
c.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
}
protected MyConnectionImpl() {
super(c);
}
protected MyConnectionImpl(Configuration conf) {
super(conf);
@ -217,7 +247,7 @@ public class TestAsyncProcess {
final boolean usedRegions[];
protected MyConnectionImpl2(List<HRegionLocation> hrl) {
super(c);
super(conf);
this.hrl = hrl;
this.usedRegions = new boolean[hrl.size()];
}
@ -320,7 +350,7 @@ public class TestAsyncProcess {
Assert.assertEquals(0, puts.size());
ars.waitUntilDone();
verifyResult(ars, false);
Assert.assertEquals(2L, ap.getRetriesRequested());
Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
Assert.assertEquals(1, ars.getErrors().exceptions.size());
Assert.assertTrue("was: " + ars.getErrors().exceptions.get(0),
@ -386,7 +416,8 @@ public class TestAsyncProcess {
Assert.assertTrue(puts.isEmpty());
ars.waitUntilDone();
verifyResult(ars, false, true, true);
Assert.assertEquals(2, ap.getRetriesRequested());
Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
ap.callsCt.set(0);
Assert.assertEquals(1, ars.getErrors().actions.size());
puts.add(createPut(1, true));
@ -395,7 +426,7 @@ public class TestAsyncProcess {
ars = ap.submit(DUMMY_TABLE, puts, false, null, true);
Assert.assertEquals(0, puts.size());
ars.waitUntilDone();
Assert.assertEquals(2, ap.getRetriesRequested());
Assert.assertEquals(2, ap.callsCt.get());
verifyResult(ars, true);
}
@ -411,7 +442,7 @@ public class TestAsyncProcess {
AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, null, true);
ars.waitUntilDone();
verifyResult(ars, false, true, true);
Assert.assertEquals(2, ap.getRetriesRequested());
Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
Assert.assertEquals(1, ars.getFailedOperations().size());
}
@ -608,7 +639,7 @@ public class TestAsyncProcess {
@Test
public void testBatch() throws IOException, InterruptedException {
HTable ht = new HTable();
ht.connection = new MyConnectionImpl();
ht.connection = new MyConnectionImpl(conf);
ht.multiAp = new MyAsyncProcess(ht.connection, conf, false);
List<Put> puts = new ArrayList<Put>();
@ -641,12 +672,11 @@ public class TestAsyncProcess {
HTable ht = new HTable();
Configuration configuration = new Configuration(conf);
configuration.setBoolean(ConnectionManager.RETRIES_BY_SERVER_KEY, true);
configuration.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
// set default writeBufferSize
ht.setWriteBufferSize(configuration.getLong("hbase.client.write.buffer", 2097152));
ht.connection = new MyConnectionImpl(configuration);
MyAsyncProcess ap = new MyAsyncProcess(ht.connection, conf, true);
MyAsyncProcess ap = new MyAsyncProcess(ht.connection, configuration, true);
ht.ap = ap;
Assert.assertNotNull(ht.ap.createServerErrorTracker());
@ -663,7 +693,29 @@ public class TestAsyncProcess {
} catch (RetriesExhaustedWithDetailsException expected) {
}
// Checking that the ErrorsServers came into play and didn't make us stop immediately
Assert.assertEquals(2, ap.getRetriesRequested());
Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
}
@Test
public void testGlobalErrors() throws IOException {
HTable ht = new HTable();
ht.connection = new MyConnectionImpl(conf);
AsyncProcessWithFailure ap = new AsyncProcessWithFailure(ht.connection, conf);
ht.ap = ap;
Assert.assertNotNull(ht.ap.createServerErrorTracker());
Put p = createPut(1, true);
ht.setAutoFlush(false, false);
ht.put(p);
try {
ht.flushCommits();
Assert.fail();
} catch (RetriesExhaustedWithDetailsException expected) {
}
// Checking that the ErrorsServers came into play and didn't make us stop immediately
Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
}
/**