Remove LocalCheckpointTracker#resetCheckpoint (#34667)

In #34474, we added a new assertion to ensure that the
LocalCheckpointTracker is always consistent with Lucene index. However,
we reset LocalCheckpoinTracker in testDedupByPrimaryTerm cause this
assertion to be violated.

This commit removes resetCheckpoint from LocalCheckpointTracker and
rewrites testDedupByPrimaryTerm without resetting the local checkpoint.

Relates #34474
This commit is contained in:
Nhat Nguyen 2018-12-07 12:22:20 -05:00 committed by GitHub
parent cec788deea
commit f2df0a5be4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 65 additions and 99 deletions

View File

@ -103,19 +103,6 @@ public class LocalCheckpointTracker {
}
}
/**
* Resets the checkpoint to the specified value.
*
* @param checkpoint the local checkpoint to reset this tracker to
*/
public synchronized void resetCheckpoint(final long checkpoint) {
// TODO: remove this method as after we restore the local history on promotion.
assert checkpoint != SequenceNumbers.UNASSIGNED_SEQ_NO;
assert checkpoint <= this.checkpoint;
processedSeqNo.clear();
this.checkpoint = checkpoint;
}
/**
* The current checkpoint which can be advanced by {@link #markSeqNoAsCompleted(long)}.
*

View File

@ -5463,7 +5463,7 @@ public class InternalEngineTests extends EngineTestCase {
final List<DocIdSeqNoAndTerm> docs;
try (InternalEngine engine = createEngine(
config(softDeletesEnabled, store, translogPath, newMergePolicy(), null, null, globalCheckpoint::get))) {
List<Engine.Operation> ops = generateReplicaHistory(between(1, 100), randomBoolean());
List<Engine.Operation> ops = generateHistoryOnReplica(between(1, 100), randomBoolean(), randomBoolean(), randomBoolean());
applyOperations(engine, ops);
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint()));
engine.syncTranslog();

View File

@ -22,7 +22,6 @@ package org.elasticsearch.index.engine;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.translog.SnapshotMatchers;
@ -32,7 +31,6 @@ import org.junit.Before;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -150,35 +148,35 @@ public class LuceneChangesSnapshotTests extends EngineTestCase {
}
}
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/34667")
public void testDedupByPrimaryTerm() throws Exception {
Map<Long, Long> latestOperations = new HashMap<>();
List<Integer> terms = Arrays.asList(between(1, 1000), between(1000, 2000));
/**
* If an operation above the local checkpoint is delivered multiple times, an engine will add multiple copies of that operation
* into Lucene (only the first copy is non-stale; others are stale and soft-deleted). Moreover, a nested document is indexed into
* Lucene as multiple documents (only the root document has both seq_no and term, non-root docs only have seq_no). This test verifies
* that {@link LuceneChangesSnapshot} returns exactly one operation per seq_no, and skip non-root nested documents or stale copies.
*/
public void testSkipStaleOrNonRootOfNestedDocuments() throws Exception {
Map<Long, Long> seqNoToTerm = new HashMap<>();
List<Engine.Operation> operations = generateHistoryOnReplica(between(1, 100), randomBoolean(), randomBoolean(), randomBoolean());
int totalOps = 0;
for (long term : terms) {
final List<Engine.Operation> ops = generateSingleDocHistory(true,
randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL, VersionType.EXTERNAL_GTE), term, 2, 20, "1");
primaryTerm.set(Math.max(primaryTerm.get(), term));
engine.rollTranslogGeneration();
for (Engine.Operation op : ops) {
// We need to simulate a rollback here as only ops after local checkpoint get into the engine
if (op.seqNo() <= engine.getLocalCheckpointTracker().getCheckpoint()) {
engine.getLocalCheckpointTracker().resetCheckpoint(randomLongBetween(-1, op.seqNo() - 1));
engine.rollTranslogGeneration();
}
for (Engine.Operation op : operations) {
// Engine skips deletes or indexes below the local checkpoint
if (engine.getLocalCheckpoint() < op.seqNo() || op instanceof Engine.NoOp) {
seqNoToTerm.put(op.seqNo(), op.primaryTerm());
if (op instanceof Engine.Index) {
engine.index((Engine.Index) op);
} else if (op instanceof Engine.Delete) {
engine.delete((Engine.Delete) op);
totalOps += ((Engine.Index) op).docs().size();
} else {
totalOps++;
}
latestOperations.put(op.seqNo(), op.primaryTerm());
if (rarely()) {
engine.refresh("test");
}
if (rarely()) {
engine.flush();
}
totalOps++;
}
applyOperation(engine, op);
if (rarely()) {
engine.refresh("test");
}
if (rarely()) {
engine.rollTranslogGeneration();
}
if (rarely()) {
engine.flush();
}
}
long maxSeqNo = engine.getLocalCheckpointTracker().getMaxSeqNo();
@ -188,9 +186,9 @@ public class LuceneChangesSnapshotTests extends EngineTestCase {
searcher = null;
Translog.Operation op;
while ((op = snapshot.next()) != null) {
assertThat(op.toString(), op.primaryTerm(), equalTo(latestOperations.get(op.seqNo())));
assertThat(op.toString(), op.primaryTerm(), equalTo(seqNoToTerm.get(op.seqNo())));
}
assertThat(snapshot.skippedOperations(), equalTo(totalOps - latestOperations.size()));
assertThat(snapshot.skippedOperations(), equalTo(totalOps - seqNoToTerm.size()));
} finally {
IOUtils.close(searcher);
}

View File

@ -19,13 +19,10 @@
package org.elasticsearch.index.seqno;
import com.carrotsearch.hppc.LongObjectHashMap;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.test.ESTestCase;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.junit.Before;
import java.util.ArrayList;
@ -266,35 +263,6 @@ public class LocalCheckpointTrackerTests extends ESTestCase {
thread.join();
}
public void testResetCheckpoint() {
final int operations = 1024 - scaledRandomIntBetween(0, 1024);
int maxSeqNo = Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED);
for (int i = 0; i < operations; i++) {
if (!rarely()) {
tracker.markSeqNoAsCompleted(i);
maxSeqNo = i;
}
}
final int localCheckpoint =
randomIntBetween(Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED), Math.toIntExact(tracker.getCheckpoint()));
tracker.resetCheckpoint(localCheckpoint);
assertThat(tracker.getCheckpoint(), equalTo((long) localCheckpoint));
assertThat(tracker.getMaxSeqNo(), equalTo((long) maxSeqNo));
assertThat(tracker.processedSeqNo, new BaseMatcher<LongObjectHashMap<CountedBitSet>>() {
@Override
public boolean matches(Object item) {
return (item instanceof LongObjectHashMap && ((LongObjectHashMap) item).isEmpty());
}
@Override
public void describeTo(Description description) {
description.appendText("empty");
}
});
assertThat(tracker.generateSeqNo(), equalTo((long) (maxSeqNo + 1)));
}
public void testContains() {
final long maxSeqNo = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, 100);
final long localCheckpoint = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, maxSeqNo);

View File

@ -49,7 +49,7 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.CheckedBiFunction;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.Strings;
@ -316,18 +316,17 @@ public abstract class EngineTestCase extends ESTestCase {
mappingUpdate);
}
public static CheckedFunction<String, ParsedDocument, IOException> nestedParsedDocFactory() throws Exception {
public static CheckedBiFunction<String, Integer, ParsedDocument, IOException> nestedParsedDocFactory() throws Exception {
final MapperService mapperService = createMapperService("type");
final String nestedMapping = Strings.toString(XContentFactory.jsonBuilder().startObject().startObject("type")
.startObject("properties").startObject("nested_field").field("type", "nested").endObject().endObject()
.endObject().endObject());
final DocumentMapper nestedMapper = mapperService.documentMapperParser().parse("type", new CompressedXContent(nestedMapping));
return docId -> {
return (docId, nestedFieldValues) -> {
final XContentBuilder source = XContentFactory.jsonBuilder().startObject().field("field", "value");
final int nestedValues = between(0, 3);
if (nestedValues > 0) {
if (nestedFieldValues > 0) {
XContentBuilder nestedField = source.startObject("nested_field");
for (int i = 0; i < nestedValues; i++) {
for (int i = 0; i < nestedFieldValues; i++) {
nestedField.field("field-" + i, "value-" + i);
}
source.endObject();
@ -705,22 +704,36 @@ public abstract class EngineTestCase extends ESTestCase {
return ops;
}
public List<Engine.Operation> generateReplicaHistory(int numOps, boolean allowGapInSeqNo) {
public List<Engine.Operation> generateHistoryOnReplica(int numOps, boolean allowGapInSeqNo, boolean allowDuplicate,
boolean includeNestedDocs) throws Exception {
long seqNo = 0;
List<Engine.Operation> operations = new ArrayList<>(numOps);
final int maxIdValue = randomInt(numOps * 2);
final List<Engine.Operation> operations = new ArrayList<>(numOps);
CheckedBiFunction<String, Integer, ParsedDocument, IOException> nestedParsedDocFactory = nestedParsedDocFactory();
for (int i = 0; i < numOps; i++) {
String id = Integer.toString(between(1, 100));
final ParsedDocument doc = EngineTestCase.createParsedDoc(id, null);
if (randomBoolean()) {
operations.add(new Engine.Index(EngineTestCase.newUid(doc), doc, seqNo, primaryTerm.get(),
i, null, Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis(),
-1, true));
} else if (randomBoolean()) {
operations.add(new Engine.Delete(doc.type(), doc.id(), EngineTestCase.newUid(doc), seqNo, primaryTerm.get(),
i, null, Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis()));
} else {
operations.add(new Engine.NoOp(seqNo, primaryTerm.get(), Engine.Operation.Origin.REPLICA,
threadPool.relativeTimeInMillis(), "test-" + i));
final String id = Integer.toString(randomInt(maxIdValue));
final Engine.Operation.TYPE opType = randomFrom(Engine.Operation.TYPE.values());
final boolean isNestedDoc = includeNestedDocs && opType == Engine.Operation.TYPE.INDEX && randomBoolean();
final int nestedValues = between(0, 3);
final long startTime = threadPool.relativeTimeInMillis();
final int copies = allowDuplicate && rarely() ? between(2, 4) : 1;
for (int copy = 0; copy < copies; copy++) {
final ParsedDocument doc = isNestedDoc ? nestedParsedDocFactory.apply(id, nestedValues) : createParsedDoc(id, null);
switch (opType) {
case INDEX:
operations.add(new Engine.Index(EngineTestCase.newUid(doc), doc, seqNo, primaryTerm.get(),
i, null, Engine.Operation.Origin.REPLICA, startTime, -1, true));
break;
case DELETE:
operations.add(new Engine.Delete(doc.type(), doc.id(), EngineTestCase.newUid(doc), seqNo, primaryTerm.get(),
i, null, Engine.Operation.Origin.REPLICA, startTime));
break;
case NO_OP:
operations.add(new Engine.NoOp(seqNo, primaryTerm.get(), Engine.Operation.Origin.REPLICA, startTime, "test-" + i));
break;
default:
throw new IllegalStateException("Unknown operation type [" + opType + "]");
}
}
seqNo++;
if (allowGapInSeqNo && rarely()) {

View File

@ -14,7 +14,7 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.CheckedBiFunction;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
@ -567,12 +567,12 @@ public class FollowingEngineTests extends ESTestCase {
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build();
final IndexMetaData indexMetaData = IndexMetaData.builder(index.getName()).settings(settings).build();
final IndexSettings indexSettings = new IndexSettings(indexMetaData, settings);
final CheckedFunction<String, ParsedDocument, IOException> nestedDocFactory = EngineTestCase.nestedParsedDocFactory();
final CheckedBiFunction<String, Integer, ParsedDocument, IOException> nestedDocFunc = EngineTestCase.nestedParsedDocFactory();
int numOps = between(10, 100);
List<Engine.Operation> operations = new ArrayList<>(numOps);
for (int i = 0; i < numOps; i++) {
String docId = Integer.toString(between(1, 100));
ParsedDocument doc = randomBoolean() ? EngineTestCase.createParsedDoc(docId, null) : nestedDocFactory.apply(docId);
ParsedDocument doc = randomBoolean() ? EngineTestCase.createParsedDoc(docId, null) : nestedDocFunc.apply(docId, randomInt(3));
if (randomBoolean()) {
operations.add(new Engine.Index(EngineTestCase.newUid(doc), doc, i, primaryTerm.get(), 1L,
VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, threadPool.relativeTimeInMillis(), -1, true));