Synchronize WriteReplicaResult callbacks (#36770)

TransportWriteAction.WriteReplicaResult is not properly synchronized, which can lead to a data race
between the thread that calls respond and the AsyncAfterWriteAction that calls either onSuccess or
onFailure. This data race results in the response listener not being called, which ultimately results in
a stuck replication task on the replica.
This commit is contained in:
Yannick Welsch 2018-12-18 19:23:35 +01:00 committed by GitHub
parent 1fa105658e
commit e35de2ea2c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 52 additions and 2 deletions

View File

@ -163,6 +163,7 @@ public abstract class TransportWriteAction<
* Respond if the refresh has occurred and the listener is ready. Always called while synchronized on {@code this}. * Respond if the refresh has occurred and the listener is ready. Always called while synchronized on {@code this}.
*/ */
protected void respondIfPossible(Exception ex) { protected void respondIfPossible(Exception ex) {
assert Thread.holdsLock(this);
if (finishedAsyncActions && listener != null) { if (finishedAsyncActions && listener != null) {
if (ex == null) { if (ex == null) {
super.respond(listener); super.respond(listener);
@ -206,7 +207,7 @@ public abstract class TransportWriteAction<
} }
@Override @Override
public void respond(ActionListener<TransportResponse.Empty> listener) { public synchronized void respond(ActionListener<TransportResponse.Empty> listener) {
this.listener = listener; this.listener = listener;
respondIfPossible(null); respondIfPossible(null);
} }
@ -215,6 +216,7 @@ public abstract class TransportWriteAction<
* Respond if the refresh has occurred and the listener is ready. Always called while synchronized on {@code this}. * Respond if the refresh has occurred and the listener is ready. Always called while synchronized on {@code this}.
*/ */
protected void respondIfPossible(Exception ex) { protected void respondIfPossible(Exception ex) {
assert Thread.holdsLock(this);
if (finishedAsyncActions && listener != null) { if (finishedAsyncActions && listener != null) {
if (ex == null) { if (ex == null) {
super.respond(listener); super.respond(listener);
@ -225,7 +227,7 @@ public abstract class TransportWriteAction<
} }
@Override @Override
public void onFailure(Exception ex) { public synchronized void onFailure(Exception ex) {
finishedAsyncActions = true; finishedAsyncActions = true;
respondIfPossible(ex); respondIfPossible(ex);
} }

View File

@ -65,6 +65,9 @@ import org.mockito.ArgumentCaptor;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.Locale; import java.util.Locale;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -347,6 +350,51 @@ public class TransportWriteActionTests extends ESTestCase {
} }
} }
public void testConcurrentWriteReplicaResultCompletion() throws InterruptedException {
IndexShard replica = mock(IndexShard.class);
when(replica.getTranslogDurability()).thenReturn(Translog.Durability.ASYNC);
TestRequest request = new TestRequest();
request.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL);
TransportWriteAction.WriteReplicaResult<TestRequest> replicaResult = new TransportWriteAction.WriteReplicaResult<>(
request, new Translog.Location(0, 0, 0), null, replica, logger);
CyclicBarrier barrier = new CyclicBarrier(2);
Runnable waitForBarrier = () -> {
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
throw new AssertionError(e);
}
};
CountDownLatch completionLatch = new CountDownLatch(1);
threadPool.generic().execute(() -> {
waitForBarrier.run();
replicaResult.respond(new ActionListener<TransportResponse.Empty>() {
@Override
public void onResponse(TransportResponse.Empty empty) {
completionLatch.countDown();
}
@Override
public void onFailure(Exception e) {
completionLatch.countDown();
}
});
});
if (randomBoolean()) {
threadPool.generic().execute(() -> {
waitForBarrier.run();
replicaResult.onFailure(null);
});
} else {
threadPool.generic().execute(() -> {
waitForBarrier.run();
replicaResult.onSuccess(false);
});
}
assertTrue(completionLatch.await(30, TimeUnit.SECONDS));
}
private class TestAction extends TransportWriteAction<TestRequest, TestRequest, TestResponse> { private class TestAction extends TransportWriteAction<TestRequest, TestRequest, TestResponse> {
private final boolean withDocumentFailureOnPrimary; private final boolean withDocumentFailureOnPrimary;