Adds average document size to DocsStats (#27117)

This change is required in order to support a size based check for the
index rollover.

The index size is estimated by sampling the existing segments only. We
prefer using segments to StoreStats because StoreStats is not reliable
if indexing or merging operations are in progress.

Relates #27004
This commit is contained in:
Nhat 2017-10-28 12:47:08 -04:00 committed by GitHub
parent abaede2373
commit 07d270b45f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 174 additions and 15 deletions

View File

@ -19,11 +19,13 @@
package org.elasticsearch.index.shard;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.store.StoreStats;
import java.io.IOException;
@ -31,22 +33,25 @@ public class DocsStats implements Streamable, ToXContentFragment {
long count = 0;
long deleted = 0;
long totalSizeInBytes = 0;
public DocsStats() {
}
public DocsStats(long count, long deleted) {
public DocsStats(long count, long deleted, long totalSizeInBytes) {
this.count = count;
this.deleted = deleted;
this.totalSizeInBytes = totalSizeInBytes;
}
public void add(DocsStats docsStats) {
if (docsStats == null) {
public void add(DocsStats other) {
if (other == null) {
return;
}
count += docsStats.count;
deleted += docsStats.deleted;
this.totalSizeInBytes += other.totalSizeInBytes;
this.count += other.count;
this.deleted += other.deleted;
}
public long getCount() {
@ -57,16 +62,40 @@ public class DocsStats implements Streamable, ToXContentFragment {
return this.deleted;
}
/**
* Returns the total size in bytes of all documents in this stats.
* This value may be more reliable than {@link StoreStats#getSizeInBytes()} in estimating the index size.
*/
public long getTotalSizeInBytes() {
return totalSizeInBytes;
}
/**
* Returns the average size in bytes of all documents in this stats.
*/
public long getAverageSizeInBytes() {
long totalDocs = count + deleted;
return totalDocs == 0 ? 0 : totalSizeInBytes / totalDocs;
}
@Override
public void readFrom(StreamInput in) throws IOException {
count = in.readVLong();
deleted = in.readVLong();
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
totalSizeInBytes = in.readVLong();
} else {
totalSizeInBytes = -1;
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(count);
out.writeVLong(deleted);
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
out.writeVLong(totalSizeInBytes);
}
}
@Override

View File

@ -880,9 +880,18 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
public DocsStats docStats() {
try (Engine.Searcher searcher = acquireSearcher("doc_stats")) {
return new DocsStats(searcher.reader().numDocs(), searcher.reader().numDeletedDocs());
long numDocs = 0;
long numDeletedDocs = 0;
long sizeInBytes = 0;
List<Segment> segments = segments(false);
for (Segment segment : segments) {
if (segment.search) {
numDocs += segment.getNumDocs();
numDeletedDocs += segment.getDeletedDocs();
sizeInBytes += segment.getSizeInBytes();
}
}
return new DocsStats(numDocs, numDeletedDocs, sizeInBytes);
}
/**

View File

@ -82,12 +82,12 @@ public class TransportRolloverActionTests extends ESTestCase {
.settings(settings)
.build();
final HashSet<Condition> conditions = Sets.newHashSet(maxDocsCondition, maxAgeCondition);
Set<Condition.Result> results = evaluateConditions(conditions, new DocsStats(matchMaxDocs, 0L), metaData);
Set<Condition.Result> results = evaluateConditions(conditions, new DocsStats(matchMaxDocs, 0L, between(1, 10000)), metaData);
assertThat(results.size(), equalTo(2));
for (Condition.Result result : results) {
assertThat(result.matched, equalTo(true));
}
results = evaluateConditions(conditions, new DocsStats(notMatchMaxDocs, 0), metaData);
results = evaluateConditions(conditions, new DocsStats(notMatchMaxDocs, 0, between(1, 10000)), metaData);
assertThat(results.size(), equalTo(2));
for (Condition.Result result : results) {
if (result.condition instanceof MaxAgeCondition) {
@ -213,10 +213,10 @@ public class TransportRolloverActionTests extends ESTestCase {
private IndicesStatsResponse createIndecesStatResponse(long totalDocs, long primaryDocs) {
final CommonStats primaryStats = mock(CommonStats.class);
when(primaryStats.getDocs()).thenReturn(new DocsStats(primaryDocs, 0));
when(primaryStats.getDocs()).thenReturn(new DocsStats(primaryDocs, 0, between(1, 10000)));
final CommonStats totalStats = mock(CommonStats.class);
when(totalStats.getDocs()).thenReturn(new DocsStats(totalDocs, 0));
when(totalStats.getDocs()).thenReturn(new DocsStats(totalDocs, 0, between(1, 10000)));
final IndicesStatsResponse response = mock(IndicesStatsResponse.class);
when(response.getPrimaries()).thenReturn(primaryStats);

View File

@ -73,7 +73,7 @@ public class TransportShrinkActionTests extends ESTestCase {
assertTrue(
expectThrows(IllegalStateException.class, () ->
TransportShrinkAction.prepareCreateIndexRequest(new ShrinkRequest("target", "source"), state,
(i) -> new DocsStats(Integer.MAX_VALUE, randomIntBetween(1, 1000)), new IndexNameExpressionResolver(Settings.EMPTY))
(i) -> new DocsStats(Integer.MAX_VALUE, between(1, 1000), between(1, 100)), new IndexNameExpressionResolver(Settings.EMPTY))
).getMessage().startsWith("Can't merge index with more than [2147483519] docs - too many documents in shards "));
@ -84,7 +84,7 @@ public class TransportShrinkActionTests extends ESTestCase {
ClusterState clusterState = createClusterState("source", 8, 1,
Settings.builder().put("index.blocks.write", true).build());
TransportShrinkAction.prepareCreateIndexRequest(req, clusterState,
(i) -> i == 2 || i == 3 ? new DocsStats(Integer.MAX_VALUE/2, randomIntBetween(1, 1000)) : null,
(i) -> i == 2 || i == 3 ? new DocsStats(Integer.MAX_VALUE / 2, between(1, 1000), between(1, 10000)) : null,
new IndexNameExpressionResolver(Settings.EMPTY));
}
).getMessage().startsWith("Can't merge index with more than [2147483519] docs - too many documents in shards "));
@ -106,7 +106,7 @@ public class TransportShrinkActionTests extends ESTestCase {
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
TransportShrinkAction.prepareCreateIndexRequest(new ShrinkRequest("target", "source"), clusterState,
(i) -> new DocsStats(randomIntBetween(1, 1000), randomIntBetween(1, 1000)), new IndexNameExpressionResolver(Settings.EMPTY));
(i) -> new DocsStats(between(1, 1000), between(1, 1000), between(0, 10000)), new IndexNameExpressionResolver(Settings.EMPTY));
}
public void testShrinkIndexSettings() {
@ -128,7 +128,7 @@ public class TransportShrinkActionTests extends ESTestCase {
routingTable.index(indexName).shardsWithState(ShardRoutingState.INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
int numSourceShards = clusterState.metaData().index(indexName).getNumberOfShards();
DocsStats stats = new DocsStats(randomIntBetween(0, (IndexWriter.MAX_DOCS) / numSourceShards), randomIntBetween(1, 1000));
DocsStats stats = new DocsStats(between(0, (IndexWriter.MAX_DOCS) / numSourceShards), between(1, 1000), between(1, 10000));
ShrinkRequest target = new ShrinkRequest("target", indexName);
final ActiveShardCount activeShardCount = randomBoolean() ? ActiveShardCount.ALL : ActiveShardCount.ONE;
target.setWaitForActiveShards(activeShardCount);

View File

@ -0,0 +1,59 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.shard;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.test.ESTestCase;
import static org.hamcrest.Matchers.equalTo;
public class DocsStatsTests extends ESTestCase {
public void testCalculateAverageDocSize() throws Exception {
DocsStats stats = new DocsStats(10, 2, 120);
assertThat(stats.getAverageSizeInBytes(), equalTo(10L));
stats.add(new DocsStats(0, 0, 0));
assertThat(stats.getAverageSizeInBytes(), equalTo(10L));
stats.add(new DocsStats(8, 30, 480));
assertThat(stats.getCount(), equalTo(18L));
assertThat(stats.getDeleted(), equalTo(32L));
assertThat(stats.getTotalSizeInBytes(), equalTo(600L));
assertThat(stats.getAverageSizeInBytes(), equalTo(12L));
}
public void testSerialize() throws Exception {
DocsStats originalStats = new DocsStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong());
try (BytesStreamOutput out = new BytesStreamOutput()) {
originalStats.writeTo(out);
BytesReference bytes = out.bytes();
try (StreamInput in = bytes.streamInput()) {
DocsStats cloneStats = new DocsStats();
cloneStats.readFrom(in);
assertThat(cloneStats.getCount(), equalTo(originalStats.getCount()));
assertThat(cloneStats.getDeleted(), equalTo(originalStats.getDeleted()));
assertThat(cloneStats.getAverageSizeInBytes(), equalTo(originalStats.getAverageSizeInBytes()));
}
}
}
}

View File

@ -67,6 +67,7 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.VersionType;
@ -87,6 +88,7 @@ import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreStats;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogTests;
import org.elasticsearch.indices.IndicesQueryCache;
@ -150,6 +152,7 @@ import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
@ -2227,6 +2230,7 @@ public class IndexShardTests extends IndexShardTestCase {
final DocsStats docsStats = indexShard.docStats();
assertThat(docsStats.getCount(), equalTo(numDocs));
assertThat(docsStats.getDeleted(), equalTo(0L));
assertThat(docsStats.getAverageSizeInBytes(), greaterThan(0L));
}
final List<Integer> ids = randomSubsetOf(
@ -2263,12 +2267,70 @@ public class IndexShardTests extends IndexShardTestCase {
final DocsStats docStats = indexShard.docStats();
assertThat(docStats.getCount(), equalTo(numDocs));
assertThat(docStats.getDeleted(), equalTo(0L));
assertThat(docStats.getAverageSizeInBytes(), greaterThan(0L));
}
} finally {
closeShards(indexShard);
}
}
public void testEstimateTotalDocSize() throws Exception {
IndexShard indexShard = null;
try {
indexShard = newStartedShard(true);
int numDoc = randomIntBetween(100, 200);
for (int i = 0; i < numDoc; i++) {
String doc = XContentFactory.jsonBuilder()
.startObject()
.field("count", randomInt())
.field("point", randomFloat())
.field("description", randomUnicodeOfCodepointLength(100))
.endObject().string();
indexDoc(indexShard, "doc", Integer.toString(i), doc);
}
assertThat("Without flushing, segment sizes should be zero",
indexShard.docStats().getTotalSizeInBytes(), equalTo(0L));
indexShard.flush(new FlushRequest());
indexShard.refresh("test");
{
final DocsStats docsStats = indexShard.docStats();
final StoreStats storeStats = indexShard.storeStats();
assertThat(storeStats.sizeInBytes(), greaterThan(numDoc * 100L)); // A doc should be more than 100 bytes.
assertThat("Estimated total document size is too small compared with the stored size",
docsStats.getTotalSizeInBytes(), greaterThanOrEqualTo(storeStats.sizeInBytes() * 80/100));
assertThat("Estimated total document size is too large compared with the stored size",
docsStats.getTotalSizeInBytes(), lessThanOrEqualTo(storeStats.sizeInBytes() * 120/100));
}
// Do some updates and deletes, then recheck the correlation again.
for (int i = 0; i < numDoc / 2; i++) {
if (randomBoolean()) {
deleteDoc(indexShard, "doc", Integer.toString(i));
} else {
indexDoc(indexShard, "doc", Integer.toString(i), "{\"foo\": \"bar\"}");
}
}
indexShard.flush(new FlushRequest());
indexShard.refresh("test");
{
final DocsStats docsStats = indexShard.docStats();
final StoreStats storeStats = indexShard.storeStats();
assertThat("Estimated total document size is too small compared with the stored size",
docsStats.getTotalSizeInBytes(), greaterThanOrEqualTo(storeStats.sizeInBytes() * 80/100));
assertThat("Estimated total document size is too large compared with the stored size",
docsStats.getTotalSizeInBytes(), lessThanOrEqualTo(storeStats.sizeInBytes() * 120/100));
}
} finally {
closeShards(indexShard);
}
}
/**
* here we are simulating the scenario that happens when we do async shard fetching from GatewaySerivce while we are finishing
* a recovery and concurrently clean files. This should always be possible without any exception. Yet there was a bug where IndexShard