Trim local translog in peer recovery (#44756)

Today, if an operation-based peer recovery occurs, we won't trim
translog but leave it as is. Some unacknowledged operations existing in
translog of that replica might suddenly reappear when it gets promoted.
With this change, we ensure trimming translog above the starting
sequence number of phase 2. This change can allow us to read translog
forward.
This commit is contained in:
Nhat Nguyen 2019-08-03 16:17:23 -04:00
parent 1cd464d675
commit 25c6102101
17 changed files with 202 additions and 46 deletions

View File

@ -1101,7 +1101,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
/**
* Rolls the tranlog generation and cleans unneeded.
*/
private void rollTranslogGeneration() {
public void rollTranslogGeneration() {
final Engine engine = getEngine();
engine.rollTranslogGeneration();
}

View File

@ -63,6 +63,7 @@ final class MultiSnapshot implements Translog.Snapshot {
@Override
public Translog.Operation next() throws IOException {
// TODO: Read translog forward in 9.0+
for (; index >= 0; index--) {
final TranslogSnapshot current = translogs[index];
Translog.Operation op;

View File

@ -379,7 +379,7 @@ public class PeerRecoveryTargetService implements IndexEventListener {
public void messageReceived(RecoveryFinalizeRecoveryRequest request, TransportChannel channel, Task task) throws Exception {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
final ActionListener<TransportResponse> listener = new ChannelActionListener<>(channel, Actions.FINALIZE, request);
recoveryRef.target().finalizeRecovery(request.globalCheckpoint(),
recoveryRef.target().finalizeRecovery(request.globalCheckpoint(), request.trimAboveSeqNo(),
ActionListener.map(listener, nullVal -> TransportResponse.Empty.INSTANCE));
}
}

View File

@ -28,27 +28,30 @@ import org.elasticsearch.transport.TransportRequest;
import java.io.IOException;
public class RecoveryFinalizeRecoveryRequest extends TransportRequest {
final class RecoveryFinalizeRecoveryRequest extends TransportRequest {
private long recoveryId;
private ShardId shardId;
private long globalCheckpoint;
private final long recoveryId;
private final ShardId shardId;
private final long globalCheckpoint;
private final long trimAboveSeqNo;
public RecoveryFinalizeRecoveryRequest(StreamInput in) throws IOException {
RecoveryFinalizeRecoveryRequest(StreamInput in) throws IOException {
super(in);
recoveryId = in.readLong();
shardId = new ShardId(in);
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
globalCheckpoint = in.readZLong();
globalCheckpoint = in.readZLong();
if (in.getVersion().onOrAfter(Version.V_7_4_0)) {
trimAboveSeqNo = in.readZLong();
} else {
globalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
trimAboveSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
}
}
RecoveryFinalizeRecoveryRequest(final long recoveryId, final ShardId shardId, final long globalCheckpoint) {
RecoveryFinalizeRecoveryRequest(final long recoveryId, final ShardId shardId, final long globalCheckpoint, final long trimAboveSeqNo) {
this.recoveryId = recoveryId;
this.shardId = shardId;
this.globalCheckpoint = globalCheckpoint;
this.trimAboveSeqNo = trimAboveSeqNo;
}
public long recoveryId() {
@ -63,13 +66,18 @@ public class RecoveryFinalizeRecoveryRequest extends TransportRequest {
return globalCheckpoint;
}
public long trimAboveSeqNo() {
return trimAboveSeqNo;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(recoveryId);
shardId.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
out.writeZLong(globalCheckpoint);
out.writeZLong(globalCheckpoint);
if (out.getVersion().onOrAfter(Version.V_7_4_0)) {
out.writeZLong(trimAboveSeqNo);
}
}

View File

@ -327,7 +327,9 @@ public class RecoverySourceHandler {
}, onFailure);
sendSnapshotStep.whenComplete(r -> finalizeRecovery(r.targetLocalCheckpoint, finalizeStep), onFailure);
// Recovery target can trim all operations >= startingSeqNo as we have sent all these operations in the phase 2
final long trimAboveSeqNo = startingSeqNo - 1;
sendSnapshotStep.whenComplete(r -> finalizeRecovery(r.targetLocalCheckpoint, trimAboveSeqNo, finalizeStep), onFailure);
finalizeStep.whenComplete(r -> {
final long phase1ThrottlingWaitTime = 0L; // TODO: return the actual throttle time
@ -750,7 +752,7 @@ public class RecoverySourceHandler {
}
}
void finalizeRecovery(final long targetLocalCheckpoint, final ActionListener<Void> listener) throws IOException {
void finalizeRecovery(long targetLocalCheckpoint, long trimAboveSeqNo, ActionListener<Void> listener) throws IOException {
if (shard.state() == IndexShardState.CLOSED) {
throw new IndexShardClosedException(request.shardId());
}
@ -767,7 +769,7 @@ public class RecoverySourceHandler {
shardId + " marking " + request.targetAllocationId() + " as in sync", shard, cancellableThreads, logger);
final long globalCheckpoint = shard.getLastKnownGlobalCheckpoint(); // this global checkpoint is persisted in finalizeRecovery
final StepListener<Void> finalizeListener = new StepListener<>();
cancellableThreads.executeIO(() -> recoveryTarget.finalizeRecovery(globalCheckpoint, finalizeListener));
cancellableThreads.executeIO(() -> recoveryTarget.finalizeRecovery(globalCheckpoint, trimAboveSeqNo, finalizeListener));
finalizeListener.whenComplete(r -> {
runUnderPrimaryPermit(() -> shard.updateGlobalCheckpointForShard(request.targetAllocationId(), globalCheckpoint),
shardId + " updating " + request.targetAllocationId() + "'s global checkpoint", shard, cancellableThreads, logger);

View File

@ -290,9 +290,8 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
}
@Override
public void finalizeRecovery(final long globalCheckpoint, ActionListener<Void> listener) {
public void finalizeRecovery(final long globalCheckpoint, final long trimAboveSeqNo, ActionListener<Void> listener) {
ActionListener.completeWith(listener, () -> {
final IndexShard indexShard = indexShard();
indexShard.updateGlobalCheckpointOnReplica(globalCheckpoint, "finalizing recovery");
// Persist the global checkpoint.
indexShard.sync();
@ -300,6 +299,15 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
if (hasUncommittedOperations()) {
indexShard.flush(new FlushRequest().force(true).waitIfOngoing(true));
}
if (trimAboveSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
// We should erase all translog operations above trimAboveSeqNo as we have received either the same or a newer copy
// from the recovery source in phase2. Rolling a new translog generation is not strictly required here for we won't
// trim the current generation. It's merely to satisfy the assumption that the current generation does not have any
// operation that would be trimmed (see TranslogWriter#assertNoSeqAbove). This assumption does not hold for peer
// recovery because we could have received operations above startingSeqNo from the previous primary terms.
indexShard.rollTranslogGeneration();
indexShard.trimOperationOfPreviousPrimaryTerms(trimAboveSeqNo);
}
indexShard.finalizeRecovery();
return null;
});

View File

@ -42,9 +42,11 @@ public interface RecoveryTargetHandler {
* the global checkpoint.
*
* @param globalCheckpoint the global checkpoint on the recovery source
* @param trimAboveSeqNo The recovery target should erase its existing translog above this sequence number
* from the previous primary terms.
* @param listener the listener which will be notified when this method is completed
*/
void finalizeRecovery(long globalCheckpoint, ActionListener<Void> listener);
void finalizeRecovery(long globalCheckpoint, long trimAboveSeqNo, ActionListener<Void> listener);
/**
* Handoff the primary context between the relocation source and the relocation target.

View File

@ -86,9 +86,9 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
}
@Override
public void finalizeRecovery(final long globalCheckpoint, final ActionListener<Void> listener) {
public void finalizeRecovery(final long globalCheckpoint, final long trimAboveSeqNo, final ActionListener<Void> listener) {
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.FINALIZE,
new RecoveryFinalizeRecoveryRequest(recoveryId, shardId, globalCheckpoint),
new RecoveryFinalizeRecoveryRequest(recoveryId, shardId, globalCheckpoint, trimAboveSeqNo),
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionLongTimeout()).build(),
new ActionListenerResponseHandler<>(ActionListener.map(listener, r -> null),
in -> TransportResponse.Empty.INSTANCE, ThreadPool.Names.GENERIC));

View File

@ -134,6 +134,7 @@ import org.elasticsearch.index.shard.ShardUtils;
import org.elasticsearch.index.store.FsDirectoryFactory;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.SnapshotMatchers;
import org.elasticsearch.index.translog.TestTranslog;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
@ -184,6 +185,7 @@ import static org.elasticsearch.index.engine.Engine.Operation.Origin.LOCAL_TRANS
import static org.elasticsearch.index.engine.Engine.Operation.Origin.PEER_RECOVERY;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA;
import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;
@ -5938,4 +5940,34 @@ public class InternalEngineTests extends EngineTestCase {
assertNotNull(engine.failedEngine.get());
}
}
/**
* We can trim translog on primary promotion and peer recovery based on the fact we add operations with either
* REPLICA or PEER_RECOVERY origin to translog although they already exist in the engine (i.e. hasProcessed() == true).
* If we decide not to add those already-processed operations to translog, we need to study carefully the consequence
* of the translog trimming in these two places.
*/
public void testAlwaysRecordReplicaOrPeerRecoveryOperationsToTranslog() throws Exception {
List<Engine.Operation> operations = generateHistoryOnReplica(between(1, 100), randomBoolean(), randomBoolean(), randomBoolean());
applyOperations(engine, operations);
Set<Long> seqNos = operations.stream().map(Engine.Operation::seqNo).collect(Collectors.toSet());
try (Translog.Snapshot snapshot = getTranslog(engine).newSnapshot()) {
assertThat(snapshot.totalOperations(), equalTo(operations.size()));
assertThat(TestTranslog.drainSnapshot(snapshot, false).stream().map(Translog.Operation::seqNo).collect(Collectors.toSet()),
equalTo(seqNos));
}
primaryTerm.set(randomLongBetween(primaryTerm.get(), Long.MAX_VALUE));
engine.rollTranslogGeneration();
engine.trimOperationsFromTranslog(primaryTerm.get(), NO_OPS_PERFORMED); // trim everything in translog
try (Translog.Snapshot snapshot = getTranslog(engine).newSnapshot()) {
assertThat(snapshot.totalOperations(), equalTo(operations.size()));
assertNull(snapshot.next());
}
applyOperations(engine, operations);
try (Translog.Snapshot snapshot = getTranslog(engine).newSnapshot()) {
assertThat(snapshot.totalOperations(), equalTo(operations.size() * 2));
assertThat(TestTranslog.drainSnapshot(snapshot, false).stream().map(Translog.Operation::seqNo).collect(Collectors.toSet()),
equalTo(seqNos));
}
}
}

View File

@ -504,9 +504,9 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
recoveryStart.countDown();
return new RecoveryTarget(indexShard, node, recoveryListener) {
@Override
public void finalizeRecovery(long globalCheckpoint, ActionListener<Void> listener) {
public void finalizeRecovery(long globalCheckpoint, long trimAboveSeqNo, ActionListener<Void> listener) {
recoveryDone.set(true);
super.finalizeRecovery(globalCheckpoint, listener);
super.finalizeRecovery(globalCheckpoint, trimAboveSeqNo, listener);
}
};
});
@ -868,13 +868,13 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
}
@Override
public void finalizeRecovery(long globalCheckpoint, ActionListener<Void> listener) {
public void finalizeRecovery(long globalCheckpoint, long trimAboveSeqNo, ActionListener<Void> listener) {
if (hasBlocked() == false) {
// it maybe that not ops have been transferred, block now
blockIfNeeded(RecoveryState.Stage.TRANSLOG);
}
blockIfNeeded(RecoveryState.Stage.FINALIZE);
super.finalizeRecovery(globalCheckpoint, listener);
super.finalizeRecovery(globalCheckpoint, trimAboveSeqNo, listener);
}
}

View File

@ -1065,7 +1065,7 @@ public class IndexShardTests extends IndexShardTestCase {
onFailureException.get(), hasToString(containsString("operation primary term [" + oldPrimaryTerm + "] is too old")));
}
closeShards(indexShard);
closeShard(indexShard, false); // skip asserting translog and Lucene as we rolled back Lucene but did not execute resync
}
public void testAcquireReplicaPermitAdvanceMaxSeqNoOfUpdates() throws Exception {
@ -2767,8 +2767,8 @@ public class IndexShardTests extends IndexShardTestCase {
}
@Override
public void finalizeRecovery(long globalCheckpoint, ActionListener<Void> listener) {
super.finalizeRecovery(globalCheckpoint,
public void finalizeRecovery(long globalCheckpoint, long trimAboveSeqNo, ActionListener<Void> listener) {
super.finalizeRecovery(globalCheckpoint, trimAboveSeqNo,
ActionListener.wrap(
r -> {
assertListenerCalled.accept(replica);

View File

@ -20,6 +20,7 @@
package org.elasticsearch.indices.recovery;
import org.apache.lucene.analysis.TokenStream;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
@ -33,6 +34,7 @@ import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
@ -75,6 +77,7 @@ import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.analysis.AnalysisModule;
import org.elasticsearch.indices.flush.SyncedFlushUtil;
import org.elasticsearch.indices.recovery.RecoveryState.Stage;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.node.RecoverySettingsChunkSizePlugin;
import org.elasticsearch.plugins.AnalysisPlugin;
import org.elasticsearch.plugins.Plugin;
@ -101,6 +104,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -117,6 +121,8 @@ import java.util.stream.StreamSupport;
import static java.util.Collections.singletonMap;
import static java.util.stream.Collectors.toList;
import static org.elasticsearch.action.DocWriteResponse.Result.CREATED;
import static org.elasticsearch.action.DocWriteResponse.Result.UPDATED;
import static org.elasticsearch.node.RecoverySettingsChunkSizePlugin.CHUNK_SIZE_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
@ -125,6 +131,7 @@ import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.isOneOf;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
@ -1429,4 +1436,56 @@ public class IndexRecoveryIT extends ESIntegTestCase {
assertThat(shardStats.getSeqNoStats().getGlobalCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED));
}
}
public void testPeerRecoveryTrimsLocalTranslog() throws Exception {
internalCluster().startNode();
List<String> dataNodes = internalCluster().startDataOnlyNodes(2);
String indexName = "test-index";
createIndex(indexName, Settings.builder()
.put("index.number_of_shards", 1).put("index.number_of_replicas", 1)
.put("index.routing.allocation.include._name", String.join(",", dataNodes)).build());
ensureGreen(indexName);
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
DiscoveryNode nodeWithOldPrimary = clusterState.nodes().get(clusterState.routingTable()
.index(indexName).shard(0).primaryShard().currentNodeId());
MockTransportService transportService = (MockTransportService) internalCluster()
.getInstance(TransportService.class, nodeWithOldPrimary.getName());
CountDownLatch readyToRestartNode = new CountDownLatch(1);
AtomicBoolean stopped = new AtomicBoolean();
transportService.addSendBehavior((connection, requestId, action, request, options) -> {
if (action.equals("indices:data/write/bulk[s][r]") && randomInt(100) < 5) {
throw new NodeClosedException(nodeWithOldPrimary);
}
// prevent the primary from marking the replica as stale so the replica can get promoted.
if (action.equals("internal:cluster/shard/failure")) {
stopped.set(true);
readyToRestartNode.countDown();
throw new NodeClosedException(nodeWithOldPrimary);
}
connection.sendRequest(requestId, action, request, options);
});
Thread[] indexers = new Thread[randomIntBetween(1, 8)];
for (int i = 0; i < indexers.length; i++) {
indexers[i] = new Thread(() -> {
while (stopped.get() == false) {
try {
IndexResponse response = client().prepareIndex(indexName, "_doc")
.setSource(Collections.singletonMap("f" + randomIntBetween(1, 10), randomNonNegativeLong()), XContentType.JSON)
.get();
assertThat(response.getResult(), isOneOf(CREATED, UPDATED));
} catch (ElasticsearchException ignored) {
}
}
});
}
for (Thread indexer : indexers) {
indexer.start();
}
readyToRestartNode.await();
transportService.clearAllRules();
internalCluster().restartNode(nodeWithOldPrimary.getName(), new InternalTestCluster.RestartCallback());
for (Thread indexer : indexers) {
indexer.join();
}
ensureGreen(indexName);
}
}

View File

@ -782,7 +782,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
}
@Override
public void finalizeRecovery(long globalCheckpoint, ActionListener<Void> listener) {
public void finalizeRecovery(long globalCheckpoint, long trimAboveSeqNo, ActionListener<Void> listener) {
}
@Override

View File

@ -30,6 +30,7 @@ import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.bulk.BulkShardRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.ShardRouting;
@ -41,6 +42,7 @@ import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.MergePolicyConfig;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.DocIdSeqNoAndSource;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.InternalEngineFactory;
@ -461,4 +463,44 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase {
closeShards(replica);
}
}
public void testRecoveryTrimsLocalTranslog() throws Exception {
try (ReplicationGroup shards = createGroup(between(1, 2))) {
shards.startAll();
IndexShard oldPrimary = shards.getPrimary();
shards.indexDocs(scaledRandomIntBetween(1, 100));
if (randomBoolean()) {
shards.flush();
}
int inflightDocs = scaledRandomIntBetween(1, 100);
for (int i = 0; i < inflightDocs; i++) {
final IndexRequest indexRequest = new IndexRequest(index.getName(), "type", "extra_" + i).source("{}", XContentType.JSON);
final BulkShardRequest bulkShardRequest = indexOnPrimary(indexRequest, oldPrimary);
for (IndexShard replica : randomSubsetOf(shards.getReplicas())) {
indexOnReplica(bulkShardRequest, shards, replica);
}
if (rarely()) {
shards.flush();
}
}
shards.syncGlobalCheckpoint();
shards.promoteReplicaToPrimary(randomFrom(shards.getReplicas())).get();
oldPrimary.close("demoted", false);
oldPrimary.store().close();
oldPrimary = shards.addReplicaWithExistingPath(oldPrimary.shardPath(), oldPrimary.routingEntry().currentNodeId());
shards.recoverReplica(oldPrimary);
for (IndexShard shard : shards) {
assertConsistentHistoryBetweenTranslogAndLucene(shard);
}
final List<DocIdSeqNoAndSource> docsAfterRecovery = getDocIdAndSeqNos(shards.getPrimary());
for (IndexShard shard : shards.getReplicas()) {
assertThat(shard.routingEntry().toString(), getDocIdAndSeqNos(shard), equalTo(docsAfterRecovery));
}
shards.promoteReplicaToPrimary(oldPrimary).get();
for (IndexShard shard : shards) {
assertThat(shard.routingEntry().toString(), getDocIdAndSeqNos(shard), equalTo(docsAfterRecovery));
assertConsistentHistoryBetweenTranslogAndLucene(shard);
}
}
}
}

View File

@ -115,7 +115,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -132,11 +131,13 @@ import java.util.stream.Collectors;
import static java.util.Collections.emptyList;
import static java.util.Collections.shuffle;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.PEER_RECOVERY;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA;
import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;
public abstract class EngineTestCase extends ESTestCase {
@ -850,14 +851,15 @@ public abstract class EngineTestCase extends ESTestCase {
switch (opType) {
case INDEX:
operations.add(new Engine.Index(EngineTestCase.newUid(doc), doc, seqNo, primaryTerm.get(),
i, null, Engine.Operation.Origin.REPLICA, startTime, -1, true, SequenceNumbers.UNASSIGNED_SEQ_NO, 0));
i, null, randomFrom(REPLICA, PEER_RECOVERY), startTime, -1, true, SequenceNumbers.UNASSIGNED_SEQ_NO, 0));
break;
case DELETE:
operations.add(new Engine.Delete(doc.type(), doc.id(), EngineTestCase.newUid(doc), seqNo, primaryTerm.get(),
i, null, Engine.Operation.Origin.REPLICA, startTime, SequenceNumbers.UNASSIGNED_SEQ_NO, 0));
i, null, randomFrom(REPLICA, PEER_RECOVERY), startTime, SequenceNumbers.UNASSIGNED_SEQ_NO, 0));
break;
case NO_OP:
operations.add(new Engine.NoOp(seqNo, primaryTerm.get(), Engine.Operation.Origin.REPLICA, startTime, "test-" + i));
operations.add(new Engine.NoOp(seqNo, primaryTerm.get(),
randomFrom(REPLICA, PEER_RECOVERY), startTime, "test-" + i));
break;
default:
throw new IllegalStateException("Unknown operation type [" + opType + "]");
@ -1056,8 +1058,7 @@ public abstract class EngineTestCase extends ESTestCase {
*/
public static List<Translog.Operation> readAllOperationsInLucene(Engine engine, MapperService mapper) throws IOException {
final List<Translog.Operation> operations = new ArrayList<>();
long maxSeqNo = Math.max(0, ((InternalEngine)engine).getLocalCheckpointTracker().getMaxSeqNo());
try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", mapper, 0, maxSeqNo, false)) {
try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", mapper, 0, Long.MAX_VALUE, false)) {
Translog.Operation op;
while ((op = snapshot.next()) != null){
operations.add(op);
@ -1075,18 +1076,19 @@ public abstract class EngineTestCase extends ESTestCase {
return;
}
final long maxSeqNo = ((InternalEngine) engine).getLocalCheckpointTracker().getMaxSeqNo();
if (maxSeqNo < 0) {
return; // nothing to check
}
final Map<Long, Translog.Operation> translogOps = new HashMap<>();
final List<Translog.Operation> translogOps = new ArrayList<>();
try (Translog.Snapshot snapshot = EngineTestCase.getTranslog(engine).newSnapshot()) {
Translog.Operation op;
while ((op = snapshot.next()) != null) {
translogOps.put(op.seqNo(), op);
assertThat("translog operation [" + op + "] > max_seq_no[" + maxSeqNo + "]", op.seqNo(), lessThanOrEqualTo(maxSeqNo));
translogOps.add(op);
}
}
final Map<Long, Translog.Operation> luceneOps = readAllOperationsInLucene(engine, mapper).stream()
.collect(Collectors.toMap(Translog.Operation::seqNo, Function.identity()));
for (Translog.Operation op : luceneOps.values()) {
assertThat("lucene operation [" + op + "] > max_seq_no[" + maxSeqNo + "]", op.seqNo(), lessThanOrEqualTo(maxSeqNo));
}
final long globalCheckpoint = EngineTestCase.getTranslog(engine).getLastSyncedGlobalCheckpoint();
final long retainedOps = engine.config().getIndexSettings().getSoftDeleteRetentionOperations();
final long seqNoForRecovery;
@ -1094,10 +1096,10 @@ public abstract class EngineTestCase extends ESTestCase {
seqNoForRecovery = Long.parseLong(safeCommit.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1;
}
final long minSeqNoToRetain = Math.min(seqNoForRecovery, globalCheckpoint + 1 - retainedOps);
for (Translog.Operation translogOp : translogOps.values()) {
for (Translog.Operation translogOp : translogOps) {
final Translog.Operation luceneOp = luceneOps.get(translogOp.seqNo());
if (luceneOp == null) {
if (minSeqNoToRetain <= translogOp.seqNo() && translogOp.seqNo() <= maxSeqNo) {
if (minSeqNoToRetain <= translogOp.seqNo()) {
fail("Operation not found seq# [" + translogOp.seqNo() + "], global checkpoint [" + globalCheckpoint + "], " +
"retention policy [" + retainedOps + "], maxSeqNo [" + maxSeqNo + "], translog op [" + translogOp + "]");
} else {

View File

@ -814,7 +814,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
/**
* indexes the given requests on the supplied primary, modifying it for replicas
*/
BulkShardRequest indexOnPrimary(IndexRequest request, IndexShard primary) throws Exception {
public BulkShardRequest indexOnPrimary(IndexRequest request, IndexShard primary) throws Exception {
return executeReplicationRequestOnPrimary(primary, request);
}
@ -828,7 +828,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
/**
* indexes the given requests on the supplied replica shard
*/
void indexOnReplica(BulkShardRequest request, ReplicationGroup group, IndexShard replica) throws Exception {
public void indexOnReplica(BulkShardRequest request, ReplicationGroup group, IndexShard replica) throws Exception {
indexOnReplica(request, group, replica, group.primary.getPendingPrimaryTerm());
}

View File

@ -50,8 +50,8 @@ public class AsyncRecoveryTarget implements RecoveryTargetHandler {
}
@Override
public void finalizeRecovery(long globalCheckpoint, ActionListener<Void> listener) {
executor.execute(() -> target.finalizeRecovery(globalCheckpoint, listener));
public void finalizeRecovery(long globalCheckpoint, long trimAboveSeqNo, ActionListener<Void> listener) {
executor.execute(() -> target.finalizeRecovery(globalCheckpoint, trimAboveSeqNo, listener));
}
@Override