Follow engine should not fill gaps upon promotion and recovery (#31751)
Closes #31318
This commit is contained in:
parent
05b4517f2f
commit
ac654cbc10
|
@ -58,6 +58,12 @@ public final class FollowingEngine extends InternalEngine {
|
|||
return planDeletionAsNonPrimary(delete);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int fillSeqNoGaps(long primaryTerm) throws IOException {
|
||||
// a noop implementation, because follow shard does not own the history but the leader shard does.
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean assertPrimaryIncomingSequenceNumber(final Operation.Origin origin, final long seqNo) {
|
||||
// sequence number should be set when operation origin is primary
|
||||
|
|
|
@ -0,0 +1,80 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ccr.index.engine;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.index.mapper.SourceToParse;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.IndexShardTestCase;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.ccr.CcrSettings;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class FollowEngineIndexShardTests extends IndexShardTestCase {
|
||||
|
||||
public void testDoNotFillGaps() throws Exception {
|
||||
Settings settings = Settings.builder()
|
||||
.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true)
|
||||
.build();
|
||||
final IndexShard indexShard = newStartedShard(false, settings, new FollowingEngineFactory());
|
||||
|
||||
long seqNo = -1;
|
||||
for (int i = 0; i < 8; i++) {
|
||||
final String id = Long.toString(i);
|
||||
SourceToParse sourceToParse = SourceToParse.source(indexShard.shardId().getIndexName(), "_doc", id,
|
||||
new BytesArray("{}"), XContentType.JSON);
|
||||
indexShard.applyIndexOperationOnReplica(++seqNo,
|
||||
1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse);
|
||||
}
|
||||
long seqNoBeforeGap = seqNo;
|
||||
seqNo += 8;
|
||||
SourceToParse sourceToParse = SourceToParse.source(indexShard.shardId().getIndexName(), "_doc", "9",
|
||||
new BytesArray("{}"), XContentType.JSON);
|
||||
indexShard.applyIndexOperationOnReplica(seqNo,
|
||||
1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse);
|
||||
|
||||
// promote the replica to primary:
|
||||
final ShardRouting replicaRouting = indexShard.routingEntry();
|
||||
final ShardRouting primaryRouting =
|
||||
newShardRouting(
|
||||
replicaRouting.shardId(),
|
||||
replicaRouting.currentNodeId(),
|
||||
null,
|
||||
true,
|
||||
ShardRoutingState.STARTED,
|
||||
replicaRouting.allocationId());
|
||||
indexShard.updateShardState(primaryRouting, indexShard.getPrimaryTerm() + 1, (shard, listener) -> {},
|
||||
0L, Collections.singleton(primaryRouting.allocationId().getId()),
|
||||
new IndexShardRoutingTable.Builder(primaryRouting.shardId()).addShard(primaryRouting).build(), Collections.emptySet());
|
||||
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
ActionListener<Releasable> actionListener = ActionListener.wrap(releasable -> {
|
||||
releasable.close();
|
||||
latch.countDown();
|
||||
}, e -> {assert false : "expected no exception, but got [" + e.getMessage() + "]";});
|
||||
indexShard.acquirePrimaryOperationPermit(actionListener, ThreadPool.Names.GENERIC, "");
|
||||
latch.await();
|
||||
assertThat(indexShard.getLocalCheckpoint(), equalTo(seqNoBeforeGap));
|
||||
indexShard.refresh("test");
|
||||
assertThat(indexShard.docStats().getCount(), equalTo(9L));
|
||||
closeShards(indexShard);
|
||||
}
|
||||
|
||||
}
|
|
@ -150,50 +150,7 @@ public class FollowingEngineTests extends ESTestCase {
|
|||
try (Store store = createStore(shardId, indexSettings, newDirectory())) {
|
||||
final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, store, logger, xContentRegistry());
|
||||
try (FollowingEngine followingEngine = createEngine(store, engineConfig)) {
|
||||
final String id = "id";
|
||||
final Field uidField = new Field("_id", id, IdFieldMapper.Defaults.FIELD_TYPE);
|
||||
final String type = "type";
|
||||
final Field versionField = new NumericDocValuesField("_version", 0);
|
||||
final SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID();
|
||||
final ParseContext.Document document = new ParseContext.Document();
|
||||
document.add(uidField);
|
||||
document.add(versionField);
|
||||
document.add(seqID.seqNo);
|
||||
document.add(seqID.seqNoDocValue);
|
||||
document.add(seqID.primaryTerm);
|
||||
final BytesReference source = new BytesArray(new byte[]{1});
|
||||
final ParsedDocument parsedDocument = new ParsedDocument(
|
||||
versionField,
|
||||
seqID,
|
||||
id,
|
||||
type,
|
||||
"routing",
|
||||
Collections.singletonList(document),
|
||||
source,
|
||||
XContentType.JSON,
|
||||
null);
|
||||
|
||||
final long version;
|
||||
final long autoGeneratedIdTimestamp;
|
||||
if (randomBoolean()) {
|
||||
version = 1;
|
||||
autoGeneratedIdTimestamp = System.currentTimeMillis();
|
||||
} else {
|
||||
version = randomNonNegativeLong();
|
||||
autoGeneratedIdTimestamp = IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP;
|
||||
}
|
||||
final Engine.Index index = new Engine.Index(
|
||||
new Term("_id", parsedDocument.id()),
|
||||
parsedDocument,
|
||||
seqNo,
|
||||
primaryTerm.get(),
|
||||
version,
|
||||
VersionType.EXTERNAL,
|
||||
origin,
|
||||
System.currentTimeMillis(),
|
||||
autoGeneratedIdTimestamp,
|
||||
randomBoolean());
|
||||
|
||||
final Engine.Index index = createIndexOp("id", seqNo, origin);
|
||||
consumer.accept(followingEngine, index);
|
||||
}
|
||||
}
|
||||
|
@ -243,6 +200,26 @@ public class FollowingEngineTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testDoNotFillSeqNoGaps() throws Exception {
|
||||
final Settings settings =
|
||||
Settings.builder()
|
||||
.put("index.number_of_shards", 1)
|
||||
.put("index.number_of_replicas", 0)
|
||||
.put("index.version.created", Version.CURRENT)
|
||||
.put("index.xpack.ccr.following_index", true)
|
||||
.build();
|
||||
final IndexMetaData indexMetaData = IndexMetaData.builder(index.getName()).settings(settings).build();
|
||||
final IndexSettings indexSettings = new IndexSettings(indexMetaData, settings);
|
||||
try (Store store = createStore(shardId, indexSettings, newDirectory())) {
|
||||
final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, store, logger, xContentRegistry());
|
||||
try (FollowingEngine followingEngine = createEngine(store, engineConfig)) {
|
||||
followingEngine.index(createIndexOp("id", 128, Engine.Operation.Origin.PRIMARY));
|
||||
int addedNoops = followingEngine.fillSeqNoGaps(primaryTerm.get());
|
||||
assertThat(addedNoops, equalTo(0));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private EngineConfig engineConfig(
|
||||
final ShardId shardId,
|
||||
final IndexSettings indexSettings,
|
||||
|
@ -307,4 +284,49 @@ public class FollowingEngineTests extends ESTestCase {
|
|||
return followingEngine;
|
||||
}
|
||||
|
||||
private Engine.Index createIndexOp(String id, long seqNo, Engine.Operation.Origin origin) {
|
||||
final Field uidField = new Field("_id", id, IdFieldMapper.Defaults.FIELD_TYPE);
|
||||
final String type = "type";
|
||||
final Field versionField = new NumericDocValuesField("_version", 0);
|
||||
final SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID();
|
||||
final ParseContext.Document document = new ParseContext.Document();
|
||||
document.add(uidField);
|
||||
document.add(versionField);
|
||||
document.add(seqID.seqNo);
|
||||
document.add(seqID.seqNoDocValue);
|
||||
document.add(seqID.primaryTerm);
|
||||
final BytesReference source = new BytesArray(new byte[]{1});
|
||||
final ParsedDocument parsedDocument = new ParsedDocument(
|
||||
versionField,
|
||||
seqID,
|
||||
id,
|
||||
type,
|
||||
"routing",
|
||||
Collections.singletonList(document),
|
||||
source,
|
||||
XContentType.JSON,
|
||||
null);
|
||||
|
||||
final long version;
|
||||
final long autoGeneratedIdTimestamp;
|
||||
if (randomBoolean()) {
|
||||
version = 1;
|
||||
autoGeneratedIdTimestamp = System.currentTimeMillis();
|
||||
} else {
|
||||
version = randomNonNegativeLong();
|
||||
autoGeneratedIdTimestamp = IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP;
|
||||
}
|
||||
return new Engine.Index(
|
||||
new Term("_id", parsedDocument.id()),
|
||||
parsedDocument,
|
||||
seqNo,
|
||||
primaryTerm.get(),
|
||||
version,
|
||||
VersionType.EXTERNAL,
|
||||
origin,
|
||||
System.currentTimeMillis(),
|
||||
autoGeneratedIdTimestamp,
|
||||
randomBoolean());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue