Propagate auto_id_timestamp in primary-replica resync (#33964)

A follow-up of #33693 to propagate max_seen_auto_id_timestamp in a
primary-replica resync.

Relates #33693
This commit is contained in:
Nhat Nguyen 2018-09-22 11:40:10 -04:00 committed by GitHub
parent b89551c452
commit e7ae2f9d36
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 86 additions and 12 deletions

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.resync;
import org.elasticsearch.Version;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -28,6 +29,7 @@ import org.elasticsearch.index.translog.Translog;
import java.io.IOException;
import java.util.Arrays;
import java.util.Objects;
/**
* Represents a batch of operations sent from the primary to its replicas during the primary-replica resync.
@ -36,15 +38,17 @@ public final class ResyncReplicationRequest extends ReplicatedWriteRequest<Resyn
private long trimAboveSeqNo;
private Translog.Operation[] operations;
private long maxSeenAutoIdTimestampOnPrimary;
ResyncReplicationRequest() {
super();
}
public ResyncReplicationRequest(final ShardId shardId, final long trimAboveSeqNo,
final Translog.Operation[] operations) {
public ResyncReplicationRequest(final ShardId shardId, final long trimAboveSeqNo, final long maxSeenAutoIdTimestampOnPrimary,
final Translog.Operation[]operations) {
super(shardId);
this.trimAboveSeqNo = trimAboveSeqNo;
this.maxSeenAutoIdTimestampOnPrimary = maxSeenAutoIdTimestampOnPrimary;
this.operations = operations;
}
@ -52,6 +56,10 @@ public final class ResyncReplicationRequest extends ReplicatedWriteRequest<Resyn
return trimAboveSeqNo;
}
public long getMaxSeenAutoIdTimestampOnPrimary() {
return maxSeenAutoIdTimestampOnPrimary;
}
public Translog.Operation[] getOperations() {
return operations;
}
@ -73,6 +81,11 @@ public final class ResyncReplicationRequest extends ReplicatedWriteRequest<Resyn
} else {
trimAboveSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
}
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
maxSeenAutoIdTimestampOnPrimary = in.readZLong();
} else {
maxSeenAutoIdTimestampOnPrimary = IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP;
}
operations = in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new);
}
@ -82,6 +95,9 @@ public final class ResyncReplicationRequest extends ReplicatedWriteRequest<Resyn
if (out.getVersion().onOrAfter(Version.V_6_4_0)) {
out.writeZLong(trimAboveSeqNo);
}
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
out.writeZLong(maxSeenAutoIdTimestampOnPrimary);
}
out.writeArray(Translog.Operation::writeOperation, operations);
}
@ -90,13 +106,13 @@ public final class ResyncReplicationRequest extends ReplicatedWriteRequest<Resyn
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final ResyncReplicationRequest that = (ResyncReplicationRequest) o;
return trimAboveSeqNo == that.trimAboveSeqNo
return trimAboveSeqNo == that.trimAboveSeqNo && maxSeenAutoIdTimestampOnPrimary == that.maxSeenAutoIdTimestampOnPrimary
&& Arrays.equals(operations, that.operations);
}
@Override
public int hashCode() {
return Long.hashCode(trimAboveSeqNo) + 31 * Arrays.hashCode(operations);
return Objects.hash(trimAboveSeqNo, maxSeenAutoIdTimestampOnPrimary, operations);
}
@Override
@ -106,6 +122,7 @@ public final class ResyncReplicationRequest extends ReplicatedWriteRequest<Resyn
", timeout=" + timeout +
", index='" + index + '\'' +
", trimAboveSeqNo=" + trimAboveSeqNo +
", maxSeenAutoIdTimestampOnPrimary=" + maxSeenAutoIdTimestampOnPrimary +
", ops=" + operations.length +
"}";
}

View File

@ -119,6 +119,12 @@ public class TransportResyncReplicationAction extends TransportWriteAction<Resyn
public static Translog.Location performOnReplica(ResyncReplicationRequest request, IndexShard replica) throws Exception {
Translog.Location location = null;
/*
* Operations received from resync do not have auto_id_timestamp individually, we need to bootstrap this max_seen_timestamp
* (at least the highest timestamp from any of these operations) to make sure that we will disable optimization for the same
* append-only requests with timestamp (sources of these operations) that are replicated; otherwise we may have duplicates.
*/
replica.updateMaxUnsafeAutoIdTimestamp(request.getMaxSeenAutoIdTimestampOnPrimary());
for (Translog.Operation operation : request.getOperations()) {
final Engine.Result operationResult = replica.applyTranslogOperation(operation, Engine.Operation.Origin.REPLICA);
if (operationResult.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {

View File

@ -135,9 +135,11 @@ public class PrimaryReplicaSyncer extends AbstractComponent {
}
}
};
// We must capture the timestamp after snapshotting a snapshot of operations to make sure
// that the auto_id_timestamp of every operation in the snapshot is at most this value.
final long maxSeenAutoIdTimestamp = indexShard.getMaxSeenAutoIdTimestamp();
resync(shardId, indexShard.routingEntry().allocationId().getId(), indexShard.getPendingPrimaryTerm(), wrappedSnapshot,
startingSeqNo, maxSeqNo, resyncListener);
startingSeqNo, maxSeqNo, maxSeenAutoIdTimestamp, resyncListener);
} catch (Exception e) {
try {
IOUtils.close(snapshot);
@ -150,7 +152,7 @@ public class PrimaryReplicaSyncer extends AbstractComponent {
}
private void resync(final ShardId shardId, final String primaryAllocationId, final long primaryTerm, final Translog.Snapshot snapshot,
long startingSeqNo, long maxSeqNo, ActionListener<ResyncTask> listener) {
long startingSeqNo, long maxSeqNo, long maxSeenAutoIdTimestamp, ActionListener<ResyncTask> listener) {
ResyncRequest request = new ResyncRequest(shardId, primaryAllocationId);
ResyncTask resyncTask = (ResyncTask) taskManager.register("transport", "resync", request); // it's not transport :-)
ActionListener<Void> wrappedListener = new ActionListener<Void>() {
@ -170,7 +172,7 @@ public class PrimaryReplicaSyncer extends AbstractComponent {
};
try {
new SnapshotSender(logger, syncAction, resyncTask, shardId, primaryAllocationId, primaryTerm, snapshot, chunkSize.bytesAsInt(),
startingSeqNo, maxSeqNo, wrappedListener).run();
startingSeqNo, maxSeqNo, maxSeenAutoIdTimestamp, wrappedListener).run();
} catch (Exception e) {
wrappedListener.onFailure(e);
}
@ -191,6 +193,7 @@ public class PrimaryReplicaSyncer extends AbstractComponent {
private final Translog.Snapshot snapshot;
private final long startingSeqNo;
private final long maxSeqNo;
private final long maxSeenAutoIdTimestamp;
private final int chunkSizeInBytes;
private final ActionListener<Void> listener;
private final AtomicBoolean firstMessage = new AtomicBoolean(true);
@ -199,7 +202,8 @@ public class PrimaryReplicaSyncer extends AbstractComponent {
private AtomicBoolean closed = new AtomicBoolean();
SnapshotSender(Logger logger, SyncAction syncAction, ResyncTask task, ShardId shardId, String primaryAllocationId, long primaryTerm,
Translog.Snapshot snapshot, int chunkSizeInBytes, long startingSeqNo, long maxSeqNo, ActionListener<Void> listener) {
Translog.Snapshot snapshot, int chunkSizeInBytes, long startingSeqNo, long maxSeqNo,
long maxSeenAutoIdTimestamp, ActionListener<Void> listener) {
this.logger = logger;
this.syncAction = syncAction;
this.task = task;
@ -210,6 +214,7 @@ public class PrimaryReplicaSyncer extends AbstractComponent {
this.chunkSizeInBytes = chunkSizeInBytes;
this.startingSeqNo = startingSeqNo;
this.maxSeqNo = maxSeqNo;
this.maxSeenAutoIdTimestamp = maxSeenAutoIdTimestamp;
this.listener = listener;
task.setTotalOperations(snapshot.totalOperations());
}
@ -260,7 +265,7 @@ public class PrimaryReplicaSyncer extends AbstractComponent {
if (!operations.isEmpty() || trimmedAboveSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
task.setPhase("sending_ops");
ResyncReplicationRequest request =
new ResyncReplicationRequest(shardId, trimmedAboveSeqNo, operations.toArray(EMPTY_ARRAY));
new ResyncReplicationRequest(shardId, trimmedAboveSeqNo, maxSeenAutoIdTimestamp, operations.toArray(EMPTY_ARRAY));
logger.trace("{} sending batch of [{}][{}] (total sent: [{}], skipped: [{}])", shardId, operations.size(),
new ByteSizeValue(size), totalSentOps.get(), totalSkippedOps.get());
firstMessage.set(false);

View File

@ -38,7 +38,7 @@ public class ResyncReplicationRequestTests extends ESTestCase {
final Translog.Index index = new Translog.Index("type", "id", 0, randomNonNegativeLong(),
randomNonNegativeLong(), bytes, null, -1);
final ShardId shardId = new ShardId(new Index("index", "uuid"), 0);
final ResyncReplicationRequest before = new ResyncReplicationRequest(shardId, 42L, new Translog.Operation[]{index});
final ResyncReplicationRequest before = new ResyncReplicationRequest(shardId, 42L, 100, new Translog.Operation[]{index});
final BytesStreamOutput out = new BytesStreamOutput();
before.writeTo(out);

View File

@ -22,6 +22,7 @@ package org.elasticsearch.index.replication;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexableField;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.bulk.BulkShardRequest;
import org.elasticsearch.action.index.IndexRequest;
@ -633,6 +634,49 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
}
}
public void testTransferMaxSeenAutoIdTimestampOnResync() throws Exception {
try (ReplicationGroup shards = createGroup(2)) {
shards.startAll();
IndexShard primary = shards.getPrimary();
IndexShard replica1 = shards.getReplicas().get(0);
IndexShard replica2 = shards.getReplicas().get(1);
long maxTimestampOnReplica1 = -1;
long maxTimestampOnReplica2 = -1;
List<IndexRequest> replicationRequests = new ArrayList<>();
for (int numDocs = between(1, 10), i = 0; i < numDocs; i++) {
final IndexRequest indexRequest = new IndexRequest(index.getName(), "type").source("{}", XContentType.JSON);
indexRequest.process(Version.CURRENT, null, index.getName());
final IndexRequest copyRequest;
if (randomBoolean()) {
copyRequest = copyIndexRequest(indexRequest);
indexRequest.onRetry();
} else {
copyRequest = copyIndexRequest(indexRequest);
copyRequest.onRetry();
}
replicationRequests.add(copyRequest);
final BulkShardRequest bulkShardRequest = indexOnPrimary(indexRequest, primary);
if (randomBoolean()) {
indexOnReplica(bulkShardRequest, shards, replica1);
maxTimestampOnReplica1 = Math.max(maxTimestampOnReplica1, indexRequest.getAutoGeneratedTimestamp());
} else {
indexOnReplica(bulkShardRequest, shards, replica2);
maxTimestampOnReplica2 = Math.max(maxTimestampOnReplica2, indexRequest.getAutoGeneratedTimestamp());
}
}
assertThat(replica1.getMaxSeenAutoIdTimestamp(), equalTo(maxTimestampOnReplica1));
assertThat(replica2.getMaxSeenAutoIdTimestamp(), equalTo(maxTimestampOnReplica2));
shards.promoteReplicaToPrimary(replica1).get();
assertThat(replica2.getMaxSeenAutoIdTimestamp(), equalTo(maxTimestampOnReplica1));
for (IndexRequest request : replicationRequests) {
shards.index(request); // deliver via normal replication
}
for (IndexShard shard : shards) {
assertThat(shard.getMaxSeenAutoIdTimestamp(), equalTo(Math.max(maxTimestampOnReplica1, maxTimestampOnReplica2)));
}
}
}
public static class BlockingTarget extends RecoveryTarget {
private final CountDownLatch recoveryBlocked;

View File

@ -76,7 +76,7 @@ public class PrimaryReplicaSyncerTests extends IndexShardTestCase {
// Index doc but not advance local checkpoint.
shard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL,
SourceToParse.source(shard.shardId().getIndexName(), "_doc", Integer.toString(i), new BytesArray("{}"), XContentType.JSON),
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
randomBoolean() ? IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP : randomNonNegativeLong(), true);
}
long globalCheckPoint = numDocs > 0 ? randomIntBetween(0, numDocs - 1) : 0;
@ -105,6 +105,8 @@ public class PrimaryReplicaSyncerTests extends IndexShardTestCase {
.findFirst()
.isPresent(),
is(false));
assertThat(resyncRequest.getMaxSeenAutoIdTimestampOnPrimary(), equalTo(shard.getMaxSeenAutoIdTimestamp()));
}
if (syncNeeded && globalCheckPoint < numDocs - 1) {
if (shard.indexSettings.isSoftDeleteEnabled()) {