Track Lucene operations in engine explicitly (#29357)
Today we reply on `IndexWriter#hasDeletions` to check if an index contains "update" operations. However, this check considers both deletes and updates. This commit replaces that check by tracking and checking Lucene operations explicitly. This would provide us stronger assertions.
This commit is contained in:
parent
7c6d5cbf1f
commit
8e2f2be249
|
@ -136,6 +136,11 @@ public class InternalEngine extends Engine {
|
|||
private final AtomicLong maxSeqNoOfNonAppendOnlyOperations = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
|
||||
private final CounterMetric numVersionLookups = new CounterMetric();
|
||||
private final CounterMetric numIndexVersionsLookups = new CounterMetric();
|
||||
// Lucene operations since this engine was opened - not include operations from existing segments.
|
||||
private final CounterMetric numDocDeletes = new CounterMetric();
|
||||
private final CounterMetric numDocAppends = new CounterMetric();
|
||||
private final CounterMetric numDocUpdates = new CounterMetric();
|
||||
|
||||
/**
|
||||
* How many bytes we are currently moving to disk, via either IndexWriter.flush or refresh. IndexingMemoryController polls this
|
||||
* across all shards to decide if throttling is necessary because moving bytes to disk is falling behind vs incoming documents
|
||||
|
@ -907,11 +912,11 @@ public class InternalEngine extends Engine {
|
|||
index.parsedDoc().version().setLongValue(plan.versionForIndexing);
|
||||
try {
|
||||
if (plan.useLuceneUpdateDocument) {
|
||||
update(index.uid(), index.docs(), indexWriter);
|
||||
updateDocs(index.uid(), index.docs(), indexWriter);
|
||||
} else {
|
||||
// document does not exists, we can optimize for create, but double check if assertions are running
|
||||
assert assertDocDoesNotExist(index, canOptimizeAddDocument(index) == false);
|
||||
index(index.docs(), indexWriter);
|
||||
addDocs(index.docs(), indexWriter);
|
||||
}
|
||||
return new IndexResult(plan.versionForIndexing, plan.seqNoForIndexing, plan.currentNotFoundOrDeleted);
|
||||
} catch (Exception ex) {
|
||||
|
@ -968,12 +973,13 @@ public class InternalEngine extends Engine {
|
|||
return maxSeqNoOfNonAppendOnlyOperations.get();
|
||||
}
|
||||
|
||||
private static void index(final List<ParseContext.Document> docs, final IndexWriter indexWriter) throws IOException {
|
||||
private void addDocs(final List<ParseContext.Document> docs, final IndexWriter indexWriter) throws IOException {
|
||||
if (docs.size() > 1) {
|
||||
indexWriter.addDocuments(docs);
|
||||
} else {
|
||||
indexWriter.addDocument(docs.get(0));
|
||||
}
|
||||
numDocAppends.inc(docs.size());
|
||||
}
|
||||
|
||||
private static final class IndexingStrategy {
|
||||
|
@ -1054,12 +1060,13 @@ public class InternalEngine extends Engine {
|
|||
return true;
|
||||
}
|
||||
|
||||
private static void update(final Term uid, final List<ParseContext.Document> docs, final IndexWriter indexWriter) throws IOException {
|
||||
private void updateDocs(final Term uid, final List<ParseContext.Document> docs, final IndexWriter indexWriter) throws IOException {
|
||||
if (docs.size() > 1) {
|
||||
indexWriter.updateDocuments(uid, docs);
|
||||
} else {
|
||||
indexWriter.updateDocument(uid, docs.get(0));
|
||||
}
|
||||
numDocUpdates.inc(docs.size());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1188,6 +1195,7 @@ public class InternalEngine extends Engine {
|
|||
// any exception that comes from this is a either an ACE or a fatal exception there
|
||||
// can't be any document failures coming from this
|
||||
indexWriter.deleteDocuments(delete.uid());
|
||||
numDocDeletes.inc();
|
||||
}
|
||||
versionMap.putUnderLock(delete.uid().bytes(),
|
||||
new DeleteVersionValue(plan.versionOfDeletion, plan.seqNoOfDeletion, delete.primaryTerm(),
|
||||
|
@ -2205,13 +2213,28 @@ public class InternalEngine extends Engine {
|
|||
return versionMap.isSafeAccessRequired();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the number of documents have been deleted since this engine was opened.
|
||||
* This count does not include the deletions from the existing segments before opening engine.
|
||||
*/
|
||||
long getNumDocDeletes() {
|
||||
return numDocDeletes.count();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns <code>true</code> iff the index writer has any deletions either buffered in memory or
|
||||
* in the index.
|
||||
* Returns the number of documents have been appended since this engine was opened.
|
||||
* This count does not include the appends from the existing segments before opening engine.
|
||||
*/
|
||||
boolean indexWriterHasDeletions() {
|
||||
return indexWriter.hasDeletions();
|
||||
long getNumDocAppends() {
|
||||
return numDocAppends.count();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the number of documents have been updated since this engine was opened.
|
||||
* This count does not include the updates from the existing segments before opening engine.
|
||||
*/
|
||||
long getNumDocUpdates() {
|
||||
return numDocUpdates.count();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -2939,21 +2939,21 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
Engine.Index retry = appendOnlyPrimary(doc, true, 1);
|
||||
if (randomBoolean()) {
|
||||
Engine.IndexResult indexResult = engine.index(operation);
|
||||
assertFalse(engine.indexWriterHasDeletions());
|
||||
assertLuceneOperations(engine, 1, 0, 0);
|
||||
assertEquals(0, engine.getNumVersionLookups());
|
||||
assertNotNull(indexResult.getTranslogLocation());
|
||||
Engine.IndexResult retryResult = engine.index(retry);
|
||||
assertTrue(engine.indexWriterHasDeletions());
|
||||
assertLuceneOperations(engine, 1, 1, 0);
|
||||
assertEquals(0, engine.getNumVersionLookups());
|
||||
assertNotNull(retryResult.getTranslogLocation());
|
||||
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) > 0);
|
||||
} else {
|
||||
Engine.IndexResult retryResult = engine.index(retry);
|
||||
assertTrue(engine.indexWriterHasDeletions());
|
||||
assertLuceneOperations(engine, 0, 1, 0);
|
||||
assertEquals(0, engine.getNumVersionLookups());
|
||||
assertNotNull(retryResult.getTranslogLocation());
|
||||
Engine.IndexResult indexResult = engine.index(operation);
|
||||
assertTrue(engine.indexWriterHasDeletions());
|
||||
assertLuceneOperations(engine, 0, 2, 0);
|
||||
assertEquals(0, engine.getNumVersionLookups());
|
||||
assertNotNull(retryResult.getTranslogLocation());
|
||||
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) < 0);
|
||||
|
@ -3000,23 +3000,23 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
final boolean belowLckp = operation.seqNo() == 0 && retry.seqNo() == 0;
|
||||
if (randomBoolean()) {
|
||||
Engine.IndexResult indexResult = engine.index(operation);
|
||||
assertFalse(engine.indexWriterHasDeletions());
|
||||
assertLuceneOperations(engine, 1, 0, 0);
|
||||
assertEquals(0, engine.getNumVersionLookups());
|
||||
assertNotNull(indexResult.getTranslogLocation());
|
||||
engine.delete(delete);
|
||||
assertEquals(1, engine.getNumVersionLookups());
|
||||
assertTrue(engine.indexWriterHasDeletions());
|
||||
assertLuceneOperations(engine, 1, 0, 1);
|
||||
Engine.IndexResult retryResult = engine.index(retry);
|
||||
assertEquals(belowLckp ? 1 : 2, engine.getNumVersionLookups());
|
||||
assertNotNull(retryResult.getTranslogLocation());
|
||||
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) > 0);
|
||||
} else {
|
||||
Engine.IndexResult retryResult = engine.index(retry);
|
||||
assertFalse(engine.indexWriterHasDeletions());
|
||||
assertLuceneOperations(engine, 1, 0, 0);
|
||||
assertEquals(1, engine.getNumVersionLookups());
|
||||
assertNotNull(retryResult.getTranslogLocation());
|
||||
engine.delete(delete);
|
||||
assertTrue(engine.indexWriterHasDeletions());
|
||||
assertLuceneOperations(engine, 1, 0, 1);
|
||||
assertEquals(2, engine.getNumVersionLookups());
|
||||
Engine.IndexResult indexResult = engine.index(operation);
|
||||
assertEquals(belowLckp ? 2 : 3, engine.getNumVersionLookups());
|
||||
|
@ -3041,21 +3041,29 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
final boolean belowLckp = operation.seqNo() == 0 && retry.seqNo() == 0;
|
||||
if (randomBoolean()) {
|
||||
Engine.IndexResult indexResult = engine.index(operation);
|
||||
assertFalse(engine.indexWriterHasDeletions());
|
||||
assertLuceneOperations(engine, 1, 0, 0);
|
||||
assertEquals(0, engine.getNumVersionLookups());
|
||||
assertNotNull(indexResult.getTranslogLocation());
|
||||
Engine.IndexResult retryResult = engine.index(retry);
|
||||
assertEquals(retry.seqNo() > operation.seqNo(), engine.indexWriterHasDeletions());
|
||||
if (retry.seqNo() > operation.seqNo()) {
|
||||
assertLuceneOperations(engine, 1, 1, 0);
|
||||
} else {
|
||||
assertLuceneOperations(engine, 1, 0, 0);
|
||||
}
|
||||
assertEquals(belowLckp ? 0 : 1, engine.getNumVersionLookups());
|
||||
assertNotNull(retryResult.getTranslogLocation());
|
||||
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) > 0);
|
||||
} else {
|
||||
Engine.IndexResult retryResult = engine.index(retry);
|
||||
assertFalse(engine.indexWriterHasDeletions());
|
||||
assertLuceneOperations(engine, 1, 0, 0);
|
||||
assertEquals(1, engine.getNumVersionLookups());
|
||||
assertNotNull(retryResult.getTranslogLocation());
|
||||
Engine.IndexResult indexResult = engine.index(operation);
|
||||
assertEquals(operation.seqNo() > retry.seqNo(), engine.indexWriterHasDeletions());
|
||||
if (operation.seqNo() > retry.seqNo()) {
|
||||
assertLuceneOperations(engine, 1, 1, 0);
|
||||
} else {
|
||||
assertLuceneOperations(engine, 1, 0, 0);
|
||||
}
|
||||
assertEquals(belowLckp ? 1 : 2, engine.getNumVersionLookups());
|
||||
assertNotNull(retryResult.getTranslogLocation());
|
||||
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) < 0);
|
||||
|
@ -3096,27 +3104,27 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
Engine.Index duplicate = replicaIndexForDoc(doc, 1, 20, true);
|
||||
if (randomBoolean()) {
|
||||
Engine.IndexResult indexResult = engine.index(operation);
|
||||
assertFalse(engine.indexWriterHasDeletions());
|
||||
assertLuceneOperations(engine, 1, 0, 0);
|
||||
assertEquals(1, engine.getNumVersionLookups());
|
||||
assertNotNull(indexResult.getTranslogLocation());
|
||||
if (randomBoolean()) {
|
||||
engine.refresh("test");
|
||||
}
|
||||
Engine.IndexResult retryResult = engine.index(duplicate);
|
||||
assertFalse(engine.indexWriterHasDeletions());
|
||||
assertLuceneOperations(engine, 1, 0, 0);
|
||||
assertEquals(2, engine.getNumVersionLookups());
|
||||
assertNotNull(retryResult.getTranslogLocation());
|
||||
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) > 0);
|
||||
} else {
|
||||
Engine.IndexResult retryResult = engine.index(duplicate);
|
||||
assertFalse(engine.indexWriterHasDeletions());
|
||||
assertLuceneOperations(engine, 1, 0, 0);
|
||||
assertEquals(1, engine.getNumVersionLookups());
|
||||
assertNotNull(retryResult.getTranslogLocation());
|
||||
if (randomBoolean()) {
|
||||
engine.refresh("test");
|
||||
}
|
||||
Engine.IndexResult indexResult = engine.index(operation);
|
||||
assertFalse(engine.indexWriterHasDeletions());
|
||||
assertLuceneOperations(engine, 1, 0, 0);
|
||||
assertEquals(2, engine.getNumVersionLookups());
|
||||
assertNotNull(retryResult.getTranslogLocation());
|
||||
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) < 0);
|
||||
|
@ -3278,10 +3286,11 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
}
|
||||
if (primary) {
|
||||
// primaries rely on lucene dedup and may index the same document twice
|
||||
assertTrue(engine.indexWriterHasDeletions());
|
||||
assertThat(engine.getNumDocUpdates(), greaterThanOrEqualTo((long) numDocs));
|
||||
assertThat(engine.getNumDocAppends() + engine.getNumDocUpdates(), equalTo(numDocs * 2L));
|
||||
} else {
|
||||
// replicas rely on seq# based dedup and in this setup (same seq#) should never rely on lucene
|
||||
assertFalse(engine.indexWriterHasDeletions());
|
||||
assertLuceneOperations(engine, numDocs, 0, 0);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3377,8 +3386,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
}
|
||||
assertEquals(0, engine.getNumVersionLookups());
|
||||
assertEquals(0, engine.getNumIndexVersionsLookups());
|
||||
assertFalse(engine.indexWriterHasDeletions());
|
||||
|
||||
assertLuceneOperations(engine, numDocs, 0, 0);
|
||||
}
|
||||
|
||||
public static long getNumVersionLookups(InternalEngine engine) { // for other tests to access this
|
||||
|
@ -4659,4 +4667,13 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
store.trimUnsafeCommits(globalCheckpoint, minRetainedTranslogGen, config.getIndexSettings().getIndexVersionCreated());
|
||||
}
|
||||
|
||||
void assertLuceneOperations(InternalEngine engine, long expectedAppends, long expectedUpdates, long expectedDeletes) {
|
||||
String message = "Lucene operations mismatched;" +
|
||||
" appends [actual:" + engine.getNumDocAppends() + ", expected:" + expectedAppends + "]," +
|
||||
" updates [actual:" + engine.getNumDocUpdates() + ", expected:" + expectedUpdates + "]," +
|
||||
" deletes [actual:" + engine.getNumDocDeletes() + ", expected:" + expectedDeletes + "]";
|
||||
assertThat(message, engine.getNumDocAppends(), equalTo(expectedAppends));
|
||||
assertThat(message, engine.getNumDocUpdates(), equalTo(expectedUpdates));
|
||||
assertThat(message, engine.getNumDocDeletes(), equalTo(expectedDeletes));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue