Move DocsStats into Engine (#33835)
By moving DocStats into the engine we can easily cache the stats for read-only engines if necessary. It also moves the responsibility out of IndexShard which has quiet some complexity already.
This commit is contained in:
parent
6f3b3338ba
commit
0c77f45dc6
|
@ -66,6 +66,7 @@ import org.elasticsearch.index.mapper.ParsedDocument;
|
||||||
import org.elasticsearch.index.merge.MergeStats;
|
import org.elasticsearch.index.merge.MergeStats;
|
||||||
import org.elasticsearch.index.seqno.SeqNoStats;
|
import org.elasticsearch.index.seqno.SeqNoStats;
|
||||||
import org.elasticsearch.index.seqno.SequenceNumbers;
|
import org.elasticsearch.index.seqno.SequenceNumbers;
|
||||||
|
import org.elasticsearch.index.shard.DocsStats;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
import org.elasticsearch.index.store.Store;
|
import org.elasticsearch.index.store.Store;
|
||||||
import org.elasticsearch.index.translog.Translog;
|
import org.elasticsearch.index.translog.Translog;
|
||||||
|
@ -175,6 +176,41 @@ public abstract class Engine implements Closeable {
|
||||||
/** Returns how many bytes we are currently moving from heap to disk */
|
/** Returns how many bytes we are currently moving from heap to disk */
|
||||||
public abstract long getWritingBytes();
|
public abstract long getWritingBytes();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the {@link DocsStats} for this engine
|
||||||
|
*/
|
||||||
|
public DocsStats docStats() {
|
||||||
|
// we calculate the doc stats based on the internal reader that is more up-to-date and not subject
|
||||||
|
// to external refreshes. For instance we don't refresh an external reader if we flush and indices with
|
||||||
|
// index.refresh_interval=-1 won't see any doc stats updates at all. This change will give more accurate statistics
|
||||||
|
// when indexing but not refreshing in general. Yet, if a refresh happens the internal reader is refresh as well so we are
|
||||||
|
// safe here.
|
||||||
|
try (Engine.Searcher searcher = acquireSearcher("docStats", Engine.SearcherScope.INTERNAL)) {
|
||||||
|
return docsStats(searcher.reader());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected final DocsStats docsStats(IndexReader indexReader) {
|
||||||
|
long numDocs = 0;
|
||||||
|
long numDeletedDocs = 0;
|
||||||
|
long sizeInBytes = 0;
|
||||||
|
// we don't wait for a pending refreshes here since it's a stats call instead we mark it as accessed only which will cause
|
||||||
|
// the next scheduled refresh to go through and refresh the stats as well
|
||||||
|
for (LeafReaderContext readerContext : indexReader.leaves()) {
|
||||||
|
// we go on the segment level here to get accurate numbers
|
||||||
|
final SegmentReader segmentReader = Lucene.segmentReader(readerContext.reader());
|
||||||
|
SegmentCommitInfo info = segmentReader.getSegmentInfo();
|
||||||
|
numDocs += readerContext.reader().numDocs();
|
||||||
|
numDeletedDocs += readerContext.reader().numDeletedDocs();
|
||||||
|
try {
|
||||||
|
sizeInBytes += info.sizeInBytes();
|
||||||
|
} catch (IOException e) {
|
||||||
|
logger.trace(() -> new ParameterizedMessage("failed to get size for [{}]", info.info.name), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return new DocsStats(numDocs, numDeletedDocs, sizeInBytes);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A throttling class that can be activated, causing the
|
* A throttling class that can be activated, causing the
|
||||||
* {@code acquireThrottle} method to block on a lock when throttling
|
* {@code acquireThrottle} method to block on a lock when throttling
|
||||||
|
|
|
@ -34,6 +34,7 @@ import org.elasticsearch.core.internal.io.IOUtils;
|
||||||
import org.elasticsearch.index.mapper.MapperService;
|
import org.elasticsearch.index.mapper.MapperService;
|
||||||
import org.elasticsearch.index.seqno.SeqNoStats;
|
import org.elasticsearch.index.seqno.SeqNoStats;
|
||||||
import org.elasticsearch.index.seqno.SequenceNumbers;
|
import org.elasticsearch.index.seqno.SequenceNumbers;
|
||||||
|
import org.elasticsearch.index.shard.DocsStats;
|
||||||
import org.elasticsearch.index.store.Store;
|
import org.elasticsearch.index.store.Store;
|
||||||
import org.elasticsearch.index.translog.Translog;
|
import org.elasticsearch.index.translog.Translog;
|
||||||
import org.elasticsearch.index.translog.TranslogStats;
|
import org.elasticsearch.index.translog.TranslogStats;
|
||||||
|
@ -63,6 +64,7 @@ public final class ReadOnlyEngine extends Engine {
|
||||||
private final SearcherManager searcherManager;
|
private final SearcherManager searcherManager;
|
||||||
private final IndexCommit indexCommit;
|
private final IndexCommit indexCommit;
|
||||||
private final Lock indexWriterLock;
|
private final Lock indexWriterLock;
|
||||||
|
private final DocsStats docsStats;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new ReadOnlyEngine. This ctor can also be used to open a read-only engine on top of an already opened
|
* Creates a new ReadOnlyEngine. This ctor can also be used to open a read-only engine on top of an already opened
|
||||||
|
@ -101,6 +103,7 @@ public final class ReadOnlyEngine extends Engine {
|
||||||
this.indexCommit = reader.getIndexCommit();
|
this.indexCommit = reader.getIndexCommit();
|
||||||
this.searcherManager = new SearcherManager(reader,
|
this.searcherManager = new SearcherManager(reader,
|
||||||
new RamAccountingSearcherFactory(engineConfig.getCircuitBreakerService()));
|
new RamAccountingSearcherFactory(engineConfig.getCircuitBreakerService()));
|
||||||
|
this.docsStats = docsStats(reader);
|
||||||
this.indexWriterLock = indexWriterLock;
|
this.indexWriterLock = indexWriterLock;
|
||||||
success = true;
|
success = true;
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -365,4 +368,9 @@ public final class ReadOnlyEngine extends Engine {
|
||||||
@Override
|
@Override
|
||||||
public void maybePruneDeletes() {
|
public void maybePruneDeletes() {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public DocsStats docStats() {
|
||||||
|
return docsStats;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,13 +21,9 @@ package org.elasticsearch.index.shard;
|
||||||
|
|
||||||
import com.carrotsearch.hppc.ObjectLongMap;
|
import com.carrotsearch.hppc.ObjectLongMap;
|
||||||
import org.apache.logging.log4j.Logger;
|
import org.apache.logging.log4j.Logger;
|
||||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
|
||||||
import org.apache.lucene.index.CheckIndex;
|
import org.apache.lucene.index.CheckIndex;
|
||||||
import org.apache.lucene.index.IndexCommit;
|
import org.apache.lucene.index.IndexCommit;
|
||||||
import org.apache.lucene.index.LeafReaderContext;
|
|
||||||
import org.apache.lucene.index.SegmentCommitInfo;
|
|
||||||
import org.apache.lucene.index.SegmentInfos;
|
import org.apache.lucene.index.SegmentInfos;
|
||||||
import org.apache.lucene.index.SegmentReader;
|
|
||||||
import org.apache.lucene.index.Term;
|
import org.apache.lucene.index.Term;
|
||||||
import org.apache.lucene.search.Query;
|
import org.apache.lucene.search.Query;
|
||||||
import org.apache.lucene.search.QueryCachingPolicy;
|
import org.apache.lucene.search.QueryCachingPolicy;
|
||||||
|
@ -879,32 +875,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
||||||
}
|
}
|
||||||
|
|
||||||
public DocsStats docStats() {
|
public DocsStats docStats() {
|
||||||
// we calculate the doc stats based on the internal reader that is more up-to-date and not subject
|
DocsStats docsStats = getEngine().docStats();
|
||||||
// to external refreshes. For instance we don't refresh an external reader if we flush and indices with
|
|
||||||
// index.refresh_interval=-1 won't see any doc stats updates at all. This change will give more accurate statistics
|
|
||||||
// when indexing but not refreshing in general. Yet, if a refresh happens the internal reader is refresh as well so we are
|
|
||||||
// safe here.
|
|
||||||
long numDocs = 0;
|
|
||||||
long numDeletedDocs = 0;
|
|
||||||
long sizeInBytes = 0;
|
|
||||||
try (Engine.Searcher searcher = acquireSearcher("docStats", Engine.SearcherScope.INTERNAL)) {
|
|
||||||
// we don't wait for a pending refreshes here since it's a stats call instead we mark it as accessed only which will cause
|
|
||||||
// the next scheduled refresh to go through and refresh the stats as well
|
|
||||||
markSearcherAccessed();
|
markSearcherAccessed();
|
||||||
for (LeafReaderContext reader : searcher.reader().leaves()) {
|
return docsStats;
|
||||||
// we go on the segment level here to get accurate numbers
|
|
||||||
final SegmentReader segmentReader = Lucene.segmentReader(reader.reader());
|
|
||||||
SegmentCommitInfo info = segmentReader.getSegmentInfo();
|
|
||||||
numDocs += reader.reader().numDocs();
|
|
||||||
numDeletedDocs += reader.reader().numDeletedDocs();
|
|
||||||
try {
|
|
||||||
sizeInBytes += info.sizeInBytes();
|
|
||||||
} catch (IOException e) {
|
|
||||||
logger.trace(() -> new ParameterizedMessage("failed to get size for [{}]", info.info.name), e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return new DocsStats(numDocs, numDeletedDocs, sizeInBytes);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -2438,7 +2438,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
||||||
closeShards(sourceShard, targetShard);
|
closeShards(sourceShard, targetShard);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testDocStats() throws IOException, InterruptedException {
|
public void testDocStats() throws Exception {
|
||||||
IndexShard indexShard = null;
|
IndexShard indexShard = null;
|
||||||
try {
|
try {
|
||||||
indexShard = newStartedShard(
|
indexShard = newStartedShard(
|
||||||
|
@ -2455,7 +2455,14 @@ public class IndexShardTests extends IndexShardTestCase {
|
||||||
indexShard.flush(new FlushRequest());
|
indexShard.flush(new FlushRequest());
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
|
IndexShard shard = indexShard;
|
||||||
|
assertBusy(() -> {
|
||||||
|
ThreadPool threadPool = shard.getThreadPool();
|
||||||
|
assertThat(threadPool.relativeTimeInMillis(), greaterThan(shard.getLastSearcherAccess()));
|
||||||
|
});
|
||||||
|
long prevAccessTime = shard.getLastSearcherAccess();
|
||||||
final DocsStats docsStats = indexShard.docStats();
|
final DocsStats docsStats = indexShard.docStats();
|
||||||
|
assertThat("searcher was not marked as accessed", shard.getLastSearcherAccess(), greaterThan(prevAccessTime));
|
||||||
assertThat(docsStats.getCount(), equalTo(numDocs));
|
assertThat(docsStats.getCount(), equalTo(numDocs));
|
||||||
try (Engine.Searcher searcher = indexShard.acquireSearcher("test")) {
|
try (Engine.Searcher searcher = indexShard.acquireSearcher("test")) {
|
||||||
assertTrue(searcher.reader().numDocs() <= docsStats.getCount());
|
assertTrue(searcher.reader().numDocs() <= docsStats.getCount());
|
||||||
|
@ -3412,4 +3419,9 @@ public class IndexShardTests extends IndexShardTestCase {
|
||||||
assertThat(shard.translogStats().estimatedNumberOfOperations(), equalTo(translogStats.estimatedNumberOfOperations()));
|
assertThat(shard.translogStats().estimatedNumberOfOperations(), equalTo(translogStats.estimatedNumberOfOperations()));
|
||||||
closeShard(shard, false);
|
closeShard(shard, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Settings threadPoolSettings() {
|
||||||
|
return Settings.builder().put(super.threadPoolSettings()).put("thread_pool.estimated_time_interval", "5ms").build();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue