Add upper limit for scroll expiry (#26448)

This change adds a dynamic cluster setting named `search.max_keep_alive`.
It is used as an upper limit for scroll expiry time in scroll queries and defaults to 1 hour.
This change also ensures that the existing setting `search.default_keep_alive` is always smaller than `search.max_keep_alive`.

Relates #11511

* check style

* add skip for bwc

* iter

* Add a maxium throttle wait time of 1h for reindex

* review

* remove empty line
This commit is contained in:
Jim Ferenczi 2017-09-06 10:06:48 +02:00 committed by GitHub
parent ad355f3e0b
commit 0c799eedc5
8 changed files with 258 additions and 15 deletions

View File

@ -212,21 +212,31 @@ public abstract class AbstractScopedSettings extends AbstractComponent {
}
/**
* Adds a settings consumer that accepts the values for two settings. The consumer if only notified if one or both settings change.
* Adds a settings consumer that accepts the values for two settings.
* See {@link #addSettingsUpdateConsumer(Setting, Setting, BiConsumer, BiConsumer)} for details.
*/
public synchronized <A, B> void addSettingsUpdateConsumer(Setting<A> a, Setting<B> b, BiConsumer<A, B> consumer) {
addSettingsUpdateConsumer(a, b, consumer, (i, j) -> {} );
}
/**
* Adds a settings consumer that accepts the values for two settings. The consumer is only notified if one or both settings change
* and if the provided validator succeeded.
* <p>
* Note: Only settings registered in {@link SettingsModule} can be changed dynamically.
* </p>
* This method registers a compound updater that is useful if two settings are depending on each other. The consumer is always provided
* with both values even if only one of the two changes.
* This method registers a compound updater that is useful if two settings are depending on each other.
* The consumer is always provided with both values even if only one of the two changes.
*/
public synchronized <A, B> void addSettingsUpdateConsumer(Setting<A> a, Setting<B> b, BiConsumer<A, B> consumer) {
public synchronized <A, B> void addSettingsUpdateConsumer(Setting<A> a, Setting<B> b,
BiConsumer<A, B> consumer, BiConsumer<A, B> validator) {
if (a != get(a.getKey())) {
throw new IllegalArgumentException("Setting is not registered for key [" + a.getKey() + "]");
}
if (b != get(b.getKey())) {
throw new IllegalArgumentException("Setting is not registered for key [" + b.getKey() + "]");
}
addSettingsUpdater(Setting.compoundUpdater(consumer, a, b, logger));
addSettingsUpdater(Setting.compoundUpdater(consumer, validator, a, b, logger));
}
/**

View File

@ -356,6 +356,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT,
SearchService.DEFAULT_KEEPALIVE_SETTING,
SearchService.KEEPALIVE_INTERVAL_SETTING,
SearchService.MAX_KEEPALIVE_SETTING,
SearchService.LOW_LEVEL_CANCELLATION_SETTING,
Node.WRITE_PORTS_FILE_SETTING,
Node.NODE_NAME_SETTING,

View File

@ -479,7 +479,7 @@ public class Setting<T> implements ToXContentObject {
* See {@link AbstractScopedSettings#addSettingsUpdateConsumer(Setting, Setting, BiConsumer)} and its usage for details.
*/
static <A, B> AbstractScopedSettings.SettingUpdater<Tuple<A, B>> compoundUpdater(final BiConsumer<A, B> consumer,
final Setting<A> aSetting, final Setting<B> bSetting, Logger logger) {
final BiConsumer<A, B> validator, final Setting<A> aSetting, final Setting<B> bSetting, Logger logger) {
final AbstractScopedSettings.SettingUpdater<A> aSettingUpdater = aSetting.newUpdater(null, logger);
final AbstractScopedSettings.SettingUpdater<B> bSettingUpdater = bSetting.newUpdater(null, logger);
return new AbstractScopedSettings.SettingUpdater<Tuple<A, B>>() {
@ -490,7 +490,10 @@ public class Setting<T> implements ToXContentObject {
@Override
public Tuple<A, B> getValue(Settings current, Settings previous) {
return new Tuple<>(aSettingUpdater.getValue(current, previous), bSettingUpdater.getValue(current, previous));
A valueA = aSettingUpdater.getValue(current, previous);
B valueB = bSettingUpdater.getValue(current, previous);
validator.accept(valueA, valueB);
return new Tuple<>(valueA, valueB);
}
@Override

View File

@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import static java.lang.Math.max;
import static java.lang.Math.min;
import static java.lang.Math.round;
import static org.elasticsearch.common.unit.TimeValue.timeValueNanos;
@ -44,6 +45,11 @@ public class WorkerBulkByScrollTaskState implements SuccessfullyProcessed {
private static final Logger logger = Loggers.getLogger(WorkerBulkByScrollTaskState.class);
/**
* Maximum wait time allowed for throttling.
*/
private static final TimeValue MAX_THROTTLE_WAIT_TIME = TimeValue.timeValueHours(1);
private final BulkByScrollTask task;
/**
@ -189,7 +195,8 @@ public class WorkerBulkByScrollTaskState implements SuccessfullyProcessed {
public TimeValue throttleWaitTime(TimeValue lastBatchStartTime, TimeValue now, int lastBatchSize) {
long earliestNextBatchStartTime = now.nanos() + (long) perfectlyThrottledBatchTime(lastBatchSize);
return timeValueNanos(max(0, earliestNextBatchStartTime - System.nanoTime()));
long waitTime = min(MAX_THROTTLE_WAIT_TIME.nanos(), max(0, earliestNextBatchStartTime - System.nanoTime()));
return timeValueNanos(waitTime);
}
/**

View File

@ -43,6 +43,7 @@ import org.elasticsearch.common.util.concurrent.ConcurrentMapLong;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.MergeSchedulerConfig;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.query.InnerHitContextBuilder;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
@ -82,6 +83,7 @@ import org.elasticsearch.search.internal.SearchContext.Lifetime;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.profile.Profilers;
import org.elasticsearch.search.query.QueryPhase;
import org.elasticsearch.search.query.QueryPhaseExecutionException;
import org.elasticsearch.search.query.QuerySearchRequest;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.ScrollQuerySearchResult;
@ -106,6 +108,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongSupplier;
import static org.elasticsearch.common.unit.TimeValue.timeValueHours;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
import static org.elasticsearch.common.unit.TimeValue.timeValueMinutes;
@ -113,7 +116,9 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
// we can have 5 minutes here, since we make sure to clean with search requests and when shard/index closes
public static final Setting<TimeValue> DEFAULT_KEEPALIVE_SETTING =
Setting.positiveTimeSetting("search.default_keep_alive", timeValueMinutes(5), Property.NodeScope);
Setting.positiveTimeSetting("search.default_keep_alive", timeValueMinutes(5), Property.NodeScope, Property.Dynamic);
public static final Setting<TimeValue> MAX_KEEPALIVE_SETTING =
Setting.positiveTimeSetting("search.max_keep_alive", timeValueHours(24), Property.NodeScope, Property.Dynamic);
public static final Setting<TimeValue> KEEPALIVE_INTERVAL_SETTING =
Setting.positiveTimeSetting("search.keep_alive_interval", timeValueMinutes(1), Property.NodeScope);
/**
@ -147,7 +152,9 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
private final FetchPhase fetchPhase;
private final long defaultKeepAlive;
private volatile long defaultKeepAlive;
private volatile long maxKeepAlive;
private volatile TimeValue defaultSearchTimeout;
@ -173,7 +180,10 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
this.fetchPhase = fetchPhase;
TimeValue keepAliveInterval = KEEPALIVE_INTERVAL_SETTING.get(settings);
this.defaultKeepAlive = DEFAULT_KEEPALIVE_SETTING.get(settings).millis();
setKeepAlives(DEFAULT_KEEPALIVE_SETTING.get(settings), MAX_KEEPALIVE_SETTING.get(settings));
clusterService.getClusterSettings().addSettingsUpdateConsumer(DEFAULT_KEEPALIVE_SETTING, MAX_KEEPALIVE_SETTING,
this::setKeepAlives, this::validateKeepAlives);
this.keepAliveReaper = threadPool.scheduleWithFixedDelay(new Reaper(), keepAliveInterval, Names.SAME);
@ -184,6 +194,20 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
clusterService.getClusterSettings().addSettingsUpdateConsumer(LOW_LEVEL_CANCELLATION_SETTING, this::setLowLevelCancellation);
}
private void validateKeepAlives(TimeValue defaultKeepAlive, TimeValue maxKeepAlive) {
if (defaultKeepAlive.millis() > maxKeepAlive.millis()) {
throw new IllegalArgumentException("Default keep alive setting for scroll [" + DEFAULT_KEEPALIVE_SETTING.getKey() + "]" +
" should be smaller than max keep alive [" + MAX_KEEPALIVE_SETTING.getKey() + "], " +
"was (" + defaultKeepAlive.format() + " > " + maxKeepAlive.format() + ")");
}
}
private void setKeepAlives(TimeValue defaultKeepAlive, TimeValue maxKeepAlive) {
validateKeepAlives(defaultKeepAlive, maxKeepAlive);
this.defaultKeepAlive = defaultKeepAlive.millis();
this.maxKeepAlive = maxKeepAlive.millis();
}
private void setDefaultSearchTimeout(TimeValue defaultSearchTimeout) {
this.defaultSearchTimeout = defaultSearchTimeout;
}
@ -547,7 +571,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
if (request.scroll() != null && request.scroll().keepAlive() != null) {
keepAlive = request.scroll().keepAlive().millis();
}
context.keepAlive(keepAlive);
contextScrollKeepAlive(context, keepAlive);
context.lowLevelCancellation(lowLevelCancellation);
} catch (Exception e) {
context.close();
@ -625,6 +649,16 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
}
}
private void contextScrollKeepAlive(SearchContext context, long keepAlive) throws IOException {
if (keepAlive > maxKeepAlive) {
throw new QueryPhaseExecutionException(context,
"Keep alive for scroll (" + TimeValue.timeValueMillis(keepAlive).format() + ") is too large. " +
"It must be less than (" + TimeValue.timeValueMillis(maxKeepAlive).format() + "). " +
"This limit can be set by changing the [" + MAX_KEEPALIVE_SETTING.getKey() + "] cluster level setting.");
}
context.keepAlive(keepAlive);
}
private void contextProcessing(SearchContext context) {
// disable timeout while executing a search
context.accessed(-1);
@ -847,13 +881,13 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
context.docIdsToLoad(docIdsToLoad, 0, docIdsToLoad.length);
}
private void processScroll(InternalScrollSearchRequest request, SearchContext context) {
private void processScroll(InternalScrollSearchRequest request, SearchContext context) throws IOException {
// process scroll
context.from(context.from() + context.size());
context.scrollContext().scroll = request.scroll();
// update the context keep alive based on the new scroll value
if (request.scroll() != null && request.scroll().keepAlive() != null) {
context.keepAlive(request.scroll().keepAlive().millis());
contextScrollKeepAlive(context, request.scroll().keepAlive().millis());
}
}

View File

@ -359,6 +359,12 @@ public class SettingTests extends ESTestCase {
this.a = a;
this.b = b;
}
public void validate(Integer a, Integer b) {
if (Integer.signum(a) != Integer.signum(b)) {
throw new IllegalArgumentException("boom");
}
}
}
@ -366,7 +372,7 @@ public class SettingTests extends ESTestCase {
Composite c = new Composite();
Setting<Integer> a = Setting.intSetting("foo.int.bar.a", 1, Property.Dynamic, Property.NodeScope);
Setting<Integer> b = Setting.intSetting("foo.int.bar.b", 1, Property.Dynamic, Property.NodeScope);
ClusterSettings.SettingUpdater<Tuple<Integer, Integer>> settingUpdater = Setting.compoundUpdater(c::set, a, b, logger);
ClusterSettings.SettingUpdater<Tuple<Integer, Integer>> settingUpdater = Setting.compoundUpdater(c::set, c::validate, a, b, logger);
assertFalse(settingUpdater.apply(Settings.EMPTY, Settings.EMPTY));
assertNull(c.a);
assertNull(c.b);
@ -392,6 +398,40 @@ public class SettingTests extends ESTestCase {
}
public void testCompositeValidator() {
Composite c = new Composite();
Setting<Integer> a = Setting.intSetting("foo.int.bar.a", 1, Property.Dynamic, Property.NodeScope);
Setting<Integer> b = Setting.intSetting("foo.int.bar.b", 1, Property.Dynamic, Property.NodeScope);
ClusterSettings.SettingUpdater<Tuple<Integer, Integer>> settingUpdater = Setting.compoundUpdater(c::set, c::validate, a, b, logger);
assertFalse(settingUpdater.apply(Settings.EMPTY, Settings.EMPTY));
assertNull(c.a);
assertNull(c.b);
Settings build = Settings.builder().put("foo.int.bar.a", 2).build();
assertTrue(settingUpdater.apply(build, Settings.EMPTY));
assertEquals(2, c.a.intValue());
assertEquals(1, c.b.intValue());
Integer aValue = c.a;
assertFalse(settingUpdater.apply(build, build));
assertSame(aValue, c.a);
Settings previous = build;
build = Settings.builder().put("foo.int.bar.a", 2).put("foo.int.bar.b", 5).build();
assertTrue(settingUpdater.apply(build, previous));
assertEquals(2, c.a.intValue());
assertEquals(5, c.b.intValue());
Settings invalid = Settings.builder().put("foo.int.bar.a", -2).put("foo.int.bar.b", 5).build();
IllegalArgumentException exc = expectThrows(IllegalArgumentException.class, () -> settingUpdater.apply(invalid, previous));
assertThat(exc.getMessage(), equalTo("boom"));
// reset to default
assertTrue(settingUpdater.apply(Settings.EMPTY, build));
assertEquals(1, c.a.intValue());
assertEquals(1, c.b.intValue());
}
public void testListSettings() {
Setting<List<String>> listSetting = Setting.listSetting("foo.bar", Arrays.asList("foo,bar"), (s) -> s.toString(),
Property.Dynamic, Property.NodeScope);

View File

@ -19,6 +19,8 @@
package org.elasticsearch.search.scroll;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.search.ClearScrollResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
@ -35,10 +37,12 @@ import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.query.QueryPhaseExecutionException;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
import org.junit.After;
import java.io.IOException;
import java.util.Map;
@ -54,6 +58,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoSe
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchHits;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
@ -63,6 +68,13 @@ import static org.hamcrest.Matchers.notNullValue;
* Tests for scrolling.
*/
public class SearchScrollIT extends ESIntegTestCase {
@After
public void cleanup() throws Exception {
assertAcked(client().admin().cluster().prepareUpdateSettings()
.setPersistentSettings(Settings.builder().putNull("*"))
.setTransientSettings(Settings.builder().putNull("*")));
}
public void testSimpleScrollQueryThenFetch() throws Exception {
client().admin().indices().prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", 3)).execute().actionGet();
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet();
@ -518,6 +530,73 @@ public class SearchScrollIT extends ESIntegTestCase {
}
}
public void testScrollInvalidDefaultKeepAlive() throws IOException {
IllegalArgumentException exc = expectThrows(IllegalArgumentException.class, () ->
client().admin().cluster().prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put("search.max_keep_alive", "1m", "search.default_keep_alive", "2m")).get());
assertThat(exc.getMessage(), containsString("was (2 minutes > 1 minute)"));
assertAcked(client().admin().cluster().prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put("search.default_keep_alive", "5m", "search.max_keep_alive", "5m")).get());
assertAcked(client().admin().cluster().prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put("search.default_keep_alive", "2m")).get());
assertAcked(client().admin().cluster().prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put("search.max_keep_alive", "2m")).get());
exc = expectThrows(IllegalArgumentException.class, () -> client().admin().cluster().prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put("search.default_keep_alive", "3m")).get());
assertThat(exc.getMessage(), containsString("was (3 minutes > 2 minutes)"));
assertAcked(client().admin().cluster().prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put("search.default_keep_alive", "1m")).get());
exc = expectThrows(IllegalArgumentException.class, () -> client().admin().cluster().prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put("search.max_keep_alive", "30s")).get());
assertThat(exc.getMessage(), containsString("was (1 minute > 30 seconds)"));
}
public void testInvalidScrollKeepAlive() throws IOException {
createIndex("test");
for (int i = 0; i < 2; i++) {
client().prepareIndex("test", "type1",
Integer.toString(i)).setSource(jsonBuilder().startObject().field("field", i).endObject()).execute().actionGet();
}
refresh();
assertAcked(client().admin().cluster().prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put("search.default_keep_alive", "5m", "search.max_keep_alive", "5m")).get());
Exception exc = expectThrows(Exception.class,
() -> client().prepareSearch()
.setQuery(matchAllQuery())
.setSize(1)
.setScroll(TimeValue.timeValueHours(2))
.execute().actionGet());
QueryPhaseExecutionException queryPhaseExecutionException =
(QueryPhaseExecutionException) ExceptionsHelper.unwrap(exc, QueryPhaseExecutionException.class);
assertNotNull(queryPhaseExecutionException);
assertThat(queryPhaseExecutionException.getMessage(), containsString("Keep alive for scroll (2 hours) is too large"));
SearchResponse searchResponse = client().prepareSearch()
.setQuery(matchAllQuery())
.setSize(1)
.setScroll(TimeValue.timeValueMinutes(5))
.execute().actionGet();
assertNotNull(searchResponse.getScrollId());
assertThat(searchResponse.getHits().getTotalHits(), equalTo(2L));
assertThat(searchResponse.getHits().getHits().length, equalTo(1));
exc = expectThrows(Exception.class,
() -> client().prepareSearchScroll(searchResponse.getScrollId())
.setScroll(TimeValue.timeValueHours(3)).get());
queryPhaseExecutionException =
(QueryPhaseExecutionException) ExceptionsHelper.unwrap(exc, QueryPhaseExecutionException.class);
assertNotNull(queryPhaseExecutionException);
assertThat(queryPhaseExecutionException.getMessage(), containsString("Keep alive for scroll (3 hours) is too large"));
}
private void assertToXContentResponse(ClearScrollResponse response, boolean succeed, int numFreed) throws IOException {
XContentBuilder builder = XContentFactory.jsonBuilder();
response.toXContent(builder, ToXContent.EMPTY_PARAMS);

View File

@ -0,0 +1,69 @@
---
teardown:
- do:
cluster.put_settings:
body:
transient:
search.max_keep_alive: null
---
"Max keep alive":
- skip:
version: " - 7.0.0"
reason: search.max_keep_alive was added in 7.0.0
- do:
index:
index: test_scroll
type: test
id: 1
body: { foo: 1 }
- do:
index:
index: test_scroll
type: test
id: 2
body: { foo: 1 }
- do:
indices.refresh: {}
- do:
cluster.put_settings:
body:
transient:
search.default_keep_alive: "1m"
search.max_keep_alive: "1m"
- do:
catch: /.*Keep alive for scroll.*is too large.*/
search:
index: test_scroll
size: 1
scroll: 2m
sort: foo
body:
query:
match_all: {}
- do:
search:
index: test_scroll
size: 1
scroll: 1m
sort: foo
body:
query:
match_all: {}
- set: {_scroll_id: scroll_id}
- match: {hits.total: 2 }
- length: {hits.hits: 1 }
- do:
catch: /.*Keep alive for scroll.*is too large.*/
scroll:
scroll_id: $scroll_id
scroll: 3m