Support point in time in async_search (#61560)

This commit integrates point in time into async search and
ensures that it works correctly with security enabled.

Relates #61062
This commit is contained in:
Nhat Nguyen 2020-08-26 15:40:00 -04:00
parent 063a6d047c
commit 035f0638f4
19 changed files with 344 additions and 65 deletions

View File

@ -69,7 +69,8 @@ public class RestSearchTemplateAction extends BaseRestHandler {
public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
// Creates the search request with all required params // Creates the search request with all required params
SearchRequest searchRequest = new SearchRequest(); SearchRequest searchRequest = new SearchRequest();
RestSearchAction.parseSearchRequest(searchRequest, request, null, size -> searchRequest.source().size(size)); RestSearchAction.parseSearchRequest(
searchRequest, request, null, client.getNamedWriteableRegistry(), size -> searchRequest.source().size(size));
// Creates the search template request // Creates the search template request
SearchTemplateRequest searchTemplateRequest; SearchTemplateRequest searchTemplateRequest;

View File

@ -23,6 +23,7 @@ import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType; import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse; import org.elasticsearch.rest.BytesRestResponse;
@ -49,7 +50,7 @@ public abstract class AbstractBaseReindexRestHandler<
protected RestChannelConsumer doPrepareRequest(RestRequest request, NodeClient client, protected RestChannelConsumer doPrepareRequest(RestRequest request, NodeClient client,
boolean includeCreated, boolean includeUpdated) throws IOException { boolean includeCreated, boolean includeUpdated) throws IOException {
// Build the internal request // Build the internal request
Request internal = setCommonOptions(request, buildRequest(request)); Request internal = setCommonOptions(request, buildRequest(request, client.getNamedWriteableRegistry()));
// Executes the request and waits for completion // Executes the request and waits for completion
if (request.paramAsBoolean("wait_for_completion", true)) { if (request.paramAsBoolean("wait_for_completion", true)) {
@ -77,7 +78,7 @@ public abstract class AbstractBaseReindexRestHandler<
/** /**
* Build the Request based on the RestRequest. * Build the Request based on the RestRequest.
*/ */
protected abstract Request buildRequest(RestRequest request) throws IOException; protected abstract Request buildRequest(RestRequest request, NamedWriteableRegistry namedWriteableRegistry) throws IOException;
/** /**
* Sets common options of {@link AbstractBulkByScrollRequest} requests. * Sets common options of {@link AbstractBulkByScrollRequest} requests.

View File

@ -22,6 +22,7 @@ package org.elasticsearch.index.reindex;
import org.elasticsearch.action.ActionType; import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
@ -44,7 +45,7 @@ public abstract class AbstractBulkByQueryRestHandler<
super(action); super(action);
} }
protected void parseInternalRequest(Request internal, RestRequest restRequest, protected void parseInternalRequest(Request internal, RestRequest restRequest, NamedWriteableRegistry namedWriteableRegistry,
Map<String, Consumer<Object>> bodyConsumers) throws IOException { Map<String, Consumer<Object>> bodyConsumers) throws IOException {
assert internal != null : "Request should not be null"; assert internal != null : "Request should not be null";
assert restRequest != null : "RestRequest should not be null"; assert restRequest != null : "RestRequest should not be null";
@ -52,7 +53,8 @@ public abstract class AbstractBulkByQueryRestHandler<
SearchRequest searchRequest = internal.getSearchRequest(); SearchRequest searchRequest = internal.getSearchRequest();
try (XContentParser parser = extractRequestSpecificFields(restRequest, bodyConsumers)) { try (XContentParser parser = extractRequestSpecificFields(restRequest, bodyConsumers)) {
RestSearchAction.parseSearchRequest(searchRequest, restRequest, parser, size -> setMaxDocsFromSearchSize(internal, size)); RestSearchAction.parseSearchRequest(
searchRequest, restRequest, parser, namedWriteableRegistry, size -> setMaxDocsFromSearchSize(internal, size));
} }
searchRequest.source().size(restRequest.paramAsInt("scroll_size", searchRequest.source().size())); searchRequest.source().size(restRequest.paramAsInt("scroll_size", searchRequest.source().size()));

View File

@ -20,6 +20,7 @@
package org.elasticsearch.index.reindex; package org.elasticsearch.index.reindex;
import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestRequest;
import java.io.IOException; import java.io.IOException;
@ -56,7 +57,7 @@ public class RestDeleteByQueryAction extends AbstractBulkByQueryRestHandler<Dele
} }
@Override @Override
protected DeleteByQueryRequest buildRequest(RestRequest request) throws IOException { protected DeleteByQueryRequest buildRequest(RestRequest request, NamedWriteableRegistry namedWriteableRegistry) throws IOException {
/* /*
* Passing the search request through DeleteByQueryRequest first allows * Passing the search request through DeleteByQueryRequest first allows
* it to set its own defaults which differ from SearchRequest's * it to set its own defaults which differ from SearchRequest's
@ -68,7 +69,7 @@ public class RestDeleteByQueryAction extends AbstractBulkByQueryRestHandler<Dele
consumers.put("conflicts", o -> internal.setConflicts((String) o)); consumers.put("conflicts", o -> internal.setConflicts((String) o));
consumers.put("max_docs", s -> setMaxDocsValidateIdentical(internal, ((Number) s).intValue())); consumers.put("max_docs", s -> setMaxDocsValidateIdentical(internal, ((Number) s).intValue()));
parseInternalRequest(internal, request, consumers); parseInternalRequest(internal, request, namedWriteableRegistry, consumers);
return internal; return internal;
} }

View File

@ -21,6 +21,7 @@ package org.elasticsearch.index.reindex;
import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestRequest;
@ -56,7 +57,7 @@ public class RestReindexAction extends AbstractBaseReindexRestHandler<ReindexReq
} }
@Override @Override
protected ReindexRequest buildRequest(RestRequest request) throws IOException { protected ReindexRequest buildRequest(RestRequest request, NamedWriteableRegistry namedWriteableRegistry) throws IOException {
if (request.hasParam("pipeline")) { if (request.hasParam("pipeline")) {
throw new IllegalArgumentException("_reindex doesn't support [pipeline] as a query parameter. " throw new IllegalArgumentException("_reindex doesn't support [pipeline] as a query parameter. "
+ "Specify it in the [dest] object instead."); + "Specify it in the [dest] object instead.");

View File

@ -20,6 +20,7 @@
package org.elasticsearch.index.reindex; package org.elasticsearch.index.reindex;
import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.script.Script; import org.elasticsearch.script.Script;
@ -57,7 +58,7 @@ public class RestUpdateByQueryAction extends AbstractBulkByQueryRestHandler<Upda
} }
@Override @Override
protected UpdateByQueryRequest buildRequest(RestRequest request) throws IOException { protected UpdateByQueryRequest buildRequest(RestRequest request, NamedWriteableRegistry namedWriteableRegistry) throws IOException {
/* /*
* Passing the search request through UpdateByQueryRequest first allows * Passing the search request through UpdateByQueryRequest first allows
* it to set its own defaults which differ from SearchRequest's * it to set its own defaults which differ from SearchRequest's
@ -70,7 +71,7 @@ public class RestUpdateByQueryAction extends AbstractBulkByQueryRestHandler<Upda
consumers.put("script", o -> internal.setScript(Script.parse(o))); consumers.put("script", o -> internal.setScript(Script.parse(o)));
consumers.put("max_docs", s -> setMaxDocsValidateIdentical(internal, ((Number) s).intValue())); consumers.put("max_docs", s -> setMaxDocsValidateIdentical(internal, ((Number) s).intValue()));
parseInternalRequest(internal, request, consumers); parseInternalRequest(internal, request, namedWriteableRegistry, consumers);
internal.setPipeline(request.param("pipeline")); internal.setPipeline(request.param("pipeline"));
return internal; return internal;

View File

@ -48,7 +48,7 @@ public class RestDeleteByQueryActionTests extends RestActionTestCase {
// checks the type in the URL is propagated correctly to the request object // checks the type in the URL is propagated correctly to the request object
// only works after the request is dispatched, so its params are filled from url. // only works after the request is dispatched, so its params are filled from url.
DeleteByQueryRequest dbqRequest = action.buildRequest(request); DeleteByQueryRequest dbqRequest = action.buildRequest(request, DEFAULT_NAMED_WRITABLE_REGISTRY);
assertArrayEquals(new String[]{"some_type"}, dbqRequest.getDocTypes()); assertArrayEquals(new String[]{"some_type"}, dbqRequest.getDocTypes());
// RestDeleteByQueryAction itself doesn't check for a deprecated type usage // RestDeleteByQueryAction itself doesn't check for a deprecated type usage
@ -57,7 +57,8 @@ public class RestDeleteByQueryActionTests extends RestActionTestCase {
} }
public void testParseEmpty() throws IOException { public void testParseEmpty() throws IOException {
DeleteByQueryRequest request = action.buildRequest(new FakeRestRequest.Builder(new NamedXContentRegistry(emptyList())).build()); final FakeRestRequest restRequest = new FakeRestRequest.Builder(new NamedXContentRegistry(emptyList())).build();
DeleteByQueryRequest request = action.buildRequest(restRequest, DEFAULT_NAMED_WRITABLE_REGISTRY);
assertEquals(AbstractBulkByScrollRequest.SIZE_ALL_MATCHES, request.getSize()); assertEquals(AbstractBulkByScrollRequest.SIZE_ALL_MATCHES, request.getSize());
assertEquals(AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE, request.getSearchRequest().source().size()); assertEquals(AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE, request.getSearchRequest().source().size());
} }

View File

@ -21,6 +21,7 @@ package org.elasticsearch.index.reindex;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.common.xcontent.json.JsonXContent;
@ -30,6 +31,7 @@ import org.elasticsearch.test.rest.RestActionTestCase;
import org.junit.Before; import org.junit.Before;
import java.io.IOException; import java.io.IOException;
import java.util.Collections;
import java.util.Arrays; import java.util.Arrays;
import static java.util.Collections.singletonMap; import static java.util.Collections.singletonMap;
@ -61,7 +63,8 @@ public class RestReindexActionTests extends RestActionTestCase {
request.withContent(BytesReference.bytes(body), body.contentType()); request.withContent(BytesReference.bytes(body), body.contentType());
} }
request.withParams(singletonMap("pipeline", "doesn't matter")); request.withParams(singletonMap("pipeline", "doesn't matter"));
Exception e = expectThrows(IllegalArgumentException.class, () -> action.buildRequest(request.build())); Exception e = expectThrows(IllegalArgumentException.class, () ->
action.buildRequest(request.build(), new NamedWriteableRegistry(Collections.emptyList())));
assertEquals("_reindex doesn't support [pipeline] as a query parameter. Specify it in the [dest] object instead.", e.getMessage()); assertEquals("_reindex doesn't support [pipeline] as a query parameter. Specify it in the [dest] object instead.", e.getMessage());
} }
@ -70,14 +73,14 @@ public class RestReindexActionTests extends RestActionTestCase {
{ {
FakeRestRequest.Builder requestBuilder = new FakeRestRequest.Builder(xContentRegistry()); FakeRestRequest.Builder requestBuilder = new FakeRestRequest.Builder(xContentRegistry());
requestBuilder.withContent(new BytesArray("{}"), XContentType.JSON); requestBuilder.withContent(new BytesArray("{}"), XContentType.JSON);
ReindexRequest request = action.buildRequest(requestBuilder.build()); ReindexRequest request = action.buildRequest(requestBuilder.build(), new NamedWriteableRegistry(Collections.emptyList()));
assertEquals(AbstractBulkByScrollRequest.DEFAULT_SCROLL_TIMEOUT, request.getScrollTime()); assertEquals(AbstractBulkByScrollRequest.DEFAULT_SCROLL_TIMEOUT, request.getScrollTime());
} }
{ {
FakeRestRequest.Builder requestBuilder = new FakeRestRequest.Builder(xContentRegistry()); FakeRestRequest.Builder requestBuilder = new FakeRestRequest.Builder(xContentRegistry());
requestBuilder.withParams(singletonMap("scroll", "10m")); requestBuilder.withParams(singletonMap("scroll", "10m"));
requestBuilder.withContent(new BytesArray("{}"), XContentType.JSON); requestBuilder.withContent(new BytesArray("{}"), XContentType.JSON);
ReindexRequest request = action.buildRequest(requestBuilder.build()); ReindexRequest request = action.buildRequest(requestBuilder.build(), new NamedWriteableRegistry(Collections.emptyList()));
assertEquals("10m", request.getScrollTime().toString()); assertEquals("10m", request.getScrollTime().toString());
} }
} }

View File

@ -49,7 +49,7 @@ public class RestUpdateByQueryActionTests extends RestActionTestCase {
// checks the type in the URL is propagated correctly to the request object // checks the type in the URL is propagated correctly to the request object
// only works after the request is dispatched, so its params are filled from url. // only works after the request is dispatched, so its params are filled from url.
UpdateByQueryRequest ubqRequest = action.buildRequest(request); UpdateByQueryRequest ubqRequest = action.buildRequest(request, DEFAULT_NAMED_WRITABLE_REGISTRY);
assertArrayEquals(new String[]{"some_type"}, ubqRequest.getDocTypes()); assertArrayEquals(new String[]{"some_type"}, ubqRequest.getDocTypes());
// RestUpdateByQueryAction itself doesn't check for a deprecated type usage // RestUpdateByQueryAction itself doesn't check for a deprecated type usage
@ -58,7 +58,8 @@ public class RestUpdateByQueryActionTests extends RestActionTestCase {
} }
public void testParseEmpty() throws IOException { public void testParseEmpty() throws IOException {
UpdateByQueryRequest request = action.buildRequest(new FakeRestRequest.Builder(new NamedXContentRegistry(emptyList())).build()); final FakeRestRequest restRequest = new FakeRestRequest.Builder(new NamedXContentRegistry(emptyList())).build();
UpdateByQueryRequest request = action.buildRequest(restRequest, DEFAULT_NAMED_WRITABLE_REGISTRY);
assertEquals(AbstractBulkByScrollRequest.SIZE_ALL_MATCHES, request.getSize()); assertEquals(AbstractBulkByScrollRequest.SIZE_ALL_MATCHES, request.getSize());
assertEquals(AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE, request.getSearchRequest().source().size()); assertEquals(AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE, request.getSearchRequest().source().size());
} }

View File

@ -295,12 +295,6 @@ public class SearchRequest extends ActionRequest implements IndicesRequest.Repla
if (scroll) { if (scroll) {
validationException = addValidationError("using [point in time] is not allowed in a scroll context", validationException); validationException = addValidationError("using [point in time] is not allowed in a scroll context", validationException);
} }
if (routing() != null) {
validationException = addValidationError("[routing] cannot be used with point in time", validationException);
}
if (preference() != null) {
validationException = addValidationError("[preference] cannot be used with point in time", validationException);
}
} }
return validationException; return validationException;
} }

View File

@ -19,6 +19,8 @@
package org.elasticsearch.rest.action.search; package org.elasticsearch.rest.action.search;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchContextId; import org.elasticsearch.action.search.SearchContextId;
import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequest;
@ -54,6 +56,7 @@ import java.util.function.IntConsumer;
import static java.util.Arrays.asList; import static java.util.Arrays.asList;
import static java.util.Collections.unmodifiableList; import static java.util.Collections.unmodifiableList;
import static org.elasticsearch.action.ValidateActions.addValidationError;
import static org.elasticsearch.common.unit.TimeValue.parseTimeValue; import static org.elasticsearch.common.unit.TimeValue.parseTimeValue;
import static org.elasticsearch.rest.RestRequest.Method.GET; import static org.elasticsearch.rest.RestRequest.Method.GET;
import static org.elasticsearch.rest.RestRequest.Method.POST; import static org.elasticsearch.rest.RestRequest.Method.POST;
@ -110,12 +113,8 @@ public class RestSearchAction extends BaseRestHandler {
* company. * company.
*/ */
IntConsumer setSize = size -> searchRequest.source().size(size); IntConsumer setSize = size -> searchRequest.source().size(size);
request.withContentOrSourceParamParserOrNull(parser -> { request.withContentOrSourceParamParserOrNull(parser ->
parseSearchRequest(searchRequest, request, parser, setSize); parseSearchRequest(searchRequest, request, parser, client.getNamedWriteableRegistry(), setSize));
if (searchRequest.pointInTimeBuilder() != null) {
preparePointInTime(searchRequest, client.getNamedWriteableRegistry());
}
});
return channel -> { return channel -> {
RestCancellableNodeClient cancelClient = new RestCancellableNodeClient(client, request.getHttpChannel()); RestCancellableNodeClient cancelClient = new RestCancellableNodeClient(client, request.getHttpChannel());
@ -132,6 +131,7 @@ public class RestSearchAction extends BaseRestHandler {
*/ */
public static void parseSearchRequest(SearchRequest searchRequest, RestRequest request, public static void parseSearchRequest(SearchRequest searchRequest, RestRequest request,
XContentParser requestContentParser, XContentParser requestContentParser,
NamedWriteableRegistry namedWriteableRegistry,
IntConsumer setSize) throws IOException { IntConsumer setSize) throws IOException {
if (searchRequest.source() == null) { if (searchRequest.source() == null) {
@ -189,6 +189,10 @@ public class RestSearchAction extends BaseRestHandler {
searchRequest.setCcsMinimizeRoundtrips(request.paramAsBoolean("ccs_minimize_roundtrips", searchRequest.isCcsMinimizeRoundtrips())); searchRequest.setCcsMinimizeRoundtrips(request.paramAsBoolean("ccs_minimize_roundtrips", searchRequest.isCcsMinimizeRoundtrips()));
checkRestTotalHits(request, searchRequest); checkRestTotalHits(request, searchRequest);
if (searchRequest.pointInTimeBuilder() != null) {
preparePointInTime(searchRequest, namedWriteableRegistry);
}
} }
/** /**
@ -306,6 +310,21 @@ public class RestSearchAction extends BaseRestHandler {
static void preparePointInTime(SearchRequest request, NamedWriteableRegistry namedWriteableRegistry) { static void preparePointInTime(SearchRequest request, NamedWriteableRegistry namedWriteableRegistry) {
assert request.pointInTimeBuilder() != null; assert request.pointInTimeBuilder() != null;
ActionRequestValidationException validationException = null;
if (request.indices().length > 0) {
validationException = addValidationError("[indices] cannot be used with point in time", validationException);
}
if (request.indicesOptions() != SearchRequest.DEFAULT_INDICES_OPTIONS) {
validationException = addValidationError("[indicesOptions] cannot be used with point in time", validationException);
}
if (request.routing() != null) {
validationException = addValidationError("[routing] cannot be used with point in time", validationException);
}
if (request.preference() != null) {
validationException = addValidationError("[preference] cannot be used with point in time", validationException);
}
ExceptionsHelper.reThrowIfNotNull(validationException);
final IndicesOptions indicesOptions = request.indicesOptions(); final IndicesOptions indicesOptions = request.indicesOptions();
final IndicesOptions stricterIndicesOptions = IndicesOptions.fromOptions( final IndicesOptions stricterIndicesOptions = IndicesOptions.fromOptions(
indicesOptions.ignoreUnavailable(), indicesOptions.allowNoIndices(), false, false, false, indicesOptions.ignoreUnavailable(), indicesOptions.allowNoIndices(), false, false, false,

View File

@ -216,28 +216,6 @@ public class SearchRequestTests extends AbstractSearchTestCase {
assertEquals(1, validationErrors.validationErrors().size()); assertEquals(1, validationErrors.validationErrors().size());
assertEquals("using [point in time] is not allowed in a scroll context", validationErrors.validationErrors().get(0)); assertEquals("using [point in time] is not allowed in a scroll context", validationErrors.validationErrors().get(0));
} }
{
// Reader context with preference
SearchRequest searchRequest = new SearchRequest()
.source(new SearchSourceBuilder().
pointInTimeBuilder(new SearchSourceBuilder.PointInTimeBuilder("id", TimeValue.timeValueMillis(between(1, 10)))))
.preference("test");
ActionRequestValidationException validationErrors = searchRequest.validate();
assertNotNull(validationErrors);
assertEquals(1, validationErrors.validationErrors().size());
assertEquals("[preference] cannot be used with point in time", validationErrors.validationErrors().get(0));
}
{
// Reader context with routing
SearchRequest searchRequest = new SearchRequest()
.source(new SearchSourceBuilder()
.pointInTimeBuilder(new SearchSourceBuilder.PointInTimeBuilder("id", TimeValue.timeValueMillis(between(1, 10)))))
.routing("test");
ActionRequestValidationException validationErrors = searchRequest.validate();
assertNotNull(validationErrors);
assertEquals(1, validationErrors.validationErrors().size());
assertEquals("[routing] cannot be used with point in time", validationErrors.validationErrors().get(0));
}
} }
public void testCopyConstructor() throws IOException { public void testCopyConstructor() throws IOException {

View File

@ -1348,6 +1348,9 @@ public abstract class ESTestCase extends LuceneTestCase {
private static final NamedXContentRegistry DEFAULT_NAMED_X_CONTENT_REGISTRY = private static final NamedXContentRegistry DEFAULT_NAMED_X_CONTENT_REGISTRY =
new NamedXContentRegistry(ClusterModule.getNamedXWriteables()); new NamedXContentRegistry(ClusterModule.getNamedXWriteables());
protected static final NamedWriteableRegistry DEFAULT_NAMED_WRITABLE_REGISTRY =
new NamedWriteableRegistry(ClusterModule.getNamedWriteables());
/** /**
* The {@link NamedXContentRegistry} to use for this test. Subclasses should override and use liberally. * The {@link NamedXContentRegistry} to use for this test. Subclasses should override and use liberally.
*/ */

View File

@ -28,6 +28,7 @@ import org.elasticsearch.search.SearchHit;
import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.xpack.core.async.AsyncExecutionId; import org.elasticsearch.xpack.core.async.AsyncExecutionId;
import org.hamcrest.CustomMatcher; import org.hamcrest.CustomMatcher;
import org.hamcrest.Matcher;
import org.junit.Before; import org.junit.Before;
import java.io.IOException; import java.io.IOException;
@ -39,6 +40,7 @@ import static org.elasticsearch.xpack.core.XPackPlugin.ASYNC_RESULTS_INDEX;
import static org.elasticsearch.xpack.core.security.authc.AuthenticationServiceField.RUN_AS_USER_HEADER; import static org.elasticsearch.xpack.core.security.authc.AuthenticationServiceField.RUN_AS_USER_HEADER;
import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue; import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
import static org.hamcrest.Matchers.arrayContainingInAnyOrder; import static org.hamcrest.Matchers.arrayContainingInAnyOrder;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
@ -75,14 +77,7 @@ public class AsyncSearchSecurityIT extends ESRestTestCase {
public void testWithDlsAndFls() throws Exception { public void testWithDlsAndFls() throws Exception {
Response submitResp = submitAsyncSearch("*", "*", TimeValue.timeValueSeconds(10), "user_dls"); Response submitResp = submitAsyncSearch("*", "*", TimeValue.timeValueSeconds(10), "user_dls");
assertOK(submitResp); assertOK(submitResp);
String id = extractResponseId(submitResp); SearchHit[] hits = getSearchHits(extractResponseId(submitResp), "user_dls");
Response getResp = getAsyncSearch(id, "user_dls");
AsyncSearchResponse searchResponse = AsyncSearchResponse.fromXContent(XContentHelper.createParser(NamedXContentRegistry.EMPTY,
LoggingDeprecationHandler.INSTANCE,
new BytesArray(EntityUtils.toByteArray(getResp.getEntity())),
XContentType.JSON));
SearchHit[] hits = searchResponse.getSearchResponse().getHits().getHits();
assertThat(hits, arrayContainingInAnyOrder( assertThat(hits, arrayContainingInAnyOrder(
new CustomMatcher<SearchHit>("\"index\" doc 1 matcher") { new CustomMatcher<SearchHit>("\"index\" doc 1 matcher") {
@Override @Override
@ -151,6 +146,139 @@ public class AsyncSearchSecurityIT extends ESRestTestCase {
assertThat(exc.getMessage(), containsString("unauthorized")); assertThat(exc.getMessage(), containsString("unauthorized"));
} }
private SearchHit[] getSearchHits(String asyncId, String user) throws IOException {
final Response resp = getAsyncSearch(asyncId, user);
assertOK(resp);
AsyncSearchResponse searchResponse = AsyncSearchResponse.fromXContent(XContentHelper.createParser(NamedXContentRegistry.EMPTY,
LoggingDeprecationHandler.INSTANCE,
new BytesArray(EntityUtils.toByteArray(resp.getEntity())),
XContentType.JSON));
return searchResponse.getSearchResponse().getHits().getHits();
}
public void testAuthorizationOfPointInTime() throws Exception {
String authorizedUser = randomFrom("user1", "user2");
final Matcher<SearchHit> hitMatcher = new CustomMatcher<SearchHit>("hit") {
@Override
public boolean matches(Object actual) {
SearchHit hit = (SearchHit) actual;
return hit.getIndex().equals("index-" + authorizedUser) && hit.getId().equals("0");
}
};
final String pitId = openPointInTime(new String[]{"index-" + authorizedUser}, authorizedUser);
try {
Response submit = submitAsyncSearchWithPIT(pitId, "foo:bar", TimeValue.timeValueSeconds(10), authorizedUser);
assertOK(submit);
final Response resp = getAsyncSearch(extractResponseId(submit), authorizedUser);
assertOK(resp);
assertThat(getSearchHits(extractResponseId(resp), authorizedUser), arrayContainingInAnyOrder(hitMatcher));
String unauthorizedUser = randomValueOtherThan(authorizedUser, () -> randomFrom("user1", "user2"));
ResponseException exc = expectThrows(ResponseException.class,
() -> submitAsyncSearchWithPIT(pitId, "*:*", TimeValue.timeValueSeconds(10), unauthorizedUser));
assertThat(exc.getResponse().getStatusLine().getStatusCode(), equalTo(403));
assertThat(exc.getMessage(), containsString("unauthorized"));
} finally {
closePointInTime(pitId, authorizedUser);
}
}
public void testRejectPointInTimeWithIndices() throws Exception {
String authorizedUser = randomFrom("user1", "user2");
final String pitId = openPointInTime(new String[]{"index-" + authorizedUser}, authorizedUser);
try {
final Request request = new Request("POST", "/_async_search");
setRunAsHeader(request, authorizedUser);
request.addParameter("wait_for_completion_timeout", "true");
request.addParameter("keep_on_completion", "true");
if (randomBoolean()) {
request.addParameter("index", "index-" + authorizedUser);
} else {
request.addParameter("index", "*");
}
final XContentBuilder requestBody = JsonXContent.contentBuilder()
.startObject()
.startObject("pit")
.field("id", pitId)
.field("keep_alive", "1m")
.endObject()
.endObject();
request.setJsonEntity(Strings.toString(requestBody));
final ResponseException exc = expectThrows(ResponseException.class, () -> client().performRequest(request));
assertThat(exc.getResponse().getStatusLine().getStatusCode(), equalTo(400));
assertThat(exc.getMessage(), containsString("[indices] cannot be used with point in time"));
} finally {
closePointInTime(pitId, authorizedUser);
}
}
public void testSharingPointInTime() throws Exception {
final Matcher<SearchHit> hitMatcher = new CustomMatcher<SearchHit>("index") {
@Override
public boolean matches(Object actual) {
SearchHit hit = (SearchHit) actual;
return hit.getIndex().equals("index") && hit.getId().equals("0");
}
};
String firstUser = randomFrom("user1", "user2");
final String pitId = openPointInTime(new String[]{"index"}, firstUser);
try {
{
Response firstSubmit = submitAsyncSearchWithPIT(pitId, "foo:bar", TimeValue.timeValueSeconds(10), firstUser);
assertOK(firstSubmit);
final Response firstResp = getAsyncSearch(extractResponseId(firstSubmit), firstUser);
assertOK(firstResp);
final SearchHit[] firstHits = getSearchHits(extractResponseId(firstResp), firstUser);
assertThat(firstHits, arrayContainingInAnyOrder(hitMatcher));
}
{
String secondUser = randomValueOtherThan(firstUser, () -> randomFrom("user1", "user2"));
Response secondSubmit = submitAsyncSearchWithPIT(pitId, "foo:bar", TimeValue.timeValueSeconds(10), secondUser);
assertOK(secondSubmit);
final Response secondResp = getAsyncSearch(extractResponseId(secondSubmit), secondUser);
assertOK(secondResp);
final SearchHit[] secondHits = getSearchHits(extractResponseId(secondResp), secondUser);
assertThat(secondHits, arrayContainingInAnyOrder(hitMatcher));
}
} finally {
closePointInTime(pitId, firstUser);
}
}
public void testWithDLSPointInTime() throws Exception {
final String pitId = openPointInTime(new String[]{"index"}, "user1");
try {
Response userResp = submitAsyncSearchWithPIT(pitId, "*", TimeValue.timeValueSeconds(10), "user1");
assertOK(userResp);
assertThat(getSearchHits(extractResponseId(userResp), "user1"), arrayWithSize(3));
Response dlsResp = submitAsyncSearchWithPIT(pitId, "*", TimeValue.timeValueSeconds(10), "user_dls");
assertOK(dlsResp);
assertThat(getSearchHits(extractResponseId(dlsResp), "user_dls"), arrayContainingInAnyOrder(
new CustomMatcher<SearchHit>("\"index\" doc 1 matcher") {
@Override
public boolean matches(Object actual) {
SearchHit hit = (SearchHit) actual;
return "index".equals(hit.getIndex()) &&
"1".equals(hit.getId()) &&
hit.getSourceAsMap().isEmpty();
}
},
new CustomMatcher<SearchHit>("\"index\" doc 2 matcher") {
@Override
public boolean matches(Object actual) {
SearchHit hit = (SearchHit) actual;
return "index".equals(hit.getIndex()) &&
"2".equals(hit.getId()) &&
"boo".equals(hit.getSourceAsMap().get("baz"));
}
}));
} finally {
closePointInTime(pitId, "user1");
}
}
static String extractResponseId(Response response) throws IOException { static String extractResponseId(Response response) throws IOException {
Map<String, Object> map = toMap(response); Map<String, Object> map = toMap(response);
return (String) map.get("id"); return (String) map.get("id");
@ -219,4 +347,42 @@ public class AsyncSearchSecurityIT extends ESRestTestCase {
builder.addHeader(RUN_AS_USER_HEADER, user); builder.addHeader(RUN_AS_USER_HEADER, user);
request.setOptions(builder); request.setOptions(builder);
} }
private String openPointInTime(String[] indexNames, String user) throws IOException {
final Request request = new Request("POST", "/_pit");
request.addParameter("index", String.join(",", indexNames));
setRunAsHeader(request, user);
request.addParameter("keep_alive", between(1, 5) + "m");
final Response response = client().performRequest(request);
assertOK(response);
return (String) toMap(response).get("id");
}
static Response submitAsyncSearchWithPIT(String pit, String query, TimeValue waitForCompletion, String user) throws IOException {
final Request request = new Request("POST", "/_async_search");
setRunAsHeader(request, user);
request.addParameter("wait_for_completion_timeout", waitForCompletion.toString());
request.addParameter("q", query);
request.addParameter("keep_on_completion", "true");
final XContentBuilder requestBody = JsonXContent.contentBuilder()
.startObject()
.startObject("pit")
.field("id", pit)
.field("keep_alive", "1m")
.endObject()
.endObject();
request.setJsonEntity(Strings.toString(requestBody));
return client().performRequest(request);
}
private void closePointInTime(String pitId, String user) throws IOException {
final Request request = new Request("DELETE", "/_pit");
setRunAsHeader(request, user);
final XContentBuilder requestBody = JsonXContent.contentBuilder()
.startObject()
.field("id", pitId)
.endObject();
request.setJsonEntity(Strings.toString(requestBody));
assertOK(client().performRequest(request));
}
} }

View File

@ -36,7 +36,11 @@ import org.elasticsearch.xpack.core.async.DeleteAsyncResultAction;
import org.elasticsearch.xpack.core.async.DeleteAsyncResultRequest; import org.elasticsearch.xpack.core.async.DeleteAsyncResultRequest;
import org.elasticsearch.xpack.core.async.GetAsyncResultRequest; import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse; import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
import org.elasticsearch.xpack.core.search.action.ClosePointInTimeAction;
import org.elasticsearch.xpack.core.search.action.ClosePointInTimeRequest;
import org.elasticsearch.xpack.core.search.action.GetAsyncSearchAction; import org.elasticsearch.xpack.core.search.action.GetAsyncSearchAction;
import org.elasticsearch.xpack.core.search.action.OpenPointInTimeAction;
import org.elasticsearch.xpack.core.search.action.OpenPointInTimeRequest;
import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchAction; import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchAction;
import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchRequest; import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchRequest;
import org.elasticsearch.xpack.core.XPackClientPlugin; import org.elasticsearch.xpack.core.XPackClientPlugin;
@ -52,6 +56,7 @@ import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.elasticsearch.xpack.core.XPackPlugin.ASYNC_RESULTS_INDEX; import static org.elasticsearch.xpack.core.XPackPlugin.ASYNC_RESULTS_INDEX;
import static org.elasticsearch.xpack.core.async.AsyncTaskMaintenanceService.ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING; import static org.elasticsearch.xpack.core.async.AsyncTaskMaintenanceService.ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING;
@ -220,7 +225,22 @@ public abstract class AsyncSearchIntegTestCase extends ESIntegTestCase {
SearchSourceBuilder source, SearchSourceBuilder source,
int numFailures, int numFailures,
int progressStep) throws Exception { int progressStep) throws Exception {
SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(source, indexName); final String pitId;
final SubmitAsyncSearchRequest request;
if (randomBoolean()) {
OpenPointInTimeRequest openPIT = new OpenPointInTimeRequest(
new String[]{indexName},
OpenPointInTimeRequest.DEFAULT_INDICES_OPTIONS,
TimeValue.timeValueMinutes(between(1, 5)),
null,
null);
pitId = client().execute(OpenPointInTimeAction.INSTANCE, openPIT).actionGet().getSearchContextId();
source.pointInTimeBuilder(new SearchSourceBuilder.PointInTimeBuilder(pitId, TimeValue.timeValueMinutes(1)));
request = new SubmitAsyncSearchRequest(source);
} else {
pitId = null;
request = new SubmitAsyncSearchRequest(source, indexName);
}
request.setBatchedReduceSize(progressStep); request.setBatchedReduceSize(progressStep);
request.setWaitForCompletionTimeout(TimeValue.timeValueMillis(1)); request.setWaitForCompletionTimeout(TimeValue.timeValueMillis(1));
BlockingQueryBuilder.QueryLatch queryLatch = BlockingQueryBuilder.acquireQueryLatch(numFailures); BlockingQueryBuilder.QueryLatch queryLatch = BlockingQueryBuilder.acquireQueryLatch(numFailures);
@ -236,6 +256,7 @@ public abstract class AsyncSearchIntegTestCase extends ESIntegTestCase {
return new SearchResponseIterator() { return new SearchResponseIterator() {
private AsyncSearchResponse response = initial; private AsyncSearchResponse response = initial;
private boolean isFirst = true; private boolean isFirst = true;
private final AtomicBoolean closed = new AtomicBoolean();
@Override @Override
public boolean hasNext() { public boolean hasNext() {
@ -296,8 +317,13 @@ public abstract class AsyncSearchIntegTestCase extends ESIntegTestCase {
@Override @Override
public void close() { public void close() {
if (closed.compareAndSet(false, true)) {
if (pitId != null) {
client().execute(ClosePointInTimeAction.INSTANCE, new ClosePointInTimeRequest(pitId)).actionGet();
}
queryLatch.close(); queryLatch.close();
} }
}
}; };
} }
} }

View File

@ -49,7 +49,7 @@ public final class RestSubmitAsyncSearchAction extends BaseRestHandler {
// pre_filter_shard_size and ccs_minimize_roundtrips get set to the search request although the REST spec don't list // pre_filter_shard_size and ccs_minimize_roundtrips get set to the search request although the REST spec don't list
//them as supported. We rely on SubmitAsyncSearchRequest#validate to fail in case they are set. //them as supported. We rely on SubmitAsyncSearchRequest#validate to fail in case they are set.
request.withContentOrSourceParamParserOrNull(parser -> request.withContentOrSourceParamParserOrNull(parser ->
parseSearchRequest(submit.getSearchRequest(), request, parser, setSize)); parseSearchRequest(submit.getSearchRequest(), request, parser, client.getNamedWriteableRegistry(), setSize));
if (request.hasParam("wait_for_completion_timeout")) { if (request.hasParam("wait_for_completion_timeout")) {
submit.setWaitForCompletionTimeout(request.paramAsTime("wait_for_completion_timeout", submit.getWaitForCompletionTimeout())); submit.setWaitForCompletionTimeout(request.paramAsTime("wait_for_completion_timeout", submit.getWaitForCompletionTimeout()));

View File

@ -15,7 +15,7 @@ import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener; import org.elasticsearch.rest.action.RestToXContentListener;
import java.io.IOException; import java.io.IOException;
import java.util.Collections; import java.util.Arrays;
import java.util.List; import java.util.List;
import static org.elasticsearch.rest.RestRequest.Method.POST; import static org.elasticsearch.rest.RestRequest.Method.POST;
@ -29,7 +29,9 @@ public class RestOpenPointInTimeAction extends BaseRestHandler {
@Override @Override
public List<Route> routes() { public List<Route> routes() {
return Collections.singletonList(new Route(POST, "/{index}/_pit")); return Arrays.asList(
new Route(POST, "/{index}/_pit"),
new Route(POST, "/_pit"));
} }
@Override @Override

View File

@ -44,7 +44,8 @@ public class RestRollupSearchAction extends BaseRestHandler {
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
SearchRequest searchRequest = new SearchRequest(); SearchRequest searchRequest = new SearchRequest();
restRequest.withContentOrSourceParamParserOrNull(parser -> restRequest.withContentOrSourceParamParserOrNull(parser ->
RestSearchAction.parseSearchRequest(searchRequest, restRequest, parser, size -> searchRequest.source().size(size))); RestSearchAction.parseSearchRequest(searchRequest, restRequest, parser,
client.getNamedWriteableRegistry(), size -> searchRequest.source().size(size)));
RestSearchAction.checkRestTotalHits(restRequest, searchRequest); RestSearchAction.checkRestTotalHits(restRequest, searchRequest);
return channel -> client.execute(RollupSearchAction.INSTANCE, searchRequest, new RestToXContentListener<>(channel)); return channel -> client.execute(RollupSearchAction.INSTANCE, searchRequest, new RestToXContentListener<>(channel));
} }

View File

@ -0,0 +1,78 @@
---
"Async search with point in time":
- skip:
version: " - 7.99.99"
reason: "point in time is introduced in 8.0"
- do:
indices.create:
index: test-1
body:
settings:
number_of_shards: "2"
- do:
indices.create:
index: test-2
body:
settings:
number_of_shards: "1"
- do:
indices.create:
index: test-3
body:
settings:
number_of_shards: "3"
- do:
index:
index: test-2
body: { max: 2 }
- do:
index:
index: test-1
body: { max: 1 }
- do:
index:
index: test-3
body: { max: 3 }
- do:
indices.refresh: {}
- do:
open_point_in_time:
index: test-*
keep_alive: 5m
- set: {id: point_in_time_id}
- do:
async_search.submit:
batched_reduce_size: 2
wait_for_completion_timeout: 10s
body:
query:
match_all: {}
aggs:
max:
max:
field: max
sort: max
pit:
id: "$point_in_time_id"
keep_alive: 1m
- is_false: id
- match: { is_partial: false }
- length: { response.hits.hits: 3 }
- match: { response.hits.hits.0._source.max: 1 }
- match: { response.aggregations.max.value: 3.0 }
- do:
close_point_in_time:
body:
id: "$point_in_time_id"