diff --git a/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java b/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java index 70eeb2a811e..337861d0dc5 100644 --- a/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java @@ -160,7 +160,7 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction pendingRefreshLocation = new AtomicReference<>(); + private final RefreshPendingLocationListener refreshPendingLocationListener; private volatile boolean useRetentionLeasesInPeerRecovery; public IndexShard( @@ -369,6 +371,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl lastSearcherAccess.set(threadPool.relativeTimeInMillis()); persistMetadata(path, indexSettings, shardRouting, null, logger); this.useRetentionLeasesInPeerRecovery = replicationTracker.hasAllPeerRecoveryRetentionLeases(); + this.refreshPendingLocationListener = new RefreshPendingLocationListener(); } public ThreadPool getThreadPool() { @@ -2751,7 +2754,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl similarityService.similarity(mapperService), codecService, shardEventListener, indexCache != null ? indexCache.query() : null, cachingPolicy, translogConfig, IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()), - Collections.singletonList(refreshListeners), + Arrays.asList(refreshListeners, refreshPendingLocationListener), Collections.singletonList(new RefreshMetricUpdater(refreshMetric)), indexSort, circuitBreakerService, globalCheckpointSupplier, replicationTracker::getRetentionLeases, () -> getOperationPrimaryTerm(), tombstoneDocSupplier()); @@ -3251,7 +3254,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl /** * Returns true if this shards is search idle */ - final boolean isSearchIdle() { + public final boolean isSearchIdle() { return (threadPool.relativeTimeInMillis() - lastSearcherAccess.get()) >= indexSettings.getSearchIdleAfter().getMillis(); } @@ -3262,15 +3265,44 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl return lastSearcherAccess.get(); } + /** + * Returns true if this shard has some scheduled refresh that is pending because of search-idle. + */ + public final boolean hasRefreshPending() { + return pendingRefreshLocation.get() != null; + } + private void setRefreshPending(Engine engine) { - Translog.Location lastWriteLocation = engine.getTranslogLastWriteLocation(); - Translog.Location location; - do { - location = this.pendingRefreshLocation.get(); - if (location != null && lastWriteLocation.compareTo(location) <= 0) { - break; + final Translog.Location lastWriteLocation = engine.getTranslogLastWriteLocation(); + pendingRefreshLocation.updateAndGet(curr -> { + if (curr == null || curr.compareTo(lastWriteLocation) <= 0) { + return lastWriteLocation; + } else { + return curr; } - } while (pendingRefreshLocation.compareAndSet(location, lastWriteLocation) == false); + }); + } + + private class RefreshPendingLocationListener implements ReferenceManager.RefreshListener { + Translog.Location lastWriteLocation; + + @Override + public void beforeRefresh() { + lastWriteLocation = getEngine().getTranslogLastWriteLocation(); + } + + @Override + public void afterRefresh(boolean didRefresh) { + if (didRefresh) { + pendingRefreshLocation.updateAndGet(pendingLocation -> { + if (pendingLocation == null || pendingLocation.compareTo(lastWriteLocation) <= 0) { + return null; + } else { + return pendingLocation; + } + }); + } + } } /** diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 3c437ac2132..03c4cd7e277 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -1115,6 +1115,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv IndexShard indexShard = indexService.getShard(request.shardId().getId()); // we don't want to use the reader wrapper since it could run costly operations // and we can afford false positives. + final boolean hasRefreshPending = indexShard.hasRefreshPending(); try (Engine.Searcher searcher = indexShard.acquireCanMatchSearcher()) { final boolean aliasFilterCanMatch = request.getAliasFilter() .getQueryBuilder() instanceof MatchNoneQueryBuilder == false; @@ -1123,14 +1124,15 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv Rewriteable.rewrite(request.getRewriteable(), context, false); FieldSortBuilder sortBuilder = FieldSortBuilder.getPrimaryFieldSortOrNull(request.source()); MinAndMax minMax = sortBuilder != null ? FieldSortBuilder.getMinMaxOrNull(context, sortBuilder) : null; + final boolean canMatch; if (canRewriteToMatchNone(request.source())) { QueryBuilder queryBuilder = request.source().query(); - return new CanMatchResponse( - aliasFilterCanMatch && queryBuilder instanceof MatchNoneQueryBuilder == false, minMax - ); + canMatch = aliasFilterCanMatch && queryBuilder instanceof MatchNoneQueryBuilder == false; + } else { + // null query means match_all + canMatch = aliasFilterCanMatch; } - // null query means match_all - return new CanMatchResponse(aliasFilterCanMatch, minMax); + return new CanMatchResponse(canMatch || hasRefreshPending, minMax); } } @@ -1207,28 +1209,28 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv public static final class CanMatchResponse extends SearchPhaseResult { private final boolean canMatch; - private final MinAndMax minAndMax; + private final MinAndMax estimatedMinAndMax; public CanMatchResponse(StreamInput in) throws IOException { super(in); this.canMatch = in.readBoolean(); if (in.getVersion().onOrAfter(Version.V_7_6_0)) { - minAndMax = in.readOptionalWriteable(MinAndMax::new); + estimatedMinAndMax = in.readOptionalWriteable(MinAndMax::new); } else { - minAndMax = null; + estimatedMinAndMax = null; } } - public CanMatchResponse(boolean canMatch, MinAndMax minAndMax) { + public CanMatchResponse(boolean canMatch, MinAndMax estimatedMinAndMax) { this.canMatch = canMatch; - this.minAndMax = minAndMax; + this.estimatedMinAndMax = estimatedMinAndMax; } @Override public void writeTo(StreamOutput out) throws IOException { out.writeBoolean(canMatch); if (out.getVersion().onOrAfter(Version.V_7_6_0)) { - out.writeOptionalWriteable(minAndMax); + out.writeOptionalWriteable(estimatedMinAndMax); } } @@ -1236,8 +1238,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv return canMatch; } - public MinAndMax minAndMax() { - return minAndMax; + public MinAndMax estimatedMinAndMax() { + return estimatedMinAndMax; } } diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchIT.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchIT.java index 64c9a1a4395..8b707092141 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchIT.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchIT.java @@ -21,12 +21,18 @@ package org.elasticsearch.action.search; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.query.RangeQueryBuilder; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.test.ESIntegTestCase; import java.util.Collections; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; public class TransportSearchIT extends ESIntegTestCase { @@ -69,4 +75,32 @@ public class TransportSearchIT extends ESIntegTestCase { TransportSearchAction.SHARD_COUNT_LIMIT_SETTING.getKey(), null))); } } + + public void testSearchIdle() throws Exception { + int numOfReplicas = randomIntBetween(0, 1); + internalCluster().ensureAtLeastNumDataNodes(numOfReplicas + 1); + final Settings.Builder settings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 5)) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numOfReplicas) + .put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.timeValueMillis(randomIntBetween(50, 500))); + assertAcked(prepareCreate("test").setSettings(settings).addMapping("_doc", "created_date", "type=date,format=yyyy-MM-dd")); + ensureGreen("test"); + assertBusy(() -> { + for (String node : internalCluster().nodesInclude("test")) { + final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node); + for (IndexShard indexShard : indicesService.indexServiceSafe(resolveIndex("test"))) { + assertTrue(indexShard.isSearchIdle()); + } + } + }); + client().prepareIndex("test", "_doc").setId("1").setSource("created_date", "2020-01-01").get(); + client().prepareIndex("test", "_doc").setId("2").setSource("created_date", "2020-01-02").get(); + client().prepareIndex("test", "_doc").setId("3").setSource("created_date", "2020-01-03").get(); + assertBusy(() -> { + SearchResponse resp = client().prepareSearch("test") + .setQuery(new RangeQueryBuilder("created_date").gte("2020-01-02").lte("2020-01-03")) + .setPreFilterShardSize(randomIntBetween(1, 3)).get(); + assertThat(resp.getHits().getTotalHits().value, equalTo(2L)); + }); + } } diff --git a/server/src/test/java/org/elasticsearch/index/shard/SearchIdleIT.java b/server/src/test/java/org/elasticsearch/index/shard/SearchIdleIT.java index 96f10df0084..db7225519c5 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/SearchIdleIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/SearchIdleIT.java @@ -157,6 +157,7 @@ public class SearchIdleIT extends ESSingleNodeTestCase { assertHitCount(client().prepareSearch().get(), 1); client().prepareIndex("test", "test", "1").setSource("{\"foo\" : \"bar\"}", XContentType.JSON).get(); assertFalse(shard.scheduledRefresh()); + assertTrue(shard.hasRefreshPending()); // now disable background refresh and make sure the refresh happens CountDownLatch updateSettingsLatch = new CountDownLatch(1); @@ -168,11 +169,13 @@ public class SearchIdleIT extends ESSingleNodeTestCase { // wait for both to ensure we don't have in-flight operations updateSettingsLatch.await(); refreshLatch.await(); + assertFalse(shard.hasRefreshPending()); // We need to ensure a `scheduledRefresh` triggered by the internal refresh setting update is executed before we index a new doc; // otherwise, it will compete to call `Engine#maybeRefresh` with the `scheduledRefresh` that we are going to verify. ensureNoPendingScheduledRefresh(indexService.getThreadPool()); client().prepareIndex("test", "test", "2").setSource("{\"foo\" : \"bar\"}", XContentType.JSON).get(); assertTrue(shard.scheduledRefresh()); + assertFalse(shard.hasRefreshPending()); assertTrue(shard.isSearchIdle()); assertHitCount(client().prepareSearch().get(), 3); }