With this change, we will verify the consistency of version and source (besides id, seq_no, and term) of live documents between shard copies at the end of disruption tests.
This commit is contained in:
parent
24484ae227
commit
c7924014fa
|
@ -66,6 +66,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import static org.elasticsearch.action.DocWriteResponse.Result.CREATED;
|
||||
import static org.elasticsearch.action.DocWriteResponse.Result.UPDATED;
|
||||
|
@ -134,6 +135,7 @@ public class ClusterDisruptionIT extends AbstractDisruptionTestCase {
|
|||
final List<Exception> exceptedExceptions = new CopyOnWriteArrayList<>();
|
||||
|
||||
final ConflictMode conflictMode = ConflictMode.randomMode();
|
||||
final List<String> fieldNames = IntStream.rangeClosed(0, randomInt(10)).mapToObj(n -> "f" + n).collect(Collectors.toList());
|
||||
|
||||
logger.info("starting indexers using conflict mode " + conflictMode);
|
||||
try {
|
||||
|
@ -156,7 +158,7 @@ public class ClusterDisruptionIT extends AbstractDisruptionTestCase {
|
|||
int shard = Math.floorMod(Murmur3HashFunction.hash(id), numPrimaries);
|
||||
logger.trace("[{}] indexing id [{}] through node [{}] targeting shard [{}]", name, id, node, shard);
|
||||
IndexRequestBuilder indexRequestBuilder = client.prepareIndex("test", "type", id)
|
||||
.setSource("{}", XContentType.JSON)
|
||||
.setSource(Collections.singletonMap(randomFrom(fieldNames), randomNonNegativeLong()), XContentType.JSON)
|
||||
.setTimeout(timeout);
|
||||
|
||||
if (conflictMode == ConflictMode.external) {
|
||||
|
@ -459,7 +461,9 @@ public class ClusterDisruptionIT extends AbstractDisruptionTestCase {
|
|||
while (stopped.get() == false && docID.get() < 5000) {
|
||||
String id = Integer.toString(docID.incrementAndGet());
|
||||
try {
|
||||
IndexResponse response = client().prepareIndex(index, "_doc", id).setSource("{}", XContentType.JSON).get();
|
||||
IndexResponse response = client().prepareIndex(index, "_doc", id)
|
||||
.setSource(Collections.singletonMap("f" + randomIntBetween(1, 10), randomNonNegativeLong()), XContentType.JSON)
|
||||
.get();
|
||||
assertThat(response.getResult(), isOneOf(CREATED, UPDATED));
|
||||
logger.info("--> index id={} seq_no={}", response.getId(), response.getSeqNo());
|
||||
ackedDocs.add(response.getId());
|
||||
|
|
|
@ -4384,7 +4384,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
Randomness.shuffle(seqNos);
|
||||
final EngineConfig engineConfig;
|
||||
final SeqNoStats prevSeqNoStats;
|
||||
final List<DocIdSeqNoAndTerm> prevDocs;
|
||||
final List<DocIdSeqNoAndSource> prevDocs;
|
||||
final int totalTranslogOps;
|
||||
try (InternalEngine engine = createEngine(store, createTempDir(), globalCheckpoint::get)) {
|
||||
engineConfig = engine.config();
|
||||
|
@ -5491,7 +5491,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
commits.add(new ArrayList<>());
|
||||
try (Store store = createStore()) {
|
||||
EngineConfig config = config(indexSettings, store, translogPath, NoMergePolicy.INSTANCE, null, null, globalCheckpoint::get);
|
||||
final List<DocIdSeqNoAndTerm> docs;
|
||||
final List<DocIdSeqNoAndSource> docs;
|
||||
try (InternalEngine engine = createEngine(config)) {
|
||||
List<Engine.Operation> flushedOperations = new ArrayList<>();
|
||||
for (Engine.Operation op : operations) {
|
||||
|
@ -5538,7 +5538,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
final IndexSettings softDeletesEnabled = IndexSettingsModule.newIndexSettings(
|
||||
IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(Settings.builder().
|
||||
put(defaultSettings.getSettings()).put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)).build());
|
||||
final List<DocIdSeqNoAndTerm> docs;
|
||||
final List<DocIdSeqNoAndSource> docs;
|
||||
try (InternalEngine engine = createEngine(
|
||||
config(softDeletesEnabled, store, translogPath, newMergePolicy(), null, null, globalCheckpoint::get))) {
|
||||
List<Engine.Operation> ops = generateHistoryOnReplica(between(1, 100), randomBoolean(), randomBoolean(), randomBoolean());
|
||||
|
|
|
@ -36,6 +36,7 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
@ -274,7 +275,14 @@ public class LuceneChangesSnapshotTests extends EngineTestCase {
|
|||
pullOperations(engine);
|
||||
}
|
||||
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService);
|
||||
assertThat(getDocIds(engine, true), equalTo(getDocIds(leader, true)));
|
||||
// have to verify without source since we are randomly testing without _source
|
||||
List<DocIdSeqNoAndSource> docsWithoutSourceOnFollower = getDocIds(engine, true).stream()
|
||||
.map(d -> new DocIdSeqNoAndSource(d.getId(), null, d.getSeqNo(), d.getPrimaryTerm(), d.getVersion()))
|
||||
.collect(Collectors.toList());
|
||||
List<DocIdSeqNoAndSource> docsWithoutSourceOnLeader = getDocIds(leader, true).stream()
|
||||
.map(d -> new DocIdSeqNoAndSource(d.getId(), null, d.getSeqNo(), d.getPrimaryTerm(), d.getVersion()))
|
||||
.collect(Collectors.toList());
|
||||
assertThat(docsWithoutSourceOnFollower, equalTo(docsWithoutSourceOnLeader));
|
||||
} catch (Exception ex) {
|
||||
throw new AssertionError(ex);
|
||||
}
|
||||
|
|
|
@ -44,7 +44,7 @@ public class ReadOnlyEngineTests extends EngineTestCase {
|
|||
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get);
|
||||
int numDocs = scaledRandomIntBetween(10, 1000);
|
||||
final SeqNoStats lastSeqNoStats;
|
||||
final List<DocIdSeqNoAndTerm> lastDocIds;
|
||||
final List<DocIdSeqNoAndSource> lastDocIds;
|
||||
try (InternalEngine engine = createEngine(config)) {
|
||||
Engine.Get get = null;
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
|
|
|
@ -40,7 +40,7 @@ import org.elasticsearch.common.xcontent.XContentType;
|
|||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.index.engine.DocIdSeqNoAndTerm;
|
||||
import org.elasticsearch.index.engine.DocIdSeqNoAndSource;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.engine.EngineConfig;
|
||||
import org.elasticsearch.index.engine.EngineFactory;
|
||||
|
@ -770,7 +770,7 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
|
|||
}
|
||||
}
|
||||
shards.refresh("test");
|
||||
List<DocIdSeqNoAndTerm> docsBelowGlobalCheckpoint = EngineTestCase.getDocIds(getEngine(newPrimary), randomBoolean())
|
||||
List<DocIdSeqNoAndSource> docsBelowGlobalCheckpoint = EngineTestCase.getDocIds(getEngine(newPrimary), randomBoolean())
|
||||
.stream().filter(doc -> doc.getSeqNo() <= newPrimary.getGlobalCheckpoint()).collect(Collectors.toList());
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
final AtomicBoolean done = new AtomicBoolean();
|
||||
|
@ -780,7 +780,7 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
|
|||
latch.countDown();
|
||||
while (done.get() == false) {
|
||||
try {
|
||||
List<DocIdSeqNoAndTerm> exposedDocs = EngineTestCase.getDocIds(getEngine(randomFrom(replicas)), randomBoolean());
|
||||
List<DocIdSeqNoAndSource> exposedDocs = EngineTestCase.getDocIds(getEngine(randomFrom(replicas)), randomBoolean());
|
||||
assertThat(docsBelowGlobalCheckpoint, everyItem(isIn(exposedDocs)));
|
||||
assertThat(randomFrom(replicas).getLocalCheckpoint(), greaterThanOrEqualTo(initDocs - 1L));
|
||||
} catch (AlreadyClosedException ignored) {
|
||||
|
|
|
@ -79,7 +79,7 @@ import org.elasticsearch.env.NodeEnvironment;
|
|||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.index.engine.CommitStats;
|
||||
import org.elasticsearch.index.engine.DocIdSeqNoAndTerm;
|
||||
import org.elasticsearch.index.engine.DocIdSeqNoAndSource;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.engine.Engine.DeleteResult;
|
||||
import org.elasticsearch.index.engine.EngineException;
|
||||
|
@ -3664,7 +3664,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
while (done.get() == false) {
|
||||
try {
|
||||
List<String> exposedDocIds = EngineTestCase.getDocIds(getEngine(shard), rarely())
|
||||
.stream().map(DocIdSeqNoAndTerm::getId).collect(Collectors.toList());
|
||||
.stream().map(DocIdSeqNoAndSource::getId).collect(Collectors.toList());
|
||||
assertThat("every operations before the global checkpoint must be reserved",
|
||||
docBelowGlobalCheckpoint, everyItem(isIn(exposedDocIds)));
|
||||
} catch (AlreadyClosedException ignored) {
|
||||
|
|
|
@ -20,24 +20,34 @@
|
|||
package org.elasticsearch.index.engine;
|
||||
|
||||
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
/** A tuple of document id, sequence number and primary term of a document */
|
||||
public final class DocIdSeqNoAndTerm {
|
||||
/** A tuple of document id, sequence number, primary term, source and version of a document */
|
||||
public final class DocIdSeqNoAndSource {
|
||||
private final String id;
|
||||
private final BytesRef source;
|
||||
private final long seqNo;
|
||||
private final long primaryTerm;
|
||||
private final long version;
|
||||
|
||||
public DocIdSeqNoAndTerm(String id, long seqNo, long primaryTerm) {
|
||||
public DocIdSeqNoAndSource(String id, BytesRef source, long seqNo, long primaryTerm, long version) {
|
||||
this.id = id;
|
||||
this.source = source;
|
||||
this.seqNo = seqNo;
|
||||
this.primaryTerm = primaryTerm;
|
||||
this.version = version;
|
||||
}
|
||||
|
||||
public String getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public BytesRef getSource() {
|
||||
return source;
|
||||
}
|
||||
|
||||
public long getSeqNo() {
|
||||
return seqNo;
|
||||
}
|
||||
|
@ -46,21 +56,27 @@ public final class DocIdSeqNoAndTerm {
|
|||
return primaryTerm;
|
||||
}
|
||||
|
||||
public long getVersion() {
|
||||
return version;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
DocIdSeqNoAndTerm that = (DocIdSeqNoAndTerm) o;
|
||||
return Objects.equals(id, that.id) && seqNo == that.seqNo && primaryTerm == that.primaryTerm;
|
||||
DocIdSeqNoAndSource that = (DocIdSeqNoAndSource) o;
|
||||
return Objects.equals(id, that.id) && Objects.equals(source, that.source)
|
||||
&& seqNo == that.seqNo && primaryTerm == that.primaryTerm && version == that.version;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(id, seqNo, primaryTerm);
|
||||
return Objects.hash(id, source, seqNo, primaryTerm, version);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "DocIdSeqNoAndTerm{" + "id='" + id + " seqNo=" + seqNo + " primaryTerm=" + primaryTerm + "}";
|
||||
return "doc{" + "id='" + id + " seqNo=" + seqNo + " primaryTerm=" + primaryTerm
|
||||
+ " version=" + version + " source= " + (source != null ? source.utf8ToString() : null) + "}";
|
||||
}
|
||||
}
|
|
@ -63,6 +63,7 @@ import org.elasticsearch.common.lucene.uid.Versions;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.set.Sets;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
|
@ -995,16 +996,17 @@ public abstract class EngineTestCase extends ESTestCase {
|
|||
/**
|
||||
* Gets a collection of tuples of docId, sequence number, and primary term of all live documents in the provided engine.
|
||||
*/
|
||||
public static List<DocIdSeqNoAndTerm> getDocIds(Engine engine, boolean refresh) throws IOException {
|
||||
public static List<DocIdSeqNoAndSource> getDocIds(Engine engine, boolean refresh) throws IOException {
|
||||
if (refresh) {
|
||||
engine.refresh("test_get_doc_ids");
|
||||
}
|
||||
try (Engine.Searcher searcher = engine.acquireSearcher("test_get_doc_ids")) {
|
||||
List<DocIdSeqNoAndTerm> docs = new ArrayList<>();
|
||||
List<DocIdSeqNoAndSource> docs = new ArrayList<>();
|
||||
for (LeafReaderContext leafContext : searcher.reader().leaves()) {
|
||||
LeafReader reader = leafContext.reader();
|
||||
NumericDocValues seqNoDocValues = reader.getNumericDocValues(SeqNoFieldMapper.NAME);
|
||||
NumericDocValues primaryTermDocValues = reader.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME);
|
||||
NumericDocValues versionDocValues = reader.getNumericDocValues(VersionFieldMapper.NAME);
|
||||
Bits liveDocs = reader.getLiveDocs();
|
||||
for (int i = 0; i < reader.maxDoc(); i++) {
|
||||
if (liveDocs == null || liveDocs.get(i)) {
|
||||
|
@ -1013,20 +1015,25 @@ public abstract class EngineTestCase extends ESTestCase {
|
|||
continue;
|
||||
}
|
||||
final long primaryTerm = primaryTermDocValues.longValue();
|
||||
Document uuid = reader.document(i, Collections.singleton(IdFieldMapper.NAME));
|
||||
BytesRef binaryID = uuid.getBinaryValue(IdFieldMapper.NAME);
|
||||
Document doc = reader.document(i, Sets.newHashSet(IdFieldMapper.NAME, SourceFieldMapper.NAME));
|
||||
BytesRef binaryID = doc.getBinaryValue(IdFieldMapper.NAME);
|
||||
String id = Uid.decodeId(Arrays.copyOfRange(binaryID.bytes, binaryID.offset, binaryID.offset + binaryID.length));
|
||||
final BytesRef source = doc.getBinaryValue(SourceFieldMapper.NAME);
|
||||
if (seqNoDocValues.advanceExact(i) == false) {
|
||||
throw new AssertionError("seqNoDocValues not found for doc[" + i + "] id[" + id + "]");
|
||||
}
|
||||
final long seqNo = seqNoDocValues.longValue();
|
||||
docs.add(new DocIdSeqNoAndTerm(id, seqNo, primaryTerm));
|
||||
if (versionDocValues.advanceExact(i) == false) {
|
||||
throw new AssertionError("versionDocValues not found for doc[" + i + "] id[" + id + "]");
|
||||
}
|
||||
final long version = versionDocValues.longValue();
|
||||
docs.add(new DocIdSeqNoAndSource(id, source, seqNo, primaryTerm, version));
|
||||
}
|
||||
}
|
||||
}
|
||||
docs.sort(Comparator.comparingLong(DocIdSeqNoAndTerm::getSeqNo)
|
||||
.thenComparingLong(DocIdSeqNoAndTerm::getPrimaryTerm)
|
||||
.thenComparing((DocIdSeqNoAndTerm::getId)));
|
||||
docs.sort(Comparator.comparingLong(DocIdSeqNoAndSource::getSeqNo)
|
||||
.thenComparingLong(DocIdSeqNoAndSource::getPrimaryTerm)
|
||||
.thenComparing((DocIdSeqNoAndSource::getId)));
|
||||
return docs;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -65,7 +65,7 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.engine.DocIdSeqNoAndTerm;
|
||||
import org.elasticsearch.index.engine.DocIdSeqNoAndSource;
|
||||
import org.elasticsearch.index.engine.EngineFactory;
|
||||
import org.elasticsearch.index.engine.InternalEngineFactory;
|
||||
import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction;
|
||||
|
@ -479,7 +479,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
|
|||
if (closed == false) {
|
||||
closed = true;
|
||||
try {
|
||||
final List<DocIdSeqNoAndTerm> docsOnPrimary = getDocIdAndSeqNos(primary);
|
||||
final List<DocIdSeqNoAndSource> docsOnPrimary = getDocIdAndSeqNos(primary);
|
||||
for (IndexShard replica : replicas) {
|
||||
assertThat(replica.getMaxSeenAutoIdTimestamp(), equalTo(primary.getMaxSeenAutoIdTimestamp()));
|
||||
assertThat(replica.getMaxSeqNoOfUpdatesOrDeletes(), greaterThanOrEqualTo(primary.getMaxSeqNoOfUpdatesOrDeletes()));
|
||||
|
|
|
@ -51,7 +51,7 @@ import org.elasticsearch.index.MapperTestUtils;
|
|||
import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.index.cache.IndexCache;
|
||||
import org.elasticsearch.index.cache.query.DisabledQueryCache;
|
||||
import org.elasticsearch.index.engine.DocIdSeqNoAndTerm;
|
||||
import org.elasticsearch.index.engine.DocIdSeqNoAndSource;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.engine.EngineFactory;
|
||||
import org.elasticsearch.index.engine.EngineTestCase;
|
||||
|
@ -703,10 +703,10 @@ public abstract class IndexShardTestCase extends ESTestCase {
|
|||
}
|
||||
|
||||
public static Set<String> getShardDocUIDs(final IndexShard shard) throws IOException {
|
||||
return getDocIdAndSeqNos(shard).stream().map(DocIdSeqNoAndTerm::getId).collect(Collectors.toSet());
|
||||
return getDocIdAndSeqNos(shard).stream().map(DocIdSeqNoAndSource::getId).collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
public static List<DocIdSeqNoAndTerm> getDocIdAndSeqNos(final IndexShard shard) throws IOException {
|
||||
public static List<DocIdSeqNoAndSource> getDocIdAndSeqNos(final IndexShard shard) throws IOException {
|
||||
return EngineTestCase.getDocIds(shard.getEngine(), true);
|
||||
}
|
||||
|
||||
|
|
|
@ -85,7 +85,7 @@ import org.elasticsearch.http.HttpServerTransport;
|
|||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.IndexService;
|
||||
import org.elasticsearch.index.engine.CommitStats;
|
||||
import org.elasticsearch.index.engine.DocIdSeqNoAndTerm;
|
||||
import org.elasticsearch.index.engine.DocIdSeqNoAndSource;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.engine.EngineTestCase;
|
||||
import org.elasticsearch.index.engine.InternalEngine;
|
||||
|
@ -1455,7 +1455,7 @@ public final class InternalTestCluster extends TestCluster {
|
|||
if (primaryShard == null) {
|
||||
continue;
|
||||
}
|
||||
final List<DocIdSeqNoAndTerm> docsOnPrimary;
|
||||
final List<DocIdSeqNoAndSource> docsOnPrimary;
|
||||
try {
|
||||
docsOnPrimary = IndexShardTestCase.getDocIdAndSeqNos(primaryShard);
|
||||
} catch (AlreadyClosedException ex) {
|
||||
|
@ -1466,7 +1466,7 @@ public final class InternalTestCluster extends TestCluster {
|
|||
if (replicaShard == null) {
|
||||
continue;
|
||||
}
|
||||
final List<DocIdSeqNoAndTerm> docsOnReplica;
|
||||
final List<DocIdSeqNoAndSource> docsOnReplica;
|
||||
try {
|
||||
docsOnReplica = IndexShardTestCase.getDocIdAndSeqNos(replicaShard);
|
||||
} catch (AlreadyClosedException ex) {
|
||||
|
|
|
@ -49,7 +49,7 @@ import org.elasticsearch.core.internal.io.IOUtils;
|
|||
import org.elasticsearch.env.NodeEnvironment;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.IndexService;
|
||||
import org.elasticsearch.index.engine.DocIdSeqNoAndTerm;
|
||||
import org.elasticsearch.index.engine.DocIdSeqNoAndSource;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.index.seqno.SeqNoStats;
|
||||
import org.elasticsearch.index.seqno.SequenceNumbers;
|
||||
|
@ -490,13 +490,13 @@ public abstract class CcrIntegTestCase extends ESTestCase {
|
|||
protected void assertIndexFullyReplicatedToFollower(String leaderIndex, String followerIndex) throws Exception {
|
||||
logger.info("--> asserting <<docId,seqNo>> between {} and {}", leaderIndex, followerIndex);
|
||||
assertBusy(() -> {
|
||||
Map<Integer, List<DocIdSeqNoAndTerm>> docsOnFollower = getDocIdAndSeqNos(clusterGroup.followerCluster, followerIndex);
|
||||
Map<Integer, List<DocIdSeqNoAndTerm>> docsOnLeader = getDocIdAndSeqNos(clusterGroup.leaderCluster, leaderIndex);
|
||||
Map<Integer, Set<DocIdSeqNoAndTerm>> mismatchedDocs = new HashMap<>();
|
||||
for (Map.Entry<Integer, List<DocIdSeqNoAndTerm>> fe : docsOnFollower.entrySet()) {
|
||||
Set<DocIdSeqNoAndTerm> d1 = Sets.difference(
|
||||
Map<Integer, List<DocIdSeqNoAndSource>> docsOnFollower = getDocIdAndSeqNos(clusterGroup.followerCluster, followerIndex);
|
||||
Map<Integer, List<DocIdSeqNoAndSource>> docsOnLeader = getDocIdAndSeqNos(clusterGroup.leaderCluster, leaderIndex);
|
||||
Map<Integer, Set<DocIdSeqNoAndSource>> mismatchedDocs = new HashMap<>();
|
||||
for (Map.Entry<Integer, List<DocIdSeqNoAndSource>> fe : docsOnFollower.entrySet()) {
|
||||
Set<DocIdSeqNoAndSource> d1 = Sets.difference(
|
||||
Sets.newHashSet(fe.getValue()), Sets.newHashSet(docsOnLeader.getOrDefault(fe.getKey(), Collections.emptyList())));
|
||||
Set<DocIdSeqNoAndTerm> d2 = Sets.difference(
|
||||
Set<DocIdSeqNoAndSource> d2 = Sets.difference(
|
||||
Sets.newHashSet(docsOnLeader.getOrDefault(fe.getKey(), Collections.emptyList())), Sets.newHashSet(fe.getValue()));
|
||||
if (d1.isEmpty() == false || d2.isEmpty() == false) {
|
||||
mismatchedDocs.put(fe.getKey(), Sets.union(d1, d2));
|
||||
|
@ -525,11 +525,11 @@ public abstract class CcrIntegTestCase extends ESTestCase {
|
|||
}, 120, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
private Map<Integer, List<DocIdSeqNoAndTerm>> getDocIdAndSeqNos(InternalTestCluster cluster, String index) throws IOException {
|
||||
private Map<Integer, List<DocIdSeqNoAndSource>> getDocIdAndSeqNos(InternalTestCluster cluster, String index) throws IOException {
|
||||
final ClusterState state = cluster.client().admin().cluster().prepareState().get().getState();
|
||||
List<ShardRouting> shardRoutings = state.routingTable().allShards(index);
|
||||
Randomness.shuffle(shardRoutings);
|
||||
final Map<Integer, List<DocIdSeqNoAndTerm>> docs = new HashMap<>();
|
||||
final Map<Integer, List<DocIdSeqNoAndSource>> docs = new HashMap<>();
|
||||
for (ShardRouting shardRouting : shardRoutings) {
|
||||
if (shardRouting == null || shardRouting.assignedToNode() == false) {
|
||||
continue;
|
||||
|
@ -537,14 +537,14 @@ public abstract class CcrIntegTestCase extends ESTestCase {
|
|||
IndexShard indexShard = cluster.getInstance(IndicesService.class, state.nodes().get(shardRouting.currentNodeId()).getName())
|
||||
.indexServiceSafe(shardRouting.index()).getShard(shardRouting.id());
|
||||
try {
|
||||
final List<DocIdSeqNoAndTerm> docsOnShard = IndexShardTestCase.getDocIdAndSeqNos(indexShard);
|
||||
final List<DocIdSeqNoAndSource> docsOnShard = IndexShardTestCase.getDocIdAndSeqNos(indexShard);
|
||||
logger.info("--> shard {} docs {} seq_no_stats {}", shardRouting, docsOnShard, indexShard.seqNoStats());
|
||||
docs.put(shardRouting.shardId().id(), docsOnShard.stream()
|
||||
// normalize primary term as the follower use its own term
|
||||
.map(d -> new DocIdSeqNoAndTerm(d.getId(), d.getSeqNo(), 1L))
|
||||
.map(d -> new DocIdSeqNoAndSource(d.getId(), d.getSource(), d.getSeqNo(), 1L, d.getVersion()))
|
||||
.collect(Collectors.toList()));
|
||||
} catch (AlreadyClosedException e) {
|
||||
// Ignore this exception and try getting List<DocIdSeqNoAndTerm> from other IndexShard instance.
|
||||
// Ignore this exception and try getting List<DocIdSeqNoAndSource> from other IndexShard instance.
|
||||
}
|
||||
}
|
||||
return docs;
|
||||
|
|
|
@ -24,7 +24,7 @@ import org.elasticsearch.index.Index;
|
|||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.index.codec.CodecService;
|
||||
import org.elasticsearch.index.engine.DocIdSeqNoAndTerm;
|
||||
import org.elasticsearch.index.engine.DocIdSeqNoAndSource;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.engine.EngineConfig;
|
||||
import org.elasticsearch.index.engine.EngineTestCase;
|
||||
|
@ -621,7 +621,7 @@ public class FollowingEngineTests extends ESTestCase {
|
|||
assertThat(failure.getExistingPrimaryTerm().getAsLong(), equalTo(operationWithTerms.get(op.seqNo())));
|
||||
}
|
||||
}
|
||||
for (DocIdSeqNoAndTerm docId : getDocIds(followingEngine, true)) {
|
||||
for (DocIdSeqNoAndSource docId : getDocIds(followingEngine, true)) {
|
||||
assertThat(docId.getPrimaryTerm(), equalTo(operationWithTerms.get(docId.getSeqNo())));
|
||||
}
|
||||
// Replica should accept duplicates
|
||||
|
@ -633,7 +633,7 @@ public class FollowingEngineTests extends ESTestCase {
|
|||
Engine.Result result = applyOperation(followingEngine, op, newTerm, nonPrimary);
|
||||
assertThat(result.getResultType(), equalTo(Engine.Result.Type.SUCCESS));
|
||||
}
|
||||
for (DocIdSeqNoAndTerm docId : getDocIds(followingEngine, true)) {
|
||||
for (DocIdSeqNoAndSource docId : getDocIds(followingEngine, true)) {
|
||||
assertThat(docId.getPrimaryTerm(), equalTo(operationWithTerms.get(docId.getSeqNo())));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue