Replicate write actions before fsyncing them (#49746)

This commit fixes a number of issues with data replication:

- Local and global checkpoints are not updated after the new operations have been fsynced, but
might capture a state before the fsync. The reason why this probably went undetected for so
long is that AsyncIOProcessor is synchronous if you index one item at a time, and hence working
as intended unless you have a high enough level of concurrent indexing. As we rely in other
places on the assumption that we have an up-to-date local checkpoint in case of synchronous
translog durability, there's a risk for the local and global checkpoints not to be up-to-date after
replication completes, and that this won't be corrected by the periodic global checkpoint sync.
- AsyncIOProcessor also has another "bad" side effect here: if you index one bulk at a time, the
bulk is always first fsynced on the primary before being sent to the replica. Further, if one thread
is tasked by AsyncIOProcessor to drain the processing queue and fsync, other threads can
easily pile more bulk requests on top of that thread. Things are not very fair here, and the thread
might continue doing a lot more fsyncs before returning (as the other threads pile more and
more on top), which blocks it from returning as a replication request (e.g. if this thread is on the
primary, it blocks the replication requests to the replicas from going out, and delaying
checkpoint advancement).

This commit fixes all these issues, and also simplifies the code that coordinates all the after
write actions.
This commit is contained in:
Yannick Welsch 2019-12-03 11:46:50 +01:00
parent 6e751f5536
commit fbb92f527a
13 changed files with 244 additions and 242 deletions

View File

@ -46,6 +46,7 @@ import java.util.List;
import java.util.Locale;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.LongSupplier;
public class ReplicationOperation<
Request extends ReplicationRequest<Request>,
@ -110,8 +111,6 @@ public class ReplicationOperation<
private void handlePrimaryResult(final PrimaryResultT primaryResult) {
this.primaryResult = primaryResult;
primary.updateLocalCheckpointForShard(primary.routingEntry().allocationId().getId(), primary.localCheckpoint());
primary.updateGlobalCheckpointForShard(primary.routingEntry().allocationId().getId(), primary.globalCheckpoint());
final ReplicaRequest replicaRequest = primaryResult.replicaRequest();
if (replicaRequest != null) {
if (logger.isTraceEnabled()) {
@ -134,9 +133,27 @@ public class ReplicationOperation<
markUnavailableShardsAsStale(replicaRequest, replicationGroup);
performOnReplicas(replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, replicationGroup);
}
successfulShards.incrementAndGet(); // mark primary as successful
primaryResult.runPostReplicationActions(new ActionListener<Void>() {
@Override
public void onResponse(Void aVoid) {
successfulShards.incrementAndGet();
try {
updateCheckPoints(primary.routingEntry(), primary::localCheckpoint, primary::globalCheckpoint);
} finally {
decPendingAndFinishIfNeeded();
}
}
@Override
public void onFailure(Exception e) {
logger.trace("[{}] op [{}] post replication actions failed for [{}]", primary.routingEntry().shardId(), opType, request);
// TODO: fail shard? This will otherwise have the local / global checkpoint info lagging, or possibly have replicas
// go out of sync with the primary
finishAsFailed(e);
}
});
}
private void markUnavailableShardsAsStale(ReplicaRequest replicaRequest, ReplicationGroup replicationGroup) {
// if inSyncAllocationIds contains allocation ids of shards that don't exist in RoutingTable, mark copies as stale
@ -176,17 +193,11 @@ public class ReplicationOperation<
public void onResponse(ReplicaResponse response) {
successfulShards.incrementAndGet();
try {
primary.updateLocalCheckpointForShard(shard.allocationId().getId(), response.localCheckpoint());
primary.updateGlobalCheckpointForShard(shard.allocationId().getId(), response.globalCheckpoint());
} catch (final AlreadyClosedException e) {
// the index was deleted or this shard was never activated after a relocation; fall through and finish normally
} catch (final Exception e) {
// fail the primary but fall through and let the rest of operation processing complete
final String message = String.format(Locale.ROOT, "primary failed updating local checkpoint for replica %s", shard);
primary.failShard(message, e);
}
updateCheckPoints(shard, response::localCheckpoint, response::globalCheckpoint);
} finally {
decPendingAndFinishIfNeeded();
}
}
@Override
public void onFailure(Exception replicaException) {
@ -211,6 +222,19 @@ public class ReplicationOperation<
});
}
private void updateCheckPoints(ShardRouting shard, LongSupplier localCheckpointSupplier, LongSupplier globalCheckpointSupplier) {
try {
primary.updateLocalCheckpointForShard(shard.allocationId().getId(), localCheckpointSupplier.getAsLong());
primary.updateGlobalCheckpointForShard(shard.allocationId().getId(), globalCheckpointSupplier.getAsLong());
} catch (final AlreadyClosedException e) {
// the index was deleted or this shard was never activated after a relocation; fall through and finish normally
} catch (final Exception e) {
// fail the primary but fall through and let the rest of operation processing complete
final String message = String.format(Locale.ROOT, "primary failed updating local checkpoint for replica %s", shard);
primary.failShard(message, e);
}
}
private void onNoLongerPrimary(Exception failure) {
final Throwable cause = ExceptionsHelper.unwrapCause(failure);
final boolean nodeIsClosing = cause instanceof NodeClosedException;
@ -464,5 +488,11 @@ public class ReplicationOperation<
@Nullable RequestT replicaRequest();
void setShardInfo(ReplicationResponse.ShardInfo shardInfo);
/**
* Run actions to be triggered post replication
* @param listener calllback that is invoked after post replication actions have completed
* */
void runPostReplicationActions(ActionListener<Void> listener);
}
}

View File

@ -74,8 +74,6 @@ import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
@ -347,17 +345,12 @@ public abstract class TransportReplicationAction<
} else {
setPhase(replicationTask, "primary");
final ActionListener<Response> referenceClosingListener = ActionListener.wrap(response -> {
primaryShardReference.close(); // release shard operation lock before responding to caller
setPhase(replicationTask, "finished");
onCompletionListener.onResponse(response);
}, e -> handleException(primaryShardReference, e));
final ActionListener<Response> responseListener = ActionListener.wrap(response -> {
adaptResponse(response, primaryShardReference.indexShard);
final ActionListener<Response> globalCheckpointSyncingListener = ActionListener.wrap(response -> {
if (syncGlobalCheckpointAfterOperation) {
final IndexShard shard = primaryShardReference.indexShard;
try {
shard.maybeSyncGlobalCheckpoint("post-operation");
primaryShardReference.indexShard.maybeSyncGlobalCheckpoint("post-operation");
} catch (final Exception e) {
// only log non-closed exceptions
if (ExceptionsHelper.unwrap(
@ -365,15 +358,19 @@ public abstract class TransportReplicationAction<
// intentionally swallow, a missed global checkpoint sync should not fail this operation
logger.info(
new ParameterizedMessage(
"{} failed to execute post-operation global checkpoint sync", shard.shardId()), e);
"{} failed to execute post-operation global checkpoint sync",
primaryShardReference.indexShard.shardId()), e);
}
}
}
referenceClosingListener.onResponse(response);
}, referenceClosingListener::onFailure);
primaryShardReference.close(); // release shard operation lock before responding to caller
setPhase(replicationTask, "finished");
onCompletionListener.onResponse(response);
}, e -> handleException(primaryShardReference, e));
new ReplicationOperation<>(primaryRequest.getRequest(), primaryShardReference,
ActionListener.wrap(result -> result.respond(globalCheckpointSyncingListener), referenceClosingListener::onFailure),
ActionListener.map(responseListener, result -> result.finalResponseIfSuccessful),
newReplicasProxy(), logger, actionName, primaryRequest.getPrimaryTerm()).execute();
}
} catch (Exception e) {
@ -394,10 +391,19 @@ public abstract class TransportReplicationAction<
}
// allows subclasses to adapt the response
protected void adaptResponse(Response response, IndexShard indexShard) {
}
protected ActionListener<Response> wrapResponseActionListener(ActionListener<Response> listener, IndexShard shard) {
return listener;
}
public static class PrimaryResult<ReplicaRequest extends ReplicationRequest<ReplicaRequest>,
Response extends ReplicationResponse>
implements ReplicationOperation.PrimaryResult<ReplicaRequest> {
final ReplicaRequest replicaRequest;
protected final ReplicaRequest replicaRequest;
public final Response finalResponseIfSuccessful;
public final Exception finalFailure;
@ -430,11 +436,12 @@ public abstract class TransportReplicationAction<
}
}
public void respond(ActionListener<Response> listener) {
if (finalResponseIfSuccessful != null) {
listener.onResponse(finalResponseIfSuccessful);
} else {
@Override
public void runPostReplicationActions(ActionListener<Void> listener) {
if (finalFailure != null) {
listener.onFailure(finalFailure);
} else {
listener.onResponse(null);
}
}
}
@ -450,11 +457,11 @@ public abstract class TransportReplicationAction<
this(null);
}
public void respond(ActionListener<TransportResponse.Empty> listener) {
if (finalFailure == null) {
listener.onResponse(TransportResponse.Empty.INSTANCE);
} else {
public void runPostReplicaActions(ActionListener<Void> listener) {
if (finalFailure != null) {
listener.onFailure(finalFailure);
} else {
listener.onResponse(null);
}
}
}
@ -504,10 +511,23 @@ public abstract class TransportReplicationAction<
try {
assert replica.getActiveOperationsCount() != 0 : "must perform shard operation under a permit";
final ReplicaResult replicaResult = shardOperationOnReplica(replicaRequest.getRequest(), replica);
releasable.close(); // release shard operation lock before responding to caller
replicaResult.runPostReplicaActions(
ActionListener.wrap(r -> {
final TransportReplicationAction.ReplicaResponse response =
new ReplicaResponse(replica.getLocalCheckpoint(), replica.getLastSyncedGlobalCheckpoint());
replicaResult.respond(new ResponseListener(response));
releasable.close(); // release shard operation lock before responding to caller
if (logger.isTraceEnabled()) {
logger.trace("action [{}] completed on shard [{}] for request [{}]", transportReplicaAction,
replicaRequest.getRequest().shardId(),
replicaRequest.getRequest());
}
setPhase(task, "finished");
onCompletionListener.onResponse(response);
}, e -> {
Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller
this.responseWithFailure(e);
})
);
} catch (final Exception e) {
Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller
AsyncReplicaAction.this.onFailure(e);
@ -565,33 +585,6 @@ public abstract class TransportReplicationAction<
acquireReplicaOperationPermit(replica, replicaRequest.getRequest(), this, replicaRequest.getPrimaryTerm(),
replicaRequest.getGlobalCheckpoint(), replicaRequest.getMaxSeqNoOfUpdatesOrDeletes());
}
/**
* Listens for the response on the replica and sends the response back to the primary.
*/
private class ResponseListener implements ActionListener<TransportResponse.Empty> {
private final ReplicaResponse replicaResponse;
ResponseListener(ReplicaResponse replicaResponse) {
this.replicaResponse = replicaResponse;
}
@Override
public void onResponse(Empty response) {
if (logger.isTraceEnabled()) {
logger.trace("action [{}] completed on shard [{}] for request [{}]", transportReplicaAction,
replicaRequest.getRequest().shardId(),
replicaRequest.getRequest());
}
setPhase(task, "finished");
onCompletionListener.onResponse(replicaResponse);
}
@Override
public void onFailure(Exception e) {
responseWithFailure(e);
}
}
}
private IndexShard getIndexShard(final ShardId shardId) {

View File

@ -41,7 +41,6 @@ import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.Translog.Location;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import java.util.concurrent.atomic.AtomicBoolean;
@ -124,12 +123,10 @@ public abstract class TransportWriteAction<
* NOTE: public for testing
*/
public static class WritePrimaryResult<ReplicaRequest extends ReplicatedWriteRequest<ReplicaRequest>,
Response extends ReplicationResponse & WriteResponse> extends PrimaryResult<ReplicaRequest, Response>
implements RespondingWriteResult {
boolean finishedAsyncActions;
Response extends ReplicationResponse & WriteResponse> extends PrimaryResult<ReplicaRequest, Response> {
public final Location location;
public final IndexShard primary;
ActionListener<Response> listener = null;
private final Logger logger;
public WritePrimaryResult(ReplicaRequest request, @Nullable Response finalResponse,
@Nullable Location location, @Nullable Exception operationFailure,
@ -137,104 +134,73 @@ public abstract class TransportWriteAction<
super(request, finalResponse, operationFailure);
this.location = location;
this.primary = primary;
this.logger = logger;
assert location == null || operationFailure == null
: "expected either failure to be null or translog location to be null, " +
"but found: [" + location + "] translog location and [" + operationFailure + "] failure";
if (operationFailure != null) {
this.finishedAsyncActions = true;
}
@Override
public void runPostReplicationActions(ActionListener<Void> listener) {
if (finalFailure != null) {
listener.onFailure(finalFailure);
} else {
/*
* We call this before replication because this might wait for a refresh and that can take a while.
* We call this after replication because this might wait for a refresh and that can take a while.
* This way we wait for the refresh in parallel on the primary and on the replica.
*/
new AsyncAfterWriteAction(primary, request, location, this, logger).run();
}
new AsyncAfterWriteAction(primary, replicaRequest, location, new RespondingWriteResult() {
@Override
public void onSuccess(boolean forcedRefresh) {
finalResponseIfSuccessful.setForcedRefresh(forcedRefresh);
listener.onResponse(null);
}
@Override
public synchronized void respond(ActionListener<Response> listener) {
this.listener = listener;
respondIfPossible(null);
}
/**
* Respond if the refresh has occurred and the listener is ready. Always called while synchronized on {@code this}.
*/
protected void respondIfPossible(Exception ex) {
assert Thread.holdsLock(this);
if (finishedAsyncActions && listener != null) {
if (ex == null) {
super.respond(listener);
} else {
public void onFailure(Exception ex) {
listener.onFailure(ex);
}
}, logger).run();
}
}
public synchronized void onFailure(Exception exception) {
finishedAsyncActions = true;
respondIfPossible(exception);
}
@Override
public synchronized void onSuccess(boolean forcedRefresh) {
finalResponseIfSuccessful.setForcedRefresh(forcedRefresh);
finishedAsyncActions = true;
respondIfPossible(null);
}
}
/**
* Result of taking the action on the replica.
*/
public static class WriteReplicaResult<ReplicaRequest extends ReplicatedWriteRequest<ReplicaRequest>>
extends ReplicaResult implements RespondingWriteResult {
public static class WriteReplicaResult<ReplicaRequest extends ReplicatedWriteRequest<ReplicaRequest>> extends ReplicaResult {
public final Location location;
boolean finishedAsyncActions;
private ActionListener<TransportResponse.Empty> listener;
private final ReplicaRequest request;
private final IndexShard replica;
private final Logger logger;
public WriteReplicaResult(ReplicaRequest request, @Nullable Location location,
@Nullable Exception operationFailure, IndexShard replica, Logger logger) {
super(operationFailure);
this.location = location;
if (operationFailure != null) {
this.finishedAsyncActions = true;
} else {
new AsyncAfterWriteAction(replica, request, location, this, logger).run();
}
this.request = request;
this.replica = replica;
this.logger = logger;
}
@Override
public synchronized void respond(ActionListener<TransportResponse.Empty> listener) {
this.listener = listener;
respondIfPossible(null);
public void runPostReplicaActions(ActionListener<Void> listener) {
if (finalFailure != null) {
listener.onFailure(finalFailure);
} else {
new AsyncAfterWriteAction(replica, request, location, new RespondingWriteResult() {
@Override
public void onSuccess(boolean forcedRefresh) {
listener.onResponse(null);
}
/**
* Respond if the refresh has occurred and the listener is ready. Always called while synchronized on {@code this}.
*/
protected void respondIfPossible(Exception ex) {
assert Thread.holdsLock(this);
if (finishedAsyncActions && listener != null) {
if (ex == null) {
super.respond(listener);
} else {
@Override
public void onFailure(Exception ex) {
listener.onFailure(ex);
}
}, logger).run();
}
}
@Override
public synchronized void onFailure(Exception ex) {
finishedAsyncActions = true;
respondIfPossible(ex);
}
@Override
public synchronized void onSuccess(boolean forcedRefresh) {
finishedAsyncActions = true;
respondIfPossible(null);
}
}
@Override

View File

@ -349,6 +349,11 @@ public class TransportVerifyShardBeforeCloseActionTests extends ESTestCase {
this.shardInfo.set(shardInfo);
}
@Override
public void runPostReplicationActions(ActionListener<Void> listener) {
listener.onResponse(null);
}
public ReplicationResponse.ShardInfo getShardInfo() {
return shardInfo.get();
}

View File

@ -124,6 +124,7 @@ public class ReplicationOperationTests extends ESTestCase {
assertThat(request.processedOnReplicas, equalTo(expectedReplicas));
assertThat(replicasProxy.failedReplicas, equalTo(simulatedFailures.keySet()));
assertThat(replicasProxy.markedAsStaleCopies, equalTo(staleAllocationIds));
assertThat("post replication operations not run on primary", request.runPostReplicationActionsOnPrimary.get(), equalTo(true));
assertTrue("listener is not marked as done", listener.isDone());
ShardInfo shardInfo = listener.actionGet().getShardInfo();
assertThat(shardInfo.getFailed(), equalTo(reportedFailures.size()));
@ -437,6 +438,7 @@ public class ReplicationOperationTests extends ESTestCase {
public static class Request extends ReplicationRequest<Request> {
public AtomicBoolean processedOnPrimary = new AtomicBoolean();
public AtomicBoolean runPostReplicationActionsOnPrimary = new AtomicBoolean();
public Set<ShardRouting> processedOnReplicas = ConcurrentCollections.newConcurrentSet();
Request(ShardId shardId) {
@ -505,6 +507,14 @@ public class ReplicationOperationTests extends ESTestCase {
this.shardInfo = shardInfo;
}
@Override
public void runPostReplicationActions(ActionListener<Void> listener) {
if (request.runPostReplicationActionsOnPrimary.compareAndSet(false, true) == false) {
fail("processed [" + request + "] twice");
}
listener.onResponse(null);
}
public ShardInfo getShardInfo() {
return shardInfo;
}
@ -597,6 +607,7 @@ public class ReplicationOperationTests extends ESTestCase {
final long maxSeqNoOfUpdatesOrDeletes,
final ActionListener<ReplicationOperation.ReplicaResponse> listener) {
assertTrue("replica request processed twice on [" + replica + "]", request.processedOnReplicas.add(replica));
assertFalse("primary post replication actions should run after replication", request.runPostReplicationActionsOnPrimary.get());
if (opFailures.containsKey(replica)) {
listener.onFailure(opFailures.get(replica));
} else {

View File

@ -67,9 +67,6 @@ import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Locale;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -143,7 +140,7 @@ public class TransportWriteActionTests extends ESTestCase {
testAction.shardOperationOnPrimary(request, indexShard,
ActionTestUtils.assertNoFailureListener(result -> {
CapturingActionListener<TestResponse> listener = new CapturingActionListener<>();
result.respond(listener);
result.runPostReplicationActions(ActionListener.map(listener, ignore -> result.finalResponseIfSuccessful));
assertNotNull(listener.response);
assertNull(listener.failure);
verify(indexShard, never()).refresh(any());
@ -158,7 +155,7 @@ public class TransportWriteActionTests extends ESTestCase {
TransportWriteAction.WriteReplicaResult<TestRequest> result =
testAction.shardOperationOnReplica(request, indexShard);
CapturingActionListener<TransportResponse.Empty> listener = new CapturingActionListener<>();
result.respond(listener);
result.runPostReplicaActions(ActionListener.map(listener, ignore -> TransportResponse.Empty.INSTANCE));
assertNotNull(listener.response);
assertNull(listener.failure);
verify(indexShard, never()).refresh(any());
@ -172,7 +169,7 @@ public class TransportWriteActionTests extends ESTestCase {
testAction.shardOperationOnPrimary(request, indexShard,
ActionTestUtils.assertNoFailureListener(result -> {
CapturingActionListener<TestResponse> listener = new CapturingActionListener<>();
result.respond(listener);
result.runPostReplicationActions(ActionListener.map(listener, ignore -> result.finalResponseIfSuccessful));
assertNotNull(listener.response);
assertNull(listener.failure);
assertTrue(listener.response.forcedRefresh);
@ -188,7 +185,7 @@ public class TransportWriteActionTests extends ESTestCase {
TransportWriteAction.WriteReplicaResult<TestRequest> result =
testAction.shardOperationOnReplica(request, indexShard);
CapturingActionListener<TransportResponse.Empty> listener = new CapturingActionListener<>();
result.respond(listener);
result.runPostReplicaActions(ActionListener.map(listener, ignore -> TransportResponse.Empty.INSTANCE));
assertNotNull(listener.response);
assertNull(listener.failure);
verify(indexShard).refresh("refresh_flag_index");
@ -203,7 +200,7 @@ public class TransportWriteActionTests extends ESTestCase {
testAction.shardOperationOnPrimary(request, indexShard,
ActionTestUtils.assertNoFailureListener(result -> {
CapturingActionListener<TestResponse> listener = new CapturingActionListener<>();
result.respond(listener);
result.runPostReplicationActions(ActionListener.map(listener, ignore -> result.finalResponseIfSuccessful));
assertNull(listener.response); // Haven't really responded yet
@SuppressWarnings({"unchecked", "rawtypes"})
@ -226,7 +223,7 @@ public class TransportWriteActionTests extends ESTestCase {
TestAction testAction = new TestAction();
TransportWriteAction.WriteReplicaResult<TestRequest> result = testAction.shardOperationOnReplica(request, indexShard);
CapturingActionListener<TransportResponse.Empty> listener = new CapturingActionListener<>();
result.respond(listener);
result.runPostReplicaActions(ActionListener.map(listener, ignore -> TransportResponse.Empty.INSTANCE));
assertNull(listener.response); // Haven't responded yet
@SuppressWarnings({ "unchecked", "rawtypes" })
ArgumentCaptor<Consumer<Boolean>> refreshListener = ArgumentCaptor.forClass((Class) Consumer.class);
@ -244,9 +241,9 @@ public class TransportWriteActionTests extends ESTestCase {
TestRequest request = new TestRequest();
TestAction testAction = new TestAction(true, true);
testAction.shardOperationOnPrimary(request, indexShard,
ActionTestUtils.assertNoFailureListener(writePrimaryResult -> {
ActionTestUtils.assertNoFailureListener(result -> {
CapturingActionListener<TestResponse> listener = new CapturingActionListener<>();
writePrimaryResult.respond(listener);
result.runPostReplicationActions(ActionListener.map(listener, ignore -> result.finalResponseIfSuccessful));
assertNull(listener.response);
assertNotNull(listener.failure);
}));
@ -255,10 +252,10 @@ public class TransportWriteActionTests extends ESTestCase {
public void testDocumentFailureInShardOperationOnReplica() throws Exception {
TestRequest request = new TestRequest();
TestAction testAction = new TestAction(randomBoolean(), true);
TransportWriteAction.WriteReplicaResult<TestRequest> writeReplicaResult =
TransportWriteAction.WriteReplicaResult<TestRequest> result =
testAction.shardOperationOnReplica(request, indexShard);
CapturingActionListener<TransportResponse.Empty> listener = new CapturingActionListener<>();
writeReplicaResult.respond(listener);
result.runPostReplicaActions(ActionListener.map(listener, ignore -> TransportResponse.Empty.INSTANCE));
assertNull(listener.response);
assertNotNull(listener.failure);
}
@ -350,41 +347,6 @@ public class TransportWriteActionTests extends ESTestCase {
}
}
public void testConcurrentWriteReplicaResultCompletion() throws InterruptedException {
IndexShard replica = mock(IndexShard.class);
when(replica.getTranslogDurability()).thenReturn(Translog.Durability.ASYNC);
TestRequest request = new TestRequest();
request.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL);
TransportWriteAction.WriteReplicaResult<TestRequest> replicaResult = new TransportWriteAction.WriteReplicaResult<>(
request, new Translog.Location(0, 0, 0), null, replica, logger);
CyclicBarrier barrier = new CyclicBarrier(2);
Runnable waitForBarrier = () -> {
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
throw new AssertionError(e);
}
};
CountDownLatch completionLatch = new CountDownLatch(1);
threadPool.generic().execute(() -> {
waitForBarrier.run();
replicaResult.respond(ActionListener.wrap(completionLatch::countDown));
});
if (randomBoolean()) {
threadPool.generic().execute(() -> {
waitForBarrier.run();
replicaResult.onFailure(null);
});
} else {
threadPool.generic().execute(() -> {
waitForBarrier.run();
replicaResult.onSuccess(false);
});
}
assertTrue(completionLatch.await(30, TimeUnit.SECONDS));
}
private class TestAction extends TransportWriteAction<TestRequest, TestRequest, TestResponse> {
private final boolean withDocumentFailureOnPrimary;

View File

@ -259,4 +259,31 @@ public class GlobalCheckpointSyncIT extends ESIntegTestCase {
}
});
}
public void testPersistLocalCheckpoint() {
internalCluster().ensureAtLeastNumDataNodes(2);
Settings.Builder indexSettings = Settings.builder()
.put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "10m")
.put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.REQUEST)
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", randomIntBetween(0, 1));
prepareCreate("test", indexSettings).get();
ensureGreen("test");
int numDocs = randomIntBetween(1, 20);
logger.info("numDocs {}", numDocs);
long maxSeqNo = 0;
for (int i = 0; i < numDocs; i++) {
maxSeqNo = client().prepareIndex("test", "_doc").setId(Integer.toString(i)).setSource("{}", XContentType.JSON).get().getSeqNo();
logger.info("got {}", maxSeqNo);
}
for (IndicesService indicesService : internalCluster().getDataNodeInstances(IndicesService.class)) {
for (IndexService indexService : indicesService) {
for (IndexShard shard : indexService) {
final SeqNoStats seqNoStats = shard.seqNoStats();
assertThat(maxSeqNo, equalTo(seqNoStats.getMaxSeqNo()));
assertThat(seqNoStats.getLocalCheckpoint(), equalTo(seqNoStats.getMaxSeqNo()));;
}
}
}
}
}

View File

@ -169,7 +169,7 @@ public class RetentionLeaseBackgroundSyncActionTests extends ESTestCase {
verify(indexShard).persistRetentionLeases();
// the result should indicate success
final AtomicBoolean success = new AtomicBoolean();
result.respond(ActionListener.wrap(r -> success.set(true), e -> fail(e.toString())));
result.runPostReplicaActions(ActionListener.wrap(r -> success.set(true), e -> fail(e.toString())));
assertTrue(success.get());
}

View File

@ -159,7 +159,7 @@ public class RetentionLeaseSyncActionTests extends ESTestCase {
verify(indexShard).persistRetentionLeases();
// the result should indicate success
final AtomicBoolean success = new AtomicBoolean();
result.respond(ActionListener.wrap(r -> success.set(true), e -> fail(e.toString())));
result.runPostReplicaActions(ActionListener.wrap(r -> success.set(true), e -> fail(e.toString())));
assertTrue(success.get());
}

View File

@ -610,15 +610,23 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
public void execute() {
try {
new ReplicationOperation<>(request, new PrimaryRef(),
ActionListener.delegateFailure(listener,
(delegatedListener, result) -> result.respond(delegatedListener)), new ReplicasRef(), logger, opType, primaryTerm)
ActionListener.map(listener, result -> {
adaptResponse(result.finalResponse, getPrimaryShard());
return result.finalResponse;
}),
new ReplicasRef(), logger, opType, primaryTerm)
.execute();
} catch (Exception e) {
listener.onFailure(e);
}
}
IndexShard getPrimaryShard() {
// to be overridden by subclasses
protected void adaptResponse(Response response, IndexShard indexShard) {
}
protected IndexShard getPrimaryShard() {
return replicationTargets.primary;
}
@ -741,8 +749,9 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
finalResponse.setShardInfo(shardInfo);
}
public void respond(ActionListener<Response> listener) {
listener.onResponse(finalResponse);
@Override
public void runPostReplicationActions(ActionListener<Void> listener) {
listener.onResponse(null);
}
}

View File

@ -104,7 +104,7 @@ public class TransportBulkShardOperationsAction
}
// public for testing purposes only
public static CcrWritePrimaryResult shardOperationOnPrimary(
public static WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperationsResponse> shardOperationOnPrimary(
final ShardId shardId,
final String historyUUID,
final List<Translog.Operation> sourceOperations,
@ -156,7 +156,7 @@ public class TransportBulkShardOperationsAction
}
final BulkShardOperationsRequest replicaRequest = new BulkShardOperationsRequest(
shardId, historyUUID, appliedOperations, maxSeqNoOfUpdatesOrDeletes);
return new CcrWritePrimaryResult(replicaRequest, location, primary, logger);
return new WritePrimaryResult<>(replicaRequest, new BulkShardOperationsResponse(), location, null, primary, logger);
}
@Override
@ -192,26 +192,16 @@ public class TransportBulkShardOperationsAction
return new BulkShardOperationsResponse(in);
}
/**
* Custom write result to include global checkpoint after ops have been replicated.
*/
static final class CcrWritePrimaryResult extends WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperationsResponse> {
CcrWritePrimaryResult(BulkShardOperationsRequest request, Translog.Location location, IndexShard primary, Logger logger) {
super(request, new BulkShardOperationsResponse(), location, null, primary, logger);
@Override
protected void adaptResponse(BulkShardOperationsResponse response, IndexShard indexShard) {
adaptBulkShardOperationsResponse(response, indexShard);
}
@Override
public synchronized void respond(ActionListener<BulkShardOperationsResponse> listener) {
final ActionListener<BulkShardOperationsResponse> wrappedListener = ActionListener.wrap(response -> {
final SeqNoStats seqNoStats = primary.seqNoStats();
public static void adaptBulkShardOperationsResponse(BulkShardOperationsResponse response, IndexShard indexShard) {
final SeqNoStats seqNoStats = indexShard.seqNoStats();
// return a fresh global checkpoint after the operations have been replicated for the shard follow task
response.setGlobalCheckpoint(seqNoStats.getGlobalCheckpoint());
response.setMaxSeqNo(seqNoStats.getMaxSeqNo());
listener.onResponse(response);
}, listener::onFailure);
super.respond(wrappedListener);
}
}
}

View File

@ -18,6 +18,7 @@ import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportWriteAction;
import org.elasticsearch.action.support.replication.TransportWriteActionTestHelper;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RecoverySource;
@ -67,6 +68,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
@ -668,26 +670,32 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
@Override
protected void performOnPrimary(IndexShard primary, BulkShardOperationsRequest request, ActionListener<PrimaryResult> listener) {
ActionListener.completeWith(listener, () -> {
final PlainActionFuture<Releasable> permitFuture = new PlainActionFuture<>();
primary.acquirePrimaryOperationPermit(permitFuture, ThreadPool.Names.SAME, request);
final TransportWriteAction.WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperationsResponse> ccrResult;
try (Releasable ignored = permitFuture.get()) {
ccrResult = TransportBulkShardOperationsAction.shardOperationOnPrimary(primary.shardId(), request.getHistoryUUID(),
request.getOperations(), request.getMaxSeqNoOfUpdatesOrDeletes(), primary, logger);
TransportWriteActionTestHelper.performPostWriteActions(primary, request, ccrResult.location, logger);
} catch (InterruptedException | ExecutionException | IOException e) {
throw new AssertionError(e);
}
return new PrimaryResult(ccrResult.replicaRequest(), ccrResult.finalResponseIfSuccessful) {
listener.onResponse(new PrimaryResult(ccrResult.replicaRequest(), ccrResult.finalResponseIfSuccessful));
}
@Override
public void respond(ActionListener<BulkShardOperationsResponse> listener) {
ccrResult.respond(listener);
}
};
});
protected void adaptResponse(BulkShardOperationsResponse response, IndexShard indexShard) {
TransportBulkShardOperationsAction.adaptBulkShardOperationsResponse(response, indexShard);
}
@Override
protected void performOnReplica(BulkShardOperationsRequest request, IndexShard replica) throws Exception {
TransportBulkShardOperationsAction.shardOperationOnReplica(request, replica, logger);
try (Releasable ignored = PlainActionFuture.get(f -> replica.acquireReplicaOperationPermit(
getPrimaryShard().getPendingPrimaryTerm(), getPrimaryShard().getLastKnownGlobalCheckpoint(),
getPrimaryShard().getMaxSeqNoOfUpdatesOrDeletes(), f, ThreadPool.Names.SAME, request))) {
Translog.Location location = TransportBulkShardOperationsAction.shardOperationOnReplica(request, replica, logger).location;
TransportWriteActionTestHelper.performPostWriteActions(replica, request, location, logger);
}
}
}

View File

@ -33,7 +33,6 @@ import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction.rewriteOperationWithPrimaryTerm;
import static org.hamcrest.Matchers.equalTo;
import static org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction.CcrWritePrimaryResult;
public class BulkShardOperationsTests extends IndexShardTestCase {
@ -124,7 +123,8 @@ public class BulkShardOperationsTests extends IndexShardTestCase {
Randomness.shuffle(firstBulk);
Randomness.shuffle(secondBulk);
oldPrimary.advanceMaxSeqNoOfUpdatesOrDeletes(seqno);
final CcrWritePrimaryResult fullResult = TransportBulkShardOperationsAction.shardOperationOnPrimary(oldPrimary.shardId(),
final TransportWriteAction.WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperationsResponse> fullResult =
TransportBulkShardOperationsAction.shardOperationOnPrimary(oldPrimary.shardId(),
oldPrimary.getHistoryUUID(), firstBulk, seqno, oldPrimary, logger);
assertThat(fullResult.replicaRequest().getOperations(),
equalTo(firstBulk.stream().map(op -> rewriteOperationWithPrimaryTerm(op, oldPrimaryTerm)).collect(Collectors.toList())));
@ -138,7 +138,8 @@ public class BulkShardOperationsTests extends IndexShardTestCase {
// The second bulk includes some operations from the first bulk which were processed already;
// only a subset of these operations will be included the result but with the old primary term.
final List<Translog.Operation> existingOps = randomSubsetOf(firstBulk);
final CcrWritePrimaryResult partialResult = TransportBulkShardOperationsAction.shardOperationOnPrimary(newPrimary.shardId(),
final TransportWriteAction.WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperationsResponse> partialResult =
TransportBulkShardOperationsAction.shardOperationOnPrimary(newPrimary.shardId(),
newPrimary.getHistoryUUID(), Stream.concat(secondBulk.stream(), existingOps.stream()).collect(Collectors.toList()),
seqno, newPrimary, logger);
final long newPrimaryTerm = newPrimary.getOperationPrimaryTerm();