HBASE-14812 Fix ResultBoundedCompletionService deadlock
This commit is contained in:
parent
5b3a49d835
commit
e40ed0e836
|
@ -39,12 +39,13 @@ public class ResultBoundedCompletionService<V> {
|
|||
private final Executor executor;
|
||||
private final QueueingFuture<V>[] tasks; // all the tasks
|
||||
private volatile QueueingFuture<V> completed = null;
|
||||
private volatile boolean cancelled = false;
|
||||
|
||||
class QueueingFuture<T> implements RunnableFuture<T> {
|
||||
private final RetryingCallable<T> future;
|
||||
private T result = null;
|
||||
private ExecutionException exeEx = null;
|
||||
private volatile boolean cancelled;
|
||||
private volatile boolean cancelled = false;
|
||||
private final int callTimeout;
|
||||
private final RpcRetryingCaller<T> retryingCaller;
|
||||
private boolean resultObtained = false;
|
||||
|
@ -61,18 +62,21 @@ public class ResultBoundedCompletionService<V> {
|
|||
public void run() {
|
||||
try {
|
||||
if (!cancelled) {
|
||||
result =
|
||||
this.retryingCaller.callWithRetries(future, callTimeout);
|
||||
result = this.retryingCaller.callWithRetries(future, callTimeout);
|
||||
resultObtained = true;
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
exeEx = new ExecutionException(t);
|
||||
} finally {
|
||||
if (!cancelled && completed == null) {
|
||||
completed = (QueueingFuture<V>) QueueingFuture.this;
|
||||
synchronized (tasks) {
|
||||
tasks.notify();
|
||||
synchronized (tasks) {
|
||||
// If this wasn't canceled then store the result.
|
||||
if (!cancelled && completed == null) {
|
||||
completed = (QueueingFuture<V>) QueueingFuture.this;
|
||||
}
|
||||
|
||||
// Notify just in case there was someone waiting and this was canceled.
|
||||
// That shouldn't happen but better safe than sorry.
|
||||
tasks.notify();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -145,19 +149,23 @@ public class ResultBoundedCompletionService<V> {
|
|||
|
||||
public QueueingFuture<V> take() throws InterruptedException {
|
||||
synchronized (tasks) {
|
||||
while (completed == null) tasks.wait();
|
||||
while (completed == null && !cancelled) tasks.wait();
|
||||
}
|
||||
return completed;
|
||||
}
|
||||
|
||||
public QueueingFuture<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
|
||||
synchronized (tasks) {
|
||||
if (completed == null) unit.timedWait(tasks, timeout);
|
||||
if (completed == null && !cancelled) unit.timedWait(tasks, timeout);
|
||||
}
|
||||
return completed;
|
||||
}
|
||||
|
||||
public void cancelAll() {
|
||||
// Grab the lock on tasks so that cancelled is visible everywhere
|
||||
synchronized (tasks) {
|
||||
cancelled = true;
|
||||
}
|
||||
for (QueueingFuture<V> future : tasks) {
|
||||
if (future != null) future.cancel(true);
|
||||
}
|
||||
|
|
|
@ -30,6 +30,7 @@ import java.util.concurrent.ExecutionException;
|
|||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -163,12 +164,13 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
|
|||
replicaSwitched.set(false);
|
||||
// submit call for the primary replica.
|
||||
addCallsForCurrentReplica(cs, rl);
|
||||
|
||||
try {
|
||||
// wait for the timeout to see whether the primary responds back
|
||||
Future<Pair<Result[], ScannerCallable>> f = cs.poll(timeBeforeReplicas,
|
||||
TimeUnit.MICROSECONDS); // Yes, microseconds
|
||||
if (f != null) {
|
||||
Pair<Result[], ScannerCallable> r = f.get();
|
||||
Pair<Result[], ScannerCallable> r = f.get(timeout, TimeUnit.MILLISECONDS);
|
||||
if (r != null && r.getSecond() != null) {
|
||||
updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
|
||||
}
|
||||
|
@ -180,23 +182,31 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
|
|||
throw new InterruptedIOException(e.getMessage());
|
||||
} catch (InterruptedException e) {
|
||||
throw new InterruptedIOException(e.getMessage());
|
||||
} catch (TimeoutException e) {
|
||||
throw new InterruptedIOException(e.getMessage());
|
||||
}
|
||||
|
||||
// submit call for the all of the secondaries at once
|
||||
// TODO: this may be an overkill for large region replication
|
||||
addCallsForOtherReplicas(cs, rl, 0, rl.size() - 1);
|
||||
|
||||
try {
|
||||
Future<Pair<Result[], ScannerCallable>> f = cs.take();
|
||||
Pair<Result[], ScannerCallable> r = f.get();
|
||||
if (r != null && r.getSecond() != null) {
|
||||
updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
|
||||
Future<Pair<Result[], ScannerCallable>> f = cs.poll(timeout, TimeUnit.MILLISECONDS);
|
||||
if (f != null) {
|
||||
Pair<Result[], ScannerCallable> r = f.get(timeout, TimeUnit.MILLISECONDS);
|
||||
if (r != null && r.getSecond() != null) {
|
||||
updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
|
||||
}
|
||||
return r == null ? null : r.getFirst(); // great we got an answer
|
||||
}
|
||||
return r == null ? null : r.getFirst(); // great we got an answer
|
||||
} catch (ExecutionException e) {
|
||||
RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries);
|
||||
} catch (CancellationException e) {
|
||||
throw new InterruptedIOException(e.getMessage());
|
||||
} catch (InterruptedException e) {
|
||||
throw new InterruptedIOException(e.getMessage());
|
||||
} catch (TimeoutException e) {
|
||||
throw new InterruptedIOException(e.getMessage());
|
||||
} finally {
|
||||
// We get there because we were interrupted or because one or more of the
|
||||
// calls succeeded or failed. In all case, we stop all our tasks.
|
||||
|
|
Loading…
Reference in New Issue