From e40ed0e836e19bdac8dc0892d552a71448c8e4d2 Mon Sep 17 00:00:00 2001 From: Elliott Clark Date: Fri, 13 Nov 2015 18:28:12 -0800 Subject: [PATCH] HBASE-14812 Fix ResultBoundedCompletionService deadlock --- .../ResultBoundedCompletionService.java | 26 ++++++++++++------- .../client/ScannerCallableWithReplicas.java | 22 +++++++++++----- 2 files changed, 33 insertions(+), 15 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java index eacbe2ddd51..9b32e93106e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java @@ -39,12 +39,13 @@ public class ResultBoundedCompletionService { private final Executor executor; private final QueueingFuture[] tasks; // all the tasks private volatile QueueingFuture completed = null; + private volatile boolean cancelled = false; class QueueingFuture implements RunnableFuture { private final RetryingCallable future; private T result = null; private ExecutionException exeEx = null; - private volatile boolean cancelled; + private volatile boolean cancelled = false; private final int callTimeout; private final RpcRetryingCaller retryingCaller; private boolean resultObtained = false; @@ -61,18 +62,21 @@ public class ResultBoundedCompletionService { public void run() { try { if (!cancelled) { - result = - this.retryingCaller.callWithRetries(future, callTimeout); + result = this.retryingCaller.callWithRetries(future, callTimeout); resultObtained = true; } } catch (Throwable t) { exeEx = new ExecutionException(t); } finally { - if (!cancelled && completed == null) { - completed = (QueueingFuture) QueueingFuture.this; - synchronized (tasks) { - tasks.notify(); + synchronized (tasks) { + // If this wasn't canceled then store the result. + if (!cancelled && completed == null) { + completed = (QueueingFuture) QueueingFuture.this; } + + // Notify just in case there was someone waiting and this was canceled. + // That shouldn't happen but better safe than sorry. + tasks.notify(); } } } @@ -145,19 +149,23 @@ public class ResultBoundedCompletionService { public QueueingFuture take() throws InterruptedException { synchronized (tasks) { - while (completed == null) tasks.wait(); + while (completed == null && !cancelled) tasks.wait(); } return completed; } public QueueingFuture poll(long timeout, TimeUnit unit) throws InterruptedException { synchronized (tasks) { - if (completed == null) unit.timedWait(tasks, timeout); + if (completed == null && !cancelled) unit.timedWait(tasks, timeout); } return completed; } public void cancelAll() { + // Grab the lock on tasks so that cancelled is visible everywhere + synchronized (tasks) { + cancelled = true; + } for (QueueingFuture future : tasks) { if (future != null) future.cancel(true); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java index 740eb91da9c..7418292bad6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java @@ -30,6 +30,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; @@ -163,12 +164,13 @@ class ScannerCallableWithReplicas implements RetryingCallable { replicaSwitched.set(false); // submit call for the primary replica. addCallsForCurrentReplica(cs, rl); + try { // wait for the timeout to see whether the primary responds back Future> f = cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS); // Yes, microseconds if (f != null) { - Pair r = f.get(); + Pair r = f.get(timeout, TimeUnit.MILLISECONDS); if (r != null && r.getSecond() != null) { updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool); } @@ -180,23 +182,31 @@ class ScannerCallableWithReplicas implements RetryingCallable { throw new InterruptedIOException(e.getMessage()); } catch (InterruptedException e) { throw new InterruptedIOException(e.getMessage()); + } catch (TimeoutException e) { + throw new InterruptedIOException(e.getMessage()); } + // submit call for the all of the secondaries at once // TODO: this may be an overkill for large region replication addCallsForOtherReplicas(cs, rl, 0, rl.size() - 1); + try { - Future> f = cs.take(); - Pair r = f.get(); - if (r != null && r.getSecond() != null) { - updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool); + Future> f = cs.poll(timeout, TimeUnit.MILLISECONDS); + if (f != null) { + Pair r = f.get(timeout, TimeUnit.MILLISECONDS); + if (r != null && r.getSecond() != null) { + updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool); + } + return r == null ? null : r.getFirst(); // great we got an answer } - return r == null ? null : r.getFirst(); // great we got an answer } catch (ExecutionException e) { RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries); } catch (CancellationException e) { throw new InterruptedIOException(e.getMessage()); } catch (InterruptedException e) { throw new InterruptedIOException(e.getMessage()); + } catch (TimeoutException e) { + throw new InterruptedIOException(e.getMessage()); } finally { // We get there because we were interrupted or because one or more of the // calls succeeded or failed. In all case, we stop all our tasks.