Cache completion stats between refreshes (#52872)
Computing the stats for completion fields may involve a significant amount of work since it walks every field of every segment looking for completion fields. Innocuous-looking APIs like `GET _stats` or `GET _cluster/stats` do this for every shard in the cluster. This repeated work is unnecessary since these stats do not change between refreshes; in many indices they remain constant for a long time. This commit introduces a cache for these stats which is invalidated on a refresh, allowing most stats calls to bypass the work needed to compute them on most shards. Closes #51915 Backport of #51991
This commit is contained in:
parent
40bc06f6ad
commit
52fa465300
|
@ -0,0 +1,67 @@
|
||||||
|
---
|
||||||
|
setup:
|
||||||
|
|
||||||
|
- do:
|
||||||
|
indices.create:
|
||||||
|
index: test1
|
||||||
|
wait_for_active_shards: all
|
||||||
|
body:
|
||||||
|
settings:
|
||||||
|
# Limit the number of shards so that shards are unlikely
|
||||||
|
# to be relocated or being initialized between the test
|
||||||
|
# set up and the test execution
|
||||||
|
index.number_of_shards: 3
|
||||||
|
index.number_of_replicas: 0
|
||||||
|
mappings:
|
||||||
|
properties:
|
||||||
|
bar:
|
||||||
|
type: text
|
||||||
|
fielddata: true
|
||||||
|
fields:
|
||||||
|
completion:
|
||||||
|
type: completion
|
||||||
|
|
||||||
|
- do:
|
||||||
|
cluster.health:
|
||||||
|
wait_for_no_relocating_shards: true
|
||||||
|
wait_for_events: languid
|
||||||
|
|
||||||
|
- do:
|
||||||
|
index:
|
||||||
|
index: test1
|
||||||
|
id: 1
|
||||||
|
body: { "bar": "bar" }
|
||||||
|
|
||||||
|
- do:
|
||||||
|
index:
|
||||||
|
index: test1
|
||||||
|
id: 2
|
||||||
|
body: { "bar": "foo" }
|
||||||
|
|
||||||
|
- do:
|
||||||
|
indices.refresh: {}
|
||||||
|
|
||||||
|
---
|
||||||
|
"Completion stats":
|
||||||
|
- do:
|
||||||
|
indices.stats: { completion_fields: "*" }
|
||||||
|
|
||||||
|
- match: { _shards.failed: 0}
|
||||||
|
- gt: { _all.total.completion.fields.bar\.completion.size_in_bytes: 0 }
|
||||||
|
- gt: { _all.total.completion.size_in_bytes: 0 }
|
||||||
|
- set: { _all.total.completion.size_in_bytes: original_size }
|
||||||
|
|
||||||
|
- do:
|
||||||
|
index:
|
||||||
|
index: test1
|
||||||
|
id: 3
|
||||||
|
body: { "bar": "foo", "baz": "foo" }
|
||||||
|
|
||||||
|
- do:
|
||||||
|
indices.refresh: {}
|
||||||
|
|
||||||
|
- do:
|
||||||
|
indices.stats: { completion_fields: "*" }
|
||||||
|
|
||||||
|
- match: { _shards.failed: 0}
|
||||||
|
- gt: { _all.total.completion.size_in_bytes: $original_size }
|
|
@ -0,0 +1,153 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elasticsearch under one or more contributor
|
||||||
|
* license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright
|
||||||
|
* ownership. Elasticsearch licenses this file to you under
|
||||||
|
* the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
* not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package org.elasticsearch.index.engine;
|
||||||
|
|
||||||
|
import com.carrotsearch.hppc.ObjectLongHashMap;
|
||||||
|
import com.carrotsearch.hppc.cursors.ObjectLongCursor;
|
||||||
|
import org.apache.lucene.index.FieldInfo;
|
||||||
|
import org.apache.lucene.index.LeafReader;
|
||||||
|
import org.apache.lucene.index.LeafReaderContext;
|
||||||
|
import org.apache.lucene.index.Terms;
|
||||||
|
import org.apache.lucene.search.ReferenceManager;
|
||||||
|
import org.apache.lucene.search.suggest.document.CompletionTerms;
|
||||||
|
import org.elasticsearch.action.ActionListener;
|
||||||
|
import org.elasticsearch.action.support.PlainActionFuture;
|
||||||
|
import org.elasticsearch.common.FieldMemoryStats;
|
||||||
|
import org.elasticsearch.common.Nullable;
|
||||||
|
import org.elasticsearch.common.regex.Regex;
|
||||||
|
import org.elasticsearch.search.suggest.completion.CompletionStats;
|
||||||
|
|
||||||
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
|
class CompletionStatsCache implements ReferenceManager.RefreshListener {
|
||||||
|
|
||||||
|
private final Supplier<Engine.Searcher> searcherSupplier;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Contains a future (i.e. non-null) if another thread is already computing stats, in which case wait for this computation to
|
||||||
|
* complete. Contains null otherwise, in which case compute the stats ourselves and save them here for other threads to use.
|
||||||
|
* Futures are eventually completed with stats that include all fields, requiring further filtering (see
|
||||||
|
* {@link CompletionStatsCache#filterCompletionStatsByFieldName}).
|
||||||
|
*/
|
||||||
|
@Nullable
|
||||||
|
private PlainActionFuture<CompletionStats> completionStatsFuture;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Protects accesses to {@code completionStatsFuture} since we can't use {@link java.util.concurrent.atomic.AtomicReference} in JDK8.
|
||||||
|
*/
|
||||||
|
private final Object completionStatsFutureMutex = new Object();
|
||||||
|
|
||||||
|
CompletionStatsCache(Supplier<Engine.Searcher> searcherSupplier) {
|
||||||
|
this.searcherSupplier = searcherSupplier;
|
||||||
|
}
|
||||||
|
|
||||||
|
CompletionStats get(String... fieldNamePatterns) {
|
||||||
|
final PlainActionFuture<CompletionStats> newFuture = new PlainActionFuture<>();
|
||||||
|
|
||||||
|
// final PlainActionFuture<CompletionStats> oldFuture = completionStatsFutureRef.compareAndExchange(null, newFuture);
|
||||||
|
// except JDK8 doesn't have compareAndExchange so we emulate it:
|
||||||
|
final PlainActionFuture<CompletionStats> oldFuture;
|
||||||
|
synchronized (completionStatsFutureMutex) {
|
||||||
|
if (completionStatsFuture == null) {
|
||||||
|
completionStatsFuture = newFuture;
|
||||||
|
oldFuture = null;
|
||||||
|
} else {
|
||||||
|
oldFuture = completionStatsFuture;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (oldFuture != null) {
|
||||||
|
// we lost the race, someone else is already computing stats, so we wait for that to finish
|
||||||
|
return filterCompletionStatsByFieldName(fieldNamePatterns, oldFuture.actionGet());
|
||||||
|
}
|
||||||
|
|
||||||
|
// we won the race, nobody else is already computing stats, so it's up to us
|
||||||
|
ActionListener.completeWith(newFuture, () -> {
|
||||||
|
long sizeInBytes = 0;
|
||||||
|
final ObjectLongHashMap<String> completionFields = new ObjectLongHashMap<>();
|
||||||
|
|
||||||
|
try (Engine.Searcher currentSearcher = searcherSupplier.get()) {
|
||||||
|
for (LeafReaderContext atomicReaderContext : currentSearcher.getIndexReader().leaves()) {
|
||||||
|
LeafReader atomicReader = atomicReaderContext.reader();
|
||||||
|
for (FieldInfo info : atomicReader.getFieldInfos()) {
|
||||||
|
Terms terms = atomicReader.terms(info.name);
|
||||||
|
if (terms instanceof CompletionTerms) {
|
||||||
|
// TODO: currently we load up the suggester for reporting its size
|
||||||
|
final long fstSize = ((CompletionTerms) terms).suggester().ramBytesUsed();
|
||||||
|
completionFields.addTo(info.name, fstSize);
|
||||||
|
sizeInBytes += fstSize;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return new CompletionStats(sizeInBytes, new FieldMemoryStats(completionFields));
|
||||||
|
});
|
||||||
|
|
||||||
|
boolean success = false;
|
||||||
|
final CompletionStats completionStats;
|
||||||
|
try {
|
||||||
|
completionStats = newFuture.actionGet();
|
||||||
|
success = true;
|
||||||
|
} finally {
|
||||||
|
if (success == false) {
|
||||||
|
// invalidate the cache (if not already invalidated) so that future calls will retry
|
||||||
|
|
||||||
|
// completionStatsFutureRef.compareAndSet(newFuture, null); except we're not using AtomicReference in JDK8
|
||||||
|
synchronized (completionStatsFutureMutex) {
|
||||||
|
if (completionStatsFuture == newFuture) {
|
||||||
|
completionStatsFuture = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return filterCompletionStatsByFieldName(fieldNamePatterns, completionStats);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static CompletionStats filterCompletionStatsByFieldName(String[] fieldNamePatterns, CompletionStats fullCompletionStats) {
|
||||||
|
final FieldMemoryStats fieldMemoryStats;
|
||||||
|
if (fieldNamePatterns != null && fieldNamePatterns.length > 0) {
|
||||||
|
final ObjectLongHashMap<String> completionFields = new ObjectLongHashMap<>(fieldNamePatterns.length);
|
||||||
|
for (ObjectLongCursor<String> fieldCursor : fullCompletionStats.getFields()) {
|
||||||
|
if (Regex.simpleMatch(fieldNamePatterns, fieldCursor.key)) {
|
||||||
|
completionFields.addTo(fieldCursor.key, fieldCursor.value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fieldMemoryStats = new FieldMemoryStats(completionFields);
|
||||||
|
} else {
|
||||||
|
fieldMemoryStats = null;
|
||||||
|
}
|
||||||
|
return new CompletionStats(fullCompletionStats.getSizeInBytes(), fieldMemoryStats);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void beforeRefresh() {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void afterRefresh(boolean didRefresh) {
|
||||||
|
if (didRefresh) {
|
||||||
|
// completionStatsFutureRef.set(null); except we're not using AtomicReference in JDK8
|
||||||
|
synchronized (completionStatsFutureMutex) {
|
||||||
|
completionStatsFuture = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -19,27 +19,22 @@
|
||||||
|
|
||||||
package org.elasticsearch.index.engine;
|
package org.elasticsearch.index.engine;
|
||||||
|
|
||||||
import com.carrotsearch.hppc.ObjectLongHashMap;
|
|
||||||
import org.apache.logging.log4j.Logger;
|
import org.apache.logging.log4j.Logger;
|
||||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||||
import org.apache.lucene.index.DirectoryReader;
|
import org.apache.lucene.index.DirectoryReader;
|
||||||
import org.apache.lucene.index.FieldInfo;
|
|
||||||
import org.apache.lucene.index.IndexCommit;
|
import org.apache.lucene.index.IndexCommit;
|
||||||
import org.apache.lucene.index.IndexFileNames;
|
import org.apache.lucene.index.IndexFileNames;
|
||||||
import org.apache.lucene.index.IndexReader;
|
import org.apache.lucene.index.IndexReader;
|
||||||
import org.apache.lucene.index.LeafReader;
|
|
||||||
import org.apache.lucene.index.LeafReaderContext;
|
import org.apache.lucene.index.LeafReaderContext;
|
||||||
import org.apache.lucene.index.SegmentCommitInfo;
|
import org.apache.lucene.index.SegmentCommitInfo;
|
||||||
import org.apache.lucene.index.SegmentInfos;
|
import org.apache.lucene.index.SegmentInfos;
|
||||||
import org.apache.lucene.index.SegmentReader;
|
import org.apache.lucene.index.SegmentReader;
|
||||||
import org.apache.lucene.index.Term;
|
import org.apache.lucene.index.Term;
|
||||||
import org.apache.lucene.index.Terms;
|
|
||||||
import org.apache.lucene.search.IndexSearcher;
|
import org.apache.lucene.search.IndexSearcher;
|
||||||
import org.apache.lucene.search.QueryCache;
|
import org.apache.lucene.search.QueryCache;
|
||||||
import org.apache.lucene.search.QueryCachingPolicy;
|
import org.apache.lucene.search.QueryCachingPolicy;
|
||||||
import org.apache.lucene.search.ReferenceManager;
|
import org.apache.lucene.search.ReferenceManager;
|
||||||
import org.apache.lucene.search.similarities.Similarity;
|
import org.apache.lucene.search.similarities.Similarity;
|
||||||
import org.apache.lucene.search.suggest.document.CompletionTerms;
|
|
||||||
import org.apache.lucene.store.AlreadyClosedException;
|
import org.apache.lucene.store.AlreadyClosedException;
|
||||||
import org.apache.lucene.store.Directory;
|
import org.apache.lucene.store.Directory;
|
||||||
import org.apache.lucene.store.IOContext;
|
import org.apache.lucene.store.IOContext;
|
||||||
|
@ -49,7 +44,6 @@ import org.apache.lucene.util.SetOnce;
|
||||||
import org.elasticsearch.ExceptionsHelper;
|
import org.elasticsearch.ExceptionsHelper;
|
||||||
import org.elasticsearch.action.index.IndexRequest;
|
import org.elasticsearch.action.index.IndexRequest;
|
||||||
import org.elasticsearch.common.CheckedRunnable;
|
import org.elasticsearch.common.CheckedRunnable;
|
||||||
import org.elasticsearch.common.FieldMemoryStats;
|
|
||||||
import org.elasticsearch.common.Nullable;
|
import org.elasticsearch.common.Nullable;
|
||||||
import org.elasticsearch.common.bytes.BytesReference;
|
import org.elasticsearch.common.bytes.BytesReference;
|
||||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||||
|
@ -65,7 +59,6 @@ import org.elasticsearch.common.lucene.uid.Versions;
|
||||||
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver;
|
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver;
|
||||||
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion;
|
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion;
|
||||||
import org.elasticsearch.common.metrics.CounterMetric;
|
import org.elasticsearch.common.metrics.CounterMetric;
|
||||||
import org.elasticsearch.common.regex.Regex;
|
|
||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.util.concurrent.ReleasableLock;
|
import org.elasticsearch.common.util.concurrent.ReleasableLock;
|
||||||
|
@ -185,30 +178,7 @@ public abstract class Engine implements Closeable {
|
||||||
/**
|
/**
|
||||||
* Returns the {@link CompletionStats} for this engine
|
* Returns the {@link CompletionStats} for this engine
|
||||||
*/
|
*/
|
||||||
public CompletionStats completionStats(String... fieldNamePatterns) throws IOException {
|
public abstract CompletionStats completionStats(String... fieldNamePatterns);
|
||||||
try (Searcher currentSearcher = acquireSearcher("completion_stats", SearcherScope.INTERNAL)) {
|
|
||||||
long sizeInBytes = 0;
|
|
||||||
ObjectLongHashMap<String> completionFields = null;
|
|
||||||
if (fieldNamePatterns != null && fieldNamePatterns.length > 0) {
|
|
||||||
completionFields = new ObjectLongHashMap<>(fieldNamePatterns.length);
|
|
||||||
}
|
|
||||||
for (LeafReaderContext atomicReaderContext : currentSearcher.getIndexReader().leaves()) {
|
|
||||||
LeafReader atomicReader = atomicReaderContext.reader();
|
|
||||||
for (FieldInfo info : atomicReader.getFieldInfos()) {
|
|
||||||
Terms terms = atomicReader.terms(info.name);
|
|
||||||
if (terms instanceof CompletionTerms) {
|
|
||||||
// TODO: currently we load up the suggester for reporting its size
|
|
||||||
long fstSize = ((CompletionTerms) terms).suggester().ramBytesUsed();
|
|
||||||
if (Regex.simpleMatch(fieldNamePatterns, info.name)) {
|
|
||||||
completionFields.addTo(info.name, fstSize);
|
|
||||||
}
|
|
||||||
sizeInBytes += fstSize;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return new CompletionStats(sizeInBytes, completionFields == null ? null : new FieldMemoryStats(completionFields));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the {@link DocsStats} for this engine
|
* Returns the {@link DocsStats} for this engine
|
||||||
|
|
|
@ -101,6 +101,7 @@ import org.elasticsearch.index.translog.TranslogConfig;
|
||||||
import org.elasticsearch.index.translog.TranslogCorruptedException;
|
import org.elasticsearch.index.translog.TranslogCorruptedException;
|
||||||
import org.elasticsearch.index.translog.TranslogDeletionPolicy;
|
import org.elasticsearch.index.translog.TranslogDeletionPolicy;
|
||||||
import org.elasticsearch.index.translog.TranslogStats;
|
import org.elasticsearch.index.translog.TranslogStats;
|
||||||
|
import org.elasticsearch.search.suggest.completion.CompletionStats;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
|
@ -180,6 +181,8 @@ public class InternalEngine extends Engine {
|
||||||
private final SoftDeletesPolicy softDeletesPolicy;
|
private final SoftDeletesPolicy softDeletesPolicy;
|
||||||
private final LastRefreshedCheckpointListener lastRefreshedCheckpointListener;
|
private final LastRefreshedCheckpointListener lastRefreshedCheckpointListener;
|
||||||
|
|
||||||
|
private final CompletionStatsCache completionStatsCache;
|
||||||
|
|
||||||
private final AtomicBoolean trackTranslogLocation = new AtomicBoolean(false);
|
private final AtomicBoolean trackTranslogLocation = new AtomicBoolean(false);
|
||||||
private final KeyedLock<Long> noOpKeyedLock = new KeyedLock<>();
|
private final KeyedLock<Long> noOpKeyedLock = new KeyedLock<>();
|
||||||
private final AtomicBoolean shouldPeriodicallyFlushAfterBigMerge = new AtomicBoolean(false);
|
private final AtomicBoolean shouldPeriodicallyFlushAfterBigMerge = new AtomicBoolean(false);
|
||||||
|
@ -272,6 +275,8 @@ public class InternalEngine extends Engine {
|
||||||
"failed to restore version map and local checkpoint tracker", e);
|
"failed to restore version map and local checkpoint tracker", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
completionStatsCache = new CompletionStatsCache(() -> acquireSearcher("completion_stats"));
|
||||||
|
this.externalReaderManager.addListener(completionStatsCache);
|
||||||
success = true;
|
success = true;
|
||||||
} finally {
|
} finally {
|
||||||
if (success == false) {
|
if (success == false) {
|
||||||
|
@ -312,6 +317,11 @@ public class InternalEngine extends Engine {
|
||||||
engineConfig.retentionLeasesSupplier());
|
engineConfig.retentionLeasesSupplier());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CompletionStats completionStats(String... fieldNamePatterns) {
|
||||||
|
return completionStatsCache.get(fieldNamePatterns);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This reference manager delegates all it's refresh calls to another (internal) ReaderManager
|
* This reference manager delegates all it's refresh calls to another (internal) ReaderManager
|
||||||
* The main purpose for this is that if we have external refreshes happening we don't issue extra
|
* The main purpose for this is that if we have external refreshes happening we don't issue extra
|
||||||
|
|
|
@ -42,6 +42,7 @@ import org.elasticsearch.index.translog.Translog;
|
||||||
import org.elasticsearch.index.translog.TranslogConfig;
|
import org.elasticsearch.index.translog.TranslogConfig;
|
||||||
import org.elasticsearch.index.translog.TranslogDeletionPolicy;
|
import org.elasticsearch.index.translog.TranslogDeletionPolicy;
|
||||||
import org.elasticsearch.index.translog.TranslogStats;
|
import org.elasticsearch.index.translog.TranslogStats;
|
||||||
|
import org.elasticsearch.search.suggest.completion.CompletionStats;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -78,6 +79,7 @@ public class ReadOnlyEngine extends Engine {
|
||||||
private final DocsStats docsStats;
|
private final DocsStats docsStats;
|
||||||
private final RamAccountingRefreshListener refreshListener;
|
private final RamAccountingRefreshListener refreshListener;
|
||||||
private final SafeCommitInfo safeCommitInfo;
|
private final SafeCommitInfo safeCommitInfo;
|
||||||
|
private final CompletionStatsCache completionStatsCache;
|
||||||
|
|
||||||
protected volatile TranslogStats translogStats;
|
protected volatile TranslogStats translogStats;
|
||||||
|
|
||||||
|
@ -122,6 +124,10 @@ public class ReadOnlyEngine extends Engine {
|
||||||
this.translogStats = translogStats != null ? translogStats : translogStats(config, lastCommittedSegmentInfos);
|
this.translogStats = translogStats != null ? translogStats : translogStats(config, lastCommittedSegmentInfos);
|
||||||
this.indexWriterLock = indexWriterLock;
|
this.indexWriterLock = indexWriterLock;
|
||||||
this.safeCommitInfo = new SafeCommitInfo(seqNoStats.getLocalCheckpoint(), lastCommittedSegmentInfos.totalMaxDoc());
|
this.safeCommitInfo = new SafeCommitInfo(seqNoStats.getLocalCheckpoint(), lastCommittedSegmentInfos.totalMaxDoc());
|
||||||
|
|
||||||
|
completionStatsCache = new CompletionStatsCache(() -> acquireSearcher("completion_stats"));
|
||||||
|
// no need to register a refresh listener to invalidate completionStatsCache since this engine is readonly
|
||||||
|
|
||||||
success = true;
|
success = true;
|
||||||
} finally {
|
} finally {
|
||||||
if (success == false) {
|
if (success == false) {
|
||||||
|
@ -542,4 +548,9 @@ public class ReadOnlyEngine extends Engine {
|
||||||
return reader;
|
return reader;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CompletionStats completionStats(String... fieldNamePatterns) {
|
||||||
|
return completionStatsCache.get(fieldNamePatterns);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -154,7 +154,6 @@ import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.PrintStream;
|
import java.io.PrintStream;
|
||||||
import java.io.UncheckedIOException;
|
|
||||||
import java.nio.channels.ClosedByInterruptException;
|
import java.nio.channels.ClosedByInterruptException;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -1062,11 +1061,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
||||||
|
|
||||||
public CompletionStats completionStats(String... fields) {
|
public CompletionStats completionStats(String... fields) {
|
||||||
readAllowed();
|
readAllowed();
|
||||||
try {
|
|
||||||
return getEngine().completionStats(fields);
|
return getEngine().completionStats(fields);
|
||||||
} catch (IOException e) {
|
|
||||||
throw new UncheckedIOException(e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public Engine.SyncedFlushResult syncFlush(String syncId, Engine.CommitId expectedCommitId) {
|
public Engine.SyncedFlushResult syncFlush(String syncId, Engine.CommitId expectedCommitId) {
|
||||||
|
|
|
@ -0,0 +1,241 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elasticsearch under one or more contributor
|
||||||
|
* license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright
|
||||||
|
* ownership. Elasticsearch licenses this file to you under
|
||||||
|
* the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
* not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package org.elasticsearch.index.engine;
|
||||||
|
|
||||||
|
import org.apache.lucene.codecs.PostingsFormat;
|
||||||
|
import org.apache.lucene.codecs.lucene84.Lucene84Codec;
|
||||||
|
import org.apache.lucene.document.Document;
|
||||||
|
import org.apache.lucene.index.DirectoryReader;
|
||||||
|
import org.apache.lucene.index.IndexWriter;
|
||||||
|
import org.apache.lucene.index.IndexWriterConfig;
|
||||||
|
import org.apache.lucene.search.Query;
|
||||||
|
import org.apache.lucene.search.QueryCachingPolicy;
|
||||||
|
import org.apache.lucene.search.suggest.document.Completion84PostingsFormat;
|
||||||
|
import org.apache.lucene.search.suggest.document.SuggestField;
|
||||||
|
import org.apache.lucene.store.Directory;
|
||||||
|
import org.elasticsearch.ElasticsearchException;
|
||||||
|
import org.elasticsearch.core.internal.io.IOUtils;
|
||||||
|
import org.elasticsearch.search.suggest.completion.CompletionStats;
|
||||||
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.concurrent.BrokenBarrierException;
|
||||||
|
import java.util.concurrent.CyclicBarrier;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
import static org.hamcrest.Matchers.greaterThan;
|
||||||
|
|
||||||
|
public class CompletionStatsCacheTests extends ESTestCase {
|
||||||
|
|
||||||
|
public void testExceptionsAreNotCached() {
|
||||||
|
final AtomicInteger openCount = new AtomicInteger();
|
||||||
|
final CompletionStatsCache completionStatsCache = new CompletionStatsCache(() -> {
|
||||||
|
throw new ElasticsearchException("simulated " + openCount.incrementAndGet());
|
||||||
|
});
|
||||||
|
|
||||||
|
assertThat(expectThrows(ElasticsearchException.class, completionStatsCache::get).getMessage(), equalTo("simulated 1"));
|
||||||
|
assertThat(expectThrows(ElasticsearchException.class, completionStatsCache::get).getMessage(), equalTo("simulated 2"));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testCompletionStatsCache() throws IOException, InterruptedException {
|
||||||
|
final IndexWriterConfig indexWriterConfig = newIndexWriterConfig();
|
||||||
|
final PostingsFormat postingsFormat = new Completion84PostingsFormat();
|
||||||
|
indexWriterConfig.setCodec(new Lucene84Codec() {
|
||||||
|
@Override
|
||||||
|
public PostingsFormat getPostingsFormatForField(String field) {
|
||||||
|
return postingsFormat; // all fields are suggest fields
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
final QueryCachingPolicy queryCachingPolicy = new QueryCachingPolicy() {
|
||||||
|
@Override
|
||||||
|
public void onUse(Query query) {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean shouldCache(Query query) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
try (Directory directory = newDirectory();
|
||||||
|
IndexWriter indexWriter = new IndexWriter(directory, indexWriterConfig)) {
|
||||||
|
|
||||||
|
final Document document = new Document();
|
||||||
|
document.add(new SuggestField("suggest1", "val", 1));
|
||||||
|
document.add(new SuggestField("suggest2", "val", 1));
|
||||||
|
document.add(new SuggestField("suggest2", "anotherval", 1));
|
||||||
|
document.add(new SuggestField("otherfield", "val", 1));
|
||||||
|
document.add(new SuggestField("otherfield", "anotherval", 1));
|
||||||
|
document.add(new SuggestField("otherfield", "yetmoreval", 1));
|
||||||
|
indexWriter.addDocument(document);
|
||||||
|
|
||||||
|
final OpenCloseCounter openCloseCounter = new OpenCloseCounter();
|
||||||
|
final CompletionStatsCache completionStatsCache = new CompletionStatsCache(() -> {
|
||||||
|
openCloseCounter.countOpened();
|
||||||
|
try {
|
||||||
|
final DirectoryReader directoryReader = DirectoryReader.open(indexWriter);
|
||||||
|
return new Engine.Searcher("test", directoryReader, null, null, queryCachingPolicy, () -> {
|
||||||
|
openCloseCounter.countClosed();
|
||||||
|
IOUtils.close(directoryReader);
|
||||||
|
});
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new AssertionError(e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
final int threadCount = 6;
|
||||||
|
final TestHarness testHarness = new TestHarness(completionStatsCache, threadCount);
|
||||||
|
final Thread[] threads = new Thread[threadCount];
|
||||||
|
threads[0] = new Thread(() -> testHarness.getStats(0, "*"));
|
||||||
|
threads[1] = new Thread(() -> testHarness.getStats(1, "suggest1", "suggest2"));
|
||||||
|
threads[2] = new Thread(() -> testHarness.getStats(2, "sug*"));
|
||||||
|
threads[3] = new Thread(() -> testHarness.getStats(3, "no match*"));
|
||||||
|
threads[4] = new Thread(() -> testHarness.getStats(4));
|
||||||
|
threads[5] = new Thread(() -> testHarness.getStats(5, (String[]) null));
|
||||||
|
|
||||||
|
for (Thread thread : threads) {
|
||||||
|
thread.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
testHarness.start();
|
||||||
|
|
||||||
|
for (Thread thread : threads) {
|
||||||
|
thread.join();
|
||||||
|
}
|
||||||
|
|
||||||
|
// 0: "*" should match all fields:
|
||||||
|
final long suggest1Size = testHarness.getResult(0).getFields().get("suggest1");
|
||||||
|
final long suggest2Size = testHarness.getResult(0).getFields().get("suggest2");
|
||||||
|
final long otherFieldSize = testHarness.getResult(0).getFields().get("otherfield");
|
||||||
|
final long totalSizeInBytes = testHarness.getResult(0).getSizeInBytes();
|
||||||
|
assertThat(suggest1Size, greaterThan(0L));
|
||||||
|
assertThat(suggest2Size, greaterThan(0L));
|
||||||
|
assertThat(otherFieldSize, greaterThan(0L));
|
||||||
|
assertThat(totalSizeInBytes, equalTo(suggest1Size + suggest2Size + otherFieldSize));
|
||||||
|
|
||||||
|
// 1: enumerating fields omits the other ones
|
||||||
|
assertThat(testHarness.getResult(1).getSizeInBytes(), equalTo(totalSizeInBytes));
|
||||||
|
assertThat(testHarness.getResult(1).getFields().get("suggest1"), equalTo(suggest1Size));
|
||||||
|
assertThat(testHarness.getResult(1).getFields().get("suggest2"), equalTo(suggest2Size));
|
||||||
|
assertFalse(testHarness.getResult(1).getFields().containsField("otherfield"));
|
||||||
|
|
||||||
|
// 2: wildcards also exclude some fields
|
||||||
|
assertThat(testHarness.getResult(2).getSizeInBytes(), equalTo(totalSizeInBytes));
|
||||||
|
assertThat(testHarness.getResult(2).getFields().get("suggest1"), equalTo(suggest1Size));
|
||||||
|
assertThat(testHarness.getResult(2).getFields().get("suggest2"), equalTo(suggest2Size));
|
||||||
|
assertFalse(testHarness.getResult(2).getFields().containsField("otherfield"));
|
||||||
|
|
||||||
|
// 3: non-matching wildcard returns empty set of fields
|
||||||
|
assertThat(testHarness.getResult(3).getSizeInBytes(), equalTo(totalSizeInBytes));
|
||||||
|
assertFalse(testHarness.getResult(3).getFields().containsField("suggest1"));
|
||||||
|
assertFalse(testHarness.getResult(3).getFields().containsField("suggest2"));
|
||||||
|
assertFalse(testHarness.getResult(3).getFields().containsField("otherfield"));
|
||||||
|
|
||||||
|
// 4: no fields means per-fields stats is null
|
||||||
|
assertThat(testHarness.getResult(4).getSizeInBytes(), equalTo(totalSizeInBytes));
|
||||||
|
assertNull(testHarness.getResult(4).getFields());
|
||||||
|
|
||||||
|
// 5: null fields means per-fields stats is null
|
||||||
|
assertThat(testHarness.getResult(5).getSizeInBytes(), equalTo(totalSizeInBytes));
|
||||||
|
assertNull(testHarness.getResult(5).getFields());
|
||||||
|
|
||||||
|
// the stats were only computed once
|
||||||
|
openCloseCounter.assertCount(1);
|
||||||
|
|
||||||
|
// the stats are not recomputed on a refresh
|
||||||
|
completionStatsCache.afterRefresh(true);
|
||||||
|
openCloseCounter.assertCount(1);
|
||||||
|
|
||||||
|
// but they are recomputed on the next get
|
||||||
|
completionStatsCache.get();
|
||||||
|
openCloseCounter.assertCount(2);
|
||||||
|
|
||||||
|
// and they do update
|
||||||
|
final Document document2 = new Document();
|
||||||
|
document2.add(new SuggestField("suggest1", "foo", 1));
|
||||||
|
document2.add(new SuggestField("suggest2", "bar", 1));
|
||||||
|
document2.add(new SuggestField("otherfield", "baz", 1));
|
||||||
|
indexWriter.addDocument(document2);
|
||||||
|
completionStatsCache.afterRefresh(true);
|
||||||
|
final CompletionStats updatedStats = completionStatsCache.get();
|
||||||
|
assertThat(updatedStats.getSizeInBytes(), greaterThan(totalSizeInBytes));
|
||||||
|
openCloseCounter.assertCount(3);
|
||||||
|
|
||||||
|
// beforeRefresh does not invalidate the cache
|
||||||
|
completionStatsCache.beforeRefresh();
|
||||||
|
completionStatsCache.get();
|
||||||
|
openCloseCounter.assertCount(3);
|
||||||
|
|
||||||
|
// afterRefresh does not invalidate the cache if no refresh took place
|
||||||
|
completionStatsCache.afterRefresh(false);
|
||||||
|
completionStatsCache.get();
|
||||||
|
openCloseCounter.assertCount(3);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class OpenCloseCounter {
|
||||||
|
private final AtomicInteger openCount = new AtomicInteger();
|
||||||
|
private final AtomicInteger closeCount = new AtomicInteger();
|
||||||
|
|
||||||
|
void countOpened() {
|
||||||
|
openCount.incrementAndGet();
|
||||||
|
}
|
||||||
|
|
||||||
|
void countClosed() {
|
||||||
|
closeCount.incrementAndGet();
|
||||||
|
}
|
||||||
|
|
||||||
|
void assertCount(int expectedCount) {
|
||||||
|
assertThat(openCount.get(), equalTo(expectedCount));
|
||||||
|
assertThat(closeCount.get(), equalTo(expectedCount));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class TestHarness {
|
||||||
|
private final CompletionStatsCache completionStatsCache;
|
||||||
|
private final CyclicBarrier cyclicBarrier;
|
||||||
|
private final CompletionStats[] results;
|
||||||
|
|
||||||
|
TestHarness(CompletionStatsCache completionStatsCache, int resultCount) {
|
||||||
|
this.completionStatsCache = completionStatsCache;
|
||||||
|
results = new CompletionStats[resultCount];
|
||||||
|
cyclicBarrier = new CyclicBarrier(resultCount + 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
void getStats(int threadIndex, String... fieldPatterns) {
|
||||||
|
start();
|
||||||
|
results[threadIndex] = completionStatsCache.get(fieldPatterns);
|
||||||
|
}
|
||||||
|
|
||||||
|
void start() {
|
||||||
|
try {
|
||||||
|
cyclicBarrier.await();
|
||||||
|
} catch (InterruptedException | BrokenBarrierException e) {
|
||||||
|
throw new AssertionError(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
CompletionStats getResult(int index) {
|
||||||
|
return results[index];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue