HBASE-13997 ScannerCallableWithReplicas cause Infinitely blocking (Zephyr Guo and Enis)
This commit is contained in:
parent
c16bbf47cb
commit
5e708746b8
|
@ -22,9 +22,7 @@ import static org.apache.hadoop.hbase.client.ClientScanner.createClosestRowBefor
|
|||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.CancellationException;
|
||||
|
@ -158,15 +156,13 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
|
|||
// We want to accomodate some RPCs for redundant replica scans (but are still in progress)
|
||||
ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs =
|
||||
new ResultBoundedCompletionService<Pair<Result[], ScannerCallable>>(
|
||||
new RpcRetryingCallerFactory(ScannerCallableWithReplicas.this.conf), pool,
|
||||
RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf), pool,
|
||||
rl.size() * 5);
|
||||
|
||||
List<ExecutionException> exceptions = null;
|
||||
int submitted = 0, completed = 0;
|
||||
AtomicBoolean done = new AtomicBoolean(false);
|
||||
replicaSwitched.set(false);
|
||||
// submit call for the primary replica.
|
||||
submitted += addCallsForCurrentReplica(cs, rl);
|
||||
addCallsForCurrentReplica(cs, rl);
|
||||
try {
|
||||
// wait for the timeout to see whether the primary responds back
|
||||
Future<Pair<Result[], ScannerCallable>> f = cs.poll(timeBeforeReplicas,
|
||||
|
@ -179,11 +175,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
|
|||
return r == null ? null : r.getFirst(); //great we got a response
|
||||
}
|
||||
} catch (ExecutionException e) {
|
||||
// the primary call failed with RetriesExhaustedException or DoNotRetryIOException
|
||||
// but the secondaries might still succeed. Continue on the replica RPCs.
|
||||
exceptions = new ArrayList<ExecutionException>(rl.size());
|
||||
exceptions.add(e);
|
||||
completed++;
|
||||
RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries);
|
||||
} catch (CancellationException e) {
|
||||
throw new InterruptedIOException(e.getMessage());
|
||||
} catch (InterruptedException e) {
|
||||
|
@ -191,24 +183,16 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
|
|||
}
|
||||
// submit call for the all of the secondaries at once
|
||||
// TODO: this may be an overkill for large region replication
|
||||
submitted += addCallsForOtherReplicas(cs, rl, 0, rl.size() - 1);
|
||||
addCallsForOtherReplicas(cs, rl, 0, rl.size() - 1);
|
||||
try {
|
||||
while (completed < submitted) {
|
||||
try {
|
||||
Future<Pair<Result[], ScannerCallable>> f = cs.take();
|
||||
Pair<Result[], ScannerCallable> r = f.get();
|
||||
if (r != null && r.getSecond() != null) {
|
||||
updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
|
||||
}
|
||||
return r == null ? null : r.getFirst(); // great we got an answer
|
||||
} catch (ExecutionException e) {
|
||||
// if not cancel or interrupt, wait until all RPC's are done
|
||||
// one of the tasks failed. Save the exception for later.
|
||||
if (exceptions == null) exceptions = new ArrayList<ExecutionException>(rl.size());
|
||||
exceptions.add(e);
|
||||
completed++;
|
||||
}
|
||||
Future<Pair<Result[], ScannerCallable>> f = cs.take();
|
||||
Pair<Result[], ScannerCallable> r = f.get();
|
||||
if (r != null && r.getSecond() != null) {
|
||||
updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
|
||||
}
|
||||
return r == null ? null : r.getFirst(); // great we got an answer
|
||||
} catch (ExecutionException e) {
|
||||
RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries);
|
||||
} catch (CancellationException e) {
|
||||
throw new InterruptedIOException(e.getMessage());
|
||||
} catch (InterruptedException e) {
|
||||
|
@ -218,11 +202,6 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
|
|||
// calls succeeded or failed. In all case, we stop all our tasks.
|
||||
cs.cancelAll();
|
||||
}
|
||||
|
||||
if (exceptions != null && !exceptions.isEmpty()) {
|
||||
RpcRetryingCallerWithReadReplicas.throwEnrichedException(exceptions.get(0),
|
||||
retries); // just rethrow the first exception for now.
|
||||
}
|
||||
return null; // unreachable
|
||||
}
|
||||
|
||||
|
@ -283,19 +262,18 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
|
|||
return currentScannerCallable != null && currentScannerCallable.isHeartbeatMessage();
|
||||
}
|
||||
|
||||
private int addCallsForCurrentReplica(
|
||||
private void addCallsForCurrentReplica(
|
||||
ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl) {
|
||||
RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable);
|
||||
outstandingCallables.add(currentScannerCallable);
|
||||
cs.submit(retryingOnReplica, scannerTimeout, currentScannerCallable.id);
|
||||
return 1;
|
||||
}
|
||||
|
||||
private int addCallsForOtherReplicas(
|
||||
private void addCallsForOtherReplicas(
|
||||
ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl,
|
||||
int min, int max) {
|
||||
if (scan.getConsistency() == Consistency.STRONG) {
|
||||
return 0; // not scheduling on other replicas for strong consistency
|
||||
return; // not scheduling on other replicas for strong consistency
|
||||
}
|
||||
for (int id = min; id <= max; id++) {
|
||||
if (currentScannerCallable.id == id) {
|
||||
|
@ -307,7 +285,6 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
|
|||
RetryingRPC retryingOnReplica = new RetryingRPC(s);
|
||||
cs.submit(retryingOnReplica, scannerTimeout, id);
|
||||
}
|
||||
return max - min + 1;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -354,8 +331,8 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
|
|||
// and we can't invoke it multiple times at the same time)
|
||||
this.caller = ScannerCallableWithReplicas.this.caller;
|
||||
if (scan.getConsistency() == Consistency.TIMELINE) {
|
||||
this.caller = new RpcRetryingCallerFactory(ScannerCallableWithReplicas.this.conf).
|
||||
<Result[]>newCaller();
|
||||
this.caller = RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf)
|
||||
.<Result[]>newCaller();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -21,9 +21,14 @@ import static org.junit.Assert.assertEquals;
|
|||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyBoolean;
|
||||
import static org.mockito.Matchers.anyInt;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.LinkedList;
|
||||
import java.util.Iterator;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
|
@ -32,6 +37,7 @@ import org.apache.hadoop.hbase.Cell;
|
|||
import org.apache.hadoop.hbase.CellScanner;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.KeyValue.Type;
|
||||
import org.apache.hadoop.hbase.RegionLocations;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
|
@ -496,4 +502,69 @@ public class TestClientScanner {
|
|||
assertFalse(cs.advance());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests the case where all replicas of a region throw an exception. It should not cause a hang
|
||||
* but the exception should propagate to the client
|
||||
*/
|
||||
@Test (timeout = 30000)
|
||||
public void testExceptionsFromReplicasArePropagated() throws IOException {
|
||||
scan.setConsistency(Consistency.TIMELINE);
|
||||
|
||||
// Mock a caller which calls the callable for ScannerCallableWithReplicas,
|
||||
// but throws an exception for the actual scanner calls via callWithRetries.
|
||||
rpcFactory = new MockRpcRetryingCallerFactory(conf);
|
||||
conf.set(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY,
|
||||
MockRpcRetryingCallerFactory.class.getName());
|
||||
|
||||
// mock 3 replica locations
|
||||
when(clusterConn.locateRegion((TableName)any(), (byte[])any(), anyBoolean(),
|
||||
anyBoolean(), anyInt())).thenReturn(new RegionLocations(null, null, null));
|
||||
|
||||
try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf("table"),
|
||||
clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
|
||||
Iterator<Result> iter = scanner.iterator();
|
||||
while (iter.hasNext()) {
|
||||
iter.next();
|
||||
}
|
||||
fail("Should have failed with RetriesExhaustedException");
|
||||
} catch (RetriesExhaustedException expected) {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
public static class MockRpcRetryingCallerFactory extends RpcRetryingCallerFactory {
|
||||
|
||||
public MockRpcRetryingCallerFactory(Configuration conf) {
|
||||
super(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> RpcRetryingCaller<T> newCaller() {
|
||||
return new RpcRetryingCaller<T>() {
|
||||
@Override
|
||||
public void cancel() {
|
||||
}
|
||||
@Override
|
||||
public T callWithRetries(RetryingCallable<T> callable, int callTimeout)
|
||||
throws IOException, RuntimeException {
|
||||
throw new IOException("Scanner exception");
|
||||
}
|
||||
|
||||
@Override
|
||||
public T callWithoutRetries(RetryingCallable<T> callable, int callTimeout)
|
||||
throws IOException, RuntimeException {
|
||||
try {
|
||||
return callable.call(callTimeout);
|
||||
} catch (IOException e) {
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue