diff --git a/modules/lang-mustache/src/main/java/org/elasticsearch/script/mustache/RestSearchTemplateAction.java b/modules/lang-mustache/src/main/java/org/elasticsearch/script/mustache/RestSearchTemplateAction.java index 0ef2a858e8b..410ff065d72 100644 --- a/modules/lang-mustache/src/main/java/org/elasticsearch/script/mustache/RestSearchTemplateAction.java +++ b/modules/lang-mustache/src/main/java/org/elasticsearch/script/mustache/RestSearchTemplateAction.java @@ -69,7 +69,8 @@ public class RestSearchTemplateAction extends BaseRestHandler { public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { // Creates the search request with all required params SearchRequest searchRequest = new SearchRequest(); - RestSearchAction.parseSearchRequest(searchRequest, request, null, size -> searchRequest.source().size(size)); + RestSearchAction.parseSearchRequest( + searchRequest, request, null, client.getNamedWriteableRegistry(), size -> searchRequest.source().size(size)); // Creates the search template request SearchTemplateRequest searchTemplateRequest; diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBaseReindexRestHandler.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBaseReindexRestHandler.java index 3423bdccee3..ed70d13f017 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBaseReindexRestHandler.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBaseReindexRestHandler.java @@ -23,6 +23,7 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.BytesRestResponse; @@ -49,7 +50,7 @@ public abstract class AbstractBaseReindexRestHandler< protected RestChannelConsumer doPrepareRequest(RestRequest request, NodeClient client, boolean includeCreated, boolean includeUpdated) throws IOException { // Build the internal request - Request internal = setCommonOptions(request, buildRequest(request)); + Request internal = setCommonOptions(request, buildRequest(request, client.getNamedWriteableRegistry())); // Executes the request and waits for completion if (request.paramAsBoolean("wait_for_completion", true)) { @@ -77,7 +78,7 @@ public abstract class AbstractBaseReindexRestHandler< /** * Build the Request based on the RestRequest. */ - protected abstract Request buildRequest(RestRequest request) throws IOException; + protected abstract Request buildRequest(RestRequest request, NamedWriteableRegistry namedWriteableRegistry) throws IOException; /** * Sets common options of {@link AbstractBulkByScrollRequest} requests. diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByQueryRestHandler.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByQueryRestHandler.java index 45487544cb2..507628cd1d9 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByQueryRestHandler.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByQueryRestHandler.java @@ -22,6 +22,7 @@ package org.elasticsearch.index.reindex; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; @@ -44,7 +45,7 @@ public abstract class AbstractBulkByQueryRestHandler< super(action); } - protected void parseInternalRequest(Request internal, RestRequest restRequest, + protected void parseInternalRequest(Request internal, RestRequest restRequest, NamedWriteableRegistry namedWriteableRegistry, Map> bodyConsumers) throws IOException { assert internal != null : "Request should not be null"; assert restRequest != null : "RestRequest should not be null"; @@ -52,7 +53,8 @@ public abstract class AbstractBulkByQueryRestHandler< SearchRequest searchRequest = internal.getSearchRequest(); try (XContentParser parser = extractRequestSpecificFields(restRequest, bodyConsumers)) { - RestSearchAction.parseSearchRequest(searchRequest, restRequest, parser, size -> setMaxDocsFromSearchSize(internal, size)); + RestSearchAction.parseSearchRequest( + searchRequest, restRequest, parser, namedWriteableRegistry, size -> setMaxDocsFromSearchSize(internal, size)); } searchRequest.source().size(restRequest.paramAsInt("scroll_size", searchRequest.source().size())); diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestDeleteByQueryAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestDeleteByQueryAction.java index bc820604158..b4d4dbd116c 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestDeleteByQueryAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestDeleteByQueryAction.java @@ -20,6 +20,7 @@ package org.elasticsearch.index.reindex; import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.rest.RestRequest; import java.io.IOException; @@ -56,7 +57,7 @@ public class RestDeleteByQueryAction extends AbstractBulkByQueryRestHandler internal.setConflicts((String) o)); consumers.put("max_docs", s -> setMaxDocsValidateIdentical(internal, ((Number) s).intValue())); - parseInternalRequest(internal, request, consumers); + parseInternalRequest(internal, request, namedWriteableRegistry, consumers); return internal; } diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestReindexAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestReindexAction.java index 49e32cfeaaa..299448ee6dd 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestReindexAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestReindexAction.java @@ -21,6 +21,7 @@ package org.elasticsearch.index.reindex; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.rest.RestRequest; @@ -56,7 +57,7 @@ public class RestReindexAction extends AbstractBaseReindexRestHandler internal.setScript(Script.parse(o))); consumers.put("max_docs", s -> setMaxDocsValidateIdentical(internal, ((Number) s).intValue())); - parseInternalRequest(internal, request, consumers); + parseInternalRequest(internal, request, namedWriteableRegistry, consumers); internal.setPipeline(request.param("pipeline")); return internal; diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RestDeleteByQueryActionTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RestDeleteByQueryActionTests.java index 13e00503d08..a92fa1f4282 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RestDeleteByQueryActionTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RestDeleteByQueryActionTests.java @@ -48,7 +48,7 @@ public class RestDeleteByQueryActionTests extends RestActionTestCase { // checks the type in the URL is propagated correctly to the request object // only works after the request is dispatched, so its params are filled from url. - DeleteByQueryRequest dbqRequest = action.buildRequest(request); + DeleteByQueryRequest dbqRequest = action.buildRequest(request, DEFAULT_NAMED_WRITABLE_REGISTRY); assertArrayEquals(new String[]{"some_type"}, dbqRequest.getDocTypes()); // RestDeleteByQueryAction itself doesn't check for a deprecated type usage @@ -57,7 +57,8 @@ public class RestDeleteByQueryActionTests extends RestActionTestCase { } public void testParseEmpty() throws IOException { - DeleteByQueryRequest request = action.buildRequest(new FakeRestRequest.Builder(new NamedXContentRegistry(emptyList())).build()); + final FakeRestRequest restRequest = new FakeRestRequest.Builder(new NamedXContentRegistry(emptyList())).build(); + DeleteByQueryRequest request = action.buildRequest(restRequest, DEFAULT_NAMED_WRITABLE_REGISTRY); assertEquals(AbstractBulkByScrollRequest.SIZE_ALL_MATCHES, request.getSize()); assertEquals(AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE, request.getSearchRequest().source().size()); } diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RestReindexActionTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RestReindexActionTests.java index 312a07ee13f..6bd1ea1c760 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RestReindexActionTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RestReindexActionTests.java @@ -21,6 +21,7 @@ package org.elasticsearch.index.reindex; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.json.JsonXContent; @@ -30,6 +31,7 @@ import org.elasticsearch.test.rest.RestActionTestCase; import org.junit.Before; import java.io.IOException; +import java.util.Collections; import java.util.Arrays; import static java.util.Collections.singletonMap; @@ -61,7 +63,8 @@ public class RestReindexActionTests extends RestActionTestCase { request.withContent(BytesReference.bytes(body), body.contentType()); } request.withParams(singletonMap("pipeline", "doesn't matter")); - Exception e = expectThrows(IllegalArgumentException.class, () -> action.buildRequest(request.build())); + Exception e = expectThrows(IllegalArgumentException.class, () -> + action.buildRequest(request.build(), new NamedWriteableRegistry(Collections.emptyList()))); assertEquals("_reindex doesn't support [pipeline] as a query parameter. Specify it in the [dest] object instead.", e.getMessage()); } @@ -70,14 +73,14 @@ public class RestReindexActionTests extends RestActionTestCase { { FakeRestRequest.Builder requestBuilder = new FakeRestRequest.Builder(xContentRegistry()); requestBuilder.withContent(new BytesArray("{}"), XContentType.JSON); - ReindexRequest request = action.buildRequest(requestBuilder.build()); + ReindexRequest request = action.buildRequest(requestBuilder.build(), new NamedWriteableRegistry(Collections.emptyList())); assertEquals(AbstractBulkByScrollRequest.DEFAULT_SCROLL_TIMEOUT, request.getScrollTime()); } { FakeRestRequest.Builder requestBuilder = new FakeRestRequest.Builder(xContentRegistry()); requestBuilder.withParams(singletonMap("scroll", "10m")); requestBuilder.withContent(new BytesArray("{}"), XContentType.JSON); - ReindexRequest request = action.buildRequest(requestBuilder.build()); + ReindexRequest request = action.buildRequest(requestBuilder.build(), new NamedWriteableRegistry(Collections.emptyList())); assertEquals("10m", request.getScrollTime().toString()); } } diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RestUpdateByQueryActionTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RestUpdateByQueryActionTests.java index d22082d09cb..19da391a17b 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RestUpdateByQueryActionTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RestUpdateByQueryActionTests.java @@ -49,7 +49,7 @@ public class RestUpdateByQueryActionTests extends RestActionTestCase { // checks the type in the URL is propagated correctly to the request object // only works after the request is dispatched, so its params are filled from url. - UpdateByQueryRequest ubqRequest = action.buildRequest(request); + UpdateByQueryRequest ubqRequest = action.buildRequest(request, DEFAULT_NAMED_WRITABLE_REGISTRY); assertArrayEquals(new String[]{"some_type"}, ubqRequest.getDocTypes()); // RestUpdateByQueryAction itself doesn't check for a deprecated type usage @@ -58,7 +58,8 @@ public class RestUpdateByQueryActionTests extends RestActionTestCase { } public void testParseEmpty() throws IOException { - UpdateByQueryRequest request = action.buildRequest(new FakeRestRequest.Builder(new NamedXContentRegistry(emptyList())).build()); + final FakeRestRequest restRequest = new FakeRestRequest.Builder(new NamedXContentRegistry(emptyList())).build(); + UpdateByQueryRequest request = action.buildRequest(restRequest, DEFAULT_NAMED_WRITABLE_REGISTRY); assertEquals(AbstractBulkByScrollRequest.SIZE_ALL_MATCHES, request.getSize()); assertEquals(AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE, request.getSearchRequest().source().size()); } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java index 90c47c92164..50182283543 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java @@ -295,12 +295,6 @@ public class SearchRequest extends ActionRequest implements IndicesRequest.Repla if (scroll) { validationException = addValidationError("using [point in time] is not allowed in a scroll context", validationException); } - if (routing() != null) { - validationException = addValidationError("[routing] cannot be used with point in time", validationException); - } - if (preference() != null) { - validationException = addValidationError("[preference] cannot be used with point in time", validationException); - } } return validationException; } diff --git a/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java b/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java index 98be2a92fea..6f3ee79f42b 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java @@ -19,6 +19,8 @@ package org.elasticsearch.rest.action.search; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchContextId; import org.elasticsearch.action.search.SearchRequest; @@ -54,6 +56,7 @@ import java.util.function.IntConsumer; import static java.util.Arrays.asList; import static java.util.Collections.unmodifiableList; +import static org.elasticsearch.action.ValidateActions.addValidationError; import static org.elasticsearch.common.unit.TimeValue.parseTimeValue; import static org.elasticsearch.rest.RestRequest.Method.GET; import static org.elasticsearch.rest.RestRequest.Method.POST; @@ -110,12 +113,8 @@ public class RestSearchAction extends BaseRestHandler { * company. */ IntConsumer setSize = size -> searchRequest.source().size(size); - request.withContentOrSourceParamParserOrNull(parser -> { - parseSearchRequest(searchRequest, request, parser, setSize); - if (searchRequest.pointInTimeBuilder() != null) { - preparePointInTime(searchRequest, client.getNamedWriteableRegistry()); - } - }); + request.withContentOrSourceParamParserOrNull(parser -> + parseSearchRequest(searchRequest, request, parser, client.getNamedWriteableRegistry(), setSize)); return channel -> { RestCancellableNodeClient cancelClient = new RestCancellableNodeClient(client, request.getHttpChannel()); @@ -132,6 +131,7 @@ public class RestSearchAction extends BaseRestHandler { */ public static void parseSearchRequest(SearchRequest searchRequest, RestRequest request, XContentParser requestContentParser, + NamedWriteableRegistry namedWriteableRegistry, IntConsumer setSize) throws IOException { if (searchRequest.source() == null) { @@ -189,6 +189,10 @@ public class RestSearchAction extends BaseRestHandler { searchRequest.setCcsMinimizeRoundtrips(request.paramAsBoolean("ccs_minimize_roundtrips", searchRequest.isCcsMinimizeRoundtrips())); checkRestTotalHits(request, searchRequest); + + if (searchRequest.pointInTimeBuilder() != null) { + preparePointInTime(searchRequest, namedWriteableRegistry); + } } /** @@ -306,6 +310,21 @@ public class RestSearchAction extends BaseRestHandler { static void preparePointInTime(SearchRequest request, NamedWriteableRegistry namedWriteableRegistry) { assert request.pointInTimeBuilder() != null; + ActionRequestValidationException validationException = null; + if (request.indices().length > 0) { + validationException = addValidationError("[indices] cannot be used with point in time", validationException); + } + if (request.indicesOptions() != SearchRequest.DEFAULT_INDICES_OPTIONS) { + validationException = addValidationError("[indicesOptions] cannot be used with point in time", validationException); + } + if (request.routing() != null) { + validationException = addValidationError("[routing] cannot be used with point in time", validationException); + } + if (request.preference() != null) { + validationException = addValidationError("[preference] cannot be used with point in time", validationException); + } + ExceptionsHelper.reThrowIfNotNull(validationException); + final IndicesOptions indicesOptions = request.indicesOptions(); final IndicesOptions stricterIndicesOptions = IndicesOptions.fromOptions( indicesOptions.ignoreUnavailable(), indicesOptions.allowNoIndices(), false, false, false, diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java index 54b558b7d24..d1a1f861a35 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java @@ -216,28 +216,6 @@ public class SearchRequestTests extends AbstractSearchTestCase { assertEquals(1, validationErrors.validationErrors().size()); assertEquals("using [point in time] is not allowed in a scroll context", validationErrors.validationErrors().get(0)); } - { - // Reader context with preference - SearchRequest searchRequest = new SearchRequest() - .source(new SearchSourceBuilder(). - pointInTimeBuilder(new SearchSourceBuilder.PointInTimeBuilder("id", TimeValue.timeValueMillis(between(1, 10))))) - .preference("test"); - ActionRequestValidationException validationErrors = searchRequest.validate(); - assertNotNull(validationErrors); - assertEquals(1, validationErrors.validationErrors().size()); - assertEquals("[preference] cannot be used with point in time", validationErrors.validationErrors().get(0)); - } - { - // Reader context with routing - SearchRequest searchRequest = new SearchRequest() - .source(new SearchSourceBuilder() - .pointInTimeBuilder(new SearchSourceBuilder.PointInTimeBuilder("id", TimeValue.timeValueMillis(between(1, 10))))) - .routing("test"); - ActionRequestValidationException validationErrors = searchRequest.validate(); - assertNotNull(validationErrors); - assertEquals(1, validationErrors.validationErrors().size()); - assertEquals("[routing] cannot be used with point in time", validationErrors.validationErrors().get(0)); - } } public void testCopyConstructor() throws IOException { diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java index a14489c9ad2..43326ea9394 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java @@ -1348,6 +1348,9 @@ public abstract class ESTestCase extends LuceneTestCase { private static final NamedXContentRegistry DEFAULT_NAMED_X_CONTENT_REGISTRY = new NamedXContentRegistry(ClusterModule.getNamedXWriteables()); + protected static final NamedWriteableRegistry DEFAULT_NAMED_WRITABLE_REGISTRY = + new NamedWriteableRegistry(ClusterModule.getNamedWriteables()); + /** * The {@link NamedXContentRegistry} to use for this test. Subclasses should override and use liberally. */ diff --git a/x-pack/plugin/async-search/qa/security/src/javaRestTest/java/org/elasticsearch/xpack/search/AsyncSearchSecurityIT.java b/x-pack/plugin/async-search/qa/security/src/javaRestTest/java/org/elasticsearch/xpack/search/AsyncSearchSecurityIT.java index d4f75b1fd39..5e5ca8c7efb 100644 --- a/x-pack/plugin/async-search/qa/security/src/javaRestTest/java/org/elasticsearch/xpack/search/AsyncSearchSecurityIT.java +++ b/x-pack/plugin/async-search/qa/security/src/javaRestTest/java/org/elasticsearch/xpack/search/AsyncSearchSecurityIT.java @@ -28,6 +28,7 @@ import org.elasticsearch.search.SearchHit; import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.xpack.core.async.AsyncExecutionId; import org.hamcrest.CustomMatcher; +import org.hamcrest.Matcher; import org.junit.Before; import java.io.IOException; @@ -39,6 +40,7 @@ import static org.elasticsearch.xpack.core.XPackPlugin.ASYNC_RESULTS_INDEX; import static org.elasticsearch.xpack.core.security.authc.AuthenticationServiceField.RUN_AS_USER_HEADER; import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue; import static org.hamcrest.Matchers.arrayContainingInAnyOrder; +import static org.hamcrest.Matchers.arrayWithSize; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -75,14 +77,7 @@ public class AsyncSearchSecurityIT extends ESRestTestCase { public void testWithDlsAndFls() throws Exception { Response submitResp = submitAsyncSearch("*", "*", TimeValue.timeValueSeconds(10), "user_dls"); assertOK(submitResp); - String id = extractResponseId(submitResp); - Response getResp = getAsyncSearch(id, "user_dls"); - AsyncSearchResponse searchResponse = AsyncSearchResponse.fromXContent(XContentHelper.createParser(NamedXContentRegistry.EMPTY, - LoggingDeprecationHandler.INSTANCE, - new BytesArray(EntityUtils.toByteArray(getResp.getEntity())), - XContentType.JSON)); - SearchHit[] hits = searchResponse.getSearchResponse().getHits().getHits(); - + SearchHit[] hits = getSearchHits(extractResponseId(submitResp), "user_dls"); assertThat(hits, arrayContainingInAnyOrder( new CustomMatcher("\"index\" doc 1 matcher") { @Override @@ -151,6 +146,139 @@ public class AsyncSearchSecurityIT extends ESRestTestCase { assertThat(exc.getMessage(), containsString("unauthorized")); } + private SearchHit[] getSearchHits(String asyncId, String user) throws IOException { + final Response resp = getAsyncSearch(asyncId, user); + assertOK(resp); + AsyncSearchResponse searchResponse = AsyncSearchResponse.fromXContent(XContentHelper.createParser(NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, + new BytesArray(EntityUtils.toByteArray(resp.getEntity())), + XContentType.JSON)); + return searchResponse.getSearchResponse().getHits().getHits(); + } + + public void testAuthorizationOfPointInTime() throws Exception { + String authorizedUser = randomFrom("user1", "user2"); + final Matcher hitMatcher = new CustomMatcher("hit") { + @Override + public boolean matches(Object actual) { + SearchHit hit = (SearchHit) actual; + return hit.getIndex().equals("index-" + authorizedUser) && hit.getId().equals("0"); + } + }; + final String pitId = openPointInTime(new String[]{"index-" + authorizedUser}, authorizedUser); + try { + Response submit = submitAsyncSearchWithPIT(pitId, "foo:bar", TimeValue.timeValueSeconds(10), authorizedUser); + assertOK(submit); + final Response resp = getAsyncSearch(extractResponseId(submit), authorizedUser); + assertOK(resp); + assertThat(getSearchHits(extractResponseId(resp), authorizedUser), arrayContainingInAnyOrder(hitMatcher)); + + String unauthorizedUser = randomValueOtherThan(authorizedUser, () -> randomFrom("user1", "user2")); + ResponseException exc = expectThrows(ResponseException.class, + () -> submitAsyncSearchWithPIT(pitId, "*:*", TimeValue.timeValueSeconds(10), unauthorizedUser)); + assertThat(exc.getResponse().getStatusLine().getStatusCode(), equalTo(403)); + assertThat(exc.getMessage(), containsString("unauthorized")); + + } finally { + closePointInTime(pitId, authorizedUser); + } + } + + public void testRejectPointInTimeWithIndices() throws Exception { + String authorizedUser = randomFrom("user1", "user2"); + final String pitId = openPointInTime(new String[]{"index-" + authorizedUser}, authorizedUser); + try { + final Request request = new Request("POST", "/_async_search"); + setRunAsHeader(request, authorizedUser); + request.addParameter("wait_for_completion_timeout", "true"); + request.addParameter("keep_on_completion", "true"); + if (randomBoolean()) { + request.addParameter("index", "index-" + authorizedUser); + } else { + request.addParameter("index", "*"); + } + final XContentBuilder requestBody = JsonXContent.contentBuilder() + .startObject() + .startObject("pit") + .field("id", pitId) + .field("keep_alive", "1m") + .endObject() + .endObject(); + request.setJsonEntity(Strings.toString(requestBody)); + final ResponseException exc = expectThrows(ResponseException.class, () -> client().performRequest(request)); + assertThat(exc.getResponse().getStatusLine().getStatusCode(), equalTo(400)); + assertThat(exc.getMessage(), containsString("[indices] cannot be used with point in time")); + } finally { + closePointInTime(pitId, authorizedUser); + } + } + + public void testSharingPointInTime() throws Exception { + final Matcher hitMatcher = new CustomMatcher("index") { + @Override + public boolean matches(Object actual) { + SearchHit hit = (SearchHit) actual; + return hit.getIndex().equals("index") && hit.getId().equals("0"); + } + }; + String firstUser = randomFrom("user1", "user2"); + final String pitId = openPointInTime(new String[]{"index"}, firstUser); + try { + { + Response firstSubmit = submitAsyncSearchWithPIT(pitId, "foo:bar", TimeValue.timeValueSeconds(10), firstUser); + assertOK(firstSubmit); + final Response firstResp = getAsyncSearch(extractResponseId(firstSubmit), firstUser); + assertOK(firstResp); + final SearchHit[] firstHits = getSearchHits(extractResponseId(firstResp), firstUser); + assertThat(firstHits, arrayContainingInAnyOrder(hitMatcher)); + } + { + String secondUser = randomValueOtherThan(firstUser, () -> randomFrom("user1", "user2")); + Response secondSubmit = submitAsyncSearchWithPIT(pitId, "foo:bar", TimeValue.timeValueSeconds(10), secondUser); + assertOK(secondSubmit); + final Response secondResp = getAsyncSearch(extractResponseId(secondSubmit), secondUser); + assertOK(secondResp); + final SearchHit[] secondHits = getSearchHits(extractResponseId(secondResp), secondUser); + assertThat(secondHits, arrayContainingInAnyOrder(hitMatcher)); + } + } finally { + closePointInTime(pitId, firstUser); + } + } + + public void testWithDLSPointInTime() throws Exception { + final String pitId = openPointInTime(new String[]{"index"}, "user1"); + try { + Response userResp = submitAsyncSearchWithPIT(pitId, "*", TimeValue.timeValueSeconds(10), "user1"); + assertOK(userResp); + assertThat(getSearchHits(extractResponseId(userResp), "user1"), arrayWithSize(3)); + + Response dlsResp = submitAsyncSearchWithPIT(pitId, "*", TimeValue.timeValueSeconds(10), "user_dls"); + assertOK(dlsResp); + assertThat(getSearchHits(extractResponseId(dlsResp), "user_dls"), arrayContainingInAnyOrder( + new CustomMatcher("\"index\" doc 1 matcher") { + @Override + public boolean matches(Object actual) { + SearchHit hit = (SearchHit) actual; + return "index".equals(hit.getIndex()) && + "1".equals(hit.getId()) && + hit.getSourceAsMap().isEmpty(); + } + }, + new CustomMatcher("\"index\" doc 2 matcher") { + @Override + public boolean matches(Object actual) { + SearchHit hit = (SearchHit) actual; + return "index".equals(hit.getIndex()) && + "2".equals(hit.getId()) && + "boo".equals(hit.getSourceAsMap().get("baz")); + } + })); + } finally { + closePointInTime(pitId, "user1"); + } + } + static String extractResponseId(Response response) throws IOException { Map map = toMap(response); return (String) map.get("id"); @@ -219,4 +347,42 @@ public class AsyncSearchSecurityIT extends ESRestTestCase { builder.addHeader(RUN_AS_USER_HEADER, user); request.setOptions(builder); } + + private String openPointInTime(String[] indexNames, String user) throws IOException { + final Request request = new Request("POST", "/_pit"); + request.addParameter("index", String.join(",", indexNames)); + setRunAsHeader(request, user); + request.addParameter("keep_alive", between(1, 5) + "m"); + final Response response = client().performRequest(request); + assertOK(response); + return (String) toMap(response).get("id"); + } + + static Response submitAsyncSearchWithPIT(String pit, String query, TimeValue waitForCompletion, String user) throws IOException { + final Request request = new Request("POST", "/_async_search"); + setRunAsHeader(request, user); + request.addParameter("wait_for_completion_timeout", waitForCompletion.toString()); + request.addParameter("q", query); + request.addParameter("keep_on_completion", "true"); + final XContentBuilder requestBody = JsonXContent.contentBuilder() + .startObject() + .startObject("pit") + .field("id", pit) + .field("keep_alive", "1m") + .endObject() + .endObject(); + request.setJsonEntity(Strings.toString(requestBody)); + return client().performRequest(request); + } + + private void closePointInTime(String pitId, String user) throws IOException { + final Request request = new Request("DELETE", "/_pit"); + setRunAsHeader(request, user); + final XContentBuilder requestBody = JsonXContent.contentBuilder() + .startObject() + .field("id", pitId) + .endObject(); + request.setJsonEntity(Strings.toString(requestBody)); + assertOK(client().performRequest(request)); + } } diff --git a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java index 3ac4dfb7624..116c6bf78c4 100644 --- a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java +++ b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java @@ -36,7 +36,11 @@ import org.elasticsearch.xpack.core.async.DeleteAsyncResultAction; import org.elasticsearch.xpack.core.async.DeleteAsyncResultRequest; import org.elasticsearch.xpack.core.async.GetAsyncResultRequest; import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse; +import org.elasticsearch.xpack.core.search.action.ClosePointInTimeAction; +import org.elasticsearch.xpack.core.search.action.ClosePointInTimeRequest; import org.elasticsearch.xpack.core.search.action.GetAsyncSearchAction; +import org.elasticsearch.xpack.core.search.action.OpenPointInTimeAction; +import org.elasticsearch.xpack.core.search.action.OpenPointInTimeRequest; import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchAction; import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchRequest; import org.elasticsearch.xpack.core.XPackClientPlugin; @@ -52,6 +56,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; import static org.elasticsearch.xpack.core.XPackPlugin.ASYNC_RESULTS_INDEX; import static org.elasticsearch.xpack.core.async.AsyncTaskMaintenanceService.ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING; @@ -220,7 +225,22 @@ public abstract class AsyncSearchIntegTestCase extends ESIntegTestCase { SearchSourceBuilder source, int numFailures, int progressStep) throws Exception { - SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(source, indexName); + final String pitId; + final SubmitAsyncSearchRequest request; + if (randomBoolean()) { + OpenPointInTimeRequest openPIT = new OpenPointInTimeRequest( + new String[]{indexName}, + OpenPointInTimeRequest.DEFAULT_INDICES_OPTIONS, + TimeValue.timeValueMinutes(between(1, 5)), + null, + null); + pitId = client().execute(OpenPointInTimeAction.INSTANCE, openPIT).actionGet().getSearchContextId(); + source.pointInTimeBuilder(new SearchSourceBuilder.PointInTimeBuilder(pitId, TimeValue.timeValueMinutes(1))); + request = new SubmitAsyncSearchRequest(source); + } else { + pitId = null; + request = new SubmitAsyncSearchRequest(source, indexName); + } request.setBatchedReduceSize(progressStep); request.setWaitForCompletionTimeout(TimeValue.timeValueMillis(1)); BlockingQueryBuilder.QueryLatch queryLatch = BlockingQueryBuilder.acquireQueryLatch(numFailures); @@ -236,6 +256,7 @@ public abstract class AsyncSearchIntegTestCase extends ESIntegTestCase { return new SearchResponseIterator() { private AsyncSearchResponse response = initial; private boolean isFirst = true; + private final AtomicBoolean closed = new AtomicBoolean(); @Override public boolean hasNext() { @@ -296,7 +317,12 @@ public abstract class AsyncSearchIntegTestCase extends ESIntegTestCase { @Override public void close() { - queryLatch.close(); + if (closed.compareAndSet(false, true)) { + if (pitId != null) { + client().execute(ClosePointInTimeAction.INSTANCE, new ClosePointInTimeRequest(pitId)).actionGet(); + } + queryLatch.close(); + } } }; } diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/RestSubmitAsyncSearchAction.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/RestSubmitAsyncSearchAction.java index 2b4eedc1c86..36dfd12134c 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/RestSubmitAsyncSearchAction.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/RestSubmitAsyncSearchAction.java @@ -49,7 +49,7 @@ public final class RestSubmitAsyncSearchAction extends BaseRestHandler { // pre_filter_shard_size and ccs_minimize_roundtrips get set to the search request although the REST spec don't list //them as supported. We rely on SubmitAsyncSearchRequest#validate to fail in case they are set. request.withContentOrSourceParamParserOrNull(parser -> - parseSearchRequest(submit.getSearchRequest(), request, parser, setSize)); + parseSearchRequest(submit.getSearchRequest(), request, parser, client.getNamedWriteableRegistry(), setSize)); if (request.hasParam("wait_for_completion_timeout")) { submit.setWaitForCompletionTimeout(request.paramAsTime("wait_for_completion_timeout", submit.getWaitForCompletionTimeout())); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/RestOpenPointInTimeAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/RestOpenPointInTimeAction.java index 113a478ad4f..5d69a7c2e8c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/RestOpenPointInTimeAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/RestOpenPointInTimeAction.java @@ -15,7 +15,7 @@ import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.action.RestToXContentListener; import java.io.IOException; -import java.util.Collections; +import java.util.Arrays; import java.util.List; import static org.elasticsearch.rest.RestRequest.Method.POST; @@ -29,7 +29,9 @@ public class RestOpenPointInTimeAction extends BaseRestHandler { @Override public List routes() { - return Collections.singletonList(new Route(POST, "/{index}/_pit")); + return Arrays.asList( + new Route(POST, "/{index}/_pit"), + new Route(POST, "/_pit")); } @Override diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/rest/RestRollupSearchAction.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/rest/RestRollupSearchAction.java index 2e9bee9f47b..281e6be9eed 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/rest/RestRollupSearchAction.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/rest/RestRollupSearchAction.java @@ -44,7 +44,8 @@ public class RestRollupSearchAction extends BaseRestHandler { protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { SearchRequest searchRequest = new SearchRequest(); restRequest.withContentOrSourceParamParserOrNull(parser -> - RestSearchAction.parseSearchRequest(searchRequest, restRequest, parser, size -> searchRequest.source().size(size))); + RestSearchAction.parseSearchRequest(searchRequest, restRequest, parser, + client.getNamedWriteableRegistry(), size -> searchRequest.source().size(size))); RestSearchAction.checkRestTotalHits(restRequest, searchRequest); return channel -> client.execute(RollupSearchAction.INSTANCE, searchRequest, new RestToXContentListener<>(channel)); } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/async_search/20-with-poin-in-time.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/async_search/20-with-poin-in-time.yml new file mode 100644 index 00000000000..e02ae478fc5 --- /dev/null +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/async_search/20-with-poin-in-time.yml @@ -0,0 +1,78 @@ +--- +"Async search with point in time": + - skip: + version: " - 7.99.99" + reason: "point in time is introduced in 8.0" + - do: + indices.create: + index: test-1 + body: + settings: + number_of_shards: "2" + + - do: + indices.create: + index: test-2 + body: + settings: + number_of_shards: "1" + + - do: + indices.create: + index: test-3 + body: + settings: + number_of_shards: "3" + + - do: + index: + index: test-2 + body: { max: 2 } + + - do: + index: + index: test-1 + body: { max: 1 } + + - do: + index: + index: test-3 + body: { max: 3 } + + - do: + indices.refresh: {} + + - do: + open_point_in_time: + index: test-* + keep_alive: 5m + - set: {id: point_in_time_id} + + - do: + async_search.submit: + batched_reduce_size: 2 + wait_for_completion_timeout: 10s + body: + query: + match_all: {} + aggs: + max: + max: + field: max + sort: max + pit: + id: "$point_in_time_id" + keep_alive: 1m + + - is_false: id + - match: { is_partial: false } + - length: { response.hits.hits: 3 } + - match: { response.hits.hits.0._source.max: 1 } + - match: { response.aggregations.max.value: 3.0 } + + - do: + close_point_in_time: + body: + id: "$point_in_time_id" + +