HBASE-14812 Fix ResultBoundedCompletionService deadlock

This commit is contained in:
Elliott Clark 2015-11-13 18:28:12 -08:00
parent ca1048415b
commit d6fdf92f9e
2 changed files with 33 additions and 15 deletions

View File

@ -39,12 +39,13 @@ public class ResultBoundedCompletionService<V> {
private final Executor executor;
private final QueueingFuture<V>[] tasks; // all the tasks
private volatile QueueingFuture<V> completed = null;
private volatile boolean cancelled = false;
class QueueingFuture<T> implements RunnableFuture<T> {
private final RetryingCallable<T> future;
private T result = null;
private ExecutionException exeEx = null;
private volatile boolean cancelled;
private volatile boolean cancelled = false;
private final int callTimeout;
private final RpcRetryingCaller<T> retryingCaller;
private boolean resultObtained = false;
@ -61,18 +62,21 @@ public class ResultBoundedCompletionService<V> {
public void run() {
try {
if (!cancelled) {
result =
this.retryingCaller.callWithRetries(future, callTimeout);
result = this.retryingCaller.callWithRetries(future, callTimeout);
resultObtained = true;
}
} catch (Throwable t) {
exeEx = new ExecutionException(t);
} finally {
if (!cancelled && completed == null) {
completed = (QueueingFuture<V>) QueueingFuture.this;
synchronized (tasks) {
tasks.notify();
synchronized (tasks) {
// If this wasn't canceled then store the result.
if (!cancelled && completed == null) {
completed = (QueueingFuture<V>) QueueingFuture.this;
}
// Notify just in case there was someone waiting and this was canceled.
// That shouldn't happen but better safe than sorry.
tasks.notify();
}
}
}
@ -145,19 +149,23 @@ public class ResultBoundedCompletionService<V> {
public QueueingFuture<V> take() throws InterruptedException {
synchronized (tasks) {
while (completed == null) tasks.wait();
while (completed == null && !cancelled) tasks.wait();
}
return completed;
}
public QueueingFuture<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
synchronized (tasks) {
if (completed == null) unit.timedWait(tasks, timeout);
if (completed == null && !cancelled) unit.timedWait(tasks, timeout);
}
return completed;
}
public void cancelAll() {
// Grab the lock on tasks so that cancelled is visible everywhere
synchronized (tasks) {
cancelled = true;
}
for (QueueingFuture<V> future : tasks) {
if (future != null) future.cancel(true);
}

View File

@ -30,6 +30,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
@ -163,12 +164,13 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
replicaSwitched.set(false);
// submit call for the primary replica.
addCallsForCurrentReplica(cs, rl);
try {
// wait for the timeout to see whether the primary responds back
Future<Pair<Result[], ScannerCallable>> f = cs.poll(timeBeforeReplicas,
TimeUnit.MICROSECONDS); // Yes, microseconds
if (f != null) {
Pair<Result[], ScannerCallable> r = f.get();
Pair<Result[], ScannerCallable> r = f.get(timeout, TimeUnit.MILLISECONDS);
if (r != null && r.getSecond() != null) {
updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
}
@ -180,23 +182,31 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
throw new InterruptedIOException(e.getMessage());
} catch (InterruptedException e) {
throw new InterruptedIOException(e.getMessage());
} catch (TimeoutException e) {
throw new InterruptedIOException(e.getMessage());
}
// submit call for the all of the secondaries at once
// TODO: this may be an overkill for large region replication
addCallsForOtherReplicas(cs, rl, 0, rl.size() - 1);
try {
Future<Pair<Result[], ScannerCallable>> f = cs.take();
Pair<Result[], ScannerCallable> r = f.get();
if (r != null && r.getSecond() != null) {
updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
Future<Pair<Result[], ScannerCallable>> f = cs.poll(timeout, TimeUnit.MILLISECONDS);
if (f != null) {
Pair<Result[], ScannerCallable> r = f.get(timeout, TimeUnit.MILLISECONDS);
if (r != null && r.getSecond() != null) {
updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
}
return r == null ? null : r.getFirst(); // great we got an answer
}
return r == null ? null : r.getFirst(); // great we got an answer
} catch (ExecutionException e) {
RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries);
} catch (CancellationException e) {
throw new InterruptedIOException(e.getMessage());
} catch (InterruptedException e) {
throw new InterruptedIOException(e.getMessage());
} catch (TimeoutException e) {
throw new InterruptedIOException(e.getMessage());
} finally {
// We get there because we were interrupted or because one or more of the
// calls succeeded or failed. In all case, we stop all our tasks.