diff --git a/core/src/main/java/org/elasticsearch/action/explain/TransportExplainAction.java b/core/src/main/java/org/elasticsearch/action/explain/TransportExplainAction.java index c30dfd360a0..5b20b848f0b 100644 --- a/core/src/main/java/org/elasticsearch/action/explain/TransportExplainAction.java +++ b/core/src/main/java/org/elasticsearch/action/explain/TransportExplainAction.java @@ -33,8 +33,10 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.get.GetResult; +import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.SearchService; import org.elasticsearch.search.internal.AliasFilter; @@ -86,6 +88,19 @@ public class TransportExplainAction extends TransportSingleShardAction listener) throws IOException { + IndexService indexService = searchService.getIndicesService().indexServiceSafe(shardId.getIndex()); + IndexShard indexShard = indexService.getShard(shardId.id()); + indexShard.awaitShardSearchActive(b -> { + try { + super.asyncShardOperation(request, shardId, listener); + } catch (Exception ex) { + listener.onFailure(ex); + } + }); + } + @Override protected ExplainResponse shardOperation(ExplainRequest request, ShardId shardId) throws IOException { ShardSearchLocalRequest shardSearchLocalRequest = new ShardSearchLocalRequest(shardId, diff --git a/core/src/main/java/org/elasticsearch/action/get/TransportGetAction.java b/core/src/main/java/org/elasticsearch/action/get/TransportGetAction.java index 884af4a3af9..d14db67744d 100644 --- a/core/src/main/java/org/elasticsearch/action/get/TransportGetAction.java +++ b/core/src/main/java/org/elasticsearch/action/get/TransportGetAction.java @@ -19,13 +19,13 @@ package org.elasticsearch.action.get; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.single.shard.TransportSingleShardAction; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.routing.Preference; import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; @@ -38,6 +38,8 @@ import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.io.IOException; + /** * Performs the get operation. */ @@ -76,6 +78,23 @@ public class TransportGetAction extends TransportSingleShardAction listener) throws IOException { + IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); + IndexShard indexShard = indexService.getShard(shardId.id()); + if (request.realtime()) { // we are not tied to a refresh cycle here anyway + listener.onResponse(shardOperation(request, shardId)); + } else { + indexShard.awaitShardSearchActive(b -> { + try { + super.asyncShardOperation(request, shardId, listener); + } catch (Exception ex) { + listener.onFailure(ex); + } + }); + } + } + @Override protected GetResponse shardOperation(GetRequest request, ShardId shardId) { IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); diff --git a/core/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java b/core/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java index 811dcbed3dc..f2b2090dc28 100644 --- a/core/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java @@ -38,6 +38,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.logging.LoggerMessageFormat; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportChannel; @@ -47,6 +48,8 @@ import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.concurrent.Executor; import java.util.function.Supplier; import static org.elasticsearch.action.support.TransportActions.isShardNotAvailableException; @@ -78,7 +81,7 @@ public abstract class TransportSingleShardAction listener) throws IOException { + threadPool.executor(this.executor).execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + + @Override + protected void doRun() throws Exception { + listener.onResponse(shardOperation(request, shardId)); + } + }); + } protected abstract Response newResponse(); protected abstract boolean resolveIndex(Request request); @@ -291,11 +307,27 @@ public abstract class TransportSingleShardAction() { + @Override + public void onResponse(Response response) { + try { + channel.sendResponse(response); + } catch (IOException e) { + onFailure(e); + } + } + + @Override + public void onFailure(Exception e) { + try { + channel.sendResponse(e); + } catch (IOException e1) { + throw new UncheckedIOException(e1); + } + } + }); } } - /** * Internal request class that gets built on each node. Holds the original request plus additional info. */ diff --git a/core/src/main/java/org/elasticsearch/action/termvectors/TransportTermVectorsAction.java b/core/src/main/java/org/elasticsearch/action/termvectors/TransportTermVectorsAction.java index 5ff55a6fa55..289f40f1a34 100644 --- a/core/src/main/java/org/elasticsearch/action/termvectors/TransportTermVectorsAction.java +++ b/core/src/main/java/org/elasticsearch/action/termvectors/TransportTermVectorsAction.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.termvectors; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.single.shard.TransportSingleShardAction; @@ -37,6 +38,8 @@ import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.io.IOException; + /** * Performs the get operation. */ @@ -82,6 +85,23 @@ public class TransportTermVectorsAction extends TransportSingleShardAction listener) throws IOException { + IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); + IndexShard indexShard = indexService.getShard(shardId.id()); + if (request.realtime()) { // it's a realtime request which is not subject to refresh cycles + listener.onResponse(shardOperation(request, shardId)); + } else { + indexShard.awaitShardSearchActive(b -> { + try { + super.asyncShardOperation(request, shardId, listener); + } catch (Exception ex) { + listener.onFailure(ex); + } + }); + } + } + @Override protected TermVectorsResponse shardOperation(TermVectorsRequest request, ShardId shardId) { IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); diff --git a/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java b/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java index b3b0fdb8cf8..63c5e4d5ab5 100644 --- a/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java +++ b/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java @@ -36,7 +36,6 @@ import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.fielddata.IndexFieldDataService; import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.mapper.MapperService; -import org.elasticsearch.index.seqno.LocalCheckpointTracker; import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.store.FsDirectoryService; import org.elasticsearch.index.store.Store; @@ -135,6 +134,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING, IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING, IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING, + IndexSettings.INDEX_SEARCH_IDLE_AFTER, IndexFieldDataService.INDEX_FIELDDATA_CACHE_KEY, FieldMapper.IGNORE_MALFORMED_SETTING, FieldMapper.COERCE_SETTING, diff --git a/core/src/main/java/org/elasticsearch/index/IndexService.java b/core/src/main/java/org/elasticsearch/index/IndexService.java index d192e8781d6..78489965e39 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexService.java +++ b/core/src/main/java/org/elasticsearch/index/IndexService.java @@ -37,6 +37,7 @@ import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.NodeEnvironment; @@ -624,6 +625,27 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust } } if (refreshTask.getInterval().equals(indexSettings.getRefreshInterval()) == false) { + // once we change the refresh interval we schedule yet another refresh + // to ensure we are in a clean and predictable state. + // it doesn't matter if we move from or to -1 in both cases we want + // docs to become visible immediately. This also flushes all pending indexing / search reqeusts + // that are waiting for a refresh. + threadPool.executor(ThreadPool.Names.REFRESH).execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + logger.warn("forced refresh failed after interval change", e); + } + + @Override + protected void doRun() throws Exception { + maybeRefreshEngine(true); + } + + @Override + public boolean isForceExecution() { + return true; + } + }); rescheduleRefreshTasks(); } final Translog.Durability durability = indexSettings.getTranslogDurability(); @@ -686,17 +708,13 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust } } - private void maybeRefreshEngine() { - if (indexSettings.getRefreshInterval().millis() > 0) { + private void maybeRefreshEngine(boolean force) { + if (indexSettings.getRefreshInterval().millis() > 0 || force) { for (IndexShard shard : this.shards.values()) { - if (shard.isReadAllowed()) { - try { - if (shard.isRefreshNeeded()) { - shard.refresh("schedule"); - } - } catch (IndexShardClosedException | AlreadyClosedException ex) { - // fine - continue; - } + try { + shard.scheduledRefresh(); + } catch (IndexShardClosedException | AlreadyClosedException ex) { + // fine - continue; } } } @@ -896,7 +914,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust @Override protected void runInternal() { - indexService.maybeRefreshEngine(); + indexService.maybeRefreshEngine(false); } @Override diff --git a/core/src/main/java/org/elasticsearch/index/IndexSettings.java b/core/src/main/java/org/elasticsearch/index/IndexSettings.java index 9e390fb5b22..bf498d3d07d 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexSettings.java +++ b/core/src/main/java/org/elasticsearch/index/IndexSettings.java @@ -62,6 +62,9 @@ public final class IndexSettings { public static final Setting INDEX_TRANSLOG_SYNC_INTERVAL_SETTING = Setting.timeSetting("index.translog.sync_interval", TimeValue.timeValueSeconds(5), TimeValue.timeValueMillis(100), Property.IndexScope); + public static final Setting INDEX_SEARCH_IDLE_AFTER = + Setting.timeSetting("index.search.idle.after", TimeValue.timeValueSeconds(30), + TimeValue.timeValueMinutes(0), Property.IndexScope, Property.Dynamic); public static final Setting INDEX_TRANSLOG_DURABILITY_SETTING = new Setting<>("index.translog.durability", Translog.Durability.REQUEST.name(), (value) -> Translog.Durability.valueOf(value.toUpperCase(Locale.ROOT)), Property.Dynamic, Property.IndexScope); @@ -262,6 +265,8 @@ public final class IndexSettings { private volatile int maxNgramDiff; private volatile int maxShingleDiff; private volatile boolean TTLPurgeDisabled; + private volatile TimeValue searchIdleAfter; + /** * The maximum number of refresh listeners allows on this shard. */ @@ -371,6 +376,7 @@ public final class IndexSettings { maxSlicesPerScroll = scopedSettings.get(MAX_SLICES_PER_SCROLL); this.mergePolicyConfig = new MergePolicyConfig(logger, this); this.indexSortConfig = new IndexSortConfig(this); + searchIdleAfter = scopedSettings.get(INDEX_SEARCH_IDLE_AFTER); singleType = INDEX_MAPPING_SINGLE_TYPE_SETTING.get(indexMetaData.getSettings()); // get this from metadata - it's not registered if ((singleType || version.before(Version.V_6_0_0_alpha1)) == false) { throw new AssertionError(index.toString() + "multiple types are only allowed on pre 6.x indices but version is: [" @@ -411,8 +417,11 @@ public final class IndexSettings { scopedSettings.addSettingsUpdateConsumer(MAX_REFRESH_LISTENERS_PER_SHARD, this::setMaxRefreshListeners); scopedSettings.addSettingsUpdateConsumer(MAX_SLICES_PER_SCROLL, this::setMaxSlicesPerScroll); scopedSettings.addSettingsUpdateConsumer(DEFAULT_FIELD_SETTING, this::setDefaultFields); + scopedSettings.addSettingsUpdateConsumer(INDEX_SEARCH_IDLE_AFTER, this::setSearchIdleAfter); } + private void setSearchIdleAfter(TimeValue searchIdleAfter) { this.searchIdleAfter = searchIdleAfter; } + private void setTranslogFlushThresholdSize(ByteSizeValue byteSizeValue) { this.flushThresholdSize = byteSizeValue; } @@ -752,4 +761,16 @@ public final class IndexSettings { } public IndexScopedSettings getScopedSettings() { return scopedSettings;} + + /** + * Returns true iff the refresh setting exists or in other words is explicitly set. + */ + public boolean isExplicitRefresh() { + return INDEX_REFRESH_INTERVAL_SETTING.exists(settings); + } + + /** + * Returns the time that an index shard becomes search idle unless it's accessed in between + */ + public TimeValue getSearchIdleAfter() { return searchIdleAfter; } } diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index df4f77b5224..dc144c13d50 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -154,6 +154,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -236,6 +237,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl */ private final RefreshListeners refreshListeners; + private final AtomicLong lastSearcherAccess = new AtomicLong(); + private final AtomicReference pendingRefreshLocation = new AtomicReference<>(); + public IndexShard( ShardRouting shardRouting, IndexSettings indexSettings, @@ -300,6 +304,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl searcherWrapper = indexSearcherWrapper; primaryTerm = indexSettings.getIndexMetaData().primaryTerm(shardId.id()); refreshListeners = buildRefreshListeners(); + lastSearcherAccess.set(threadPool.relativeTimeInMillis()); persistMetadata(path, indexSettings, shardRouting, null, logger); } @@ -867,6 +872,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl long numDeletedDocs = 0; long sizeInBytes = 0; try (Engine.Searcher searcher = acquireSearcher("docStats", Engine.SearcherScope.INTERNAL)) { + // we don't wait for a pending refreshes here since it's a stats call instead we mark it as accesssed only which will cause + // the next scheduled refresh to go through and refresh the stats as well + markSearcherAccessed(); for (LeafReaderContext reader : searcher.reader().leaves()) { // we go on the segment level here to get accurate numbers final SegmentReader segmentReader = Lucene.segmentReader(reader.reader()); @@ -963,6 +971,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl public CompletionStats completionStats(String... fields) { CompletionStats completionStats = new CompletionStats(); try (Engine.Searcher currentSearcher = acquireSearcher("completion_stats")) { + // we don't wait for a pending refreshes here since it's a stats call instead we mark it as accesssed only which will cause + // the next scheduled refresh to go through and refresh the stats as well + markSearcherAccessed(); completionStats.add(CompletionFieldStats.completionStats(currentSearcher.reader(), fields)); } return completionStats; @@ -1132,6 +1143,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl return acquireSearcher(source, Engine.SearcherScope.EXTERNAL); } + private void markSearcherAccessed() { + lastSearcherAccess.lazySet(threadPool.relativeTimeInMillis()); + } + private Engine.Searcher acquireSearcher(String source, Engine.SearcherScope scope) { readAllowed(); final Engine engine = getEngine(); @@ -2433,14 +2448,74 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl } /** - * Returns true iff one or more changes to the engine are not visible to via the current searcher *or* there are pending - * refresh listeners. - * Otherwise false. + * Executes a scheduled refresh if necessary. * - * @throws AlreadyClosedException if the engine or internal indexwriter in the engine is already closed + * @return true iff the engine got refreshed otherwise false */ - public boolean isRefreshNeeded() { - return getEngine().refreshNeeded() || (refreshListeners != null && refreshListeners.refreshNeeded()); + public boolean scheduledRefresh() { + boolean listenerNeedsRefresh = refreshListeners.refreshNeeded(); + if (isReadAllowed() && (listenerNeedsRefresh || getEngine().refreshNeeded())) { + if (listenerNeedsRefresh == false // if we have a listener that is waiting for a refresh we need to force it + && isSearchIdle() && indexSettings.isExplicitRefresh() == false) { + // lets skip this refresh since we are search idle and + // don't necessarily need to refresh. the next searcher access will register a refreshListener and that will + // cause the next schedule to refresh. + setRefreshPending(); + return false; + } else { + refresh("schedule"); + return true; + } + } + return false; + } + + /** + * Returns true if this shards is search idle + */ + final boolean isSearchIdle() { + return (threadPool.relativeTimeInMillis() - lastSearcherAccess.get()) >= indexSettings.getSearchIdleAfter().getMillis(); + } + + /** + * Returns the last timestamp the searcher was accessed. This is a relative timestamp in milliseconds. + */ + final long getLastSearcherAccess() { + return lastSearcherAccess.get(); + } + + private void setRefreshPending() { + Engine engine = getEngine(); + Translog.Location lastWriteLocation = engine.getTranslog().getLastWriteLocation(); + Translog.Location location; + do { + location = this.pendingRefreshLocation.get(); + if (location != null && lastWriteLocation.compareTo(location) <= 0) { + break; + } + } while (pendingRefreshLocation.compareAndSet(location, lastWriteLocation) == false); + } + + /** + * Registers the given listener and invokes it once the shard is active again and all + * pending refresh translog location has been refreshed. If there is no pending refresh location registered the listener will be + * invoked immediately. + * @param listener the listener to invoke once the pending refresh location is visible. The listener will be called with + * true if the listener was registered to wait for a refresh. + */ + public final void awaitShardSearchActive(Consumer listener) { + if (isSearchIdle()) { + markSearcherAccessed(); // move the shard into non-search idle + } + final Translog.Location location = pendingRefreshLocation.get(); + if (location != null) { + addRefreshListener(location, (b) -> { + pendingRefreshLocation.compareAndSet(location, null); + listener.accept(true); + }); + } else { + listener.accept(false); + } } /** diff --git a/core/src/main/java/org/elasticsearch/search/SearchService.java b/core/src/main/java/org/elasticsearch/search/SearchService.java index 49ab6652957..117a979639b 100644 --- a/core/src/main/java/org/elasticsearch/search/SearchService.java +++ b/core/src/main/java/org/elasticsearch/search/SearchService.java @@ -582,6 +582,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv throws IOException { return createSearchContext(request, timeout, true); } + private DefaultSearchContext createSearchContext(ShardSearchRequest request, TimeValue timeout, boolean assertAsyncActions) throws IOException { @@ -979,22 +980,31 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv * The action listener is guaranteed to be executed on the search thread-pool */ private void rewriteShardRequest(ShardSearchRequest request, ActionListener listener) { + ActionListener actionListener = ActionListener.wrap(r -> + threadPool.executor(Names.SEARCH).execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + + @Override + protected void doRun() throws Exception { + listener.onResponse(request); + } + }), listener::onFailure); + IndexShard shardOrNull = indicesService.getShardOrNull(request.shardId()); + if (shardOrNull != null) { + // now we need to check if there is a pending refresh and register + ActionListener finalListener = actionListener; + actionListener = ActionListener.wrap(r -> + shardOrNull.awaitShardSearchActive(b -> finalListener.onResponse(r)), finalListener::onFailure); + } // we also do rewrite on the coordinating node (TransportSearchService) but we also need to do it here for BWC as well as // AliasFilters that might need to be rewritten. These are edge-cases but we are every efficient doing the rewrite here so it's not // adding a lot of overhead - Rewriteable.rewriteAndFetch(request.getRewriteable(), indicesService.getRewriteContext(request::nowInMillis), - ActionListener.wrap(r -> - threadPool.executor(Names.SEARCH).execute(new AbstractRunnable() { - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } + Rewriteable.rewriteAndFetch(request.getRewriteable(), indicesService.getRewriteContext(request::nowInMillis), actionListener); + - @Override - protected void doRun() throws Exception { - listener.onResponse(request); - } - }), listener::onFailure)); } /** @@ -1003,4 +1013,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv public QueryRewriteContext getRewriteContext(LongSupplier nowInMillis) { return indicesService.getRewriteContext(nowInMillis); } + + public IndicesService getIndicesService() { + return indicesService; + } } diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index b29ba2d9efc..7c38b7c211f 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -18,14 +18,14 @@ */ package org.elasticsearch.index.shard; -import org.apache.lucene.document.Field; -import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.util.IOUtils; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.ClusterInfoService; @@ -41,11 +41,11 @@ import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; @@ -56,11 +56,6 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.flush.FlushStats; -import org.elasticsearch.index.mapper.IdFieldMapper; -import org.elasticsearch.index.mapper.Mapping; -import org.elasticsearch.index.mapper.ParseContext; -import org.elasticsearch.index.mapper.ParsedDocument; -import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.IndicesService; @@ -82,8 +77,10 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; @@ -97,6 +94,7 @@ import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoSearchHits; import static org.hamcrest.Matchers.equalTo; public class IndexShardIT extends ESSingleNodeTestCase { @@ -106,21 +104,6 @@ public class IndexShardIT extends ESSingleNodeTestCase { return pluginList(InternalSettingsPlugin.class); } - private ParsedDocument testParsedDocument(String id, String type, String routing, long seqNo, - ParseContext.Document document, BytesReference source, XContentType xContentType, - Mapping mappingUpdate) { - Field uidField = new Field("_id", id, IdFieldMapper.Defaults.FIELD_TYPE); - Field versionField = new NumericDocValuesField("_version", 0); - SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID(); - document.add(uidField); - document.add(versionField); - document.add(seqID.seqNo); - document.add(seqID.seqNoDocValue); - document.add(seqID.primaryTerm); - return new ParsedDocument(versionField, seqID, id, type, routing, - Collections.singletonList(document), source, xContentType, mappingUpdate); - } - public void testLockTryingToDelete() throws Exception { createIndex("test"); ensureGreen(); @@ -550,4 +533,96 @@ public class IndexShardIT extends ESSingleNodeTestCase { RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE); return shardRouting; } + + public void testAutomaticRefresh() throws InterruptedException { + TimeValue randomTimeValue = randomFrom(random(), null, TimeValue.ZERO, TimeValue.timeValueMillis(randomIntBetween(0, 1000))); + Settings.Builder builder = Settings.builder(); + if (randomTimeValue != null) { + builder.put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), randomTimeValue); + } + IndexService indexService = createIndex("test", builder.build()); + assertFalse(indexService.getIndexSettings().isExplicitRefresh()); + ensureGreen(); + AtomicInteger totalNumDocs = new AtomicInteger(Integer.MAX_VALUE); + CountDownLatch started = new CountDownLatch(1); + Thread t = new Thread(() -> { + SearchResponse searchResponse; + started.countDown(); + do { + searchResponse = client().prepareSearch().get(); + } while (searchResponse.getHits().totalHits != totalNumDocs.get()); + }); + t.start(); + started.await(); + assertNoSearchHits(client().prepareSearch().get()); + int numDocs = scaledRandomIntBetween(25, 100); + totalNumDocs.set(numDocs); + CountDownLatch indexingDone = new CountDownLatch(numDocs); + client().prepareIndex("test", "test", "0").setSource("{\"foo\" : \"bar\"}", XContentType.JSON).get(); + indexingDone.countDown(); // one doc is indexed above blocking + IndexShard shard = indexService.getShard(0); + boolean hasRefreshed = shard.scheduledRefresh(); + if (randomTimeValue == TimeValue.ZERO) { + // with ZERO we are guaranteed to see the doc since we will wait for a refresh in the background + assertFalse(hasRefreshed); + assertTrue(shard.isSearchIdle()); + } else if (randomTimeValue == null){ + // with null we are guaranteed to see the doc since do execute the refresh. + // we can't assert on hasRefreshed since it might have been refreshed in the background on the shard concurrently + assertFalse(shard.isSearchIdle()); + } + assertHitCount(client().prepareSearch().get(), 1); + for (int i = 1; i < numDocs; i++) { + client().prepareIndex("test", "test", "" + i).setSource("{\"foo\" : \"bar\"}", XContentType.JSON) + .execute(new ActionListener() { + @Override + public void onResponse(IndexResponse indexResponse) { + indexingDone.countDown(); + } + + @Override + public void onFailure(Exception e) { + indexingDone.countDown(); + throw new AssertionError(e); + } + }); + } + indexingDone.await(); + t.join(); + } + + public void testPendingRefreshWithIntervalChange() throws InterruptedException { + Settings.Builder builder = Settings.builder(); + builder.put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.ZERO); + IndexService indexService = createIndex("test", builder.build()); + assertFalse(indexService.getIndexSettings().isExplicitRefresh()); + ensureGreen(); + assertNoSearchHits(client().prepareSearch().get()); + client().prepareIndex("test", "test", "0").setSource("{\"foo\" : \"bar\"}", XContentType.JSON).get(); + IndexShard shard = indexService.getShard(0); + assertFalse(shard.scheduledRefresh()); + assertTrue(shard.isSearchIdle()); + CountDownLatch refreshLatch = new CountDownLatch(1); + client().admin().indices().prepareRefresh() + .execute(ActionListener.wrap(refreshLatch::countDown));// async on purpose to make sure it happens concurrently + assertHitCount(client().prepareSearch().get(), 1); + client().prepareIndex("test", "test", "1").setSource("{\"foo\" : \"bar\"}", XContentType.JSON).get(); + assertFalse(shard.scheduledRefresh()); + + // now disable background refresh and make sure the refresh happens + CountDownLatch updateSettingsLatch = new CountDownLatch(1); + client().admin().indices() + .prepareUpdateSettings("test") + .setSettings(Settings.builder().put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), -1).build()) + .execute(ActionListener.wrap(updateSettingsLatch::countDown)); + assertHitCount(client().prepareSearch().get(), 2); + // wait for both to ensure we don't have in-flight operations + updateSettingsLatch.await(); + refreshLatch.await(); + + client().prepareIndex("test", "test", "2").setSource("{\"foo\" : \"bar\"}", XContentType.JSON).get(); + assertTrue(shard.scheduledRefresh()); + assertTrue(shard.isSearchIdle()); + assertHitCount(client().prepareSearch().get(), 3); + } } diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 3dcf734ccf8..9f7cdf88470 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -62,7 +62,9 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.xcontent.NamedXContentRegistry; @@ -70,6 +72,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineException; @@ -2585,4 +2588,137 @@ public class IndexShardTests extends IndexShardTestCase { public void verify(String verificationToken, DiscoveryNode localNode) { } } + + public void testIsSearchIdle() throws Exception { + Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .build(); + IndexMetaData metaData = IndexMetaData.builder("test") + .putMapping("test", "{ \"properties\": { \"foo\": { \"type\": \"text\"}}}") + .settings(settings) + .primaryTerm(0, 1).build(); + IndexShard primary = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null); + recoverShardFromStore(primary); + indexDoc(primary, "test", "0", "{\"foo\" : \"bar\"}"); + assertTrue(primary.getEngine().refreshNeeded()); + assertTrue(primary.scheduledRefresh()); + assertFalse(primary.isSearchIdle()); + + IndexScopedSettings scopedSettings = primary.indexSettings().getScopedSettings(); + settings = Settings.builder().put(settings).put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.ZERO).build(); + scopedSettings.applySettings(settings); + assertTrue(primary.isSearchIdle()); + + settings = Settings.builder().put(settings).put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.timeValueMinutes(1)) + .build(); + scopedSettings.applySettings(settings); + assertFalse(primary.isSearchIdle()); + + settings = Settings.builder().put(settings).put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.timeValueMillis(10)) + .build(); + scopedSettings.applySettings(settings); + assertBusy(() -> assertFalse(primary.isSearchIdle())); + do { + // now loop until we are fast enough... shouldn't take long + primary.acquireSearcher("test").close(); + } while (primary.isSearchIdle()); + closeShards(primary); + } + + public void testScheduledRefresh() throws IOException, InterruptedException { + Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .build(); + IndexMetaData metaData = IndexMetaData.builder("test") + .putMapping("test", "{ \"properties\": { \"foo\": { \"type\": \"text\"}}}") + .settings(settings) + .primaryTerm(0, 1).build(); + IndexShard primary = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null); + recoverShardFromStore(primary); + indexDoc(primary, "test", "0", "{\"foo\" : \"bar\"}"); + assertTrue(primary.getEngine().refreshNeeded()); + assertTrue(primary.scheduledRefresh()); + IndexScopedSettings scopedSettings = primary.indexSettings().getScopedSettings(); + settings = Settings.builder().put(settings).put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.ZERO).build(); + scopedSettings.applySettings(settings); + + assertFalse(primary.getEngine().refreshNeeded()); + indexDoc(primary, "test", "1", "{\"foo\" : \"bar\"}"); + assertTrue(primary.getEngine().refreshNeeded()); + long lastSearchAccess = primary.getLastSearcherAccess(); + assertFalse(primary.scheduledRefresh()); + assertEquals(lastSearchAccess, primary.getLastSearcherAccess()); + // wait until the thread-pool has moved the timestamp otherwise we can't assert on this below + awaitBusy(() -> primary.getThreadPool().relativeTimeInMillis() > lastSearchAccess); + CountDownLatch latch = new CountDownLatch(10); + for (int i = 0; i < 10; i++) { + primary.awaitShardSearchActive(refreshed -> { + assertTrue(refreshed); + try (Engine.Searcher searcher = primary.acquireSearcher("test")) { + assertEquals(2, searcher.reader().numDocs()); + } finally { + latch.countDown(); + } + }); + } + assertNotEquals("awaitShardSearchActive must access a searcher to remove search idle state", lastSearchAccess, + primary.getLastSearcherAccess()); + assertTrue(lastSearchAccess < primary.getLastSearcherAccess()); + try (Engine.Searcher searcher = primary.acquireSearcher("test")) { + assertEquals(1, searcher.reader().numDocs()); + } + assertTrue(primary.getEngine().refreshNeeded()); + assertTrue(primary.scheduledRefresh()); + latch.await(); + CountDownLatch latch1 = new CountDownLatch(1); + primary.awaitShardSearchActive(refreshed -> { + assertFalse(refreshed); + try (Engine.Searcher searcher = primary.acquireSearcher("test")) { + assertEquals(2, searcher.reader().numDocs()); + } finally { + latch1.countDown(); + } + + }); + latch1.await(); + closeShards(primary); + } + + public void testRefreshIsNeededWithRefreshListeners() throws IOException, InterruptedException { + Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .build(); + IndexMetaData metaData = IndexMetaData.builder("test") + .putMapping("test", "{ \"properties\": { \"foo\": { \"type\": \"text\"}}}") + .settings(settings) + .primaryTerm(0, 1).build(); + IndexShard primary = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null); + recoverShardFromStore(primary); + indexDoc(primary, "test", "0", "{\"foo\" : \"bar\"}"); + assertTrue(primary.getEngine().refreshNeeded()); + assertTrue(primary.scheduledRefresh()); + Engine.IndexResult doc = indexDoc(primary, "test", "1", "{\"foo\" : \"bar\"}"); + CountDownLatch latch = new CountDownLatch(1); + primary.addRefreshListener(doc.getTranslogLocation(), r -> latch.countDown()); + assertEquals(1, latch.getCount()); + assertTrue(primary.getEngine().refreshNeeded()); + assertTrue(primary.scheduledRefresh()); + latch.await(); + + IndexScopedSettings scopedSettings = primary.indexSettings().getScopedSettings(); + settings = Settings.builder().put(settings).put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.ZERO).build(); + scopedSettings.applySettings(settings); + + doc = indexDoc(primary, "test", "2", "{\"foo\" : \"bar\"}"); + CountDownLatch latch1 = new CountDownLatch(1); + primary.addRefreshListener(doc.getTranslogLocation(), r -> latch1.countDown()); + assertEquals(1, latch1.getCount()); + assertTrue(primary.getEngine().refreshNeeded()); + assertTrue(primary.scheduledRefresh()); + latch1.await(); + closeShards(primary); + } } diff --git a/docs/reference/index-modules.asciidoc b/docs/reference/index-modules.asciidoc index bf93f62847f..6dd0fad4365 100644 --- a/docs/reference/index-modules.asciidoc +++ b/docs/reference/index-modules.asciidoc @@ -107,11 +107,22 @@ specific index module: Set to a dash delimited lower and upper bound (e.g. `0-5`) or use `all` for the upper bound (e.g. `0-all`). Defaults to `false` (i.e. disabled). +`index.search.idle.after`:: + How long a shard can not receive a search or get request until it's considered + search idle. (default is `30s`) + `index.refresh_interval`:: How often to perform a refresh operation, which makes recent changes to the - index visible to search. Defaults to `1s`. Can be set to `-1` to disable - refresh. + index visible to search. Defaults to `1s`. Can be set to `-1` to disable + refresh. If this setting is not explicitly set, shards that haven't seen + search traffic for at least `index.search.idle.after` seconds will not receive + background refreshes until they receive a search request. Searches that hit an + idle shard where a refresh is pending will wait for the next background + refresh (within `1s`). This behavior aims to automatically optimize bulk + indexing in the default case when no searches are performed. In order to opt + out of this behavior an explicit value of `1s` should set as the refresh + interval. `index.max_result_window`:: diff --git a/docs/reference/migration/migrate_7_0/indices.asciidoc b/docs/reference/migration/migrate_7_0/indices.asciidoc index 92f56a2ddbb..16e437b4156 100644 --- a/docs/reference/migration/migrate_7_0/indices.asciidoc +++ b/docs/reference/migration/migrate_7_0/indices.asciidoc @@ -44,4 +44,14 @@ Indices created with version `7.0.0` onwards will have an automatic `index.numbe value set. This might change how documents are distributed across shards depending on how many shards the index has. In order to maintain the exact same distribution as a pre `7.0.0` index, the `index.number_of_routing_shards` must be set to the `index.number_of_shards` at index creation time. -Note: if the number of routing shards equals the number of shards `_split` operations are not supported. \ No newline at end of file +Note: if the number of routing shards equals the number of shards `_split` operations are not supported. + +==== Skipped background refresh on search idle shards. + +Shards belonging to an index that does not have an explicit +`index.refresh_interval` configured will no longer refresh in the background +once the shard becomes "search idle", ie the shard hasn't seen any search +traffic for `index.search.idle.after` seconds (defaults to `30s`). Searches +that access a search idle shard will be "parked" until the next refresh +happens. Indexing requests with `wait_for_refresh` will also trigger +a background refresh.