Recovery should not indefinitely retry on mapping error (#41099)

A stuck peer recovery in #40913 reveals that we indefinitely retry on
new cluster states if indexing translog operations hits a mapper
exception. We should not wait and retry if the mapping on the target is
as recent as the mapping that the primary used to index the replaying
operations.

Relates #40913
This commit is contained in:
Nhat Nguyen 2019-04-23 21:44:25 -04:00
parent 75283294f5
commit 615a0211f0
12 changed files with 148 additions and 31 deletions

View File

@ -3148,7 +3148,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
* which is at least the value of the max_seq_no_of_updates marker on the primary after that operation was executed on the primary.
*
* @see #acquireReplicaOperationPermit(long, long, long, ActionListener, String, Object)
* @see RecoveryTarget#indexTranslogOperations(List, int, long, long, RetentionLeases, ActionListener)
* @see RecoveryTarget#indexTranslogOperations(List, int, long, long, RetentionLeases, long, ActionListener)
*/
public void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) {
assert seqNo != UNASSIGNED_SEQ_NO

View File

@ -33,6 +33,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
@ -119,8 +120,8 @@ public class PeerRecoveryTargetService implements IndexEventListener {
RecoveryCleanFilesRequest::new, new CleanFilesRequestHandler());
transportService.registerRequestHandler(Actions.PREPARE_TRANSLOG, ThreadPool.Names.GENERIC,
RecoveryPrepareForTranslogOperationsRequest::new, new PrepareForTranslogOperationsRequestHandler());
transportService.registerRequestHandler(Actions.TRANSLOG_OPS, RecoveryTranslogOperationsRequest::new, ThreadPool.Names.GENERIC,
new TranslogOperationsRequestHandler());
transportService.registerRequestHandler(Actions.TRANSLOG_OPS, ThreadPool.Names.GENERIC, RecoveryTranslogOperationsRequest::new,
new TranslogOperationsRequestHandler());
transportService.registerRequestHandler(Actions.FINALIZE, RecoveryFinalizeRecoveryRequest::new, ThreadPool.Names.GENERIC, new
FinalizeRecoveryRequestHandler());
transportService.registerRequestHandler(
@ -501,16 +502,21 @@ public class PeerRecoveryTargetService implements IndexEventListener {
}
});
};
final IndexMetaData indexMetaData = clusterService.state().metaData().index(request.shardId().getIndex());
final long mappingVersionOnTarget = indexMetaData != null ? indexMetaData.getMappingVersion() : 0L;
recoveryTarget.indexTranslogOperations(
request.operations(),
request.totalTranslogOps(),
request.maxSeenAutoIdTimestampOnPrimary(),
request.maxSeqNoOfUpdatesOrDeletesOnPrimary(),
request.retentionLeases(),
request.mappingVersionOnPrimary(),
ActionListener.wrap(
checkpoint -> listener.onResponse(new RecoveryTranslogOperationsResponse(checkpoint)),
e -> {
if (e instanceof MapperException) {
// do not retry if the mapping on replica is at least as recent as the mapping
// that the primary used to index the operations in the request.
if (mappingVersionOnTarget < request.mappingVersionOnPrimary() && e instanceof MapperException) {
retryOnMappingException.accept(e);
} else {
listener.onFailure(e);

View File

@ -219,8 +219,9 @@ public class RecoverySourceHandler {
final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp();
final long maxSeqNoOfUpdatesOrDeletes = shard.getMaxSeqNoOfUpdatesOrDeletes();
final RetentionLeases retentionLeases = shard.getRetentionLeases();
final long mappingVersionOnPrimary = shard.indexSettings().getIndexMetaData().getMappingVersion();
phase2(startingSeqNo, endingSeqNo, phase2Snapshot, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes,
retentionLeases, sendSnapshotStep);
retentionLeases, mappingVersionOnPrimary, sendSnapshotStep);
sendSnapshotStep.whenComplete(
r -> IOUtils.close(phase2Snapshot),
e -> {
@ -511,6 +512,7 @@ public class RecoverySourceHandler {
final long maxSeenAutoIdTimestamp,
final long maxSeqNoOfUpdatesOrDeletes,
final RetentionLeases retentionLeases,
final long mappingVersion,
final ActionListener<SendSnapshotResult> listener) throws IOException {
if (shard.state() == IndexShardState.CLOSED) {
throw new IndexShardClosedException(request.shardId());
@ -572,6 +574,7 @@ public class RecoverySourceHandler {
maxSeenAutoIdTimestamp,
maxSeqNoOfUpdatesOrDeletes,
retentionLeases,
mappingVersion,
batchedListener);
}
@ -583,6 +586,7 @@ public class RecoverySourceHandler {
final long maxSeenAutoIdTimestamp,
final long maxSeqNoOfUpdatesOrDeletes,
final RetentionLeases retentionLeases,
final long mappingVersionOnPrimary,
final ActionListener<Long> listener) throws IOException {
assert ThreadPool.assertCurrentMethodIsNotCalledRecursively();
final List<Translog.Operation> operations = nextBatch.get();
@ -595,6 +599,7 @@ public class RecoverySourceHandler {
maxSeenAutoIdTimestamp,
maxSeqNoOfUpdatesOrDeletes,
retentionLeases,
mappingVersionOnPrimary,
ActionListener.wrap(
newCheckpoint -> {
sendBatch(
@ -605,6 +610,7 @@ public class RecoverySourceHandler {
maxSeenAutoIdTimestamp,
maxSeqNoOfUpdatesOrDeletes,
retentionLeases,
mappingVersionOnPrimary,
listener);
},
listener::onFailure

View File

@ -320,6 +320,7 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
final long maxSeenAutoIdTimestampOnPrimary,
final long maxSeqNoOfDeletesOrUpdatesOnPrimary,
final RetentionLeases retentionLeases,
final long mappingVersionOnPrimary,
final ActionListener<Long> listener) {
ActionListener.completeWith(listener, () -> {
final RecoveryState.Translog translog = state().getTranslog();
@ -351,7 +352,7 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
throw new MapperException("mapping updates are not allowed [" + operation + "]");
}
if (result.getFailure() != null) {
if (Assertions.ENABLED) {
if (Assertions.ENABLED && result.getFailure() instanceof MapperException == false) {
throw new AssertionError("unexpected failure while replicating translog entry", result.getFailure());
}
ExceptionsHelper.reThrowIfNotNull(result.getFailure());

View File

@ -65,6 +65,11 @@ public interface RecoveryTargetHandler {
* the primary shard when capturing these operations. This value is at least as high as the
* max_seq_no_of_updates on the primary was when any of these ops were processed on it.
* @param retentionLeases the retention leases on the primary
* @param mappingVersionOnPrimary the mapping version which is at least as up to date as the mapping version that the
* primary used to index translog {@code operations} in this request.
* If the mapping version on the replica is not older this version, we should not retry on
* {@link org.elasticsearch.index.mapper.MapperException}; otherwise we should wait for a
* new mapping then retry.
* @param listener a listener which will be notified with the local checkpoint on the target
* after these operations are successfully indexed on the target.
*/
@ -74,6 +79,7 @@ public interface RecoveryTargetHandler {
long maxSeenAutoIdTimestampOnPrimary,
long maxSeqNoOfUpdatesOrDeletesOnPrimary,
RetentionLeases retentionLeases,
long mappingVersionOnPrimary,
ActionListener<Long> listener);
/**

View File

@ -34,16 +34,14 @@ import java.util.List;
public class RecoveryTranslogOperationsRequest extends TransportRequest {
private long recoveryId;
private ShardId shardId;
private List<Translog.Operation> operations;
private int totalTranslogOps = RecoveryState.Translog.UNKNOWN;
private long maxSeenAutoIdTimestampOnPrimary;
private long maxSeqNoOfUpdatesOrDeletesOnPrimary;
private RetentionLeases retentionLeases;
public RecoveryTranslogOperationsRequest() {
}
private final long recoveryId;
private final ShardId shardId;
private final List<Translog.Operation> operations;
private final int totalTranslogOps;
private final long maxSeenAutoIdTimestampOnPrimary;
private final long maxSeqNoOfUpdatesOrDeletesOnPrimary;
private final RetentionLeases retentionLeases;
private final long mappingVersionOnPrimary;
RecoveryTranslogOperationsRequest(
final long recoveryId,
@ -52,7 +50,8 @@ public class RecoveryTranslogOperationsRequest extends TransportRequest {
final int totalTranslogOps,
final long maxSeenAutoIdTimestampOnPrimary,
final long maxSeqNoOfUpdatesOrDeletesOnPrimary,
final RetentionLeases retentionLeases) {
final RetentionLeases retentionLeases,
final long mappingVersionOnPrimary) {
this.recoveryId = recoveryId;
this.shardId = shardId;
this.operations = operations;
@ -60,6 +59,7 @@ public class RecoveryTranslogOperationsRequest extends TransportRequest {
this.maxSeenAutoIdTimestampOnPrimary = maxSeenAutoIdTimestampOnPrimary;
this.maxSeqNoOfUpdatesOrDeletesOnPrimary = maxSeqNoOfUpdatesOrDeletesOnPrimary;
this.retentionLeases = retentionLeases;
this.mappingVersionOnPrimary = mappingVersionOnPrimary;
}
public long recoveryId() {
@ -90,8 +90,16 @@ public class RecoveryTranslogOperationsRequest extends TransportRequest {
return retentionLeases;
}
@Override
public void readFrom(StreamInput in) throws IOException {
/**
* Returns the mapping version which is at least as up to date as the mapping version that the primary used to index
* the translog operations in this request. If the mapping version on the replica is not older this version, we should not
* retry on {@link org.elasticsearch.index.mapper.MapperException}; otherwise we should wait for a new mapping then retry.
*/
long mappingVersionOnPrimary() {
return mappingVersionOnPrimary;
}
RecoveryTranslogOperationsRequest(StreamInput in) throws IOException {
super.readFrom(in);
recoveryId = in.readLong();
shardId = ShardId.readShardId(in);
@ -113,6 +121,11 @@ public class RecoveryTranslogOperationsRequest extends TransportRequest {
} else {
retentionLeases = RetentionLeases.EMPTY;
}
if (in.getVersion().onOrAfter(Version.V_7_1_0)) {
mappingVersionOnPrimary = in.readVLong();
} else {
mappingVersionOnPrimary = Long.MAX_VALUE;
}
}
@Override
@ -131,5 +144,13 @@ public class RecoveryTranslogOperationsRequest extends TransportRequest {
if (out.getVersion().onOrAfter(Version.V_6_7_0)) {
retentionLeases.writeTo(out);
}
if (out.getVersion().onOrAfter(Version.V_7_1_0)) {
out.writeVLong(mappingVersionOnPrimary);
}
}
@Override
public void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
}

View File

@ -112,6 +112,7 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
final long maxSeenAutoIdTimestampOnPrimary,
final long maxSeqNoOfDeletesOrUpdatesOnPrimary,
final RetentionLeases retentionLeases,
final long mappingVersionOnPrimary,
final ActionListener<Long> listener) {
final RecoveryTranslogOperationsRequest request = new RecoveryTranslogOperationsRequest(
recoveryId,
@ -120,7 +121,8 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
totalTranslogOps,
maxSeenAutoIdTimestampOnPrimary,
maxSeqNoOfDeletesOrUpdatesOnPrimary,
retentionLeases);
retentionLeases,
mappingVersionOnPrimary);
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.TRANSLOG_OPS, request, translogOpsRequestOptions,
new ActionListenerResponseHandler<>(ActionListener.map(listener, r -> r.localCheckpoint),
RecoveryTranslogOperationsResponse::new, ThreadPool.Names.GENERIC));

View File

@ -564,6 +564,7 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
final long maxAutoIdTimestamp,
final long maxSeqNoOfUpdates,
final RetentionLeases retentionLeases,
final long mappingVersion,
final ActionListener<Long> listener) {
// index a doc which is not part of the snapshot, but also does not complete on replica
replicaEngineFactory.latchIndexers(1);
@ -597,6 +598,7 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
maxAutoIdTimestamp,
maxSeqNoOfUpdates,
retentionLeases,
mappingVersion,
listener);
}
});
@ -845,11 +847,13 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
final long maxAutoIdTimestamp,
final long maxSeqNoOfUpdates,
final RetentionLeases retentionLeases,
final long mappingVersion,
final ActionListener<Long> listener) {
if (hasBlocked() == false) {
blockIfNeeded(RecoveryState.Stage.TRANSLOG);
}
super.indexTranslogOperations(operations, totalTranslogOps, maxAutoIdTimestamp, maxSeqNoOfUpdates, retentionLeases, listener);
super.indexTranslogOperations(
operations, totalTranslogOps, maxAutoIdTimestamp, maxSeqNoOfUpdates, retentionLeases, mappingVersion, listener);
}
@Override

View File

@ -2472,6 +2472,7 @@ public class IndexShardTests extends IndexShardTestCase {
final long maxSeenAutoIdTimestamp,
final long maxSeqNoOfUpdatesOrDeletes,
final RetentionLeases retentionLeases,
final long mappingVersion,
final ActionListener<Long> listener){
super.indexTranslogOperations(
operations,
@ -2479,6 +2480,7 @@ public class IndexShardTests extends IndexShardTestCase {
maxSeenAutoIdTimestamp,
maxSeqNoOfUpdatesOrDeletes,
retentionLeases,
mappingVersion,
ActionListener.wrap(
r -> {
assertFalse(replica.isSyncNeeded());
@ -2594,6 +2596,7 @@ public class IndexShardTests extends IndexShardTestCase {
final long maxAutoIdTimestamp,
final long maxSeqNoOfUpdatesOrDeletes,
final RetentionLeases retentionLeases,
final long mappingVersion,
final ActionListener<Long> listener){
super.indexTranslogOperations(
operations,
@ -2601,6 +2604,7 @@ public class IndexShardTests extends IndexShardTestCase {
maxAutoIdTimestamp,
maxSeqNoOfUpdatesOrDeletes,
retentionLeases,
mappingVersion,
ActionListener.wrap(
checkpoint -> {
listener.onResponse(checkpoint);
@ -2659,6 +2663,7 @@ public class IndexShardTests extends IndexShardTestCase {
final long maxAutoIdTimestamp,
final long maxSeqNoOfUpdatesOrDeletes,
final RetentionLeases retentionLeases,
final long mappingVersion,
final ActionListener<Long> listener) {
super.indexTranslogOperations(
operations,
@ -2666,6 +2671,7 @@ public class IndexShardTests extends IndexShardTestCase {
maxAutoIdTimestamp,
maxSeqNoOfUpdatesOrDeletes,
retentionLeases,
mappingVersion,
ActionListener.wrap(
r -> {
assertListenerCalled.accept(replica);

View File

@ -19,6 +19,7 @@
package org.elasticsearch.indices.recovery;
import org.apache.lucene.analysis.TokenStream;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
@ -35,6 +36,7 @@ import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RecoverySource.PeerRecoverySource;
import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
@ -46,12 +48,18 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.analysis.AbstractTokenFilterFactory;
import org.elasticsearch.index.analysis.TokenFilterFactory;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.recovery.RecoveryStats;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.analysis.AnalysisModule;
import org.elasticsearch.indices.recovery.RecoveryState.Stage;
import org.elasticsearch.node.RecoverySettingsChunkSizePlugin;
import org.elasticsearch.plugins.AnalysisPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.BackgroundIndexer;
@ -78,10 +86,12 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import static java.util.Collections.singletonMap;
import static org.elasticsearch.node.RecoverySettingsChunkSizePlugin.CHUNK_SIZE_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
@ -109,7 +119,7 @@ public class IndexRecoveryIT extends ESIntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class, MockFSIndexStore.TestPlugin.class,
RecoverySettingsChunkSizePlugin.class);
RecoverySettingsChunkSizePlugin.class, TestAnalysisPlugin.class);
}
@After
@ -863,4 +873,57 @@ public class IndexRecoveryIT extends ESIntegTestCase {
assertThat(recoveryStates.get(0).getIndex().totalFileCount(), is(0));
assertThat(recoveryStates.get(0).getTranslog().recoveredOperations(), greaterThan(0));
}
public void testDoNotInfinitelyWaitForMapping() {
internalCluster().ensureAtLeastNumDataNodes(3);
createIndex("test", Settings.builder()
.put("index.analysis.analyzer.test_analyzer.type", "custom")
.put("index.analysis.analyzer.test_analyzer.tokenizer", "standard")
.putList("index.analysis.analyzer.test_analyzer.filter", "test_token_filter")
.put("index.number_of_replicas", 0).put("index.number_of_shards", 1).build());
client().admin().indices().preparePutMapping("test")
.setType("_doc").setSource("test_field", "type=text,analyzer=test_analyzer").get();
int numDocs = between(1, 10);
for (int i = 0; i < numDocs; i++) {
client().prepareIndex("test", "_doc", "u" + i)
.setSource(singletonMap("test_field", Integer.toString(i)), XContentType.JSON).get();
}
Semaphore recoveryBlocked = new Semaphore(1);
for (DiscoveryNode node : clusterService().state().nodes()) {
MockTransportService transportService = (MockTransportService) internalCluster().getInstance(
TransportService.class, node.getName());
transportService.addSendBehavior((connection, requestId, action, request, options) -> {
if (action.equals(PeerRecoverySourceService.Actions.START_RECOVERY)) {
if (recoveryBlocked.tryAcquire()) {
PluginsService pluginService = internalCluster().getInstance(PluginsService.class, node.getName());
for (TestAnalysisPlugin plugin : pluginService.filterPlugins(TestAnalysisPlugin.class)) {
plugin.throwParsingError.set(true);
}
}
}
connection.sendRequest(requestId, action, request, options);
});
}
client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put("index.number_of_replicas", 1)).get();
ensureGreen("test");
client().admin().indices().prepareRefresh("test").get();
assertHitCount(client().prepareSearch().get(), numDocs);
}
public static final class TestAnalysisPlugin extends Plugin implements AnalysisPlugin {
final AtomicBoolean throwParsingError = new AtomicBoolean();
@Override
public Map<String, AnalysisModule.AnalysisProvider<TokenFilterFactory>> getTokenFilters() {
return singletonMap("test_token_filter",
(indexSettings, environment, name, settings) -> new AbstractTokenFilterFactory(indexSettings, name, settings) {
@Override
public TokenStream create(TokenStream tokenStream) {
if (throwParsingError.get()) {
throw new MapperParsingException("simulate mapping parsing error");
}
return tokenStream;
}
});
}
}
}

View File

@ -237,7 +237,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
RecoveryTargetHandler recoveryTarget = new TestRecoveryTargetHandler() {
@Override
public void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long timestamp, long msu,
RetentionLeases retentionLeases, ActionListener<Long> listener) {
RetentionLeases retentionLeases, long mappingVersion, ActionListener<Long> listener) {
shippedOps.addAll(operations);
checkpointOnTarget.set(randomLongBetween(checkpointOnTarget.get(), Long.MAX_VALUE));
listener.onResponse(checkpointOnTarget.get()); }
@ -246,7 +246,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
shard, new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()), request, fileChunkSizeInBytes, between(1, 10));
PlainActionFuture<RecoverySourceHandler.SendSnapshotResult> future = new PlainActionFuture<>();
handler.phase2(startingSeqNo, endingSeqNo, newTranslogSnapshot(operations, Collections.emptyList()),
randomNonNegativeLong(), randomNonNegativeLong(), RetentionLeases.EMPTY, future);
randomNonNegativeLong(), randomNonNegativeLong(), RetentionLeases.EMPTY, randomNonNegativeLong(), future);
final int expectedOps = (int) (endingSeqNo - startingSeqNo + 1);
RecoverySourceHandler.SendSnapshotResult result = future.actionGet();
assertThat(result.totalOperations, equalTo(expectedOps));
@ -272,7 +272,8 @@ public class RecoverySourceHandlerTests extends ESTestCase {
RecoveryTargetHandler recoveryTarget = new TestRecoveryTargetHandler() {
@Override
public void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long timestamp,
long msu, RetentionLeases retentionLeases, ActionListener<Long> listener) {
long msu, RetentionLeases retentionLeases, long mappingVersion,
ActionListener<Long> listener) {
if (randomBoolean()) {
listener.onResponse(SequenceNumbers.NO_OPS_PERFORMED);
} else {
@ -287,7 +288,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
final long startingSeqNo = randomLongBetween(0, ops.size() - 1L);
final long endingSeqNo = randomLongBetween(startingSeqNo, ops.size() - 1L);
handler.phase2(startingSeqNo, endingSeqNo, newTranslogSnapshot(ops, Collections.emptyList()),
randomNonNegativeLong(), randomNonNegativeLong(), RetentionLeases.EMPTY, future);
randomNonNegativeLong(), randomNonNegativeLong(), RetentionLeases.EMPTY, randomNonNegativeLong(), future);
if (wasFailed.get()) {
assertThat(expectThrows(RuntimeException.class, () -> future.actionGet()).getMessage(), equalTo("test - failed to index"));
}
@ -487,10 +488,10 @@ public class RecoverySourceHandlerTests extends ESTestCase {
@Override
void phase2(long startingSeqNo, long endingSeqNo, Translog.Snapshot snapshot,
long maxSeenAutoIdTimestamp, long maxSeqNoOfUpdatesOrDeletes, RetentionLeases retentionLeases,
ActionListener<SendSnapshotResult> listener) throws IOException {
long mappingVersion, ActionListener<SendSnapshotResult> listener) throws IOException {
phase2Called.set(true);
super.phase2(startingSeqNo, endingSeqNo, snapshot,
maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, retentionLeases, listener);
maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, retentionLeases, mappingVersion, listener);
}
};
@ -706,6 +707,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
final long timestamp,
final long msu,
final RetentionLeases retentionLeases,
final long mappingVersion,
final ActionListener<Long> listener) {
}

View File

@ -63,9 +63,9 @@ public class AsyncRecoveryTarget implements RecoveryTargetHandler {
@Override
public void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps,
long maxSeenAutoIdTimestampOnPrimary, long maxSeqNoOfDeletesOrUpdatesOnPrimary,
RetentionLeases retentionLeases, ActionListener<Long> listener) {
executor.execute(() -> target.indexTranslogOperations(
operations, totalTranslogOps, maxSeenAutoIdTimestampOnPrimary, maxSeqNoOfDeletesOrUpdatesOnPrimary, retentionLeases, listener));
RetentionLeases retentionLeases, long mappingVersionOnPrimary, ActionListener<Long> listener) {
executor.execute(() -> target.indexTranslogOperations(operations, totalTranslogOps, maxSeenAutoIdTimestampOnPrimary,
maxSeqNoOfDeletesOrUpdatesOnPrimary, retentionLeases, mappingVersionOnPrimary, listener));
}
@Override