Ensure `doc_stats` are changing even if refresh is disabled (#27505)
Today if refresh is disabled the doc stats are not updated anymore. In a bulk index scenario this might cause confusion since even if we refresh internal readers etc. doc stats are never advancing. This change cuts over to the internal reader that is refreshed outside of the external readers refresh interval but always equally `fresh` or `fresher` which will cause less confusion.
This commit is contained in:
parent
0b6448726c
commit
a29dc20c26
|
@ -30,16 +30,19 @@ import org.apache.lucene.codecs.PostingsFormat;
|
|||
import org.apache.lucene.document.LatLonDocValuesField;
|
||||
import org.apache.lucene.index.CorruptIndexException;
|
||||
import org.apache.lucene.index.DirectoryReader;
|
||||
import org.apache.lucene.index.FilterLeafReader;
|
||||
import org.apache.lucene.index.IndexCommit;
|
||||
import org.apache.lucene.index.IndexFileNames;
|
||||
import org.apache.lucene.index.IndexFormatTooNewException;
|
||||
import org.apache.lucene.index.IndexFormatTooOldException;
|
||||
import org.apache.lucene.index.IndexWriter;
|
||||
import org.apache.lucene.index.IndexWriterConfig;
|
||||
import org.apache.lucene.index.LeafReader;
|
||||
import org.apache.lucene.index.LeafReaderContext;
|
||||
import org.apache.lucene.index.NoMergePolicy;
|
||||
import org.apache.lucene.index.SegmentCommitInfo;
|
||||
import org.apache.lucene.index.SegmentInfos;
|
||||
import org.apache.lucene.index.SegmentReader;
|
||||
import org.apache.lucene.search.DocIdSetIterator;
|
||||
import org.apache.lucene.search.Explanation;
|
||||
import org.apache.lucene.search.FieldDoc;
|
||||
|
@ -650,6 +653,21 @@ public class Lucene {
|
|||
return LenientParser.parse(toParse, defaultValue);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tries to extract a segment reader from the given index reader.
|
||||
* If no SegmentReader can be extracted an {@link IllegalStateException} is thrown.
|
||||
*/
|
||||
public static SegmentReader segmentReader(LeafReader reader) {
|
||||
if (reader instanceof SegmentReader) {
|
||||
return (SegmentReader) reader;
|
||||
} else if (reader instanceof FilterLeafReader) {
|
||||
final FilterLeafReader fReader = (FilterLeafReader) reader;
|
||||
return segmentReader(FilterLeafReader.unwrap(fReader));
|
||||
}
|
||||
// hard fail - we can't get a SegmentReader
|
||||
throw new IllegalStateException("Can not extract segment reader from given index reader [" + reader + "]");
|
||||
}
|
||||
|
||||
@SuppressForbidden(reason = "Version#parseLeniently() used in a central place")
|
||||
private static final class LenientParser {
|
||||
public static Version parse(String toParse, Version defaultValue) {
|
||||
|
|
|
@ -23,7 +23,6 @@ import org.apache.logging.log4j.Logger;
|
|||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.apache.logging.log4j.util.Supplier;
|
||||
import org.apache.lucene.index.DirectoryReader;
|
||||
import org.apache.lucene.index.FilterLeafReader;
|
||||
import org.apache.lucene.index.IndexCommit;
|
||||
import org.apache.lucene.index.IndexFileNames;
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
|
@ -143,27 +142,12 @@ public abstract class Engine implements Closeable {
|
|||
return a.ramBytesUsed();
|
||||
}
|
||||
|
||||
/**
|
||||
* Tries to extract a segment reader from the given index reader.
|
||||
* If no SegmentReader can be extracted an {@link IllegalStateException} is thrown.
|
||||
*/
|
||||
protected static SegmentReader segmentReader(LeafReader reader) {
|
||||
if (reader instanceof SegmentReader) {
|
||||
return (SegmentReader) reader;
|
||||
} else if (reader instanceof FilterLeafReader) {
|
||||
final FilterLeafReader fReader = (FilterLeafReader) reader;
|
||||
return segmentReader(FilterLeafReader.unwrap(fReader));
|
||||
}
|
||||
// hard fail - we can't get a SegmentReader
|
||||
throw new IllegalStateException("Can not extract segment reader from given index reader [" + reader + "]");
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns whether a leaf reader comes from a merge (versus flush or addIndexes).
|
||||
*/
|
||||
protected static boolean isMergedSegment(LeafReader reader) {
|
||||
// We expect leaves to be segment readers
|
||||
final Map<String, String> diagnostics = segmentReader(reader).getSegmentInfo().info.getDiagnostics();
|
||||
final Map<String, String> diagnostics = Lucene.segmentReader(reader).getSegmentInfo().info.getDiagnostics();
|
||||
final String source = diagnostics.get(IndexWriter.SOURCE);
|
||||
assert Arrays.asList(IndexWriter.SOURCE_ADDINDEXES_READERS, IndexWriter.SOURCE_FLUSH,
|
||||
IndexWriter.SOURCE_MERGE).contains(source) : "Unknown source " + source;
|
||||
|
@ -611,7 +595,7 @@ public abstract class Engine implements Closeable {
|
|||
try (Searcher searcher = acquireSearcher("segments_stats")) {
|
||||
SegmentsStats stats = new SegmentsStats();
|
||||
for (LeafReaderContext reader : searcher.reader().leaves()) {
|
||||
final SegmentReader segmentReader = segmentReader(reader.reader());
|
||||
final SegmentReader segmentReader = Lucene.segmentReader(reader.reader());
|
||||
stats.add(1, segmentReader.ramBytesUsed());
|
||||
stats.addTermsMemoryInBytes(guardedRamBytesUsed(segmentReader.getPostingsReader()));
|
||||
stats.addStoredFieldsMemoryInBytes(guardedRamBytesUsed(segmentReader.getFieldsReader()));
|
||||
|
@ -718,7 +702,7 @@ public abstract class Engine implements Closeable {
|
|||
// first, go over and compute the search ones...
|
||||
try (Searcher searcher = acquireSearcher("segments")){
|
||||
for (LeafReaderContext reader : searcher.reader().leaves()) {
|
||||
final SegmentReader segmentReader = segmentReader(reader.reader());
|
||||
final SegmentReader segmentReader = Lucene.segmentReader(reader.reader());
|
||||
SegmentCommitInfo info = segmentReader.getSegmentInfo();
|
||||
assert !segments.containsKey(info.info.name);
|
||||
Segment segment = new Segment(info.info.name);
|
||||
|
|
|
@ -21,10 +21,14 @@ package org.elasticsearch.index.shard;
|
|||
|
||||
import com.carrotsearch.hppc.ObjectLongMap;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.apache.lucene.index.CheckIndex;
|
||||
import org.apache.lucene.index.IndexCommit;
|
||||
import org.apache.lucene.index.IndexOptions;
|
||||
import org.apache.lucene.index.LeafReaderContext;
|
||||
import org.apache.lucene.index.SegmentCommitInfo;
|
||||
import org.apache.lucene.index.SegmentInfos;
|
||||
import org.apache.lucene.index.SegmentReader;
|
||||
import org.apache.lucene.index.Term;
|
||||
import org.apache.lucene.search.QueryCachingPolicy;
|
||||
import org.apache.lucene.search.ReferenceManager;
|
||||
|
@ -58,7 +62,6 @@ import org.elasticsearch.common.lease.Releasables;
|
|||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.common.metrics.MeanMetric;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
|
@ -151,7 +154,6 @@ import java.util.concurrent.CountDownLatch;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Consumer;
|
||||
|
@ -856,15 +858,27 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
}
|
||||
|
||||
public DocsStats docStats() {
|
||||
// we calculate the doc stats based on the internal reader that is more up-to-date and not subject
|
||||
// to external refreshes. For instance we don't refresh an external reader if we flush and indices with
|
||||
// index.refresh_interval=-1 won't see any doc stats updates at all. This change will give more accurate statistics
|
||||
// when indexing but not refreshing in general. Yet, if a refresh happens the internal reader is refresh as well so we are
|
||||
// safe here.
|
||||
long numDocs = 0;
|
||||
long numDeletedDocs = 0;
|
||||
long sizeInBytes = 0;
|
||||
List<Segment> segments = segments(false);
|
||||
for (Segment segment : segments) {
|
||||
if (segment.search) {
|
||||
numDocs += segment.getNumDocs();
|
||||
numDeletedDocs += segment.getDeletedDocs();
|
||||
sizeInBytes += segment.getSizeInBytes();
|
||||
try (Engine.Searcher searcher = acquireSearcher("docStats", Engine.SearcherScope.INTERNAL)) {
|
||||
for (LeafReaderContext reader : searcher.reader().leaves()) {
|
||||
// we go on the segment level here to get accurate numbers
|
||||
final SegmentReader segmentReader = Lucene.segmentReader(reader.reader());
|
||||
SegmentCommitInfo info = segmentReader.getSegmentInfo();
|
||||
numDocs += reader.reader().numDocs();
|
||||
numDeletedDocs += reader.reader().numDeletedDocs();
|
||||
try {
|
||||
sizeInBytes += info.sizeInBytes();
|
||||
} catch (IOException e) {
|
||||
logger.trace((org.apache.logging.log4j.util.Supplier<?>)
|
||||
() -> new ParameterizedMessage("failed to get size for [{}]", info.info.name), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
return new DocsStats(numDocs, numDeletedDocs, sizeInBytes);
|
||||
|
|
|
@ -2269,11 +2269,17 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
final String id = Integer.toString(i);
|
||||
indexDoc(indexShard, "test", id);
|
||||
}
|
||||
|
||||
indexShard.refresh("test");
|
||||
if (randomBoolean()) {
|
||||
indexShard.refresh("test");
|
||||
} else {
|
||||
indexShard.flush(new FlushRequest());
|
||||
}
|
||||
{
|
||||
final DocsStats docsStats = indexShard.docStats();
|
||||
assertThat(docsStats.getCount(), equalTo(numDocs));
|
||||
try (Engine.Searcher searcher = indexShard.acquireSearcher("test")) {
|
||||
assertTrue(searcher.reader().numDocs() <= docsStats.getCount());
|
||||
}
|
||||
assertThat(docsStats.getDeleted(), equalTo(0L));
|
||||
assertThat(docsStats.getAverageSizeInBytes(), greaterThan(0L));
|
||||
}
|
||||
|
@ -2293,9 +2299,14 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
flushRequest.waitIfOngoing(false);
|
||||
indexShard.flush(flushRequest);
|
||||
|
||||
indexShard.refresh("test");
|
||||
if (randomBoolean()) {
|
||||
indexShard.refresh("test");
|
||||
}
|
||||
{
|
||||
final DocsStats docStats = indexShard.docStats();
|
||||
try (Engine.Searcher searcher = indexShard.acquireSearcher("test")) {
|
||||
assertTrue(searcher.reader().numDocs() <= docStats.getCount());
|
||||
}
|
||||
assertThat(docStats.getCount(), equalTo(numDocs));
|
||||
// Lucene will delete a segment if all docs are deleted from it; this means that we lose the deletes when deleting all docs
|
||||
assertThat(docStats.getDeleted(), equalTo(numDocsToDelete == numDocs ? 0 : numDocsToDelete));
|
||||
|
@ -2307,7 +2318,11 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
forceMergeRequest.maxNumSegments(1);
|
||||
indexShard.forceMerge(forceMergeRequest);
|
||||
|
||||
indexShard.refresh("test");
|
||||
if (randomBoolean()) {
|
||||
indexShard.refresh("test");
|
||||
} else {
|
||||
indexShard.flush(new FlushRequest());
|
||||
}
|
||||
{
|
||||
final DocsStats docStats = indexShard.docStats();
|
||||
assertThat(docStats.getCount(), equalTo(numDocs));
|
||||
|
@ -2338,8 +2353,11 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
assertThat("Without flushing, segment sizes should be zero",
|
||||
indexShard.docStats().getTotalSizeInBytes(), equalTo(0L));
|
||||
|
||||
indexShard.flush(new FlushRequest());
|
||||
indexShard.refresh("test");
|
||||
if (randomBoolean()) {
|
||||
indexShard.flush(new FlushRequest());
|
||||
} else {
|
||||
indexShard.refresh("test");
|
||||
}
|
||||
{
|
||||
final DocsStats docsStats = indexShard.docStats();
|
||||
final StoreStats storeStats = indexShard.storeStats();
|
||||
|
@ -2359,9 +2377,11 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
indexDoc(indexShard, "doc", Integer.toString(i), "{\"foo\": \"bar\"}");
|
||||
}
|
||||
}
|
||||
|
||||
indexShard.flush(new FlushRequest());
|
||||
indexShard.refresh("test");
|
||||
if (randomBoolean()) {
|
||||
indexShard.flush(new FlushRequest());
|
||||
} else {
|
||||
indexShard.refresh("test");
|
||||
}
|
||||
{
|
||||
final DocsStats docsStats = indexShard.docStats();
|
||||
final StoreStats storeStats = indexShard.storeStats();
|
||||
|
|
Loading…
Reference in New Issue