Use reader for doc stats
Today we try to pull stats from index writer but we do not get a consistent view of stats. Under heavy indexing, this inconsistency can be very skewed indeed. In particular, it can lead to the number of deleted docs being reported as negative and this leads to serialization issues. Instead, we should provide a consistent view of the stats by using an index reader. Relates #22317
This commit is contained in:
parent
1243abfecc
commit
2713549533
|
@ -64,7 +64,6 @@ import org.elasticsearch.index.mapper.ParsedDocument;
|
||||||
import org.elasticsearch.index.mapper.Uid;
|
import org.elasticsearch.index.mapper.Uid;
|
||||||
import org.elasticsearch.index.merge.MergeStats;
|
import org.elasticsearch.index.merge.MergeStats;
|
||||||
import org.elasticsearch.index.seqno.SequenceNumbersService;
|
import org.elasticsearch.index.seqno.SequenceNumbersService;
|
||||||
import org.elasticsearch.index.shard.DocsStats;
|
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
import org.elasticsearch.index.store.Store;
|
import org.elasticsearch.index.store.Store;
|
||||||
import org.elasticsearch.index.translog.Translog;
|
import org.elasticsearch.index.translog.Translog;
|
||||||
|
@ -1374,16 +1373,6 @@ public abstract class Engine implements Closeable {
|
||||||
return this.lastWriteNanos;
|
return this.lastWriteNanos;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns the engines current document statistics
|
|
||||||
*/
|
|
||||||
public DocsStats getDocStats() {
|
|
||||||
try (Engine.Searcher searcher = acquireSearcher("doc_stats")) {
|
|
||||||
IndexReader reader = searcher.reader();
|
|
||||||
return new DocsStats(reader.numDocs(), reader.numDeletedDocs());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Called for each new opened engine searcher to warm new segments
|
* Called for each new opened engine searcher to warm new segments
|
||||||
*
|
*
|
||||||
|
|
|
@ -64,7 +64,6 @@ import org.elasticsearch.index.merge.MergeStats;
|
||||||
import org.elasticsearch.index.merge.OnGoingMerge;
|
import org.elasticsearch.index.merge.OnGoingMerge;
|
||||||
import org.elasticsearch.index.seqno.SeqNoStats;
|
import org.elasticsearch.index.seqno.SeqNoStats;
|
||||||
import org.elasticsearch.index.seqno.SequenceNumbersService;
|
import org.elasticsearch.index.seqno.SequenceNumbersService;
|
||||||
import org.elasticsearch.index.shard.DocsStats;
|
|
||||||
import org.elasticsearch.index.shard.ElasticsearchMergePolicy;
|
import org.elasticsearch.index.shard.ElasticsearchMergePolicy;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
import org.elasticsearch.index.shard.TranslogRecoveryPerformer;
|
import org.elasticsearch.index.shard.TranslogRecoveryPerformer;
|
||||||
|
@ -1620,14 +1619,6 @@ public class InternalEngine extends Engine {
|
||||||
return seqNoService;
|
return seqNoService;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public DocsStats getDocStats() {
|
|
||||||
final int numDocs = indexWriter.numDocs();
|
|
||||||
final int maxDoc = indexWriter.maxDoc();
|
|
||||||
return new DocsStats(numDocs, maxDoc-numDocs);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the number of times a version was looked up either from the index.
|
* Returns the number of times a version was looked up either from the index.
|
||||||
* Note this is only available if assertions are enabled
|
* Note this is only available if assertions are enabled
|
||||||
|
|
|
@ -669,9 +669,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
||||||
}
|
}
|
||||||
|
|
||||||
public DocsStats docStats() {
|
public DocsStats docStats() {
|
||||||
readAllowed();
|
try (final Engine.Searcher searcher = acquireSearcher("doc_stats")) {
|
||||||
final Engine engine = getEngine();
|
return new DocsStats(searcher.reader().numDocs(), searcher.reader().numDeletedDocs());
|
||||||
return engine.getDocStats();
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -2548,33 +2548,6 @@ public class InternalEngineTests extends ESTestCase {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testDocStats() throws IOException {
|
|
||||||
final int numDocs = randomIntBetween(2, 10); // at least 2 documents otherwise we don't see any deletes below
|
|
||||||
for (int i = 0; i < numDocs; i++) {
|
|
||||||
ParsedDocument doc = testParsedDocument(Integer.toString(i), Integer.toString(i), "test", null, testDocument(), new BytesArray("{}"), null);
|
|
||||||
Engine.Index firstIndexRequest = new Engine.Index(newUid(Integer.toString(i)), doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false);
|
|
||||||
Engine.IndexResult indexResult = engine.index(firstIndexRequest);
|
|
||||||
assertThat(indexResult.getVersion(), equalTo(1L));
|
|
||||||
}
|
|
||||||
DocsStats docStats = engine.getDocStats();
|
|
||||||
assertEquals(numDocs, docStats.getCount());
|
|
||||||
assertEquals(0, docStats.getDeleted());
|
|
||||||
engine.forceMerge(randomBoolean(), 1, false, false, false);
|
|
||||||
|
|
||||||
ParsedDocument doc = testParsedDocument(Integer.toString(0), Integer.toString(0), "test", null, testDocument(), new BytesArray("{}"), null);
|
|
||||||
Engine.Index firstIndexRequest = new Engine.Index(newUid(Integer.toString(0)), doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false);
|
|
||||||
Engine.IndexResult index = engine.index(firstIndexRequest);
|
|
||||||
assertThat(index.getVersion(), equalTo(2L));
|
|
||||||
engine.flush(); // flush - buffered deletes are not counted
|
|
||||||
docStats = engine.getDocStats();
|
|
||||||
assertEquals(1, docStats.getDeleted());
|
|
||||||
assertEquals(numDocs, docStats.getCount());
|
|
||||||
engine.forceMerge(randomBoolean(), 1, false, false, false);
|
|
||||||
docStats = engine.getDocStats();
|
|
||||||
assertEquals(0, docStats.getDeleted());
|
|
||||||
assertEquals(numDocs, docStats.getCount());
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testDoubleDelivery() throws IOException {
|
public void testDoubleDelivery() throws IOException {
|
||||||
final ParsedDocument doc = testParsedDocument("1", "1", "test", null, testDocumentWithTextField(), new BytesArray("{}".getBytes(Charset.defaultCharset())), null);
|
final ParsedDocument doc = testParsedDocument("1", "1", "test", null, testDocumentWithTextField(), new BytesArray("{}".getBytes(Charset.defaultCharset())), null);
|
||||||
Engine.Index operation = randomAppendOnly(1, doc, false);
|
Engine.Index operation = randomAppendOnly(1, doc, false);
|
||||||
|
|
|
@ -984,33 +984,6 @@ public class ShadowEngineTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testDocStats() throws IOException {
|
|
||||||
final int numDocs = randomIntBetween(2, 10); // at least 2 documents otherwise we don't see any deletes below
|
|
||||||
for (int i = 0; i < numDocs; i++) {
|
|
||||||
ParsedDocument doc = testParsedDocument(Integer.toString(i), Integer.toString(i), "test", null, testDocument(), new BytesArray("{}"), null);
|
|
||||||
Engine.Index firstIndexRequest = new Engine.Index(newUid(Integer.toString(i)), doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false);
|
|
||||||
Engine.IndexResult indexResult = primaryEngine.index(firstIndexRequest);
|
|
||||||
assertThat(indexResult.getVersion(), equalTo(1L));
|
|
||||||
}
|
|
||||||
DocsStats docStats = primaryEngine.getDocStats();
|
|
||||||
assertEquals(numDocs, docStats.getCount());
|
|
||||||
assertEquals(0, docStats.getDeleted());
|
|
||||||
|
|
||||||
docStats = replicaEngine.getDocStats();
|
|
||||||
assertEquals(0, docStats.getCount());
|
|
||||||
assertEquals(0, docStats.getDeleted());
|
|
||||||
primaryEngine.flush();
|
|
||||||
|
|
||||||
docStats = replicaEngine.getDocStats();
|
|
||||||
assertEquals(0, docStats.getCount());
|
|
||||||
assertEquals(0, docStats.getDeleted());
|
|
||||||
replicaEngine.refresh("test");
|
|
||||||
docStats = replicaEngine.getDocStats();
|
|
||||||
assertEquals(numDocs, docStats.getCount());
|
|
||||||
assertEquals(0, docStats.getDeleted());
|
|
||||||
primaryEngine.forceMerge(randomBoolean(), 1, false, false, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testRefreshListenersFails() throws IOException {
|
public void testRefreshListenersFails() throws IOException {
|
||||||
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(),
|
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(),
|
||||||
new RefreshListeners(null, null, null, logger));
|
new RefreshListeners(null, null, null, logger));
|
||||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.lucene.store.IOContext;
|
||||||
import org.apache.lucene.util.Constants;
|
import org.apache.lucene.util.Constants;
|
||||||
import org.elasticsearch.Version;
|
import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
|
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
|
||||||
|
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
|
||||||
import org.elasticsearch.action.admin.indices.stats.CommonStats;
|
import org.elasticsearch.action.admin.indices.stats.CommonStats;
|
||||||
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
|
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
|
||||||
import org.elasticsearch.action.admin.indices.stats.ShardStats;
|
import org.elasticsearch.action.admin.indices.stats.ShardStats;
|
||||||
|
@ -57,11 +58,13 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||||
import org.elasticsearch.common.io.stream.StreamInput;
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
import org.elasticsearch.common.lease.Releasable;
|
import org.elasticsearch.common.lease.Releasable;
|
||||||
import org.elasticsearch.common.lease.Releasables;
|
import org.elasticsearch.common.lease.Releasables;
|
||||||
|
import org.elasticsearch.common.lucene.uid.Versions;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
import org.elasticsearch.env.NodeEnvironment;
|
import org.elasticsearch.env.NodeEnvironment;
|
||||||
|
import org.elasticsearch.index.VersionType;
|
||||||
import org.elasticsearch.index.engine.Engine;
|
import org.elasticsearch.index.engine.Engine;
|
||||||
import org.elasticsearch.index.engine.EngineException;
|
import org.elasticsearch.index.engine.EngineException;
|
||||||
import org.elasticsearch.index.fielddata.FieldDataStats;
|
import org.elasticsearch.index.fielddata.FieldDataStats;
|
||||||
|
@ -73,6 +76,7 @@ import org.elasticsearch.index.mapper.ParsedDocument;
|
||||||
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
|
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
|
||||||
import org.elasticsearch.index.mapper.Uid;
|
import org.elasticsearch.index.mapper.Uid;
|
||||||
import org.elasticsearch.index.mapper.UidFieldMapper;
|
import org.elasticsearch.index.mapper.UidFieldMapper;
|
||||||
|
import org.elasticsearch.index.seqno.SequenceNumbersService;
|
||||||
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
|
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
|
||||||
import org.elasticsearch.index.store.Store;
|
import org.elasticsearch.index.store.Store;
|
||||||
import org.elasticsearch.index.translog.Translog;
|
import org.elasticsearch.index.translog.Translog;
|
||||||
|
@ -112,13 +116,15 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.function.BiConsumer;
|
import java.util.function.BiConsumer;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.stream.IntStream;
|
||||||
|
|
||||||
import static java.util.Collections.emptyMap;
|
import static java.util.Collections.emptyMap;
|
||||||
import static java.util.Collections.emptySet;
|
import static java.util.Collections.emptySet;
|
||||||
import static org.elasticsearch.common.lucene.Lucene.cleanLuceneIndex;
|
import static org.elasticsearch.common.lucene.Lucene.cleanLuceneIndex;
|
||||||
import static org.elasticsearch.common.lucene.Lucene.readScoreDoc;
|
|
||||||
import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS;
|
import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS;
|
||||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||||
|
import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY;
|
||||||
import static org.hamcrest.Matchers.containsString;
|
import static org.hamcrest.Matchers.containsString;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
import static org.hamcrest.Matchers.greaterThan;
|
import static org.hamcrest.Matchers.greaterThan;
|
||||||
|
@ -1357,6 +1363,91 @@ public class IndexShardTests extends IndexShardTestCase {
|
||||||
closeShards(sourceShard, targetShard);
|
closeShards(sourceShard, targetShard);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testDocStats() throws IOException {
|
||||||
|
IndexShard indexShard = null;
|
||||||
|
try {
|
||||||
|
indexShard = newStartedShard();
|
||||||
|
final long numDocs = randomIntBetween(2, 32); // at least two documents so we have docs to delete
|
||||||
|
final long numDocsToDelete = randomIntBetween(1, Math.toIntExact(numDocs));
|
||||||
|
for (int i = 0; i < numDocs; i++) {
|
||||||
|
final String id = Integer.toString(i);
|
||||||
|
final ParsedDocument doc =
|
||||||
|
testParsedDocument(id, id, "test", null, new ParseContext.Document(), new BytesArray("{}"), null);
|
||||||
|
final Engine.Index index =
|
||||||
|
new Engine.Index(
|
||||||
|
new Term("_uid", id),
|
||||||
|
doc,
|
||||||
|
SequenceNumbersService.UNASSIGNED_SEQ_NO,
|
||||||
|
0,
|
||||||
|
Versions.MATCH_ANY,
|
||||||
|
VersionType.INTERNAL,
|
||||||
|
PRIMARY,
|
||||||
|
System.nanoTime(),
|
||||||
|
-1,
|
||||||
|
false);
|
||||||
|
final Engine.IndexResult result = indexShard.index(index);
|
||||||
|
assertThat(result.getVersion(), equalTo(1L));
|
||||||
|
}
|
||||||
|
|
||||||
|
indexShard.refresh("test");
|
||||||
|
{
|
||||||
|
final DocsStats docsStats = indexShard.docStats();
|
||||||
|
assertThat(docsStats.getCount(), equalTo(numDocs));
|
||||||
|
assertThat(docsStats.getDeleted(), equalTo(0L));
|
||||||
|
}
|
||||||
|
|
||||||
|
final List<Integer> ids = randomSubsetOf(
|
||||||
|
Math.toIntExact(numDocsToDelete),
|
||||||
|
IntStream.range(0, Math.toIntExact(numDocs)).boxed().collect(Collectors.toList()));
|
||||||
|
for (final Integer i : ids) {
|
||||||
|
final String id = Integer.toString(i);
|
||||||
|
final ParsedDocument doc = testParsedDocument(id, id, "test", null, new ParseContext.Document(), new BytesArray("{}"), null);
|
||||||
|
final Engine.Index index =
|
||||||
|
new Engine.Index(
|
||||||
|
new Term("_uid", id),
|
||||||
|
doc,
|
||||||
|
SequenceNumbersService.UNASSIGNED_SEQ_NO,
|
||||||
|
0,
|
||||||
|
Versions.MATCH_ANY,
|
||||||
|
VersionType.INTERNAL,
|
||||||
|
PRIMARY,
|
||||||
|
System.nanoTime(),
|
||||||
|
-1,
|
||||||
|
false);
|
||||||
|
final Engine.IndexResult result = indexShard.index(index);
|
||||||
|
assertThat(result.getVersion(), equalTo(2L));
|
||||||
|
}
|
||||||
|
|
||||||
|
// flush the buffered deletes
|
||||||
|
final FlushRequest flushRequest = new FlushRequest();
|
||||||
|
flushRequest.force(false);
|
||||||
|
flushRequest.waitIfOngoing(false);
|
||||||
|
indexShard.flush(flushRequest);
|
||||||
|
|
||||||
|
indexShard.refresh("test");
|
||||||
|
{
|
||||||
|
final DocsStats docStats = indexShard.docStats();
|
||||||
|
assertThat(docStats.getCount(), equalTo(numDocs));
|
||||||
|
assertThat(docStats.getDeleted(), equalTo(numDocsToDelete));
|
||||||
|
}
|
||||||
|
|
||||||
|
// merge them away
|
||||||
|
final ForceMergeRequest forceMergeRequest = new ForceMergeRequest();
|
||||||
|
forceMergeRequest.onlyExpungeDeletes(randomBoolean());
|
||||||
|
forceMergeRequest.maxNumSegments(1);
|
||||||
|
indexShard.forceMerge(forceMergeRequest);
|
||||||
|
|
||||||
|
indexShard.refresh("test");
|
||||||
|
{
|
||||||
|
final DocsStats docStats = indexShard.docStats();
|
||||||
|
assertThat(docStats.getCount(), equalTo(numDocs));
|
||||||
|
assertThat(docStats.getDeleted(), equalTo(0L));
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
closeShards(indexShard);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/** A dummy repository for testing which just needs restore overridden */
|
/** A dummy repository for testing which just needs restore overridden */
|
||||||
private abstract static class RestoreOnlyRepository extends AbstractLifecycleComponent implements Repository {
|
private abstract static class RestoreOnlyRepository extends AbstractLifecycleComponent implements Repository {
|
||||||
private final String indexName;
|
private final String indexName;
|
||||||
|
|
|
@ -21,16 +21,20 @@ package org.elasticsearch.indices.stats;
|
||||||
|
|
||||||
import org.apache.lucene.util.LuceneTestCase.SuppressCodecs;
|
import org.apache.lucene.util.LuceneTestCase.SuppressCodecs;
|
||||||
import org.elasticsearch.action.DocWriteResponse;
|
import org.elasticsearch.action.DocWriteResponse;
|
||||||
|
import org.elasticsearch.action.ShardOperationFailedException;
|
||||||
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
|
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
|
||||||
|
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
|
||||||
import org.elasticsearch.action.admin.indices.stats.CommonStats;
|
import org.elasticsearch.action.admin.indices.stats.CommonStats;
|
||||||
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
|
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
|
||||||
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags.Flag;
|
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags.Flag;
|
||||||
import org.elasticsearch.action.admin.indices.stats.IndexStats;
|
import org.elasticsearch.action.admin.indices.stats.IndexStats;
|
||||||
|
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
|
||||||
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequestBuilder;
|
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequestBuilder;
|
||||||
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
|
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
|
||||||
import org.elasticsearch.action.admin.indices.stats.ShardStats;
|
import org.elasticsearch.action.admin.indices.stats.ShardStats;
|
||||||
import org.elasticsearch.action.get.GetResponse;
|
import org.elasticsearch.action.get.GetResponse;
|
||||||
import org.elasticsearch.action.index.IndexRequestBuilder;
|
import org.elasticsearch.action.index.IndexRequestBuilder;
|
||||||
|
import org.elasticsearch.action.index.IndexResponse;
|
||||||
import org.elasticsearch.action.search.SearchType;
|
import org.elasticsearch.action.search.SearchType;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.common.bytes.BytesReference;
|
import org.elasticsearch.common.bytes.BytesReference;
|
||||||
|
@ -54,14 +58,27 @@ import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
|
||||||
import org.elasticsearch.test.ESIntegTestCase.Scope;
|
import org.elasticsearch.test.ESIntegTestCase.Scope;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
import java.util.concurrent.BrokenBarrierException;
|
||||||
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.CyclicBarrier;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
|
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
|
||||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAllSuccessful;
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAllSuccessful;
|
||||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
|
||||||
|
import static org.hamcrest.Matchers.emptyCollectionOf;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
import static org.hamcrest.Matchers.greaterThan;
|
import static org.hamcrest.Matchers.greaterThan;
|
||||||
import static org.hamcrest.Matchers.is;
|
import static org.hamcrest.Matchers.is;
|
||||||
|
@ -1068,4 +1085,103 @@ public class IndexStatsIT extends ESIntegTestCase {
|
||||||
assertThat(response.getTotal().queryCache.getMemorySizeInBytes(), equalTo(0L));
|
assertThat(response.getTotal().queryCache.getMemorySizeInBytes(), equalTo(0L));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that we can safely concurrently index and get stats. This test was inspired by a serialization issue that arose due to a race
|
||||||
|
* getting doc stats during heavy indexing. The race could lead to deleted docs being negative which would then be serialized as a
|
||||||
|
* variable-length long. Since serialization of negative longs using a variable-length format was unsupported
|
||||||
|
* ({@link org.elasticsearch.common.io.stream.StreamOutput#writeVLong(long)}), the stream would become corrupted. Here, we want to test
|
||||||
|
* that we can continue to get stats while indexing.
|
||||||
|
*/
|
||||||
|
public void testConcurrentIndexingAndStatsRequests() throws BrokenBarrierException, InterruptedException, ExecutionException {
|
||||||
|
final AtomicInteger idGenerator = new AtomicInteger();
|
||||||
|
final int numberOfIndexingThreads = Runtime.getRuntime().availableProcessors();
|
||||||
|
final int numberOfStatsThreads = 4 * numberOfIndexingThreads;
|
||||||
|
final CyclicBarrier barrier = new CyclicBarrier(1 + numberOfIndexingThreads + numberOfStatsThreads);
|
||||||
|
final AtomicBoolean stop = new AtomicBoolean();
|
||||||
|
final List<Thread> threads = new ArrayList<>(numberOfIndexingThreads + numberOfIndexingThreads);
|
||||||
|
|
||||||
|
final CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
final AtomicBoolean failed = new AtomicBoolean();
|
||||||
|
final AtomicReference<List<ShardOperationFailedException>> shardFailures = new AtomicReference<>(new CopyOnWriteArrayList<>());
|
||||||
|
final AtomicReference<List<Exception>> executionFailures = new AtomicReference<>(new CopyOnWriteArrayList<>());
|
||||||
|
|
||||||
|
// increasing the number of shards increases the number of chances any one stats request will hit a race
|
||||||
|
final CreateIndexRequest createIndexRequest =
|
||||||
|
new CreateIndexRequest("test", Settings.builder().put("index.number_of_shards", 10).build());
|
||||||
|
client().admin().indices().create(createIndexRequest).get();
|
||||||
|
|
||||||
|
// start threads that will index concurrently with stats requests
|
||||||
|
for (int i = 0; i < numberOfIndexingThreads; i++) {
|
||||||
|
final Thread thread = new Thread(() -> {
|
||||||
|
try {
|
||||||
|
barrier.await();
|
||||||
|
} catch (final BrokenBarrierException | InterruptedException e) {
|
||||||
|
failed.set(true);
|
||||||
|
executionFailures.get().add(e);
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
while (!stop.get()) {
|
||||||
|
final String id = Integer.toString(idGenerator.incrementAndGet());
|
||||||
|
final IndexResponse response =
|
||||||
|
client()
|
||||||
|
.prepareIndex("test", "type", id)
|
||||||
|
.setSource("{}")
|
||||||
|
.get();
|
||||||
|
assertThat(response.getResult(), equalTo(DocWriteResponse.Result.CREATED));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
thread.setName("indexing-" + i);
|
||||||
|
threads.add(thread);
|
||||||
|
thread.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
// start threads that will get stats concurrently with indexing
|
||||||
|
for (int i = 0; i < numberOfStatsThreads; i++) {
|
||||||
|
final Thread thread = new Thread(() -> {
|
||||||
|
try {
|
||||||
|
barrier.await();
|
||||||
|
} catch (final BrokenBarrierException | InterruptedException e) {
|
||||||
|
failed.set(true);
|
||||||
|
executionFailures.get().add(e);
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
final IndicesStatsRequest request = new IndicesStatsRequest();
|
||||||
|
request.all();
|
||||||
|
request.indices(new String[0]);
|
||||||
|
while (!stop.get()) {
|
||||||
|
try {
|
||||||
|
final IndicesStatsResponse response = client().admin().indices().stats(request).get();
|
||||||
|
if (response.getFailedShards() > 0) {
|
||||||
|
failed.set(true);
|
||||||
|
shardFailures.get().addAll(Arrays.asList(response.getShardFailures()));
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
} catch (final ExecutionException | InterruptedException e) {
|
||||||
|
failed.set(true);
|
||||||
|
executionFailures.get().add(e);
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
thread.setName("stats-" + i);
|
||||||
|
threads.add(thread);
|
||||||
|
thread.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
// release the hounds
|
||||||
|
barrier.await();
|
||||||
|
|
||||||
|
// wait for a failure, or for fifteen seconds to elapse
|
||||||
|
latch.await(15, TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
// stop all threads and wait for them to complete
|
||||||
|
stop.set(true);
|
||||||
|
for (final Thread thread : threads) {
|
||||||
|
thread.join();
|
||||||
|
}
|
||||||
|
|
||||||
|
assertThat(shardFailures.get(), emptyCollectionOf(ShardOperationFailedException.class));
|
||||||
|
assertThat(executionFailures.get(), emptyCollectionOf(Exception.class));
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -108,6 +108,8 @@ PUT logs_write/log/1
|
||||||
"message": "a dummy log"
|
"message": "a dummy log"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
POST logs_write/_refresh
|
||||||
|
|
||||||
# Wait for a day to pass
|
# Wait for a day to pass
|
||||||
|
|
||||||
POST /logs_write/_rollover <2>
|
POST /logs_write/_rollover <2>
|
||||||
|
|
Loading…
Reference in New Issue