From 6647122f1c22c484ded40f45c65d24366a84abda Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 15 Jan 2019 13:17:25 -0500 Subject: [PATCH] Prepare to make send translog of recovery non-blocking (#37458) This commit prepares the required infra to make send a translog snapshot of the recovery source non-blocking. I'll make a follow-up to make the send snapshot method non-blocking. Relates #37291 --- .../elasticsearch/index/shard/IndexShard.java | 2 +- .../recovery/PeerRecoveryTargetService.java | 41 +++++------ .../recovery/RecoverySourceHandler.java | 54 +++++++++------ .../indices/recovery/RecoveryTarget.java | 68 ++++++++++--------- .../recovery/RecoveryTargetHandler.java | 7 +- .../RecoveryTranslogOperationsResponse.java | 41 ++++------- .../recovery/RemoteRecoveryTargetHandler.java | 17 ++--- .../RecoveryDuringReplicationTests.java | 20 +++--- .../index/shard/IndexShardTests.java | 38 +++++------ .../recovery/RecoverySourceHandlerTests.java | 32 +++++---- 10 files changed, 162 insertions(+), 158 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index e8cb0f519dd..f92eb383492 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2998,7 +2998,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl * which is at least the value of the max_seq_no_of_updates marker on the primary after that operation was executed on the primary. * * @see #acquireReplicaOperationPermit(long, long, long, ActionListener, String, Object) - * @see org.elasticsearch.indices.recovery.RecoveryTarget#indexTranslogOperations(List, int, long, long) + * @see org.elasticsearch.indices.recovery.RecoveryTarget#indexTranslogOperations(List, int, long, long, ActionListener) */ public void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) { assert seqNo != UNASSIGNED_SEQ_NO diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index 8579caecc8a..3a48702fd55 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -485,14 +485,12 @@ public class PeerRecoveryTargetService implements IndexEventListener { public void messageReceived(final RecoveryTranslogOperationsRequest request, final TransportChannel channel, Task task) throws IOException { try (RecoveryRef recoveryRef = - onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) { + onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) { final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext()); final RecoveryTarget recoveryTarget = recoveryRef.target(); - try { - recoveryTarget.indexTranslogOperations(request.operations(), request.totalTranslogOps(), - request.maxSeenAutoIdTimestampOnPrimary(), request.maxSeqNoOfUpdatesOrDeletesOnPrimary()); - channel.sendResponse(new RecoveryTranslogOperationsResponse(recoveryTarget.indexShard().getLocalCheckpoint())); - } catch (MapperException exception) { + final ActionListener listener = + new HandledTransportAction.ChannelActionListener<>(channel, Actions.TRANSLOG_OPS, request); + final Consumer retryOnMappingException = exception -> { // in very rare cases a translog replay from primary is processed before a mapping update on this node // which causes local mapping changes since the mapping (clusterstate) might not have arrived on this node. logger.debug("delaying recovery due to missing mapping changes", exception); @@ -504,31 +502,36 @@ public class PeerRecoveryTargetService implements IndexEventListener { try { messageReceived(request, channel, task); } catch (Exception e) { - onFailure(e); - } - } - - protected void onFailure(Exception e) { - try { - channel.sendResponse(e); - } catch (IOException e1) { - logger.warn("failed to send error back to recovery source", e1); + listener.onFailure(e); } } @Override public void onClusterServiceClose() { - onFailure(new ElasticsearchException("cluster service was closed while waiting for mapping updates")); + listener.onFailure(new ElasticsearchException( + "cluster service was closed while waiting for mapping updates")); } @Override public void onTimeout(TimeValue timeout) { // note that we do not use a timeout (see comment above) - onFailure(new ElasticsearchTimeoutException("timed out waiting for mapping updates (timeout [" + timeout + - "])")); + listener.onFailure(new ElasticsearchTimeoutException("timed out waiting for mapping updates " + + "(timeout [" + timeout + "])")); } }); - } + }; + recoveryTarget.indexTranslogOperations(request.operations(), request.totalTranslogOps(), + request.maxSeenAutoIdTimestampOnPrimary(), request.maxSeqNoOfUpdatesOrDeletesOnPrimary(), + ActionListener.wrap( + checkpoint -> listener.onResponse(new RecoveryTranslogOperationsResponse(checkpoint)), + e -> { + if (e instanceof MapperException) { + retryOnMappingException.accept(e); + } else { + listener.onFailure(e); + } + }) + ); } } } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index e3fd1bf73d3..d2d03156271 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -33,6 +33,7 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.StepListener; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.StopWatch; @@ -226,25 +227,27 @@ public class RecoverySourceHandler { logger.trace("snapshot translog for recovery; current size is [{}]", shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo)); } - final SendSnapshotResult sendSnapshotResult; - try (Translog.Snapshot snapshot = shard.getHistoryOperations("peer-recovery", startingSeqNo)) { - // we can release the retention lock here because the snapshot itself will retain the required operations. - IOUtils.close(retentionLock, () -> resources.remove(retentionLock)); - // we have to capture the max_seen_auto_id_timestamp and the max_seq_no_of_updates to make sure that these values - // are at least as high as the corresponding values on the primary when any of these operations were executed on it. - final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp(); - final long maxSeqNoOfUpdatesOrDeletes = shard.getMaxSeqNoOfUpdatesOrDeletes(); - sendSnapshotResult = phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot, - maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes); - } catch (Exception e) { - throw new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e); - } + final Translog.Snapshot phase2Snapshot = shard.getHistoryOperations("peer-recovery", startingSeqNo); + resources.add(phase2Snapshot); + // we can release the retention lock here because the snapshot itself will retain the required operations. + IOUtils.close(retentionLock); + // we have to capture the max_seen_auto_id_timestamp and the max_seq_no_of_updates to make sure that these values + // are at least as high as the corresponding values on the primary when any of these operations were executed on it. + final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp(); + final long maxSeqNoOfUpdatesOrDeletes = shard.getMaxSeqNoOfUpdatesOrDeletes(); + final StepListener sendSnapshotStep = new StepListener<>(); + phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, phase2Snapshot, maxSeenAutoIdTimestamp, + maxSeqNoOfUpdatesOrDeletes, sendSnapshotStep); + sendSnapshotStep.whenComplete( + r -> IOUtils.close(phase2Snapshot), + e -> onFailure.accept(new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e))); final StepListener finalizeStep = new StepListener<>(); - finalizeRecovery(sendSnapshotResult.targetLocalCheckpoint, finalizeStep); + sendSnapshotStep.whenComplete(r -> finalizeRecovery(r.targetLocalCheckpoint, finalizeStep), onFailure); + finalizeStep.whenComplete(r -> { - assert resources.isEmpty() : "not every resource is released [" + resources + "]"; final long phase1ThrottlingWaitTime = 0L; // TODO: return the actual throttle time + final SendSnapshotResult sendSnapshotResult = sendSnapshotStep.result(); final RecoveryResponse response = new RecoveryResponse(sendFileResult.phase1FileNames, sendFileResult.phase1FileSizes, sendFileResult.phase1ExistingFileNames, sendFileResult.phase1ExistingFileSizes, sendFileResult.totalSize, sendFileResult.existingTotalSize, sendFileResult.took.millis(), phase1ThrottlingWaitTime, @@ -507,10 +510,17 @@ public class RecoverySourceHandler { * @param snapshot a snapshot of the translog * @param maxSeenAutoIdTimestamp the max auto_id_timestamp of append-only requests on the primary * @param maxSeqNoOfUpdatesOrDeletes the max seq_no of updates or deletes on the primary after these operations were executed on it. - * @return the send snapshot result + * @param listener a listener which will be notified with the local checkpoint on the target. */ - SendSnapshotResult phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, Translog.Snapshot snapshot, - long maxSeenAutoIdTimestamp, long maxSeqNoOfUpdatesOrDeletes) throws IOException { + void phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, Translog.Snapshot snapshot, long maxSeenAutoIdTimestamp, + long maxSeqNoOfUpdatesOrDeletes, ActionListener listener) throws IOException { + ActionListener.completeWith(listener, () -> sendSnapshotBlockingly( + startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes)); + } + + private SendSnapshotResult sendSnapshotBlockingly(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, + Translog.Snapshot snapshot, long maxSeenAutoIdTimestamp, + long maxSeqNoOfUpdatesOrDeletes) throws IOException { assert requiredSeqNoRangeStart <= endingSeqNo + 1: "requiredSeqNoRangeStart " + requiredSeqNoRangeStart + " is larger than endingSeqNo " + endingSeqNo; assert startingSeqNo <= requiredSeqNoRangeStart : @@ -538,9 +548,11 @@ public class RecoverySourceHandler { } final CancellableThreads.IOInterruptible sendBatch = () -> { - final long targetCheckpoint = recoveryTarget.indexTranslogOperations( - operations, expectedTotalOps, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes); - targetLocalCheckpoint.set(targetCheckpoint); + // TODO: Make this non-blocking + final PlainActionFuture future = new PlainActionFuture<>(); + recoveryTarget.indexTranslogOperations( + operations, expectedTotalOps, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, future); + targetLocalCheckpoint.set(future.actionGet()); }; // send operations in batches diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 73ad4c17594..5950f71006a 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -394,40 +394,42 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget } @Override - public long indexTranslogOperations(List operations, int totalTranslogOps, long maxSeenAutoIdTimestampOnPrimary, - long maxSeqNoOfDeletesOrUpdatesOnPrimary) throws IOException { - final RecoveryState.Translog translog = state().getTranslog(); - translog.totalOperations(totalTranslogOps); - assert indexShard().recoveryState() == state(); - if (indexShard().state() != IndexShardState.RECOVERING) { - throw new IndexShardNotRecoveringException(shardId, indexShard().state()); - } - /* - * The maxSeenAutoIdTimestampOnPrimary received from the primary is at least the highest auto_id_timestamp from any operation - * will be replayed. Bootstrapping this timestamp here will disable the optimization for original append-only requests - * (source of these operations) replicated via replication. Without this step, we may have duplicate documents if we - * replay these operations first (without timestamp), then optimize append-only requests (with timestamp). - */ - indexShard().updateMaxUnsafeAutoIdTimestamp(maxSeenAutoIdTimestampOnPrimary); - /* - * Bootstrap the max_seq_no_of_updates from the primary to make sure that the max_seq_no_of_updates on this replica when - * replaying any of these operations will be at least the max_seq_no_of_updates on the primary when that operation was executed on. - */ - indexShard().advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNoOfDeletesOrUpdatesOnPrimary); - for (Translog.Operation operation : operations) { - Engine.Result result = indexShard().applyTranslogOperation(operation, Engine.Operation.Origin.PEER_RECOVERY); - if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) { - throw new MapperException("mapping updates are not allowed [" + operation + "]"); + public void indexTranslogOperations(List operations, int totalTranslogOps, long maxSeenAutoIdTimestampOnPrimary, + long maxSeqNoOfDeletesOrUpdatesOnPrimary, ActionListener listener) { + ActionListener.completeWith(listener, () -> { + final RecoveryState.Translog translog = state().getTranslog(); + translog.totalOperations(totalTranslogOps); + assert indexShard().recoveryState() == state(); + if (indexShard().state() != IndexShardState.RECOVERING) { + throw new IndexShardNotRecoveringException(shardId, indexShard().state()); } - assert result.getFailure() == null: "unexpected failure while replicating translog entry: " + result.getFailure(); - ExceptionsHelper.reThrowIfNotNull(result.getFailure()); - } - // update stats only after all operations completed (to ensure that mapping updates don't mess with stats) - translog.incrementRecoveredOperations(operations.size()); - indexShard().sync(); - // roll over / flush / trim if needed - indexShard().afterWriteOperation(); - return indexShard().getLocalCheckpoint(); + /* + * The maxSeenAutoIdTimestampOnPrimary received from the primary is at least the highest auto_id_timestamp from any operation + * will be replayed. Bootstrapping this timestamp here will disable the optimization for original append-only requests + * (source of these operations) replicated via replication. Without this step, we may have duplicate documents if we + * replay these operations first (without timestamp), then optimize append-only requests (with timestamp). + */ + indexShard().updateMaxUnsafeAutoIdTimestamp(maxSeenAutoIdTimestampOnPrimary); + /* + * Bootstrap the max_seq_no_of_updates from the primary to make sure that the max_seq_no_of_updates on this replica when + * replaying any of these operations will be at least the max_seq_no_of_updates on the primary when that op was executed on. + */ + indexShard().advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNoOfDeletesOrUpdatesOnPrimary); + for (Translog.Operation operation : operations) { + Engine.Result result = indexShard().applyTranslogOperation(operation, Engine.Operation.Origin.PEER_RECOVERY); + if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) { + throw new MapperException("mapping updates are not allowed [" + operation + "]"); + } + assert result.getFailure() == null : "unexpected failure while replicating translog entry: " + result.getFailure(); + ExceptionsHelper.reThrowIfNotNull(result.getFailure()); + } + // update stats only after all operations completed (to ensure that mapping updates don't mess with stats) + translog.incrementRecoveredOperations(operations.size()); + indexShard().sync(); + // roll over / flush / trim if needed + indexShard().afterWriteOperation(); + return indexShard().getLocalCheckpoint(); + }); } @Override diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java index 18e57866c68..3372c933559 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java @@ -67,10 +67,11 @@ public interface RecoveryTargetHandler { * @param maxSeqNoOfUpdatesOrDeletesOnPrimary the max seq_no of update operations (index operations overwrite Lucene) or delete ops on * the primary shard when capturing these operations. This value is at least as high as the * max_seq_no_of_updates on the primary was when any of these ops were processed on it. - * @return the local checkpoint on the target shard + * @param listener a listener which will be notified with the local checkpoint on the target + * after these operations are successfully indexed on the target. */ - long indexTranslogOperations(List operations, int totalTranslogOps, - long maxSeenAutoIdTimestampOnPrimary, long maxSeqNoOfUpdatesOrDeletesOnPrimary) throws IOException; + void indexTranslogOperations(List operations, int totalTranslogOps, long maxSeenAutoIdTimestampOnPrimary, + long maxSeqNoOfUpdatesOrDeletesOnPrimary, ActionListener listener); /** * Notifies the target of the files it is going to receive diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsResponse.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsResponse.java index 8633380f394..9f86feb5734 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsResponse.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsResponse.java @@ -23,34 +23,19 @@ import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.index.seqno.SequenceNumbers; -import org.elasticsearch.transport.FutureTransportResponseHandler; import org.elasticsearch.transport.TransportResponse; -import org.elasticsearch.transport.TransportResponseHandler; import java.io.IOException; -public class RecoveryTranslogOperationsResponse extends TransportResponse { - - long localCheckpoint; - - RecoveryTranslogOperationsResponse() { - - } +final class RecoveryTranslogOperationsResponse extends TransportResponse { + final long localCheckpoint; RecoveryTranslogOperationsResponse(final long localCheckpoint) { this.localCheckpoint = localCheckpoint; } - @Override - public void writeTo(final StreamOutput out) throws IOException { - // before 6.0.0 we responded with an empty response so we have to maintain that - if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) { - out.writeZLong(localCheckpoint); - } - } - - @Override - public void readFrom(final StreamInput in) throws IOException { + RecoveryTranslogOperationsResponse(final StreamInput in) throws IOException { + super(in); // before 6.0.0 we received an empty response so we have to maintain that if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) { localCheckpoint = in.readZLong(); @@ -60,14 +45,12 @@ public class RecoveryTranslogOperationsResponse extends TransportResponse { } } - static TransportResponseHandler HANDLER = - new FutureTransportResponseHandler() { - @Override - public RecoveryTranslogOperationsResponse read(StreamInput in) throws IOException { - RecoveryTranslogOperationsResponse response = new RecoveryTranslogOperationsResponse(); - response.readFrom(in); - return response; - } - }; - + @Override + public void writeTo(final StreamOutput out) throws IOException { + super.writeTo(out); + // before 6.0.0 we responded with an empty response so we have to maintain that + if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) { + out.writeZLong(localCheckpoint); + } + } } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java index 53eb3e342fa..ba703aeee28 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -32,7 +32,6 @@ import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.EmptyTransportResponseHandler; -import org.elasticsearch.transport.TransportFuture; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; @@ -113,17 +112,13 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler { } @Override - public long indexTranslogOperations(List operations, int totalTranslogOps, - long maxSeenAutoIdTimestampOnPrimary, long maxSeqNoOfDeletesOrUpdatesOnPrimary) { - final RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest( + public void indexTranslogOperations(List operations, int totalTranslogOps, long maxSeenAutoIdTimestampOnPrimary, + long maxSeqNoOfDeletesOrUpdatesOnPrimary, ActionListener listener) { + final RecoveryTranslogOperationsRequest request = new RecoveryTranslogOperationsRequest( recoveryId, shardId, operations, totalTranslogOps, maxSeenAutoIdTimestampOnPrimary, maxSeqNoOfDeletesOrUpdatesOnPrimary); - final TransportFuture future = transportService.submitRequest( - targetNode, - PeerRecoveryTargetService.Actions.TRANSLOG_OPS, - translogOperationsRequest, - translogOpsRequestOptions, - RecoveryTranslogOperationsResponse.HANDLER); - return future.txGet().localCheckpoint; + transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.TRANSLOG_OPS, request, translogOpsRequestOptions, + new ActionListenerResponseHandler<>(ActionListener.wrap(r -> listener.onResponse(r.localCheckpoint), listener::onFailure), + RecoveryTranslogOperationsResponse::new, ThreadPool.Names.GENERIC)); } @Override diff --git a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index 0f591f6db54..d7f8a73c225 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -503,10 +503,10 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC return new RecoveryTarget(indexShard, node, recoveryListener, l -> { }) { @Override - public long indexTranslogOperations(List operations, int totalTranslogOps, - long maxSeenAutoIdTimestamp, long maxSeqNoOfUpdates) throws IOException { + public void indexTranslogOperations(List operations, int totalTranslogOps, + long maxSeenAutoIdTimestamp, long msu, ActionListener listener) { opsSent.set(true); - return super.indexTranslogOperations(operations, totalTranslogOps, maxSeenAutoIdTimestamp, maxSeqNoOfUpdates); + super.indexTranslogOperations(operations, totalTranslogOps, maxSeenAutoIdTimestamp, msu, listener); } }; }); @@ -573,9 +573,9 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC replica, (indexShard, node) -> new RecoveryTarget(indexShard, node, recoveryListener, l -> {}) { @Override - public long indexTranslogOperations(final List operations, final int totalTranslogOps, - final long maxAutoIdTimestamp, long maxSeqNoOfUpdates) - throws IOException { + public void indexTranslogOperations(final List operations, final int totalTranslogOps, + final long maxAutoIdTimestamp, long maxSeqNoOfUpdates, + ActionListener listener) { // index a doc which is not part of the snapshot, but also does not complete on replica replicaEngineFactory.latchIndexers(1); threadPool.generic().submit(() -> { @@ -602,7 +602,7 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC } catch (InterruptedException e) { throw new AssertionError(e); } - return super.indexTranslogOperations(operations, totalTranslogOps, maxAutoIdTimestamp, maxSeqNoOfUpdates); + super.indexTranslogOperations(operations, totalTranslogOps, maxAutoIdTimestamp, maxSeqNoOfUpdates, listener); } }); pendingDocActiveWithExtraDocIndexed.await(); @@ -833,12 +833,12 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC } @Override - public long indexTranslogOperations(List operations, int totalTranslogOps, - long maxAutoIdTimestamp, long maxSeqNoOfUpdates) throws IOException { + public void indexTranslogOperations(List operations, int totalTranslogOps, + long maxAutoIdTimestamp, long maxSeqNoOfUpdates, ActionListener listener) { if (hasBlocked() == false) { blockIfNeeded(RecoveryState.Stage.TRANSLOG); } - return super.indexTranslogOperations(operations, totalTranslogOps, maxAutoIdTimestamp, maxSeqNoOfUpdates); + super.indexTranslogOperations(operations, totalTranslogOps, maxAutoIdTimestamp, maxSeqNoOfUpdates, listener); } @Override diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 9bb98da6e96..d2a73caa32d 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -2422,12 +2422,10 @@ public class IndexShardTests extends IndexShardTestCase { new RecoveryTarget(shard, discoveryNode, recoveryListener, aLong -> { }) { @Override - public long indexTranslogOperations(List operations, int totalTranslogOps, long maxSeenAutoIdTimestamp, - long maxSeqNoOfUpdatesOrDeletes) throws IOException { - final long localCheckpoint = super.indexTranslogOperations( - operations, totalTranslogOps, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes); - assertFalse(replica.isSyncNeeded()); - return localCheckpoint; + public void indexTranslogOperations(List operations, int totalTranslogOps, long maxSeenAutoIdTimestamp, + long maxSeqNoOfUpdatesOrDeletes, ActionListener listener){ + super.indexTranslogOperations(operations, totalTranslogOps, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, + ActionListener.runAfter(listener, () -> assertFalse(replica.isSyncNeeded()))); } }, true, true); @@ -2531,13 +2529,14 @@ public class IndexShardTests extends IndexShardTestCase { new RecoveryTarget(shard, discoveryNode, recoveryListener, aLong -> { }) { @Override - public long indexTranslogOperations(List operations, int totalTranslogOps, - long maxAutoIdTimestamp, long maxSeqNoOfUpdatesOrDeletes) throws IOException { - final long localCheckpoint = super.indexTranslogOperations( - operations, totalTranslogOps, maxAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes); - // Shard should now be active since we did recover: - assertTrue(replica.isActive()); - return localCheckpoint; + public void indexTranslogOperations(List operations, int totalTranslogOps, long maxAutoIdTimestamp, + long maxSeqNoOfUpdatesOrDeletes, ActionListener listener){ + super.indexTranslogOperations(operations, totalTranslogOps, maxAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, + ActionListener.wrap(checkpoint -> { + listener.onResponse(checkpoint); + // Shard should now be active since we did recover: + assertTrue(replica.isActive()); + }, listener::onFailure)); } }, false, true); @@ -2580,12 +2579,13 @@ public class IndexShardTests extends IndexShardTestCase { } @Override - public long indexTranslogOperations(List operations, int totalTranslogOps, - long maxAutoIdTimestamp, long maxSeqNoOfUpdatesOrDeletes) throws IOException { - final long localCheckpoint = super.indexTranslogOperations( - operations, totalTranslogOps, maxAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes); - assertListenerCalled.accept(replica); - return localCheckpoint; + public void indexTranslogOperations(List operations, int totalTranslogOps, long maxAutoIdTimestamp, + long maxSeqNoOfUpdatesOrDeletes, ActionListener listener) { + super.indexTranslogOperations(operations, totalTranslogOps, maxAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, + ActionListener.wrap(checkpoint -> { + assertListenerCalled.accept(replica); + listener.onResponse(checkpoint); + }, listener::onFailure)); } @Override diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index 1d58e0be65a..97f2cadfa3a 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -221,15 +221,18 @@ public class RecoverySourceHandlerTests extends ESTestCase { final List shippedOps = new ArrayList<>(); RecoveryTargetHandler recoveryTarget = new TestRecoveryTargetHandler() { @Override - public long indexTranslogOperations(List operations, int totalTranslogOps, long timestamp, long msu) { + public void indexTranslogOperations(List operations, int totalTranslogOps, long timestamp, long msu, + ActionListener listener) { shippedOps.addAll(operations); - return SequenceNumbers.NO_OPS_PERFORMED; + listener.onResponse(SequenceNumbers.NO_OPS_PERFORMED); } }; RecoverySourceHandler handler = new RecoverySourceHandler(shard, recoveryTarget, request, fileChunkSizeInBytes, between(1, 10)); - RecoverySourceHandler.SendSnapshotResult result = handler.phase2(startingSeqNo, requiredStartingSeqNo, - endingSeqNo, newTranslogSnapshot(operations, Collections.emptyList()), randomNonNegativeLong(), randomNonNegativeLong()); + PlainActionFuture future = new PlainActionFuture<>(); + handler.phase2(startingSeqNo, requiredStartingSeqNo, endingSeqNo, newTranslogSnapshot(operations, Collections.emptyList()), + randomNonNegativeLong(), randomNonNegativeLong(), future); final int expectedOps = (int) (endingSeqNo - startingSeqNo + 1); + RecoverySourceHandler.SendSnapshotResult result = future.actionGet(); assertThat(result.totalOperations, equalTo(expectedOps)); shippedOps.sort(Comparator.comparing(Translog.Operation::seqNo)); assertThat(shippedOps.size(), equalTo(expectedOps)); @@ -241,8 +244,12 @@ public class RecoverySourceHandlerTests extends ESTestCase { List requiredOps = operations.subList(0, operations.size() - 1).stream() // remove last null marker .filter(o -> o.seqNo() >= requiredStartingSeqNo && o.seqNo() <= endingSeqNo).collect(Collectors.toList()); List opsToSkip = randomSubsetOf(randomIntBetween(1, requiredOps.size()), requiredOps); - expectThrows(IllegalStateException.class, () -> handler.phase2(startingSeqNo, requiredStartingSeqNo, - endingSeqNo, newTranslogSnapshot(operations, opsToSkip), randomNonNegativeLong(), randomNonNegativeLong())); + PlainActionFuture failedFuture = new PlainActionFuture<>(); + expectThrows(IllegalStateException.class, () -> { + handler.phase2(startingSeqNo, requiredStartingSeqNo, endingSeqNo, newTranslogSnapshot(operations, opsToSkip), + randomNonNegativeLong(), randomNonNegativeLong(), failedFuture); + failedFuture.actionGet(); + }); } } @@ -438,11 +445,12 @@ public class RecoverySourceHandlerTests extends ESTestCase { } @Override - SendSnapshotResult phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, Translog.Snapshot snapshot, - long maxSeenAutoIdTimestamp, long maxSeqNoOfUpdatesOrDeletes) throws IOException { + void phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, Translog.Snapshot snapshot, + long maxSeenAutoIdTimestamp, long maxSeqNoOfUpdatesOrDeletes, + ActionListener listener) throws IOException { phase2Called.set(true); - return super.phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot, - maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes); + super.phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot, + maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, listener); } }; @@ -656,8 +664,8 @@ public class RecoverySourceHandlerTests extends ESTestCase { } @Override - public long indexTranslogOperations(List operations, int totalTranslogOps, long timestamp, long msu) { - return 0; + public void indexTranslogOperations(List operations, int totalTranslogOps, long timestamp, long msu, + ActionListener listener) { } @Override