From 5e708746b8d301c2fb22a85b8756129147012374 Mon Sep 17 00:00:00 2001 From: Enis Soztutar Date: Fri, 10 Jul 2015 16:06:29 -0700 Subject: [PATCH] HBASE-13997 ScannerCallableWithReplicas cause Infinitely blocking (Zephyr Guo and Enis) --- .../client/ScannerCallableWithReplicas.java | 55 ++++---------- .../hbase/client/TestClientScanner.java | 73 ++++++++++++++++++- 2 files changed, 88 insertions(+), 40 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java index 1708efe08c2..586db8c962d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java @@ -22,9 +22,7 @@ import static org.apache.hadoop.hbase.client.ClientScanner.createClosestRowBefor import java.io.IOException; import java.io.InterruptedIOException; -import java.util.ArrayList; import java.util.HashSet; -import java.util.List; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; @@ -158,15 +156,13 @@ class ScannerCallableWithReplicas implements RetryingCallable { // We want to accomodate some RPCs for redundant replica scans (but are still in progress) ResultBoundedCompletionService> cs = new ResultBoundedCompletionService>( - new RpcRetryingCallerFactory(ScannerCallableWithReplicas.this.conf), pool, + RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf), pool, rl.size() * 5); - List exceptions = null; - int submitted = 0, completed = 0; AtomicBoolean done = new AtomicBoolean(false); replicaSwitched.set(false); // submit call for the primary replica. - submitted += addCallsForCurrentReplica(cs, rl); + addCallsForCurrentReplica(cs, rl); try { // wait for the timeout to see whether the primary responds back Future> f = cs.poll(timeBeforeReplicas, @@ -179,11 +175,7 @@ class ScannerCallableWithReplicas implements RetryingCallable { return r == null ? null : r.getFirst(); //great we got a response } } catch (ExecutionException e) { - // the primary call failed with RetriesExhaustedException or DoNotRetryIOException - // but the secondaries might still succeed. Continue on the replica RPCs. - exceptions = new ArrayList(rl.size()); - exceptions.add(e); - completed++; + RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries); } catch (CancellationException e) { throw new InterruptedIOException(e.getMessage()); } catch (InterruptedException e) { @@ -191,24 +183,16 @@ class ScannerCallableWithReplicas implements RetryingCallable { } // submit call for the all of the secondaries at once // TODO: this may be an overkill for large region replication - submitted += addCallsForOtherReplicas(cs, rl, 0, rl.size() - 1); + addCallsForOtherReplicas(cs, rl, 0, rl.size() - 1); try { - while (completed < submitted) { - try { - Future> f = cs.take(); - Pair r = f.get(); - if (r != null && r.getSecond() != null) { - updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool); - } - return r == null ? null : r.getFirst(); // great we got an answer - } catch (ExecutionException e) { - // if not cancel or interrupt, wait until all RPC's are done - // one of the tasks failed. Save the exception for later. - if (exceptions == null) exceptions = new ArrayList(rl.size()); - exceptions.add(e); - completed++; - } + Future> f = cs.take(); + Pair r = f.get(); + if (r != null && r.getSecond() != null) { + updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool); } + return r == null ? null : r.getFirst(); // great we got an answer + } catch (ExecutionException e) { + RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries); } catch (CancellationException e) { throw new InterruptedIOException(e.getMessage()); } catch (InterruptedException e) { @@ -218,11 +202,6 @@ class ScannerCallableWithReplicas implements RetryingCallable { // calls succeeded or failed. In all case, we stop all our tasks. cs.cancelAll(); } - - if (exceptions != null && !exceptions.isEmpty()) { - RpcRetryingCallerWithReadReplicas.throwEnrichedException(exceptions.get(0), - retries); // just rethrow the first exception for now. - } return null; // unreachable } @@ -283,19 +262,18 @@ class ScannerCallableWithReplicas implements RetryingCallable { return currentScannerCallable != null && currentScannerCallable.isHeartbeatMessage(); } - private int addCallsForCurrentReplica( + private void addCallsForCurrentReplica( ResultBoundedCompletionService> cs, RegionLocations rl) { RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable); outstandingCallables.add(currentScannerCallable); cs.submit(retryingOnReplica, scannerTimeout, currentScannerCallable.id); - return 1; } - private int addCallsForOtherReplicas( + private void addCallsForOtherReplicas( ResultBoundedCompletionService> cs, RegionLocations rl, int min, int max) { if (scan.getConsistency() == Consistency.STRONG) { - return 0; // not scheduling on other replicas for strong consistency + return; // not scheduling on other replicas for strong consistency } for (int id = min; id <= max; id++) { if (currentScannerCallable.id == id) { @@ -307,7 +285,6 @@ class ScannerCallableWithReplicas implements RetryingCallable { RetryingRPC retryingOnReplica = new RetryingRPC(s); cs.submit(retryingOnReplica, scannerTimeout, id); } - return max - min + 1; } /** @@ -354,8 +331,8 @@ class ScannerCallableWithReplicas implements RetryingCallable { // and we can't invoke it multiple times at the same time) this.caller = ScannerCallableWithReplicas.this.caller; if (scan.getConsistency() == Consistency.TIMELINE) { - this.caller = new RpcRetryingCallerFactory(ScannerCallableWithReplicas.this.conf). - newCaller(); + this.caller = RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf) + .newCaller(); } } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java index 3f406dfd0b7..6d7cc7f9c83 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java @@ -21,9 +21,14 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Mockito.when; import java.io.IOException; -import java.util.LinkedList; +import java.util.Iterator; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -32,6 +37,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.Type; +import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.testclassification.SmallTests; @@ -496,4 +502,69 @@ public class TestClientScanner { assertFalse(cs.advance()); } } + + /** + * Tests the case where all replicas of a region throw an exception. It should not cause a hang + * but the exception should propagate to the client + */ + @Test (timeout = 30000) + public void testExceptionsFromReplicasArePropagated() throws IOException { + scan.setConsistency(Consistency.TIMELINE); + + // Mock a caller which calls the callable for ScannerCallableWithReplicas, + // but throws an exception for the actual scanner calls via callWithRetries. + rpcFactory = new MockRpcRetryingCallerFactory(conf); + conf.set(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY, + MockRpcRetryingCallerFactory.class.getName()); + + // mock 3 replica locations + when(clusterConn.locateRegion((TableName)any(), (byte[])any(), anyBoolean(), + anyBoolean(), anyInt())).thenReturn(new RegionLocations(null, null, null)); + + try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf("table"), + clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) { + Iterator iter = scanner.iterator(); + while (iter.hasNext()) { + iter.next(); + } + fail("Should have failed with RetriesExhaustedException"); + } catch (RetriesExhaustedException expected) { + + } + } + + public static class MockRpcRetryingCallerFactory extends RpcRetryingCallerFactory { + + public MockRpcRetryingCallerFactory(Configuration conf) { + super(conf); + } + + @Override + public RpcRetryingCaller newCaller() { + return new RpcRetryingCaller() { + @Override + public void cancel() { + } + @Override + public T callWithRetries(RetryingCallable callable, int callTimeout) + throws IOException, RuntimeException { + throw new IOException("Scanner exception"); + } + + @Override + public T callWithoutRetries(RetryingCallable callable, int callTimeout) + throws IOException, RuntimeException { + try { + return callable.call(callTimeout); + } catch (IOException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }; + } + + } + }