Return true for can_match on idle search shards (#55428)
With this change, we will always return true for can_match requests on idle search shards; otherwise, some shards will never get refreshed if all search requests perform the can_match phase (i.e., total shards > pre_filter_shard_size). Relates #27500 Relates #50043
This commit is contained in:
parent
3ba44a5af8
commit
1a3f9e5a07
|
@ -160,7 +160,7 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<CanMa
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
void consumeResult(CanMatchResponse result) {
|
void consumeResult(CanMatchResponse result) {
|
||||||
consumeResult(result.getShardIndex(), result.canMatch(), result.minAndMax());
|
consumeResult(result.getShardIndex(), result.canMatch(), result.estimatedMinAndMax());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -157,6 +157,7 @@ import java.io.PrintStream;
|
||||||
import java.nio.channels.ClosedByInterruptException;
|
import java.nio.channels.ClosedByInterruptException;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
@ -271,6 +272,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
||||||
|
|
||||||
private final AtomicLong lastSearcherAccess = new AtomicLong();
|
private final AtomicLong lastSearcherAccess = new AtomicLong();
|
||||||
private final AtomicReference<Translog.Location> pendingRefreshLocation = new AtomicReference<>();
|
private final AtomicReference<Translog.Location> pendingRefreshLocation = new AtomicReference<>();
|
||||||
|
private final RefreshPendingLocationListener refreshPendingLocationListener;
|
||||||
private volatile boolean useRetentionLeasesInPeerRecovery;
|
private volatile boolean useRetentionLeasesInPeerRecovery;
|
||||||
|
|
||||||
public IndexShard(
|
public IndexShard(
|
||||||
|
@ -369,6 +371,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
||||||
lastSearcherAccess.set(threadPool.relativeTimeInMillis());
|
lastSearcherAccess.set(threadPool.relativeTimeInMillis());
|
||||||
persistMetadata(path, indexSettings, shardRouting, null, logger);
|
persistMetadata(path, indexSettings, shardRouting, null, logger);
|
||||||
this.useRetentionLeasesInPeerRecovery = replicationTracker.hasAllPeerRecoveryRetentionLeases();
|
this.useRetentionLeasesInPeerRecovery = replicationTracker.hasAllPeerRecoveryRetentionLeases();
|
||||||
|
this.refreshPendingLocationListener = new RefreshPendingLocationListener();
|
||||||
}
|
}
|
||||||
|
|
||||||
public ThreadPool getThreadPool() {
|
public ThreadPool getThreadPool() {
|
||||||
|
@ -2751,7 +2754,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
||||||
similarityService.similarity(mapperService), codecService, shardEventListener,
|
similarityService.similarity(mapperService), codecService, shardEventListener,
|
||||||
indexCache != null ? indexCache.query() : null, cachingPolicy, translogConfig,
|
indexCache != null ? indexCache.query() : null, cachingPolicy, translogConfig,
|
||||||
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()),
|
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()),
|
||||||
Collections.singletonList(refreshListeners),
|
Arrays.asList(refreshListeners, refreshPendingLocationListener),
|
||||||
Collections.singletonList(new RefreshMetricUpdater(refreshMetric)),
|
Collections.singletonList(new RefreshMetricUpdater(refreshMetric)),
|
||||||
indexSort, circuitBreakerService, globalCheckpointSupplier, replicationTracker::getRetentionLeases,
|
indexSort, circuitBreakerService, globalCheckpointSupplier, replicationTracker::getRetentionLeases,
|
||||||
() -> getOperationPrimaryTerm(), tombstoneDocSupplier());
|
() -> getOperationPrimaryTerm(), tombstoneDocSupplier());
|
||||||
|
@ -3251,7 +3254,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
||||||
/**
|
/**
|
||||||
* Returns true if this shards is search idle
|
* Returns true if this shards is search idle
|
||||||
*/
|
*/
|
||||||
final boolean isSearchIdle() {
|
public final boolean isSearchIdle() {
|
||||||
return (threadPool.relativeTimeInMillis() - lastSearcherAccess.get()) >= indexSettings.getSearchIdleAfter().getMillis();
|
return (threadPool.relativeTimeInMillis() - lastSearcherAccess.get()) >= indexSettings.getSearchIdleAfter().getMillis();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3262,15 +3265,44 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
||||||
return lastSearcherAccess.get();
|
return lastSearcherAccess.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns true if this shard has some scheduled refresh that is pending because of search-idle.
|
||||||
|
*/
|
||||||
|
public final boolean hasRefreshPending() {
|
||||||
|
return pendingRefreshLocation.get() != null;
|
||||||
|
}
|
||||||
|
|
||||||
private void setRefreshPending(Engine engine) {
|
private void setRefreshPending(Engine engine) {
|
||||||
Translog.Location lastWriteLocation = engine.getTranslogLastWriteLocation();
|
final Translog.Location lastWriteLocation = engine.getTranslogLastWriteLocation();
|
||||||
Translog.Location location;
|
pendingRefreshLocation.updateAndGet(curr -> {
|
||||||
do {
|
if (curr == null || curr.compareTo(lastWriteLocation) <= 0) {
|
||||||
location = this.pendingRefreshLocation.get();
|
return lastWriteLocation;
|
||||||
if (location != null && lastWriteLocation.compareTo(location) <= 0) {
|
} else {
|
||||||
break;
|
return curr;
|
||||||
}
|
}
|
||||||
} while (pendingRefreshLocation.compareAndSet(location, lastWriteLocation) == false);
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private class RefreshPendingLocationListener implements ReferenceManager.RefreshListener {
|
||||||
|
Translog.Location lastWriteLocation;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void beforeRefresh() {
|
||||||
|
lastWriteLocation = getEngine().getTranslogLastWriteLocation();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void afterRefresh(boolean didRefresh) {
|
||||||
|
if (didRefresh) {
|
||||||
|
pendingRefreshLocation.updateAndGet(pendingLocation -> {
|
||||||
|
if (pendingLocation == null || pendingLocation.compareTo(lastWriteLocation) <= 0) {
|
||||||
|
return null;
|
||||||
|
} else {
|
||||||
|
return pendingLocation;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -1115,6 +1115,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
|
||||||
IndexShard indexShard = indexService.getShard(request.shardId().getId());
|
IndexShard indexShard = indexService.getShard(request.shardId().getId());
|
||||||
// we don't want to use the reader wrapper since it could run costly operations
|
// we don't want to use the reader wrapper since it could run costly operations
|
||||||
// and we can afford false positives.
|
// and we can afford false positives.
|
||||||
|
final boolean hasRefreshPending = indexShard.hasRefreshPending();
|
||||||
try (Engine.Searcher searcher = indexShard.acquireCanMatchSearcher()) {
|
try (Engine.Searcher searcher = indexShard.acquireCanMatchSearcher()) {
|
||||||
final boolean aliasFilterCanMatch = request.getAliasFilter()
|
final boolean aliasFilterCanMatch = request.getAliasFilter()
|
||||||
.getQueryBuilder() instanceof MatchNoneQueryBuilder == false;
|
.getQueryBuilder() instanceof MatchNoneQueryBuilder == false;
|
||||||
|
@ -1123,14 +1124,15 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
|
||||||
Rewriteable.rewrite(request.getRewriteable(), context, false);
|
Rewriteable.rewrite(request.getRewriteable(), context, false);
|
||||||
FieldSortBuilder sortBuilder = FieldSortBuilder.getPrimaryFieldSortOrNull(request.source());
|
FieldSortBuilder sortBuilder = FieldSortBuilder.getPrimaryFieldSortOrNull(request.source());
|
||||||
MinAndMax<?> minMax = sortBuilder != null ? FieldSortBuilder.getMinMaxOrNull(context, sortBuilder) : null;
|
MinAndMax<?> minMax = sortBuilder != null ? FieldSortBuilder.getMinMaxOrNull(context, sortBuilder) : null;
|
||||||
|
final boolean canMatch;
|
||||||
if (canRewriteToMatchNone(request.source())) {
|
if (canRewriteToMatchNone(request.source())) {
|
||||||
QueryBuilder queryBuilder = request.source().query();
|
QueryBuilder queryBuilder = request.source().query();
|
||||||
return new CanMatchResponse(
|
canMatch = aliasFilterCanMatch && queryBuilder instanceof MatchNoneQueryBuilder == false;
|
||||||
aliasFilterCanMatch && queryBuilder instanceof MatchNoneQueryBuilder == false, minMax
|
} else {
|
||||||
);
|
// null query means match_all
|
||||||
|
canMatch = aliasFilterCanMatch;
|
||||||
}
|
}
|
||||||
// null query means match_all
|
return new CanMatchResponse(canMatch || hasRefreshPending, minMax);
|
||||||
return new CanMatchResponse(aliasFilterCanMatch, minMax);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1207,28 +1209,28 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
|
||||||
|
|
||||||
public static final class CanMatchResponse extends SearchPhaseResult {
|
public static final class CanMatchResponse extends SearchPhaseResult {
|
||||||
private final boolean canMatch;
|
private final boolean canMatch;
|
||||||
private final MinAndMax<?> minAndMax;
|
private final MinAndMax<?> estimatedMinAndMax;
|
||||||
|
|
||||||
public CanMatchResponse(StreamInput in) throws IOException {
|
public CanMatchResponse(StreamInput in) throws IOException {
|
||||||
super(in);
|
super(in);
|
||||||
this.canMatch = in.readBoolean();
|
this.canMatch = in.readBoolean();
|
||||||
if (in.getVersion().onOrAfter(Version.V_7_6_0)) {
|
if (in.getVersion().onOrAfter(Version.V_7_6_0)) {
|
||||||
minAndMax = in.readOptionalWriteable(MinAndMax::new);
|
estimatedMinAndMax = in.readOptionalWriteable(MinAndMax::new);
|
||||||
} else {
|
} else {
|
||||||
minAndMax = null;
|
estimatedMinAndMax = null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public CanMatchResponse(boolean canMatch, MinAndMax<?> minAndMax) {
|
public CanMatchResponse(boolean canMatch, MinAndMax<?> estimatedMinAndMax) {
|
||||||
this.canMatch = canMatch;
|
this.canMatch = canMatch;
|
||||||
this.minAndMax = minAndMax;
|
this.estimatedMinAndMax = estimatedMinAndMax;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void writeTo(StreamOutput out) throws IOException {
|
public void writeTo(StreamOutput out) throws IOException {
|
||||||
out.writeBoolean(canMatch);
|
out.writeBoolean(canMatch);
|
||||||
if (out.getVersion().onOrAfter(Version.V_7_6_0)) {
|
if (out.getVersion().onOrAfter(Version.V_7_6_0)) {
|
||||||
out.writeOptionalWriteable(minAndMax);
|
out.writeOptionalWriteable(estimatedMinAndMax);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1236,8 +1238,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
|
||||||
return canMatch;
|
return canMatch;
|
||||||
}
|
}
|
||||||
|
|
||||||
public MinAndMax<?> minAndMax() {
|
public MinAndMax<?> estimatedMinAndMax() {
|
||||||
return minAndMax;
|
return estimatedMinAndMax;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -21,12 +21,18 @@ package org.elasticsearch.action.search;
|
||||||
|
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
|
import org.elasticsearch.index.IndexSettings;
|
||||||
|
import org.elasticsearch.index.query.RangeQueryBuilder;
|
||||||
|
import org.elasticsearch.index.shard.IndexShard;
|
||||||
|
import org.elasticsearch.indices.IndicesService;
|
||||||
import org.elasticsearch.test.ESIntegTestCase;
|
import org.elasticsearch.test.ESIntegTestCase;
|
||||||
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
|
||||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||||
import static org.hamcrest.Matchers.containsString;
|
import static org.hamcrest.Matchers.containsString;
|
||||||
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
|
||||||
public class TransportSearchIT extends ESIntegTestCase {
|
public class TransportSearchIT extends ESIntegTestCase {
|
||||||
|
|
||||||
|
@ -69,4 +75,32 @@ public class TransportSearchIT extends ESIntegTestCase {
|
||||||
TransportSearchAction.SHARD_COUNT_LIMIT_SETTING.getKey(), null)));
|
TransportSearchAction.SHARD_COUNT_LIMIT_SETTING.getKey(), null)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testSearchIdle() throws Exception {
|
||||||
|
int numOfReplicas = randomIntBetween(0, 1);
|
||||||
|
internalCluster().ensureAtLeastNumDataNodes(numOfReplicas + 1);
|
||||||
|
final Settings.Builder settings = Settings.builder()
|
||||||
|
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 5))
|
||||||
|
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numOfReplicas)
|
||||||
|
.put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.timeValueMillis(randomIntBetween(50, 500)));
|
||||||
|
assertAcked(prepareCreate("test").setSettings(settings).addMapping("_doc", "created_date", "type=date,format=yyyy-MM-dd"));
|
||||||
|
ensureGreen("test");
|
||||||
|
assertBusy(() -> {
|
||||||
|
for (String node : internalCluster().nodesInclude("test")) {
|
||||||
|
final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
|
||||||
|
for (IndexShard indexShard : indicesService.indexServiceSafe(resolveIndex("test"))) {
|
||||||
|
assertTrue(indexShard.isSearchIdle());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
client().prepareIndex("test", "_doc").setId("1").setSource("created_date", "2020-01-01").get();
|
||||||
|
client().prepareIndex("test", "_doc").setId("2").setSource("created_date", "2020-01-02").get();
|
||||||
|
client().prepareIndex("test", "_doc").setId("3").setSource("created_date", "2020-01-03").get();
|
||||||
|
assertBusy(() -> {
|
||||||
|
SearchResponse resp = client().prepareSearch("test")
|
||||||
|
.setQuery(new RangeQueryBuilder("created_date").gte("2020-01-02").lte("2020-01-03"))
|
||||||
|
.setPreFilterShardSize(randomIntBetween(1, 3)).get();
|
||||||
|
assertThat(resp.getHits().getTotalHits().value, equalTo(2L));
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -157,6 +157,7 @@ public class SearchIdleIT extends ESSingleNodeTestCase {
|
||||||
assertHitCount(client().prepareSearch().get(), 1);
|
assertHitCount(client().prepareSearch().get(), 1);
|
||||||
client().prepareIndex("test", "test", "1").setSource("{\"foo\" : \"bar\"}", XContentType.JSON).get();
|
client().prepareIndex("test", "test", "1").setSource("{\"foo\" : \"bar\"}", XContentType.JSON).get();
|
||||||
assertFalse(shard.scheduledRefresh());
|
assertFalse(shard.scheduledRefresh());
|
||||||
|
assertTrue(shard.hasRefreshPending());
|
||||||
|
|
||||||
// now disable background refresh and make sure the refresh happens
|
// now disable background refresh and make sure the refresh happens
|
||||||
CountDownLatch updateSettingsLatch = new CountDownLatch(1);
|
CountDownLatch updateSettingsLatch = new CountDownLatch(1);
|
||||||
|
@ -168,11 +169,13 @@ public class SearchIdleIT extends ESSingleNodeTestCase {
|
||||||
// wait for both to ensure we don't have in-flight operations
|
// wait for both to ensure we don't have in-flight operations
|
||||||
updateSettingsLatch.await();
|
updateSettingsLatch.await();
|
||||||
refreshLatch.await();
|
refreshLatch.await();
|
||||||
|
assertFalse(shard.hasRefreshPending());
|
||||||
// We need to ensure a `scheduledRefresh` triggered by the internal refresh setting update is executed before we index a new doc;
|
// We need to ensure a `scheduledRefresh` triggered by the internal refresh setting update is executed before we index a new doc;
|
||||||
// otherwise, it will compete to call `Engine#maybeRefresh` with the `scheduledRefresh` that we are going to verify.
|
// otherwise, it will compete to call `Engine#maybeRefresh` with the `scheduledRefresh` that we are going to verify.
|
||||||
ensureNoPendingScheduledRefresh(indexService.getThreadPool());
|
ensureNoPendingScheduledRefresh(indexService.getThreadPool());
|
||||||
client().prepareIndex("test", "test", "2").setSource("{\"foo\" : \"bar\"}", XContentType.JSON).get();
|
client().prepareIndex("test", "test", "2").setSource("{\"foo\" : \"bar\"}", XContentType.JSON).get();
|
||||||
assertTrue(shard.scheduledRefresh());
|
assertTrue(shard.scheduledRefresh());
|
||||||
|
assertFalse(shard.hasRefreshPending());
|
||||||
assertTrue(shard.isSearchIdle());
|
assertTrue(shard.isSearchIdle());
|
||||||
assertHitCount(client().prepareSearch().get(), 3);
|
assertHitCount(client().prepareSearch().get(), 3);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue