mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-17 10:25:15 +00:00
Security: use default scroll keepalive (#33639)
Security previously hardcoded a default scroll keepalive of 10 seconds, but in some cases this is not enough time as there can be network issues or overloading of host machines. After this change, security will now use the default keepalive timeout, which is controllable using a setting and the default value is 5 minutes.
This commit is contained in:
parent
fcb60acc34
commit
a48b86e7c6
@ -5,6 +5,9 @@
|
|||||||
*/
|
*/
|
||||||
package org.elasticsearch.xpack.core.security;
|
package org.elasticsearch.xpack.core.security;
|
||||||
|
|
||||||
|
import org.apache.logging.log4j.LogManager;
|
||||||
|
import org.apache.logging.log4j.Logger;
|
||||||
|
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.action.search.ClearScrollRequest;
|
import org.elasticsearch.action.search.ClearScrollRequest;
|
||||||
import org.elasticsearch.action.search.SearchRequest;
|
import org.elasticsearch.action.search.SearchRequest;
|
||||||
@ -12,7 +15,6 @@ import org.elasticsearch.action.search.SearchResponse;
|
|||||||
import org.elasticsearch.action.search.SearchScrollRequest;
|
import org.elasticsearch.action.search.SearchScrollRequest;
|
||||||
import org.elasticsearch.action.support.ContextPreservingActionListener;
|
import org.elasticsearch.action.support.ContextPreservingActionListener;
|
||||||
import org.elasticsearch.client.Client;
|
import org.elasticsearch.client.Client;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
|
||||||
import org.elasticsearch.index.IndexNotFoundException;
|
import org.elasticsearch.index.IndexNotFoundException;
|
||||||
import org.elasticsearch.search.SearchHit;
|
import org.elasticsearch.search.SearchHit;
|
||||||
|
|
||||||
@ -25,6 +27,7 @@ import java.util.function.Function;
|
|||||||
|
|
||||||
public final class ScrollHelper {
|
public final class ScrollHelper {
|
||||||
|
|
||||||
|
private static final Logger LOGGER = LogManager.getLogger(ScrollHelper.class);
|
||||||
private ScrollHelper() {}
|
private ScrollHelper() {}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -35,13 +38,15 @@ public final class ScrollHelper {
|
|||||||
Function<SearchHit, T> hitParser) {
|
Function<SearchHit, T> hitParser) {
|
||||||
final List<T> results = new ArrayList<>();
|
final List<T> results = new ArrayList<>();
|
||||||
if (request.scroll() == null) { // we do scroll by default lets see if we can get rid of this at some point.
|
if (request.scroll() == null) { // we do scroll by default lets see if we can get rid of this at some point.
|
||||||
request.scroll(TimeValue.timeValueSeconds(10L));
|
throw new IllegalArgumentException("request must have scroll set");
|
||||||
}
|
}
|
||||||
final Consumer<SearchResponse> clearScroll = (response) -> {
|
final Consumer<SearchResponse> clearScroll = (response) -> {
|
||||||
if (response != null && response.getScrollId() != null) {
|
if (response != null && response.getScrollId() != null) {
|
||||||
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
|
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
|
||||||
clearScrollRequest.addScrollId(response.getScrollId());
|
clearScrollRequest.addScrollId(response.getScrollId());
|
||||||
client.clearScroll(clearScrollRequest, ActionListener.wrap((r) -> {}, (e) -> {}));
|
client.clearScroll(clearScrollRequest, ActionListener.wrap((r) -> {}, e ->
|
||||||
|
LOGGER.warn(new ParameterizedMessage("clear scroll failed for scroll id [{}]", response.getScrollId()), e)
|
||||||
|
));
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
// This function is MADNESS! But it works, don't think about it too hard...
|
// This function is MADNESS! But it works, don't think about it too hard...
|
||||||
|
@ -118,6 +118,7 @@ import java.util.function.Supplier;
|
|||||||
|
|
||||||
import static org.elasticsearch.action.support.TransportActions.isShardNotAvailableException;
|
import static org.elasticsearch.action.support.TransportActions.isShardNotAvailableException;
|
||||||
import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
|
import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
|
||||||
|
import static org.elasticsearch.search.SearchService.DEFAULT_KEEPALIVE_SETTING;
|
||||||
import static org.elasticsearch.xpack.core.ClientHelper.SECURITY_ORIGIN;
|
import static org.elasticsearch.xpack.core.ClientHelper.SECURITY_ORIGIN;
|
||||||
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
|
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
|
||||||
|
|
||||||
@ -846,7 +847,7 @@ public final class TokenService extends AbstractComponent {
|
|||||||
);
|
);
|
||||||
|
|
||||||
final SearchRequest request = client.prepareSearch(SecurityIndexManager.SECURITY_INDEX_NAME)
|
final SearchRequest request = client.prepareSearch(SecurityIndexManager.SECURITY_INDEX_NAME)
|
||||||
.setScroll(TimeValue.timeValueSeconds(10L))
|
.setScroll(DEFAULT_KEEPALIVE_SETTING.get(settings))
|
||||||
.setQuery(boolQuery)
|
.setQuery(boolQuery)
|
||||||
.setVersion(false)
|
.setVersion(false)
|
||||||
.setSize(1000)
|
.setSize(1000)
|
||||||
|
@ -28,7 +28,6 @@ import org.elasticsearch.common.ValidationException;
|
|||||||
import org.elasticsearch.common.component.AbstractComponent;
|
import org.elasticsearch.common.component.AbstractComponent;
|
||||||
import org.elasticsearch.common.settings.SecureString;
|
import org.elasticsearch.common.settings.SecureString;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
|
||||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||||
import org.elasticsearch.common.xcontent.XContentType;
|
import org.elasticsearch.common.xcontent.XContentType;
|
||||||
import org.elasticsearch.index.IndexNotFoundException;
|
import org.elasticsearch.index.IndexNotFoundException;
|
||||||
@ -62,6 +61,7 @@ import java.util.Map;
|
|||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
|
import static org.elasticsearch.search.SearchService.DEFAULT_KEEPALIVE_SETTING;
|
||||||
import static org.elasticsearch.xpack.core.ClientHelper.SECURITY_ORIGIN;
|
import static org.elasticsearch.xpack.core.ClientHelper.SECURITY_ORIGIN;
|
||||||
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
|
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
|
||||||
import static org.elasticsearch.xpack.core.ClientHelper.stashWithOrigin;
|
import static org.elasticsearch.xpack.core.ClientHelper.stashWithOrigin;
|
||||||
@ -139,7 +139,7 @@ public class NativeUsersStore extends AbstractComponent {
|
|||||||
final Supplier<ThreadContext.StoredContext> supplier = client.threadPool().getThreadContext().newRestorableContext(false);
|
final Supplier<ThreadContext.StoredContext> supplier = client.threadPool().getThreadContext().newRestorableContext(false);
|
||||||
try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN)) {
|
try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN)) {
|
||||||
SearchRequest request = client.prepareSearch(SECURITY_INDEX_NAME)
|
SearchRequest request = client.prepareSearch(SECURITY_INDEX_NAME)
|
||||||
.setScroll(TimeValue.timeValueSeconds(10L))
|
.setScroll(DEFAULT_KEEPALIVE_SETTING.get(settings))
|
||||||
.setQuery(query)
|
.setQuery(query)
|
||||||
.setSize(1000)
|
.setSize(1000)
|
||||||
.setFetchSource(true)
|
.setFetchSource(true)
|
||||||
|
@ -16,7 +16,6 @@ import org.elasticsearch.common.CheckedBiConsumer;
|
|||||||
import org.elasticsearch.common.bytes.BytesReference;
|
import org.elasticsearch.common.bytes.BytesReference;
|
||||||
import org.elasticsearch.common.component.AbstractComponent;
|
import org.elasticsearch.common.component.AbstractComponent;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
|
||||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||||
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
|
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
|
||||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||||
@ -56,6 +55,7 @@ import java.util.stream.Stream;
|
|||||||
import static org.elasticsearch.action.DocWriteResponse.Result.CREATED;
|
import static org.elasticsearch.action.DocWriteResponse.Result.CREATED;
|
||||||
import static org.elasticsearch.action.DocWriteResponse.Result.DELETED;
|
import static org.elasticsearch.action.DocWriteResponse.Result.DELETED;
|
||||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||||
|
import static org.elasticsearch.search.SearchService.DEFAULT_KEEPALIVE_SETTING;
|
||||||
import static org.elasticsearch.xpack.core.ClientHelper.SECURITY_ORIGIN;
|
import static org.elasticsearch.xpack.core.ClientHelper.SECURITY_ORIGIN;
|
||||||
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
|
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
|
||||||
import static org.elasticsearch.xpack.core.ClientHelper.stashWithOrigin;
|
import static org.elasticsearch.xpack.core.ClientHelper.stashWithOrigin;
|
||||||
@ -129,7 +129,7 @@ public class NativeRoleMappingStore extends AbstractComponent implements UserRol
|
|||||||
final Supplier<ThreadContext.StoredContext> supplier = client.threadPool().getThreadContext().newRestorableContext(false);
|
final Supplier<ThreadContext.StoredContext> supplier = client.threadPool().getThreadContext().newRestorableContext(false);
|
||||||
try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN)) {
|
try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN)) {
|
||||||
SearchRequest request = client.prepareSearch(SECURITY_INDEX_NAME)
|
SearchRequest request = client.prepareSearch(SECURITY_INDEX_NAME)
|
||||||
.setScroll(TimeValue.timeValueSeconds(10L))
|
.setScroll(DEFAULT_KEEPALIVE_SETTING.get(settings))
|
||||||
.setTypes(SECURITY_GENERIC_TYPE)
|
.setTypes(SECURITY_GENERIC_TYPE)
|
||||||
.setQuery(query)
|
.setQuery(query)
|
||||||
.setSize(1000)
|
.setSize(1000)
|
||||||
|
@ -24,7 +24,6 @@ import org.elasticsearch.common.collect.Tuple;
|
|||||||
import org.elasticsearch.common.component.AbstractComponent;
|
import org.elasticsearch.common.component.AbstractComponent;
|
||||||
import org.elasticsearch.common.io.stream.StreamInput;
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
|
||||||
import org.elasticsearch.common.util.CollectionUtils;
|
import org.elasticsearch.common.util.CollectionUtils;
|
||||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||||
import org.elasticsearch.common.util.iterable.Iterables;
|
import org.elasticsearch.common.util.iterable.Iterables;
|
||||||
@ -56,6 +55,7 @@ import java.util.stream.Collector;
|
|||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||||
|
import static org.elasticsearch.search.SearchService.DEFAULT_KEEPALIVE_SETTING;
|
||||||
import static org.elasticsearch.xpack.core.ClientHelper.SECURITY_ORIGIN;
|
import static org.elasticsearch.xpack.core.ClientHelper.SECURITY_ORIGIN;
|
||||||
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
|
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
|
||||||
import static org.elasticsearch.xpack.core.ClientHelper.stashWithOrigin;
|
import static org.elasticsearch.xpack.core.ClientHelper.stashWithOrigin;
|
||||||
@ -115,7 +115,7 @@ public class NativePrivilegeStore extends AbstractComponent {
|
|||||||
final Supplier<ThreadContext.StoredContext> supplier = client.threadPool().getThreadContext().newRestorableContext(false);
|
final Supplier<ThreadContext.StoredContext> supplier = client.threadPool().getThreadContext().newRestorableContext(false);
|
||||||
try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN)) {
|
try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN)) {
|
||||||
SearchRequest request = client.prepareSearch(SECURITY_INDEX_NAME)
|
SearchRequest request = client.prepareSearch(SECURITY_INDEX_NAME)
|
||||||
.setScroll(TimeValue.timeValueSeconds(10L))
|
.setScroll(DEFAULT_KEEPALIVE_SETTING.get(settings))
|
||||||
.setQuery(query)
|
.setQuery(query)
|
||||||
.setSize(1000)
|
.setSize(1000)
|
||||||
.setFetchSource(true)
|
.setFetchSource(true)
|
||||||
|
@ -59,6 +59,7 @@ import java.util.function.Supplier;
|
|||||||
|
|
||||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||||
import static org.elasticsearch.index.query.QueryBuilders.existsQuery;
|
import static org.elasticsearch.index.query.QueryBuilders.existsQuery;
|
||||||
|
import static org.elasticsearch.search.SearchService.DEFAULT_KEEPALIVE_SETTING;
|
||||||
import static org.elasticsearch.xpack.core.ClientHelper.SECURITY_ORIGIN;
|
import static org.elasticsearch.xpack.core.ClientHelper.SECURITY_ORIGIN;
|
||||||
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
|
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
|
||||||
import static org.elasticsearch.xpack.core.ClientHelper.stashWithOrigin;
|
import static org.elasticsearch.xpack.core.ClientHelper.stashWithOrigin;
|
||||||
@ -120,7 +121,7 @@ public class NativeRolesStore extends AbstractComponent {
|
|||||||
final Supplier<ThreadContext.StoredContext> supplier = client.threadPool().getThreadContext().newRestorableContext(false);
|
final Supplier<ThreadContext.StoredContext> supplier = client.threadPool().getThreadContext().newRestorableContext(false);
|
||||||
try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN)) {
|
try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN)) {
|
||||||
SearchRequest request = client.prepareSearch(SecurityIndexManager.SECURITY_INDEX_NAME)
|
SearchRequest request = client.prepareSearch(SecurityIndexManager.SECURITY_INDEX_NAME)
|
||||||
.setScroll(TimeValue.timeValueSeconds(10L))
|
.setScroll(DEFAULT_KEEPALIVE_SETTING.get(settings))
|
||||||
.setQuery(query)
|
.setQuery(query)
|
||||||
.setSize(1000)
|
.setSize(1000)
|
||||||
.setFetchSource(true)
|
.setFetchSource(true)
|
||||||
|
@ -79,6 +79,7 @@ public class ScrollHelperIntegTests extends ESSingleNodeTestCase {
|
|||||||
when(client.threadPool()).thenReturn(threadPool);
|
when(client.threadPool()).thenReturn(threadPool);
|
||||||
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
|
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
|
||||||
SearchRequest request = new SearchRequest();
|
SearchRequest request = new SearchRequest();
|
||||||
|
request.scroll(TimeValue.timeValueHours(10L));
|
||||||
|
|
||||||
String scrollId = randomAlphaOfLength(5);
|
String scrollId = randomAlphaOfLength(5);
|
||||||
SearchHit[] hits = new SearchHit[] {new SearchHit(1), new SearchHit(2)};
|
SearchHit[] hits = new SearchHit[] {new SearchHit(1), new SearchHit(2)};
|
||||||
|
@ -15,6 +15,7 @@ import org.elasticsearch.client.RequestOptions;
|
|||||||
import org.elasticsearch.client.Requests;
|
import org.elasticsearch.client.Requests;
|
||||||
import org.elasticsearch.client.ResponseException;
|
import org.elasticsearch.client.ResponseException;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.index.query.QueryBuilders;
|
import org.elasticsearch.index.query.QueryBuilders;
|
||||||
import org.elasticsearch.search.SearchHit;
|
import org.elasticsearch.search.SearchHit;
|
||||||
import org.elasticsearch.test.SecurityIntegTestCase;
|
import org.elasticsearch.test.SecurityIntegTestCase;
|
||||||
@ -161,6 +162,7 @@ public class AuditTrailTests extends SecurityIntegTestCase {
|
|||||||
client.admin().indices().refresh(Requests.refreshRequest(indexName)).get();
|
client.admin().indices().refresh(Requests.refreshRequest(indexName)).get();
|
||||||
|
|
||||||
final SearchRequest request = client.prepareSearch(indexName)
|
final SearchRequest request = client.prepareSearch(indexName)
|
||||||
|
.setScroll(TimeValue.timeValueMinutes(10L))
|
||||||
.setTypes(IndexAuditTrail.DOC_TYPE)
|
.setTypes(IndexAuditTrail.DOC_TYPE)
|
||||||
.setQuery(QueryBuilders.matchAllQuery())
|
.setQuery(QueryBuilders.matchAllQuery())
|
||||||
.setSize(1000)
|
.setSize(1000)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user