From 52fa4653003965283a299bc1872726b242fc7dbc Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 27 Feb 2020 10:01:24 +0000 Subject: [PATCH] Cache completion stats between refreshes (#52872) Computing the stats for completion fields may involve a significant amount of work since it walks every field of every segment looking for completion fields. Innocuous-looking APIs like `GET _stats` or `GET _cluster/stats` do this for every shard in the cluster. This repeated work is unnecessary since these stats do not change between refreshes; in many indices they remain constant for a long time. This commit introduces a cache for these stats which is invalidated on a refresh, allowing most stats calls to bypass the work needed to compute them on most shards. Closes #51915 Backport of #51991 --- .../indices.stats/40_updates_on_refresh.yml | 67 +++++ .../index/engine/CompletionStatsCache.java | 153 +++++++++++ .../elasticsearch/index/engine/Engine.java | 32 +-- .../index/engine/InternalEngine.java | 10 + .../index/engine/ReadOnlyEngine.java | 11 + .../elasticsearch/index/shard/IndexShard.java | 7 +- .../engine/CompletionStatsCacheTests.java | 241 ++++++++++++++++++ 7 files changed, 484 insertions(+), 37 deletions(-) create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/test/indices.stats/40_updates_on_refresh.yml create mode 100644 server/src/main/java/org/elasticsearch/index/engine/CompletionStatsCache.java create mode 100644 server/src/test/java/org/elasticsearch/index/engine/CompletionStatsCacheTests.java diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.stats/40_updates_on_refresh.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.stats/40_updates_on_refresh.yml new file mode 100644 index 00000000000..73c58211c18 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.stats/40_updates_on_refresh.yml @@ -0,0 +1,67 @@ +--- +setup: + + - do: + indices.create: + index: test1 + wait_for_active_shards: all + body: + settings: + # Limit the number of shards so that shards are unlikely + # to be relocated or being initialized between the test + # set up and the test execution + index.number_of_shards: 3 + index.number_of_replicas: 0 + mappings: + properties: + bar: + type: text + fielddata: true + fields: + completion: + type: completion + + - do: + cluster.health: + wait_for_no_relocating_shards: true + wait_for_events: languid + + - do: + index: + index: test1 + id: 1 + body: { "bar": "bar" } + + - do: + index: + index: test1 + id: 2 + body: { "bar": "foo" } + + - do: + indices.refresh: {} + +--- +"Completion stats": + - do: + indices.stats: { completion_fields: "*" } + + - match: { _shards.failed: 0} + - gt: { _all.total.completion.fields.bar\.completion.size_in_bytes: 0 } + - gt: { _all.total.completion.size_in_bytes: 0 } + - set: { _all.total.completion.size_in_bytes: original_size } + + - do: + index: + index: test1 + id: 3 + body: { "bar": "foo", "baz": "foo" } + + - do: + indices.refresh: {} + + - do: + indices.stats: { completion_fields: "*" } + + - match: { _shards.failed: 0} + - gt: { _all.total.completion.size_in_bytes: $original_size } diff --git a/server/src/main/java/org/elasticsearch/index/engine/CompletionStatsCache.java b/server/src/main/java/org/elasticsearch/index/engine/CompletionStatsCache.java new file mode 100644 index 00000000000..b1dfd9d2189 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/engine/CompletionStatsCache.java @@ -0,0 +1,153 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index.engine; + +import com.carrotsearch.hppc.ObjectLongHashMap; +import com.carrotsearch.hppc.cursors.ObjectLongCursor; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.Terms; +import org.apache.lucene.search.ReferenceManager; +import org.apache.lucene.search.suggest.document.CompletionTerms; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.common.FieldMemoryStats; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.regex.Regex; +import org.elasticsearch.search.suggest.completion.CompletionStats; + +import java.util.function.Supplier; + +class CompletionStatsCache implements ReferenceManager.RefreshListener { + + private final Supplier searcherSupplier; + + /** + * Contains a future (i.e. non-null) if another thread is already computing stats, in which case wait for this computation to + * complete. Contains null otherwise, in which case compute the stats ourselves and save them here for other threads to use. + * Futures are eventually completed with stats that include all fields, requiring further filtering (see + * {@link CompletionStatsCache#filterCompletionStatsByFieldName}). + */ + @Nullable + private PlainActionFuture completionStatsFuture; + + /** + * Protects accesses to {@code completionStatsFuture} since we can't use {@link java.util.concurrent.atomic.AtomicReference} in JDK8. + */ + private final Object completionStatsFutureMutex = new Object(); + + CompletionStatsCache(Supplier searcherSupplier) { + this.searcherSupplier = searcherSupplier; + } + + CompletionStats get(String... fieldNamePatterns) { + final PlainActionFuture newFuture = new PlainActionFuture<>(); + + // final PlainActionFuture oldFuture = completionStatsFutureRef.compareAndExchange(null, newFuture); + // except JDK8 doesn't have compareAndExchange so we emulate it: + final PlainActionFuture oldFuture; + synchronized (completionStatsFutureMutex) { + if (completionStatsFuture == null) { + completionStatsFuture = newFuture; + oldFuture = null; + } else { + oldFuture = completionStatsFuture; + } + } + + if (oldFuture != null) { + // we lost the race, someone else is already computing stats, so we wait for that to finish + return filterCompletionStatsByFieldName(fieldNamePatterns, oldFuture.actionGet()); + } + + // we won the race, nobody else is already computing stats, so it's up to us + ActionListener.completeWith(newFuture, () -> { + long sizeInBytes = 0; + final ObjectLongHashMap completionFields = new ObjectLongHashMap<>(); + + try (Engine.Searcher currentSearcher = searcherSupplier.get()) { + for (LeafReaderContext atomicReaderContext : currentSearcher.getIndexReader().leaves()) { + LeafReader atomicReader = atomicReaderContext.reader(); + for (FieldInfo info : atomicReader.getFieldInfos()) { + Terms terms = atomicReader.terms(info.name); + if (terms instanceof CompletionTerms) { + // TODO: currently we load up the suggester for reporting its size + final long fstSize = ((CompletionTerms) terms).suggester().ramBytesUsed(); + completionFields.addTo(info.name, fstSize); + sizeInBytes += fstSize; + } + } + } + } + + return new CompletionStats(sizeInBytes, new FieldMemoryStats(completionFields)); + }); + + boolean success = false; + final CompletionStats completionStats; + try { + completionStats = newFuture.actionGet(); + success = true; + } finally { + if (success == false) { + // invalidate the cache (if not already invalidated) so that future calls will retry + + // completionStatsFutureRef.compareAndSet(newFuture, null); except we're not using AtomicReference in JDK8 + synchronized (completionStatsFutureMutex) { + if (completionStatsFuture == newFuture) { + completionStatsFuture = null; + } + } + } + } + + return filterCompletionStatsByFieldName(fieldNamePatterns, completionStats); + } + + private static CompletionStats filterCompletionStatsByFieldName(String[] fieldNamePatterns, CompletionStats fullCompletionStats) { + final FieldMemoryStats fieldMemoryStats; + if (fieldNamePatterns != null && fieldNamePatterns.length > 0) { + final ObjectLongHashMap completionFields = new ObjectLongHashMap<>(fieldNamePatterns.length); + for (ObjectLongCursor fieldCursor : fullCompletionStats.getFields()) { + if (Regex.simpleMatch(fieldNamePatterns, fieldCursor.key)) { + completionFields.addTo(fieldCursor.key, fieldCursor.value); + } + } + fieldMemoryStats = new FieldMemoryStats(completionFields); + } else { + fieldMemoryStats = null; + } + return new CompletionStats(fullCompletionStats.getSizeInBytes(), fieldMemoryStats); + } + + @Override + public void beforeRefresh() { + } + + @Override + public void afterRefresh(boolean didRefresh) { + if (didRefresh) { + // completionStatsFutureRef.set(null); except we're not using AtomicReference in JDK8 + synchronized (completionStatsFutureMutex) { + completionStatsFuture = null; + } + } + } +} diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index daa2d4910d1..6a40d1dd578 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -19,27 +19,22 @@ package org.elasticsearch.index.engine; -import com.carrotsearch.hppc.ObjectLongHashMap; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.index.FieldInfo; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.index.IndexReader; -import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.SegmentCommitInfo; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.SegmentReader; import org.apache.lucene.index.Term; -import org.apache.lucene.index.Terms; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.QueryCache; import org.apache.lucene.search.QueryCachingPolicy; import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.search.similarities.Similarity; -import org.apache.lucene.search.suggest.document.CompletionTerms; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; @@ -49,7 +44,6 @@ import org.apache.lucene.util.SetOnce; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.CheckedRunnable; -import org.elasticsearch.common.FieldMemoryStats; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.ImmutableOpenMap; @@ -65,7 +59,6 @@ import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver; import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion; import org.elasticsearch.common.metrics.CounterMetric; -import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ReleasableLock; @@ -185,30 +178,7 @@ public abstract class Engine implements Closeable { /** * Returns the {@link CompletionStats} for this engine */ - public CompletionStats completionStats(String... fieldNamePatterns) throws IOException { - try (Searcher currentSearcher = acquireSearcher("completion_stats", SearcherScope.INTERNAL)) { - long sizeInBytes = 0; - ObjectLongHashMap completionFields = null; - if (fieldNamePatterns != null && fieldNamePatterns.length > 0) { - completionFields = new ObjectLongHashMap<>(fieldNamePatterns.length); - } - for (LeafReaderContext atomicReaderContext : currentSearcher.getIndexReader().leaves()) { - LeafReader atomicReader = atomicReaderContext.reader(); - for (FieldInfo info : atomicReader.getFieldInfos()) { - Terms terms = atomicReader.terms(info.name); - if (terms instanceof CompletionTerms) { - // TODO: currently we load up the suggester for reporting its size - long fstSize = ((CompletionTerms) terms).suggester().ramBytesUsed(); - if (Regex.simpleMatch(fieldNamePatterns, info.name)) { - completionFields.addTo(info.name, fstSize); - } - sizeInBytes += fstSize; - } - } - } - return new CompletionStats(sizeInBytes, completionFields == null ? null : new FieldMemoryStats(completionFields)); - } - } + public abstract CompletionStats completionStats(String... fieldNamePatterns); /** * Returns the {@link DocsStats} for this engine diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 5fa7bd2c8c2..cb7eb3d5122 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -101,6 +101,7 @@ import org.elasticsearch.index.translog.TranslogConfig; import org.elasticsearch.index.translog.TranslogCorruptedException; import org.elasticsearch.index.translog.TranslogDeletionPolicy; import org.elasticsearch.index.translog.TranslogStats; +import org.elasticsearch.search.suggest.completion.CompletionStats; import org.elasticsearch.threadpool.ThreadPool; import java.io.Closeable; @@ -180,6 +181,8 @@ public class InternalEngine extends Engine { private final SoftDeletesPolicy softDeletesPolicy; private final LastRefreshedCheckpointListener lastRefreshedCheckpointListener; + private final CompletionStatsCache completionStatsCache; + private final AtomicBoolean trackTranslogLocation = new AtomicBoolean(false); private final KeyedLock noOpKeyedLock = new KeyedLock<>(); private final AtomicBoolean shouldPeriodicallyFlushAfterBigMerge = new AtomicBoolean(false); @@ -272,6 +275,8 @@ public class InternalEngine extends Engine { "failed to restore version map and local checkpoint tracker", e); } } + completionStatsCache = new CompletionStatsCache(() -> acquireSearcher("completion_stats")); + this.externalReaderManager.addListener(completionStatsCache); success = true; } finally { if (success == false) { @@ -312,6 +317,11 @@ public class InternalEngine extends Engine { engineConfig.retentionLeasesSupplier()); } + @Override + public CompletionStats completionStats(String... fieldNamePatterns) { + return completionStatsCache.get(fieldNamePatterns); + } + /** * This reference manager delegates all it's refresh calls to another (internal) ReaderManager * The main purpose for this is that if we have external refreshes happening we don't issue extra diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index 06cb7804313..71780773a5b 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -42,6 +42,7 @@ import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogConfig; import org.elasticsearch.index.translog.TranslogDeletionPolicy; import org.elasticsearch.index.translog.TranslogStats; +import org.elasticsearch.search.suggest.completion.CompletionStats; import java.io.Closeable; import java.io.IOException; @@ -78,6 +79,7 @@ public class ReadOnlyEngine extends Engine { private final DocsStats docsStats; private final RamAccountingRefreshListener refreshListener; private final SafeCommitInfo safeCommitInfo; + private final CompletionStatsCache completionStatsCache; protected volatile TranslogStats translogStats; @@ -122,6 +124,10 @@ public class ReadOnlyEngine extends Engine { this.translogStats = translogStats != null ? translogStats : translogStats(config, lastCommittedSegmentInfos); this.indexWriterLock = indexWriterLock; this.safeCommitInfo = new SafeCommitInfo(seqNoStats.getLocalCheckpoint(), lastCommittedSegmentInfos.totalMaxDoc()); + + completionStatsCache = new CompletionStatsCache(() -> acquireSearcher("completion_stats")); + // no need to register a refresh listener to invalidate completionStatsCache since this engine is readonly + success = true; } finally { if (success == false) { @@ -542,4 +548,9 @@ public class ReadOnlyEngine extends Engine { return reader; } } + + @Override + public CompletionStats completionStats(String... fieldNamePatterns) { + return completionStatsCache.get(fieldNamePatterns); + } } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 36e9054586a..25f317a2894 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -154,7 +154,6 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.Closeable; import java.io.IOException; import java.io.PrintStream; -import java.io.UncheckedIOException; import java.nio.channels.ClosedByInterruptException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -1062,11 +1061,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl public CompletionStats completionStats(String... fields) { readAllowed(); - try { - return getEngine().completionStats(fields); - } catch (IOException e) { - throw new UncheckedIOException(e); - } + return getEngine().completionStats(fields); } public Engine.SyncedFlushResult syncFlush(String syncId, Engine.CommitId expectedCommitId) { diff --git a/server/src/test/java/org/elasticsearch/index/engine/CompletionStatsCacheTests.java b/server/src/test/java/org/elasticsearch/index/engine/CompletionStatsCacheTests.java new file mode 100644 index 00000000000..3a68ae66c70 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/engine/CompletionStatsCacheTests.java @@ -0,0 +1,241 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index.engine; + +import org.apache.lucene.codecs.PostingsFormat; +import org.apache.lucene.codecs.lucene84.Lucene84Codec; +import org.apache.lucene.document.Document; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.QueryCachingPolicy; +import org.apache.lucene.search.suggest.document.Completion84PostingsFormat; +import org.apache.lucene.search.suggest.document.SuggestField; +import org.apache.lucene.store.Directory; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.search.suggest.completion.CompletionStats; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; + +public class CompletionStatsCacheTests extends ESTestCase { + + public void testExceptionsAreNotCached() { + final AtomicInteger openCount = new AtomicInteger(); + final CompletionStatsCache completionStatsCache = new CompletionStatsCache(() -> { + throw new ElasticsearchException("simulated " + openCount.incrementAndGet()); + }); + + assertThat(expectThrows(ElasticsearchException.class, completionStatsCache::get).getMessage(), equalTo("simulated 1")); + assertThat(expectThrows(ElasticsearchException.class, completionStatsCache::get).getMessage(), equalTo("simulated 2")); + } + + public void testCompletionStatsCache() throws IOException, InterruptedException { + final IndexWriterConfig indexWriterConfig = newIndexWriterConfig(); + final PostingsFormat postingsFormat = new Completion84PostingsFormat(); + indexWriterConfig.setCodec(new Lucene84Codec() { + @Override + public PostingsFormat getPostingsFormatForField(String field) { + return postingsFormat; // all fields are suggest fields + } + }); + + final QueryCachingPolicy queryCachingPolicy = new QueryCachingPolicy() { + @Override + public void onUse(Query query) { + } + + @Override + public boolean shouldCache(Query query) { + return false; + } + }; + + try (Directory directory = newDirectory(); + IndexWriter indexWriter = new IndexWriter(directory, indexWriterConfig)) { + + final Document document = new Document(); + document.add(new SuggestField("suggest1", "val", 1)); + document.add(new SuggestField("suggest2", "val", 1)); + document.add(new SuggestField("suggest2", "anotherval", 1)); + document.add(new SuggestField("otherfield", "val", 1)); + document.add(new SuggestField("otherfield", "anotherval", 1)); + document.add(new SuggestField("otherfield", "yetmoreval", 1)); + indexWriter.addDocument(document); + + final OpenCloseCounter openCloseCounter = new OpenCloseCounter(); + final CompletionStatsCache completionStatsCache = new CompletionStatsCache(() -> { + openCloseCounter.countOpened(); + try { + final DirectoryReader directoryReader = DirectoryReader.open(indexWriter); + return new Engine.Searcher("test", directoryReader, null, null, queryCachingPolicy, () -> { + openCloseCounter.countClosed(); + IOUtils.close(directoryReader); + }); + } catch (IOException e) { + throw new AssertionError(e); + } + }); + + final int threadCount = 6; + final TestHarness testHarness = new TestHarness(completionStatsCache, threadCount); + final Thread[] threads = new Thread[threadCount]; + threads[0] = new Thread(() -> testHarness.getStats(0, "*")); + threads[1] = new Thread(() -> testHarness.getStats(1, "suggest1", "suggest2")); + threads[2] = new Thread(() -> testHarness.getStats(2, "sug*")); + threads[3] = new Thread(() -> testHarness.getStats(3, "no match*")); + threads[4] = new Thread(() -> testHarness.getStats(4)); + threads[5] = new Thread(() -> testHarness.getStats(5, (String[]) null)); + + for (Thread thread : threads) { + thread.start(); + } + + testHarness.start(); + + for (Thread thread : threads) { + thread.join(); + } + + // 0: "*" should match all fields: + final long suggest1Size = testHarness.getResult(0).getFields().get("suggest1"); + final long suggest2Size = testHarness.getResult(0).getFields().get("suggest2"); + final long otherFieldSize = testHarness.getResult(0).getFields().get("otherfield"); + final long totalSizeInBytes = testHarness.getResult(0).getSizeInBytes(); + assertThat(suggest1Size, greaterThan(0L)); + assertThat(suggest2Size, greaterThan(0L)); + assertThat(otherFieldSize, greaterThan(0L)); + assertThat(totalSizeInBytes, equalTo(suggest1Size + suggest2Size + otherFieldSize)); + + // 1: enumerating fields omits the other ones + assertThat(testHarness.getResult(1).getSizeInBytes(), equalTo(totalSizeInBytes)); + assertThat(testHarness.getResult(1).getFields().get("suggest1"), equalTo(suggest1Size)); + assertThat(testHarness.getResult(1).getFields().get("suggest2"), equalTo(suggest2Size)); + assertFalse(testHarness.getResult(1).getFields().containsField("otherfield")); + + // 2: wildcards also exclude some fields + assertThat(testHarness.getResult(2).getSizeInBytes(), equalTo(totalSizeInBytes)); + assertThat(testHarness.getResult(2).getFields().get("suggest1"), equalTo(suggest1Size)); + assertThat(testHarness.getResult(2).getFields().get("suggest2"), equalTo(suggest2Size)); + assertFalse(testHarness.getResult(2).getFields().containsField("otherfield")); + + // 3: non-matching wildcard returns empty set of fields + assertThat(testHarness.getResult(3).getSizeInBytes(), equalTo(totalSizeInBytes)); + assertFalse(testHarness.getResult(3).getFields().containsField("suggest1")); + assertFalse(testHarness.getResult(3).getFields().containsField("suggest2")); + assertFalse(testHarness.getResult(3).getFields().containsField("otherfield")); + + // 4: no fields means per-fields stats is null + assertThat(testHarness.getResult(4).getSizeInBytes(), equalTo(totalSizeInBytes)); + assertNull(testHarness.getResult(4).getFields()); + + // 5: null fields means per-fields stats is null + assertThat(testHarness.getResult(5).getSizeInBytes(), equalTo(totalSizeInBytes)); + assertNull(testHarness.getResult(5).getFields()); + + // the stats were only computed once + openCloseCounter.assertCount(1); + + // the stats are not recomputed on a refresh + completionStatsCache.afterRefresh(true); + openCloseCounter.assertCount(1); + + // but they are recomputed on the next get + completionStatsCache.get(); + openCloseCounter.assertCount(2); + + // and they do update + final Document document2 = new Document(); + document2.add(new SuggestField("suggest1", "foo", 1)); + document2.add(new SuggestField("suggest2", "bar", 1)); + document2.add(new SuggestField("otherfield", "baz", 1)); + indexWriter.addDocument(document2); + completionStatsCache.afterRefresh(true); + final CompletionStats updatedStats = completionStatsCache.get(); + assertThat(updatedStats.getSizeInBytes(), greaterThan(totalSizeInBytes)); + openCloseCounter.assertCount(3); + + // beforeRefresh does not invalidate the cache + completionStatsCache.beforeRefresh(); + completionStatsCache.get(); + openCloseCounter.assertCount(3); + + // afterRefresh does not invalidate the cache if no refresh took place + completionStatsCache.afterRefresh(false); + completionStatsCache.get(); + openCloseCounter.assertCount(3); + } + } + + private static class OpenCloseCounter { + private final AtomicInteger openCount = new AtomicInteger(); + private final AtomicInteger closeCount = new AtomicInteger(); + + void countOpened() { + openCount.incrementAndGet(); + } + + void countClosed() { + closeCount.incrementAndGet(); + } + + void assertCount(int expectedCount) { + assertThat(openCount.get(), equalTo(expectedCount)); + assertThat(closeCount.get(), equalTo(expectedCount)); + } + } + + private static class TestHarness { + private final CompletionStatsCache completionStatsCache; + private final CyclicBarrier cyclicBarrier; + private final CompletionStats[] results; + + TestHarness(CompletionStatsCache completionStatsCache, int resultCount) { + this.completionStatsCache = completionStatsCache; + results = new CompletionStats[resultCount]; + cyclicBarrier = new CyclicBarrier(resultCount + 1); + } + + void getStats(int threadIndex, String... fieldPatterns) { + start(); + results[threadIndex] = completionStatsCache.get(fieldPatterns); + } + + void start() { + try { + cyclicBarrier.await(); + } catch (InterruptedException | BrokenBarrierException e) { + throw new AssertionError(e); + } + } + + CompletionStats getResult(int index) { + return results[index]; + } + } + +}