Split search in two when made against throttled and non throttled searches (#42510)
When a search on some indices takes a long time, it may cause problems to other indices that are being searched as part of the same search request and being written to as well, because their search context needs to stay open for a long time. This is especially a problem when searching against throttled and non-throttled indices as part of the same request. The problem can be generalized though: this may happen whenever read-only indices are searched together with indices that are being written to. Search contexts staying open for a long time is only an issue for indices that are being written to, in practice. This commit splits the search in two sub-searches: one for read-only indices, and one for ordinary indices. This way the two don't interfere with each other. The split is done only when size is greater than 0, no scroll is provided and query_then_fetch is used as search type. Otherwise, the search executes like before. Note that the returned num_reduce_phases reflect the number of reduction phases that were run. If the search is split in two, there are three reductions: one non-final for each search, and a final one that merges the results of the previous two. Closes #40900
This commit is contained in:
parent
e538592652
commit
afeda1a7b9
|
@ -29,6 +29,7 @@ import org.elasticsearch.action.support.HandledTransportAction;
|
|||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
|
@ -472,10 +473,89 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
|
|||
Map<String, Set<String>> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, searchRequest.routing(),
|
||||
searchRequest.indices());
|
||||
routingMap = routingMap == null ? Collections.emptyMap() : Collections.unmodifiableMap(routingMap);
|
||||
String[] concreteIndices = new String[indices.length];
|
||||
for (int i = 0; i < indices.length; i++) {
|
||||
concreteIndices[i] = indices[i].getName();
|
||||
Map<String, Float> concreteIndexBoosts = resolveIndexBoosts(searchRequest, clusterState);
|
||||
|
||||
if (shouldSplitIndices(searchRequest)) {
|
||||
//Execute two separate searches when we can, so that indices that are being written to are searched as quickly as possible.
|
||||
//Otherwise their search context would need to stay open for too long between the query and the fetch phase, due to other
|
||||
//indices (possibly slower) being searched at the same time.
|
||||
List<String> writeIndicesList = new ArrayList<>();
|
||||
List<String> readOnlyIndicesList = new ArrayList<>();
|
||||
splitIndices(indices, clusterState, writeIndicesList, readOnlyIndicesList);
|
||||
String[] writeIndices = writeIndicesList.toArray(new String[0]);
|
||||
String[] readOnlyIndices = readOnlyIndicesList.toArray(new String[0]);
|
||||
|
||||
if (readOnlyIndices.length == 0) {
|
||||
executeSearch(task, timeProvider, searchRequest, localIndices, writeIndices, routingMap,
|
||||
aliasFilter, concreteIndexBoosts, remoteShardIterators, remoteConnections, clusterState, listener, clusters);
|
||||
} else if (writeIndices.length == 0 && remoteShardIterators.isEmpty()) {
|
||||
executeSearch(task, timeProvider, searchRequest, localIndices, readOnlyIndices, routingMap,
|
||||
aliasFilter, concreteIndexBoosts, remoteShardIterators, remoteConnections, clusterState, listener, clusters);
|
||||
} else {
|
||||
//Split the search in two whenever throttled indices are searched together with ordinary indices (local or remote), so
|
||||
//that we don't keep the search context open for too long between query and fetch for ordinary indices due to slow indices.
|
||||
CountDown countDown = new CountDown(2);
|
||||
AtomicReference<Exception> exceptions = new AtomicReference<>();
|
||||
SearchResponseMerger searchResponseMerger = createSearchResponseMerger(searchRequest.source(), timeProvider,
|
||||
searchService::createReduceContext);
|
||||
CountDownActionListener<SearchResponse, SearchResponse> countDownActionListener =
|
||||
new CountDownActionListener<SearchResponse, SearchResponse>(countDown, exceptions, listener) {
|
||||
@Override
|
||||
void innerOnResponse(SearchResponse searchResponse) {
|
||||
searchResponseMerger.add(searchResponse);
|
||||
}
|
||||
|
||||
@Override
|
||||
SearchResponse createFinalResponse() {
|
||||
return searchResponseMerger.getMergedResponse(clusters);
|
||||
}
|
||||
};
|
||||
|
||||
//Note that the indices set to the new SearchRequest won't be retrieved from it, as they have been already resolved and
|
||||
//will be provided separately to executeSearch.
|
||||
SearchRequest writeIndicesRequest = SearchRequest.subSearchRequest(searchRequest, writeIndices,
|
||||
RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY, timeProvider.getAbsoluteStartMillis(), false);
|
||||
executeSearch(task, timeProvider, writeIndicesRequest, localIndices, writeIndices, routingMap,
|
||||
aliasFilter, concreteIndexBoosts, remoteShardIterators, remoteConnections, clusterState, countDownActionListener,
|
||||
SearchResponse.Clusters.EMPTY);
|
||||
|
||||
//Note that the indices set to the new SearchRequest won't be retrieved from it, as they have been already resolved and
|
||||
//will be provided separately to executeSearch.
|
||||
SearchRequest readOnlyIndicesRequest = SearchRequest.subSearchRequest(searchRequest, readOnlyIndices,
|
||||
RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY, timeProvider.getAbsoluteStartMillis(), false);
|
||||
executeSearch(task, timeProvider, readOnlyIndicesRequest, localIndices, readOnlyIndices, routingMap,
|
||||
aliasFilter, concreteIndexBoosts, Collections.emptyList(), (alias, id) -> null, clusterState, countDownActionListener,
|
||||
SearchResponse.Clusters.EMPTY);
|
||||
}
|
||||
} else {
|
||||
String[] concreteIndices = Arrays.stream(indices).map(Index::getName).toArray(String[]::new);
|
||||
executeSearch(task, timeProvider, searchRequest, localIndices, concreteIndices, routingMap,
|
||||
aliasFilter, concreteIndexBoosts, remoteShardIterators, remoteConnections, clusterState, listener, clusters);
|
||||
}
|
||||
}
|
||||
|
||||
static boolean shouldSplitIndices(SearchRequest searchRequest) {
|
||||
return searchRequest.scroll() == null && searchRequest.searchType() != DFS_QUERY_THEN_FETCH
|
||||
&& (searchRequest.source() == null || searchRequest.source().size() != 0);
|
||||
}
|
||||
|
||||
static void splitIndices(Index[] indices, ClusterState clusterState, List<String> writeIndices, List<String> readOnlyIndices) {
|
||||
for (Index index : indices) {
|
||||
ClusterBlockException writeBlock = clusterState.blocks().indexBlockedException(ClusterBlockLevel.WRITE, index.getName());
|
||||
if (writeBlock == null) {
|
||||
writeIndices.add(index.getName());
|
||||
} else {
|
||||
readOnlyIndices.add(index.getName());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, SearchRequest searchRequest,
|
||||
OriginalIndices localIndices, String[] concreteIndices, Map<String, Set<String>> routingMap,
|
||||
Map<String, AliasFilter> aliasFilter, Map<String, Float> concreteIndexBoosts,
|
||||
List<SearchShardIterator> remoteShardIterators, BiFunction<String, String, DiscoveryNode> remoteConnections,
|
||||
ClusterState clusterState, ActionListener<SearchResponse> listener, SearchResponse.Clusters clusters) {
|
||||
|
||||
Map<String, Long> nodeSearchCounts = searchTransportService.getPendingSearchRequests();
|
||||
GroupShardsIterator<ShardIterator> localShardsIterator = clusterService.operationRouting().searchShards(clusterState,
|
||||
concreteIndices, routingMap, searchRequest.preference(), searchService.getResponseCollectorService(), nodeSearchCounts);
|
||||
|
@ -484,8 +564,6 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
|
|||
|
||||
failIfOverShardCountLimit(clusterService, shardIterators.size());
|
||||
|
||||
Map<String, Float> concreteIndexBoosts = resolveIndexBoosts(searchRequest, clusterState);
|
||||
|
||||
// optimize search type for cases where there is only one shard group to search on
|
||||
if (shardIterators.size() == 1) {
|
||||
// if we only have one group, then we always want Q_T_F, no need for DFS, and no need to do THEN since we hit one shard
|
||||
|
@ -498,11 +576,9 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
|
|||
if (searchRequest.isSuggestOnly()) {
|
||||
// disable request cache if we have only suggest
|
||||
searchRequest.requestCache(false);
|
||||
switch (searchRequest.searchType()) {
|
||||
case DFS_QUERY_THEN_FETCH:
|
||||
// convert to Q_T_F if we have only suggest
|
||||
searchRequest.searchType(QUERY_THEN_FETCH);
|
||||
break;
|
||||
if (searchRequest.searchType() == DFS_QUERY_THEN_FETCH) {
|
||||
// convert to Q_T_F if we have only suggest
|
||||
searchRequest.searchType(QUERY_THEN_FETCH);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -611,22 +687,16 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
|
|||
}
|
||||
}
|
||||
|
||||
abstract static class CCSActionListener<Response, FinalResponse> implements ActionListener<Response> {
|
||||
private final String clusterAlias;
|
||||
private final boolean skipUnavailable;
|
||||
abstract static class CountDownActionListener<Response, FinalResponse> implements ActionListener<Response> {
|
||||
private final CountDown countDown;
|
||||
private final AtomicInteger skippedClusters;
|
||||
private final AtomicReference<Exception> exceptions;
|
||||
private final ActionListener<FinalResponse> originalListener;
|
||||
private final ActionListener<FinalResponse> delegateListener;
|
||||
|
||||
CCSActionListener(String clusterAlias, boolean skipUnavailable, CountDown countDown, AtomicInteger skippedClusters,
|
||||
AtomicReference<Exception> exceptions, ActionListener<FinalResponse> originalListener) {
|
||||
this.clusterAlias = clusterAlias;
|
||||
this.skipUnavailable = skipUnavailable;
|
||||
CountDownActionListener(CountDown countDown, AtomicReference<Exception> exceptions,
|
||||
ActionListener<FinalResponse> delegateListener) {
|
||||
this.countDown = countDown;
|
||||
this.skippedClusters = skippedClusters;
|
||||
this.exceptions = exceptions;
|
||||
this.originalListener = originalListener;
|
||||
this.delegateListener = delegateListener;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -637,26 +707,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
|
|||
|
||||
abstract void innerOnResponse(Response response);
|
||||
|
||||
@Override
|
||||
public final void onFailure(Exception e) {
|
||||
if (skipUnavailable) {
|
||||
skippedClusters.incrementAndGet();
|
||||
} else {
|
||||
Exception exception = e;
|
||||
if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias) == false) {
|
||||
exception = wrapRemoteClusterFailure(clusterAlias, e);
|
||||
}
|
||||
if (exceptions.compareAndSet(null, exception) == false) {
|
||||
exceptions.accumulateAndGet(exception, (previous, current) -> {
|
||||
current.addSuppressed(previous);
|
||||
return current;
|
||||
});
|
||||
}
|
||||
}
|
||||
maybeFinish();
|
||||
}
|
||||
|
||||
private void maybeFinish() {
|
||||
final void maybeFinish() {
|
||||
if (countDown.countDown()) {
|
||||
Exception exception = exceptions.get();
|
||||
if (exception == null) {
|
||||
|
@ -664,17 +715,56 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
|
|||
try {
|
||||
response = createFinalResponse();
|
||||
} catch(Exception e) {
|
||||
originalListener.onFailure(e);
|
||||
delegateListener.onFailure(e);
|
||||
return;
|
||||
}
|
||||
originalListener.onResponse(response);
|
||||
delegateListener.onResponse(response);
|
||||
} else {
|
||||
originalListener.onFailure(exceptions.get());
|
||||
delegateListener.onFailure(exceptions.get());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
abstract FinalResponse createFinalResponse();
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
if (exceptions.compareAndSet(null, e) == false) {
|
||||
exceptions.accumulateAndGet(e, (previous, current) -> {
|
||||
current.addSuppressed(previous);
|
||||
return current;
|
||||
});
|
||||
}
|
||||
maybeFinish();
|
||||
}
|
||||
}
|
||||
|
||||
abstract static class CCSActionListener<Response, FinalResponse> extends CountDownActionListener<Response, FinalResponse> {
|
||||
private final String clusterAlias;
|
||||
private final boolean skipUnavailable;
|
||||
private final AtomicInteger skippedClusters;
|
||||
|
||||
CCSActionListener(String clusterAlias, boolean skipUnavailable, CountDown countDown, AtomicInteger skippedClusters,
|
||||
AtomicReference<Exception> exceptions, ActionListener<FinalResponse> originalListener) {
|
||||
super(countDown, exceptions, originalListener);
|
||||
this.clusterAlias = clusterAlias;
|
||||
this.skipUnavailable = skipUnavailable;
|
||||
this.skippedClusters = skippedClusters;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void onFailure(Exception e) {
|
||||
if (skipUnavailable) {
|
||||
skippedClusters.incrementAndGet();
|
||||
maybeFinish();
|
||||
} else {
|
||||
Exception exception = e;
|
||||
if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias) == false) {
|
||||
exception = wrapRemoteClusterFailure(clusterAlias, e);
|
||||
}
|
||||
super.onFailure(exception);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static RemoteTransportException wrapRemoteClusterFailure(String clusterAlias, Exception e) {
|
||||
|
|
|
@ -19,11 +19,14 @@
|
|||
|
||||
package org.elasticsearch.action.search;
|
||||
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.action.support.WriteRequest;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.query.RangeQueryBuilder;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
|
@ -174,4 +177,62 @@ public class TransportSearchActionSingleNodeTests extends ESSingleNodeTestCase {
|
|||
assertEquals(2, longTerms.getBuckets().size());
|
||||
}
|
||||
}
|
||||
|
||||
public void testSplitIndices() {
|
||||
{
|
||||
CreateIndexResponse response = client().admin().indices().prepareCreate("write").get();
|
||||
assertTrue(response.isAcknowledged());
|
||||
}
|
||||
{
|
||||
CreateIndexResponse response = client().admin().indices().prepareCreate("readonly").get();
|
||||
assertTrue(response.isAcknowledged());
|
||||
}
|
||||
{
|
||||
SearchResponse response = client().prepareSearch("readonly").get();
|
||||
assertEquals(1, response.getTotalShards());
|
||||
assertEquals(1, response.getSuccessfulShards());
|
||||
assertEquals(1, response.getNumReducePhases());
|
||||
}
|
||||
{
|
||||
SearchResponse response = client().prepareSearch("write").get();
|
||||
assertEquals(1, response.getTotalShards());
|
||||
assertEquals(1, response.getSuccessfulShards());
|
||||
assertEquals(1, response.getNumReducePhases());
|
||||
}
|
||||
{
|
||||
SearchResponse response = client().prepareSearch("readonly", "write").get();
|
||||
assertEquals(2, response.getTotalShards());
|
||||
assertEquals(2, response.getSuccessfulShards());
|
||||
assertEquals(1, response.getNumReducePhases());
|
||||
}
|
||||
{
|
||||
Settings settings = Settings.builder().put("index.blocks.read_only", "true").build();
|
||||
AcknowledgedResponse response = client().admin().indices().prepareUpdateSettings("readonly").setSettings(settings).get();
|
||||
assertTrue(response.isAcknowledged());
|
||||
}
|
||||
try {
|
||||
{
|
||||
SearchResponse response = client().prepareSearch("readonly").get();
|
||||
assertEquals(1, response.getTotalShards());
|
||||
assertEquals(1, response.getSuccessfulShards());
|
||||
assertEquals(1, response.getNumReducePhases());
|
||||
}
|
||||
{
|
||||
SearchResponse response = client().prepareSearch("write").get();
|
||||
assertEquals(1, response.getTotalShards());
|
||||
assertEquals(1, response.getSuccessfulShards());
|
||||
assertEquals(1, response.getNumReducePhases());
|
||||
}
|
||||
{
|
||||
SearchResponse response = client().prepareSearch("readonly", "write").get();
|
||||
assertEquals(2, response.getTotalShards());
|
||||
assertEquals(2, response.getSuccessfulShards());
|
||||
assertEquals(3, response.getNumReducePhases());
|
||||
}
|
||||
} finally {
|
||||
Settings settings = Settings.builder().put("index.blocks.read_only", "false").build();
|
||||
AcknowledgedResponse response = client().admin().indices().prepareUpdateSettings("readonly").setSettings(settings).get();
|
||||
assertTrue(response.isAcknowledged());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,6 +29,10 @@ import org.elasticsearch.action.OriginalIndicesTests;
|
|||
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
|
||||
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.routing.GroupShardsIterator;
|
||||
import org.elasticsearch.cluster.routing.GroupShardsIteratorTests;
|
||||
|
@ -837,4 +841,75 @@ public class TransportSearchActionTests extends ESTestCase {
|
|||
assertFalse(TransportSearchAction.shouldMinimizeRoundtrips(searchRequest));
|
||||
}
|
||||
}
|
||||
|
||||
public void testShouldSplitIndices() {
|
||||
{
|
||||
SearchRequest searchRequest = new SearchRequest();
|
||||
assertTrue(TransportSearchAction.shouldSplitIndices(searchRequest));
|
||||
}
|
||||
{
|
||||
SearchRequest searchRequest = new SearchRequest();
|
||||
searchRequest.source(new SearchSourceBuilder());
|
||||
assertTrue(TransportSearchAction.shouldSplitIndices(searchRequest));
|
||||
}
|
||||
{
|
||||
SearchRequest searchRequest = new SearchRequest();
|
||||
searchRequest.source(new SearchSourceBuilder().size(randomIntBetween(1, 100)));
|
||||
assertTrue(TransportSearchAction.shouldSplitIndices(searchRequest));
|
||||
}
|
||||
{
|
||||
SearchRequest searchRequest = new SearchRequest();
|
||||
searchRequest.scroll("5s");
|
||||
assertFalse(TransportSearchAction.shouldSplitIndices(searchRequest));
|
||||
}
|
||||
{
|
||||
SearchRequest searchRequest = new SearchRequest();
|
||||
searchRequest.source(new SearchSourceBuilder().size(0));
|
||||
assertFalse(TransportSearchAction.shouldSplitIndices(searchRequest));
|
||||
}
|
||||
{
|
||||
SearchRequest searchRequest = new SearchRequest();
|
||||
searchRequest.searchType(SearchType.DFS_QUERY_THEN_FETCH);
|
||||
assertFalse(TransportSearchAction.shouldSplitIndices(searchRequest));
|
||||
}
|
||||
}
|
||||
|
||||
public void testSplitIndices() {
|
||||
int numIndices = randomIntBetween(1, 10);
|
||||
Index[] indices = new Index[numIndices];
|
||||
for (int i = 0; i < numIndices; i++) {
|
||||
String indexName = randomAlphaOfLengthBetween(5, 10);
|
||||
indices[i] = new Index(indexName, indexName + "-uuid");
|
||||
}
|
||||
{
|
||||
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).build();
|
||||
List<String> writeIndices = new ArrayList<>();
|
||||
List<String> readOnlyIndices = new ArrayList<>();
|
||||
TransportSearchAction.splitIndices(indices, clusterState, writeIndices, readOnlyIndices);
|
||||
assertEquals(0, readOnlyIndices.size());
|
||||
assertEquals(numIndices, writeIndices.size());
|
||||
}
|
||||
{
|
||||
List<String> expectedWrite = new ArrayList<>();
|
||||
List<String> expectedReadOnly = new ArrayList<>();
|
||||
ClusterBlocks.Builder blocksBuilder = ClusterBlocks.builder();
|
||||
for (Index index : indices) {
|
||||
if (randomBoolean()) {
|
||||
blocksBuilder.addIndexBlock(index.getName(), IndexMetaData.INDEX_WRITE_BLOCK);
|
||||
expectedReadOnly.add(index.getName());
|
||||
} else if(randomBoolean() ){
|
||||
blocksBuilder.addIndexBlock(index.getName(), IndexMetaData.INDEX_READ_ONLY_BLOCK);
|
||||
expectedReadOnly.add(index.getName());
|
||||
} else {
|
||||
expectedWrite.add(index.getName());
|
||||
}
|
||||
}
|
||||
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).blocks(blocksBuilder).build();
|
||||
List<String> writeIndices = new ArrayList<>();
|
||||
List<String> readOnlyIndices = new ArrayList<>();
|
||||
TransportSearchAction.splitIndices(indices, clusterState, writeIndices, readOnlyIndices);
|
||||
assertEquals(writeIndices, expectedWrite);
|
||||
assertEquals(readOnlyIndices, expectedReadOnly);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -82,6 +82,25 @@
|
|||
|
||||
- match: {hits.total: 0}
|
||||
|
||||
- do:
|
||||
index:
|
||||
index: ordinary
|
||||
id: "1"
|
||||
body: { "foo": "Hello: 1" }
|
||||
refresh: wait_for
|
||||
|
||||
- do:
|
||||
search:
|
||||
rest_total_hits_as_int: true
|
||||
index: [test, ordinary]
|
||||
ignore_throttled: false
|
||||
body:
|
||||
query:
|
||||
match:
|
||||
foo: hello
|
||||
|
||||
- match: {hits.total: 3}
|
||||
|
||||
---
|
||||
"Test index options":
|
||||
|
||||
|
|
Loading…
Reference in New Issue