diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index b081d76f3c7..663a10791b6 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -64,7 +64,6 @@ import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.seqno.SequenceNumbersService; -import org.elasticsearch.index.shard.DocsStats; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; @@ -1374,16 +1373,6 @@ public abstract class Engine implements Closeable { return this.lastWriteNanos; } - /** - * Returns the engines current document statistics - */ - public DocsStats getDocStats() { - try (Engine.Searcher searcher = acquireSearcher("doc_stats")) { - IndexReader reader = searcher.reader(); - return new DocsStats(reader.numDocs(), reader.numDeletedDocs()); - } - } - /** * Called for each new opened engine searcher to warm new segments * diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 058ed0a19fc..a18ca7f280e 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -64,7 +64,6 @@ import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.merge.OnGoingMerge; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbersService; -import org.elasticsearch.index.shard.DocsStats; import org.elasticsearch.index.shard.ElasticsearchMergePolicy; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.TranslogRecoveryPerformer; @@ -1620,14 +1619,6 @@ public class InternalEngine extends Engine { return seqNoService; } - @Override - public DocsStats getDocStats() { - final int numDocs = indexWriter.numDocs(); - final int maxDoc = indexWriter.maxDoc(); - return new DocsStats(numDocs, maxDoc-numDocs); - } - - /** * Returns the number of times a version was looked up either from the index. * Note this is only available if assertions are enabled diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index fc6eac196d8..b9eb50545da 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -669,9 +669,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl } public DocsStats docStats() { - readAllowed(); - final Engine engine = getEngine(); - return engine.getDocStats(); + try (final Engine.Searcher searcher = acquireSearcher("doc_stats")) { + return new DocsStats(searcher.reader().numDocs(), searcher.reader().numDeletedDocs()); + } } /** diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 065a6d74f07..a7620901826 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -2548,33 +2548,6 @@ public class InternalEngineTests extends ESTestCase { } - public void testDocStats() throws IOException { - final int numDocs = randomIntBetween(2, 10); // at least 2 documents otherwise we don't see any deletes below - for (int i = 0; i < numDocs; i++) { - ParsedDocument doc = testParsedDocument(Integer.toString(i), Integer.toString(i), "test", null, testDocument(), new BytesArray("{}"), null); - Engine.Index firstIndexRequest = new Engine.Index(newUid(Integer.toString(i)), doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false); - Engine.IndexResult indexResult = engine.index(firstIndexRequest); - assertThat(indexResult.getVersion(), equalTo(1L)); - } - DocsStats docStats = engine.getDocStats(); - assertEquals(numDocs, docStats.getCount()); - assertEquals(0, docStats.getDeleted()); - engine.forceMerge(randomBoolean(), 1, false, false, false); - - ParsedDocument doc = testParsedDocument(Integer.toString(0), Integer.toString(0), "test", null, testDocument(), new BytesArray("{}"), null); - Engine.Index firstIndexRequest = new Engine.Index(newUid(Integer.toString(0)), doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false); - Engine.IndexResult index = engine.index(firstIndexRequest); - assertThat(index.getVersion(), equalTo(2L)); - engine.flush(); // flush - buffered deletes are not counted - docStats = engine.getDocStats(); - assertEquals(1, docStats.getDeleted()); - assertEquals(numDocs, docStats.getCount()); - engine.forceMerge(randomBoolean(), 1, false, false, false); - docStats = engine.getDocStats(); - assertEquals(0, docStats.getDeleted()); - assertEquals(numDocs, docStats.getCount()); - } - public void testDoubleDelivery() throws IOException { final ParsedDocument doc = testParsedDocument("1", "1", "test", null, testDocumentWithTextField(), new BytesArray("{}".getBytes(Charset.defaultCharset())), null); Engine.Index operation = randomAppendOnly(1, doc, false); diff --git a/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java index a3c58f25ea9..a7470666d63 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java @@ -984,33 +984,6 @@ public class ShadowEngineTests extends ESTestCase { } } - public void testDocStats() throws IOException { - final int numDocs = randomIntBetween(2, 10); // at least 2 documents otherwise we don't see any deletes below - for (int i = 0; i < numDocs; i++) { - ParsedDocument doc = testParsedDocument(Integer.toString(i), Integer.toString(i), "test", null, testDocument(), new BytesArray("{}"), null); - Engine.Index firstIndexRequest = new Engine.Index(newUid(Integer.toString(i)), doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false); - Engine.IndexResult indexResult = primaryEngine.index(firstIndexRequest); - assertThat(indexResult.getVersion(), equalTo(1L)); - } - DocsStats docStats = primaryEngine.getDocStats(); - assertEquals(numDocs, docStats.getCount()); - assertEquals(0, docStats.getDeleted()); - - docStats = replicaEngine.getDocStats(); - assertEquals(0, docStats.getCount()); - assertEquals(0, docStats.getDeleted()); - primaryEngine.flush(); - - docStats = replicaEngine.getDocStats(); - assertEquals(0, docStats.getCount()); - assertEquals(0, docStats.getDeleted()); - replicaEngine.refresh("test"); - docStats = replicaEngine.getDocStats(); - assertEquals(numDocs, docStats.getCount()); - assertEquals(0, docStats.getDeleted()); - primaryEngine.forceMerge(randomBoolean(), 1, false, false, false); - } - public void testRefreshListenersFails() throws IOException { EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), new RefreshListeners(null, null, null, logger)); diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 036faa9b903..135d77a34ab 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -33,6 +33,7 @@ import org.apache.lucene.store.IOContext; import org.apache.lucene.util.Constants; import org.elasticsearch.Version; import org.elasticsearch.action.admin.indices.flush.FlushRequest; +import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.elasticsearch.action.admin.indices.stats.CommonStats; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; import org.elasticsearch.action.admin.indices.stats.ShardStats; @@ -57,11 +58,13 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.fielddata.FieldDataStats; @@ -73,6 +76,7 @@ import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.mapper.UidFieldMapper; +import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; @@ -112,13 +116,15 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static org.elasticsearch.common.lucene.Lucene.cleanLuceneIndex; -import static org.elasticsearch.common.lucene.Lucene.readScoreDoc; import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -1357,6 +1363,91 @@ public class IndexShardTests extends IndexShardTestCase { closeShards(sourceShard, targetShard); } + public void testDocStats() throws IOException { + IndexShard indexShard = null; + try { + indexShard = newStartedShard(); + final long numDocs = randomIntBetween(2, 32); // at least two documents so we have docs to delete + final long numDocsToDelete = randomIntBetween(1, Math.toIntExact(numDocs)); + for (int i = 0; i < numDocs; i++) { + final String id = Integer.toString(i); + final ParsedDocument doc = + testParsedDocument(id, id, "test", null, new ParseContext.Document(), new BytesArray("{}"), null); + final Engine.Index index = + new Engine.Index( + new Term("_uid", id), + doc, + SequenceNumbersService.UNASSIGNED_SEQ_NO, + 0, + Versions.MATCH_ANY, + VersionType.INTERNAL, + PRIMARY, + System.nanoTime(), + -1, + false); + final Engine.IndexResult result = indexShard.index(index); + assertThat(result.getVersion(), equalTo(1L)); + } + + indexShard.refresh("test"); + { + final DocsStats docsStats = indexShard.docStats(); + assertThat(docsStats.getCount(), equalTo(numDocs)); + assertThat(docsStats.getDeleted(), equalTo(0L)); + } + + final List ids = randomSubsetOf( + Math.toIntExact(numDocsToDelete), + IntStream.range(0, Math.toIntExact(numDocs)).boxed().collect(Collectors.toList())); + for (final Integer i : ids) { + final String id = Integer.toString(i); + final ParsedDocument doc = testParsedDocument(id, id, "test", null, new ParseContext.Document(), new BytesArray("{}"), null); + final Engine.Index index = + new Engine.Index( + new Term("_uid", id), + doc, + SequenceNumbersService.UNASSIGNED_SEQ_NO, + 0, + Versions.MATCH_ANY, + VersionType.INTERNAL, + PRIMARY, + System.nanoTime(), + -1, + false); + final Engine.IndexResult result = indexShard.index(index); + assertThat(result.getVersion(), equalTo(2L)); + } + + // flush the buffered deletes + final FlushRequest flushRequest = new FlushRequest(); + flushRequest.force(false); + flushRequest.waitIfOngoing(false); + indexShard.flush(flushRequest); + + indexShard.refresh("test"); + { + final DocsStats docStats = indexShard.docStats(); + assertThat(docStats.getCount(), equalTo(numDocs)); + assertThat(docStats.getDeleted(), equalTo(numDocsToDelete)); + } + + // merge them away + final ForceMergeRequest forceMergeRequest = new ForceMergeRequest(); + forceMergeRequest.onlyExpungeDeletes(randomBoolean()); + forceMergeRequest.maxNumSegments(1); + indexShard.forceMerge(forceMergeRequest); + + indexShard.refresh("test"); + { + final DocsStats docStats = indexShard.docStats(); + assertThat(docStats.getCount(), equalTo(numDocs)); + assertThat(docStats.getDeleted(), equalTo(0L)); + } + } finally { + closeShards(indexShard); + } + } + /** A dummy repository for testing which just needs restore overridden */ private abstract static class RestoreOnlyRepository extends AbstractLifecycleComponent implements Repository { private final String indexName; diff --git a/core/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java b/core/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java index 3d9e66755eb..519df00c06c 100644 --- a/core/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java +++ b/core/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java @@ -21,16 +21,20 @@ package org.elasticsearch.indices.stats; import org.apache.lucene.util.LuceneTestCase.SuppressCodecs; import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.ShardOperationFailedException; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.stats.CommonStats; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags.Flag; import org.elasticsearch.action.admin.indices.stats.IndexStats; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequestBuilder; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.bytes.BytesReference; @@ -54,14 +58,27 @@ import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; import java.util.EnumSet; +import java.util.List; import java.util.Random; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAllSuccessful; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse; +import static org.hamcrest.Matchers.emptyCollectionOf; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.is; @@ -1068,4 +1085,103 @@ public class IndexStatsIT extends ESIntegTestCase { assertThat(response.getTotal().queryCache.getMemorySizeInBytes(), equalTo(0L)); } + /** + * Test that we can safely concurrently index and get stats. This test was inspired by a serialization issue that arose due to a race + * getting doc stats during heavy indexing. The race could lead to deleted docs being negative which would then be serialized as a + * variable-length long. Since serialization of negative longs using a variable-length format was unsupported + * ({@link org.elasticsearch.common.io.stream.StreamOutput#writeVLong(long)}), the stream would become corrupted. Here, we want to test + * that we can continue to get stats while indexing. + */ + public void testConcurrentIndexingAndStatsRequests() throws BrokenBarrierException, InterruptedException, ExecutionException { + final AtomicInteger idGenerator = new AtomicInteger(); + final int numberOfIndexingThreads = Runtime.getRuntime().availableProcessors(); + final int numberOfStatsThreads = 4 * numberOfIndexingThreads; + final CyclicBarrier barrier = new CyclicBarrier(1 + numberOfIndexingThreads + numberOfStatsThreads); + final AtomicBoolean stop = new AtomicBoolean(); + final List threads = new ArrayList<>(numberOfIndexingThreads + numberOfIndexingThreads); + + final CountDownLatch latch = new CountDownLatch(1); + final AtomicBoolean failed = new AtomicBoolean(); + final AtomicReference> shardFailures = new AtomicReference<>(new CopyOnWriteArrayList<>()); + final AtomicReference> executionFailures = new AtomicReference<>(new CopyOnWriteArrayList<>()); + + // increasing the number of shards increases the number of chances any one stats request will hit a race + final CreateIndexRequest createIndexRequest = + new CreateIndexRequest("test", Settings.builder().put("index.number_of_shards", 10).build()); + client().admin().indices().create(createIndexRequest).get(); + + // start threads that will index concurrently with stats requests + for (int i = 0; i < numberOfIndexingThreads; i++) { + final Thread thread = new Thread(() -> { + try { + barrier.await(); + } catch (final BrokenBarrierException | InterruptedException e) { + failed.set(true); + executionFailures.get().add(e); + latch.countDown(); + } + while (!stop.get()) { + final String id = Integer.toString(idGenerator.incrementAndGet()); + final IndexResponse response = + client() + .prepareIndex("test", "type", id) + .setSource("{}") + .get(); + assertThat(response.getResult(), equalTo(DocWriteResponse.Result.CREATED)); + } + }); + thread.setName("indexing-" + i); + threads.add(thread); + thread.start(); + } + + // start threads that will get stats concurrently with indexing + for (int i = 0; i < numberOfStatsThreads; i++) { + final Thread thread = new Thread(() -> { + try { + barrier.await(); + } catch (final BrokenBarrierException | InterruptedException e) { + failed.set(true); + executionFailures.get().add(e); + latch.countDown(); + } + final IndicesStatsRequest request = new IndicesStatsRequest(); + request.all(); + request.indices(new String[0]); + while (!stop.get()) { + try { + final IndicesStatsResponse response = client().admin().indices().stats(request).get(); + if (response.getFailedShards() > 0) { + failed.set(true); + shardFailures.get().addAll(Arrays.asList(response.getShardFailures())); + latch.countDown(); + } + } catch (final ExecutionException | InterruptedException e) { + failed.set(true); + executionFailures.get().add(e); + latch.countDown(); + } + } + }); + thread.setName("stats-" + i); + threads.add(thread); + thread.start(); + } + + // release the hounds + barrier.await(); + + // wait for a failure, or for fifteen seconds to elapse + latch.await(15, TimeUnit.SECONDS); + + // stop all threads and wait for them to complete + stop.set(true); + for (final Thread thread : threads) { + thread.join(); + } + + assertThat(shardFailures.get(), emptyCollectionOf(ShardOperationFailedException.class)); + assertThat(executionFailures.get(), emptyCollectionOf(Exception.class)); + } + } diff --git a/docs/reference/indices/rollover-index.asciidoc b/docs/reference/indices/rollover-index.asciidoc index db78104be12..9ae8c72a93c 100644 --- a/docs/reference/indices/rollover-index.asciidoc +++ b/docs/reference/indices/rollover-index.asciidoc @@ -108,6 +108,8 @@ PUT logs_write/log/1 "message": "a dummy log" } +POST logs_write/_refresh + # Wait for a day to pass POST /logs_write/_rollover <2>