Reindex: Use request flavored methods (#30317)

Use the new request flavored methods for the low level rest client
introduced in #29623 in reindex.
This commit is contained in:
Nik Everett 2018-05-07 17:14:38 -04:00 committed by Colin Goodheart-Smithe
parent 455853f7af
commit 619898a218
No known key found for this signature in database
GPG Key ID: F975E7BDD739B3C7
3 changed files with 93 additions and 94 deletions

View File

@ -19,7 +19,6 @@
package org.elasticsearch.index.reindex.remote; package org.elasticsearch.index.reindex.remote;
import org.apache.http.HttpEntity;
import org.apache.http.entity.ByteArrayEntity; import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.ContentType; import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity; import org.apache.http.entity.StringEntity;
@ -27,6 +26,7 @@ import org.apache.lucene.util.BytesRef;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.Request;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
@ -40,33 +40,27 @@ import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortBuilder; import org.elasticsearch.search.sort.SortBuilder;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import static java.util.Collections.singletonMap;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
/** /**
* Builds requests for remote version of Elasticsearch. Note that unlike most of the * Builds requests for remote version of Elasticsearch. Note that unlike most of the
* rest of Elasticsearch this file needs to be compatible with very old versions of * rest of Elasticsearch this file needs to be compatible with very old versions of
* Elasticsearch. Thus is often uses identifiers for versions like {@code 2000099} * Elasticsearch. Thus it often uses identifiers for versions like {@code 2000099}
* for {@code 2.0.0-alpha1}. Do not drop support for features from this file just * for {@code 2.0.0-alpha1}. Do not drop support for features from this file just
* because the version constants have been removed. * because the version constants have been removed.
*/ */
final class RemoteRequestBuilders { final class RemoteRequestBuilders {
private RemoteRequestBuilders() {} private RemoteRequestBuilders() {}
static String initialSearchPath(SearchRequest searchRequest) { static Request initialSearch(SearchRequest searchRequest, BytesReference query, Version remoteVersion) {
// It is nasty to build paths with StringBuilder but we'll be careful.... // It is nasty to build paths with StringBuilder but we'll be careful....
StringBuilder path = new StringBuilder("/"); StringBuilder path = new StringBuilder("/");
addIndexesOrTypes(path, "Index", searchRequest.indices()); addIndexesOrTypes(path, "Index", searchRequest.indices());
addIndexesOrTypes(path, "Type", searchRequest.types()); addIndexesOrTypes(path, "Type", searchRequest.types());
path.append("_search"); path.append("_search");
return path.toString(); Request request = new Request("POST", path.toString());
}
static Map<String, String> initialSearchParams(SearchRequest searchRequest, Version remoteVersion) {
Map<String, String> params = new HashMap<>();
if (searchRequest.scroll() != null) { if (searchRequest.scroll() != null) {
TimeValue keepAlive = searchRequest.scroll().keepAlive(); TimeValue keepAlive = searchRequest.scroll().keepAlive();
if (remoteVersion.before(Version.V_5_0_0)) { if (remoteVersion.before(Version.V_5_0_0)) {
@ -75,16 +69,16 @@ final class RemoteRequestBuilders {
* timeout seems safer than less. */ * timeout seems safer than less. */
keepAlive = timeValueMillis((long) Math.ceil(keepAlive.millisFrac())); keepAlive = timeValueMillis((long) Math.ceil(keepAlive.millisFrac()));
} }
params.put("scroll", keepAlive.getStringRep()); request.addParameter("scroll", keepAlive.getStringRep());
} }
params.put("size", Integer.toString(searchRequest.source().size())); request.addParameter("size", Integer.toString(searchRequest.source().size()));
if (searchRequest.source().version() == null || searchRequest.source().version() == true) { if (searchRequest.source().version() == null || searchRequest.source().version() == true) {
/* /*
* Passing `null` here just add the `version` request parameter * Passing `null` here just add the `version` request parameter
* without any value. This way of requesting the version works * without any value. This way of requesting the version works
* for all supported versions of Elasticsearch. * for all supported versions of Elasticsearch.
*/ */
params.put("version", null); request.addParameter("version", null);
} }
if (searchRequest.source().sorts() != null) { if (searchRequest.source().sorts() != null) {
boolean useScan = false; boolean useScan = false;
@ -101,13 +95,13 @@ final class RemoteRequestBuilders {
} }
} }
if (useScan) { if (useScan) {
params.put("search_type", "scan"); request.addParameter("search_type", "scan");
} else { } else {
StringBuilder sorts = new StringBuilder(sortToUri(searchRequest.source().sorts().get(0))); StringBuilder sorts = new StringBuilder(sortToUri(searchRequest.source().sorts().get(0)));
for (int i = 1; i < searchRequest.source().sorts().size(); i++) { for (int i = 1; i < searchRequest.source().sorts().size(); i++) {
sorts.append(',').append(sortToUri(searchRequest.source().sorts().get(i))); sorts.append(',').append(sortToUri(searchRequest.source().sorts().get(i)));
} }
params.put("sort", sorts.toString()); request.addParameter("sort", sorts.toString());
} }
} }
if (remoteVersion.before(Version.fromId(2000099))) { if (remoteVersion.before(Version.fromId(2000099))) {
@ -126,12 +120,9 @@ final class RemoteRequestBuilders {
fields.append(',').append(searchRequest.source().storedFields().fieldNames().get(i)); fields.append(',').append(searchRequest.source().storedFields().fieldNames().get(i));
} }
String storedFieldsParamName = remoteVersion.before(Version.V_5_0_0_alpha4) ? "fields" : "stored_fields"; String storedFieldsParamName = remoteVersion.before(Version.V_5_0_0_alpha4) ? "fields" : "stored_fields";
params.put(storedFieldsParamName, fields.toString()); request.addParameter(storedFieldsParamName, fields.toString());
} }
return params;
}
static HttpEntity initialSearchEntity(SearchRequest searchRequest, BytesReference query, Version remoteVersion) {
// EMPTY is safe here because we're not calling namedObject // EMPTY is safe here because we're not calling namedObject
try (XContentBuilder entity = JsonXContent.contentBuilder(); try (XContentBuilder entity = JsonXContent.contentBuilder();
XContentParser queryParser = XContentHelper XContentParser queryParser = XContentHelper
@ -139,7 +130,8 @@ final class RemoteRequestBuilders {
entity.startObject(); entity.startObject();
entity.field("query"); { entity.field("query"); {
/* We're intentionally a bit paranoid here - copying the query as xcontent rather than writing a raw field. We don't want /* We're intentionally a bit paranoid here - copying the query
* as xcontent rather than writing a raw field. We don't want
* poorly written queries to escape. Ever. */ * poorly written queries to escape. Ever. */
entity.copyCurrentStructure(queryParser); entity.copyCurrentStructure(queryParser);
XContentParser.Token shouldBeEof = queryParser.nextToken(); XContentParser.Token shouldBeEof = queryParser.nextToken();
@ -160,10 +152,11 @@ final class RemoteRequestBuilders {
entity.endObject(); entity.endObject();
BytesRef bytes = BytesReference.bytes(entity).toBytesRef(); BytesRef bytes = BytesReference.bytes(entity).toBytesRef();
return new ByteArrayEntity(bytes.bytes, bytes.offset, bytes.length, ContentType.APPLICATION_JSON); request.setEntity(new ByteArrayEntity(bytes.bytes, bytes.offset, bytes.length, ContentType.APPLICATION_JSON));
} catch (IOException e) { } catch (IOException e) {
throw new ElasticsearchException("unexpected error building entity", e); throw new ElasticsearchException("unexpected error building entity", e);
} }
return request;
} }
private static void addIndexesOrTypes(StringBuilder path, String name, String[] indicesOrTypes) { private static void addIndexesOrTypes(StringBuilder path, String name, String[] indicesOrTypes) {
@ -193,45 +186,50 @@ final class RemoteRequestBuilders {
throw new IllegalArgumentException("Unsupported sort [" + sort + "]"); throw new IllegalArgumentException("Unsupported sort [" + sort + "]");
} }
static String scrollPath() { static Request scroll(String scroll, TimeValue keepAlive, Version remoteVersion) {
return "/_search/scroll"; Request request = new Request("POST", "/_search/scroll");
}
static Map<String, String> scrollParams(TimeValue keepAlive, Version remoteVersion) {
if (remoteVersion.before(Version.V_5_0_0)) { if (remoteVersion.before(Version.V_5_0_0)) {
/* Versions of Elasticsearch before 5.0 couldn't parse nanos or micros /* Versions of Elasticsearch before 5.0 couldn't parse nanos or micros
* so we toss out that resolution, rounding up so we shouldn't end up * so we toss out that resolution, rounding up so we shouldn't end up
* with 0s. */ * with 0s. */
keepAlive = timeValueMillis((long) Math.ceil(keepAlive.millisFrac())); keepAlive = timeValueMillis((long) Math.ceil(keepAlive.millisFrac()));
} }
return singletonMap("scroll", keepAlive.getStringRep()); request.addParameter("scroll", keepAlive.getStringRep());
}
static HttpEntity scrollEntity(String scroll, Version remoteVersion) {
if (remoteVersion.before(Version.fromId(2000099))) { if (remoteVersion.before(Version.fromId(2000099))) {
// Versions before 2.0.0 extract the plain scroll_id from the body // Versions before 2.0.0 extract the plain scroll_id from the body
return new StringEntity(scroll, ContentType.TEXT_PLAIN); request.setEntity(new StringEntity(scroll, ContentType.TEXT_PLAIN));
return request;
} }
try (XContentBuilder entity = JsonXContent.contentBuilder()) { try (XContentBuilder entity = JsonXContent.contentBuilder()) {
return new StringEntity(Strings.toString(entity.startObject() entity.startObject()
.field("scroll_id", scroll) .field("scroll_id", scroll)
.endObject()), ContentType.APPLICATION_JSON); .endObject();
request.setEntity(new StringEntity(Strings.toString(entity), ContentType.APPLICATION_JSON));
} catch (IOException e) { } catch (IOException e) {
throw new ElasticsearchException("failed to build scroll entity", e); throw new ElasticsearchException("failed to build scroll entity", e);
} }
return request;
} }
static HttpEntity clearScrollEntity(String scroll, Version remoteVersion) { static Request clearScroll(String scroll, Version remoteVersion) {
Request request = new Request("DELETE", "/_search/scroll");
if (remoteVersion.before(Version.fromId(2000099))) { if (remoteVersion.before(Version.fromId(2000099))) {
// Versions before 2.0.0 extract the plain scroll_id from the body // Versions before 2.0.0 extract the plain scroll_id from the body
return new StringEntity(scroll, ContentType.TEXT_PLAIN); request.setEntity(new StringEntity(scroll, ContentType.TEXT_PLAIN));
return request;
} }
try (XContentBuilder entity = JsonXContent.contentBuilder()) { try (XContentBuilder entity = JsonXContent.contentBuilder()) {
return new StringEntity(Strings.toString(entity.startObject() entity.startObject()
.array("scroll_id", scroll) .array("scroll_id", scroll)
.endObject()), ContentType.APPLICATION_JSON); .endObject();
request.setEntity(new StringEntity(Strings.toString(entity), ContentType.APPLICATION_JSON));
} catch (IOException e) { } catch (IOException e) {
throw new ElasticsearchException("failed to build clear scroll entity", e); throw new ElasticsearchException("failed to build clear scroll entity", e);
} }
return request;
} }
} }

View File

@ -30,22 +30,22 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.XContentParseException;
import org.elasticsearch.index.reindex.ScrollableHitSource; import org.elasticsearch.index.reindex.ScrollableHitSource;
import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.ResponseException; import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.ResponseListener; import org.elasticsearch.client.ResponseListener;
import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentParseException;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
@ -53,20 +53,11 @@ import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map;
import java.util.function.BiFunction; import java.util.function.BiFunction;
import java.util.function.Consumer; import java.util.function.Consumer;
import static java.util.Collections.emptyMap;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
import static org.elasticsearch.common.unit.TimeValue.timeValueNanos; import static org.elasticsearch.common.unit.TimeValue.timeValueNanos;
import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.clearScrollEntity;
import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.initialSearchEntity;
import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.initialSearchParams;
import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.initialSearchPath;
import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.scrollEntity;
import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.scrollParams;
import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.scrollPath;
import static org.elasticsearch.index.reindex.remote.RemoteResponseParsers.MAIN_ACTION_PARSER; import static org.elasticsearch.index.reindex.remote.RemoteResponseParsers.MAIN_ACTION_PARSER;
import static org.elasticsearch.index.reindex.remote.RemoteResponseParsers.RESPONSE_PARSER; import static org.elasticsearch.index.reindex.remote.RemoteResponseParsers.RESPONSE_PARSER;
@ -88,13 +79,13 @@ public class RemoteScrollableHitSource extends ScrollableHitSource {
protected void doStart(Consumer<? super Response> onResponse) { protected void doStart(Consumer<? super Response> onResponse) {
lookupRemoteVersion(version -> { lookupRemoteVersion(version -> {
remoteVersion = version; remoteVersion = version;
execute("POST", initialSearchPath(searchRequest), initialSearchParams(searchRequest, version), execute(RemoteRequestBuilders.initialSearch(searchRequest, query, remoteVersion),
initialSearchEntity(searchRequest, query, remoteVersion), RESPONSE_PARSER, r -> onStartResponse(onResponse, r)); RESPONSE_PARSER, r -> onStartResponse(onResponse, r));
}); });
} }
void lookupRemoteVersion(Consumer<Version> onVersion) { void lookupRemoteVersion(Consumer<Version> onVersion) {
execute("GET", "", emptyMap(), null, MAIN_ACTION_PARSER, onVersion); execute(new Request("GET", ""), MAIN_ACTION_PARSER, onVersion);
} }
private void onStartResponse(Consumer<? super Response> onResponse, Response response) { private void onStartResponse(Consumer<? super Response> onResponse, Response response) {
@ -108,15 +99,13 @@ public class RemoteScrollableHitSource extends ScrollableHitSource {
@Override @Override
protected void doStartNextScroll(String scrollId, TimeValue extraKeepAlive, Consumer<? super Response> onResponse) { protected void doStartNextScroll(String scrollId, TimeValue extraKeepAlive, Consumer<? super Response> onResponse) {
Map<String, String> scrollParams = scrollParams( TimeValue keepAlive = timeValueNanos(searchRequest.scroll().keepAlive().nanos() + extraKeepAlive.nanos());
timeValueNanos(searchRequest.scroll().keepAlive().nanos() + extraKeepAlive.nanos()), execute(RemoteRequestBuilders.scroll(scrollId, keepAlive, remoteVersion), RESPONSE_PARSER, onResponse);
remoteVersion);
execute("POST", scrollPath(), scrollParams, scrollEntity(scrollId, remoteVersion), RESPONSE_PARSER, onResponse);
} }
@Override @Override
protected void clearScroll(String scrollId, Runnable onCompletion) { protected void clearScroll(String scrollId, Runnable onCompletion) {
client.performRequestAsync("DELETE", scrollPath(), emptyMap(), clearScrollEntity(scrollId, remoteVersion), new ResponseListener() { client.performRequestAsync(RemoteRequestBuilders.clearScroll(scrollId, remoteVersion), new ResponseListener() {
@Override @Override
public void onSuccess(org.elasticsearch.client.Response response) { public void onSuccess(org.elasticsearch.client.Response response) {
logger.debug("Successfully cleared [{}]", scrollId); logger.debug("Successfully cleared [{}]", scrollId);
@ -162,7 +151,7 @@ public class RemoteScrollableHitSource extends ScrollableHitSource {
}); });
} }
private <T> void execute(String method, String uri, Map<String, String> params, HttpEntity entity, private <T> void execute(Request request,
BiFunction<XContentParser, XContentType, T> parser, Consumer<? super T> listener) { BiFunction<XContentParser, XContentType, T> parser, Consumer<? super T> listener) {
// Preserve the thread context so headers survive after the call // Preserve the thread context so headers survive after the call
java.util.function.Supplier<ThreadContext.StoredContext> contextSupplier = threadPool.getThreadContext().newRestorableContext(true); java.util.function.Supplier<ThreadContext.StoredContext> contextSupplier = threadPool.getThreadContext().newRestorableContext(true);
@ -171,7 +160,7 @@ public class RemoteScrollableHitSource extends ScrollableHitSource {
@Override @Override
protected void doRun() throws Exception { protected void doRun() throws Exception {
client.performRequestAsync(method, uri, params, entity, new ResponseListener() { client.performRequestAsync(request, new ResponseListener() {
@Override @Override
public void onSuccess(org.elasticsearch.client.Response response) { public void onSuccess(org.elasticsearch.client.Response response) {
// Restore the thread context to get the precious headers // Restore the thread context to get the precious headers

View File

@ -23,7 +23,9 @@ import org.apache.http.HttpEntity;
import org.apache.http.entity.ContentType; import org.apache.http.entity.ContentType;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.Request;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder;
@ -35,14 +37,12 @@ import java.nio.charset.StandardCharsets;
import java.util.Map; import java.util.Map;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.clearScrollEntity; import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.clearScroll;
import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.initialSearchEntity; import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.initialSearch;
import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.initialSearchParams; import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.scroll;
import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.initialSearchPath;
import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.scrollEntity;
import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.scrollParams;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.hasKey;
@ -57,15 +57,17 @@ import static org.hamcrest.Matchers.not;
*/ */
public class RemoteRequestBuildersTests extends ESTestCase { public class RemoteRequestBuildersTests extends ESTestCase {
public void testIntialSearchPath() { public void testIntialSearchPath() {
SearchRequest searchRequest = new SearchRequest().source(new SearchSourceBuilder()); Version remoteVersion = Version.fromId(between(0, Version.CURRENT.id));
BytesReference query = new BytesArray("{}");
assertEquals("/_search", initialSearchPath(searchRequest)); SearchRequest searchRequest = new SearchRequest().source(new SearchSourceBuilder());
assertEquals("/_search", initialSearch(searchRequest, query, remoteVersion).getEndpoint());
searchRequest.indices("a"); searchRequest.indices("a");
searchRequest.types("b"); searchRequest.types("b");
assertEquals("/a/b/_search", initialSearchPath(searchRequest)); assertEquals("/a/b/_search", initialSearch(searchRequest, query, remoteVersion).getEndpoint());
searchRequest.indices("a", "b"); searchRequest.indices("a", "b");
searchRequest.types("c", "d"); searchRequest.types("c", "d");
assertEquals("/a,b/c,d/_search", initialSearchPath(searchRequest)); assertEquals("/a,b/c,d/_search", initialSearch(searchRequest, query, remoteVersion).getEndpoint());
searchRequest.indices("cat,"); searchRequest.indices("cat,");
expectBadStartRequest(searchRequest, "Index", ",", "cat,"); expectBadStartRequest(searchRequest, "Index", ",", "cat,");
@ -96,63 +98,70 @@ public class RemoteRequestBuildersTests extends ESTestCase {
} }
private void expectBadStartRequest(SearchRequest searchRequest, String type, String bad, String failed) { private void expectBadStartRequest(SearchRequest searchRequest, String type, String bad, String failed) {
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> initialSearchPath(searchRequest)); Version remoteVersion = Version.fromId(between(0, Version.CURRENT.id));
BytesReference query = new BytesArray("{}");
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> initialSearch(searchRequest, query, remoteVersion));
assertEquals(type + " containing [" + bad + "] not supported but got [" + failed + "]", e.getMessage()); assertEquals(type + " containing [" + bad + "] not supported but got [" + failed + "]", e.getMessage());
} }
public void testInitialSearchParamsSort() { public void testInitialSearchParamsSort() {
BytesReference query = new BytesArray("{}");
SearchRequest searchRequest = new SearchRequest().source(new SearchSourceBuilder()); SearchRequest searchRequest = new SearchRequest().source(new SearchSourceBuilder());
// Test sort:_doc for versions that support it. // Test sort:_doc for versions that support it.
Version remoteVersion = Version.fromId(between(2010099, Version.CURRENT.id)); Version remoteVersion = Version.fromId(between(2010099, Version.CURRENT.id));
searchRequest.source().sort("_doc"); searchRequest.source().sort("_doc");
assertThat(initialSearchParams(searchRequest, remoteVersion), hasEntry("sort", "_doc:asc")); assertThat(initialSearch(searchRequest, query, remoteVersion).getParameters(), hasEntry("sort", "_doc:asc"));
// Test search_type scan for versions that don't support sort:_doc. // Test search_type scan for versions that don't support sort:_doc.
remoteVersion = Version.fromId(between(0, 2010099 - 1)); remoteVersion = Version.fromId(between(0, 2010099 - 1));
assertThat(initialSearchParams(searchRequest, remoteVersion), hasEntry("search_type", "scan")); assertThat(initialSearch(searchRequest, query, remoteVersion).getParameters(), hasEntry("search_type", "scan"));
// Test sorting by some field. Version doesn't matter. // Test sorting by some field. Version doesn't matter.
remoteVersion = Version.fromId(between(0, Version.CURRENT.id)); remoteVersion = Version.fromId(between(0, Version.CURRENT.id));
searchRequest.source().sorts().clear(); searchRequest.source().sorts().clear();
searchRequest.source().sort("foo"); searchRequest.source().sort("foo");
assertThat(initialSearchParams(searchRequest, remoteVersion), hasEntry("sort", "foo:asc")); assertThat(initialSearch(searchRequest, query, remoteVersion).getParameters(), hasEntry("sort", "foo:asc"));
} }
public void testInitialSearchParamsFields() { public void testInitialSearchParamsFields() {
BytesReference query = new BytesArray("{}");
SearchRequest searchRequest = new SearchRequest().source(new SearchSourceBuilder()); SearchRequest searchRequest = new SearchRequest().source(new SearchSourceBuilder());
// Test request without any fields // Test request without any fields
Version remoteVersion = Version.fromId(between(2000099, Version.CURRENT.id)); Version remoteVersion = Version.fromId(between(2000099, Version.CURRENT.id));
assertThat(initialSearchParams(searchRequest, remoteVersion), assertThat(initialSearch(searchRequest, query, remoteVersion).getParameters(),
not(either(hasKey("stored_fields")).or(hasKey("fields")))); not(either(hasKey("stored_fields")).or(hasKey("fields"))));
// Test stored_fields for versions that support it // Test stored_fields for versions that support it
searchRequest = new SearchRequest().source(new SearchSourceBuilder()); searchRequest = new SearchRequest().source(new SearchSourceBuilder());
searchRequest.source().storedField("_source").storedField("_id"); searchRequest.source().storedField("_source").storedField("_id");
remoteVersion = Version.fromId(between(Version.V_5_0_0_alpha4_ID, Version.CURRENT.id)); remoteVersion = Version.fromId(between(Version.V_5_0_0_alpha4_ID, Version.CURRENT.id));
assertThat(initialSearchParams(searchRequest, remoteVersion), hasEntry("stored_fields", "_source,_id")); assertThat(initialSearch(searchRequest, query, remoteVersion).getParameters(), hasEntry("stored_fields", "_source,_id"));
// Test fields for versions that support it // Test fields for versions that support it
searchRequest = new SearchRequest().source(new SearchSourceBuilder()); searchRequest = new SearchRequest().source(new SearchSourceBuilder());
searchRequest.source().storedField("_source").storedField("_id"); searchRequest.source().storedField("_source").storedField("_id");
remoteVersion = Version.fromId(between(2000099, Version.V_5_0_0_alpha4_ID - 1)); remoteVersion = Version.fromId(between(2000099, Version.V_5_0_0_alpha4_ID - 1));
assertThat(initialSearchParams(searchRequest, remoteVersion), hasEntry("fields", "_source,_id")); assertThat(initialSearch(searchRequest, query, remoteVersion).getParameters(), hasEntry("fields", "_source,_id"));
// Test extra fields for versions that need it // Test extra fields for versions that need it
searchRequest = new SearchRequest().source(new SearchSourceBuilder()); searchRequest = new SearchRequest().source(new SearchSourceBuilder());
searchRequest.source().storedField("_source").storedField("_id"); searchRequest.source().storedField("_source").storedField("_id");
remoteVersion = Version.fromId(between(0, 2000099 - 1)); remoteVersion = Version.fromId(between(0, 2000099 - 1));
assertThat(initialSearchParams(searchRequest, remoteVersion), hasEntry("fields", "_source,_id,_parent,_routing,_ttl")); assertThat(initialSearch(searchRequest, query, remoteVersion).getParameters(),
hasEntry("fields", "_source,_id,_parent,_routing,_ttl"));
// But only versions before 1.0 force _source to be in the list // But only versions before 1.0 force _source to be in the list
searchRequest = new SearchRequest().source(new SearchSourceBuilder()); searchRequest = new SearchRequest().source(new SearchSourceBuilder());
searchRequest.source().storedField("_id"); searchRequest.source().storedField("_id");
remoteVersion = Version.fromId(between(1000099, 2000099 - 1)); remoteVersion = Version.fromId(between(1000099, 2000099 - 1));
assertThat(initialSearchParams(searchRequest, remoteVersion), hasEntry("fields", "_id,_parent,_routing,_ttl")); assertThat(initialSearch(searchRequest, query, remoteVersion).getParameters(),
hasEntry("fields", "_id,_parent,_routing,_ttl"));
} }
public void testInitialSearchParamsMisc() { public void testInitialSearchParamsMisc() {
BytesReference query = new BytesArray("{}");
SearchRequest searchRequest = new SearchRequest().source(new SearchSourceBuilder()); SearchRequest searchRequest = new SearchRequest().source(new SearchSourceBuilder());
Version remoteVersion = Version.fromId(between(0, Version.CURRENT.id)); Version remoteVersion = Version.fromId(between(0, Version.CURRENT.id));
@ -169,7 +178,7 @@ public class RemoteRequestBuildersTests extends ESTestCase {
searchRequest.source().version(fetchVersion); searchRequest.source().version(fetchVersion);
} }
Map<String, String> params = initialSearchParams(searchRequest, remoteVersion); Map<String, String> params = initialSearch(searchRequest, query, remoteVersion).getParameters();
if (scroll == null) { if (scroll == null) {
assertThat(params, not(hasKey("scroll"))); assertThat(params, not(hasKey("scroll")));
@ -199,7 +208,7 @@ public class RemoteRequestBuildersTests extends ESTestCase {
SearchRequest searchRequest = new SearchRequest(); SearchRequest searchRequest = new SearchRequest();
searchRequest.source(new SearchSourceBuilder()); searchRequest.source(new SearchSourceBuilder());
String query = "{\"match_all\":{}}"; String query = "{\"match_all\":{}}";
HttpEntity entity = initialSearchEntity(searchRequest, new BytesArray(query), remoteVersion); HttpEntity entity = initialSearch(searchRequest, new BytesArray(query), remoteVersion).getEntity();
assertEquals(ContentType.APPLICATION_JSON.toString(), entity.getContentType().getValue()); assertEquals(ContentType.APPLICATION_JSON.toString(), entity.getContentType().getValue());
if (remoteVersion.onOrAfter(Version.fromId(1000099))) { if (remoteVersion.onOrAfter(Version.fromId(1000099))) {
assertEquals("{\"query\":" + query + ",\"_source\":true}", assertEquals("{\"query\":" + query + ",\"_source\":true}",
@ -211,48 +220,51 @@ public class RemoteRequestBuildersTests extends ESTestCase {
// Source filtering is included if set up // Source filtering is included if set up
searchRequest.source().fetchSource(new String[] {"in1", "in2"}, new String[] {"out"}); searchRequest.source().fetchSource(new String[] {"in1", "in2"}, new String[] {"out"});
entity = initialSearchEntity(searchRequest, new BytesArray(query), remoteVersion); entity = initialSearch(searchRequest, new BytesArray(query), remoteVersion).getEntity();
assertEquals(ContentType.APPLICATION_JSON.toString(), entity.getContentType().getValue()); assertEquals(ContentType.APPLICATION_JSON.toString(), entity.getContentType().getValue());
assertEquals("{\"query\":" + query + ",\"_source\":{\"includes\":[\"in1\",\"in2\"],\"excludes\":[\"out\"]}}", assertEquals("{\"query\":" + query + ",\"_source\":{\"includes\":[\"in1\",\"in2\"],\"excludes\":[\"out\"]}}",
Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8))); Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)));
// Invalid XContent fails // Invalid XContent fails
RuntimeException e = expectThrows(RuntimeException.class, RuntimeException e = expectThrows(RuntimeException.class,
() -> initialSearchEntity(searchRequest, new BytesArray("{}, \"trailing\": {}"), remoteVersion)); () -> initialSearch(searchRequest, new BytesArray("{}, \"trailing\": {}"), remoteVersion));
assertThat(e.getCause().getMessage(), containsString("Unexpected character (',' (code 44))")); assertThat(e.getCause().getMessage(), containsString("Unexpected character (',' (code 44))"));
e = expectThrows(RuntimeException.class, () -> initialSearchEntity(searchRequest, new BytesArray("{"), remoteVersion)); e = expectThrows(RuntimeException.class, () -> initialSearch(searchRequest, new BytesArray("{"), remoteVersion));
assertThat(e.getCause().getMessage(), containsString("Unexpected end-of-input")); assertThat(e.getCause().getMessage(), containsString("Unexpected end-of-input"));
} }
public void testScrollParams() { public void testScrollParams() {
String scroll = randomAlphaOfLength(30);
Version remoteVersion = Version.fromId(between(0, Version.CURRENT.id)); Version remoteVersion = Version.fromId(between(0, Version.CURRENT.id));
TimeValue scroll = TimeValue.parseTimeValue(randomPositiveTimeValue(), "test"); TimeValue keepAlive = TimeValue.parseTimeValue(randomPositiveTimeValue(), "test");
assertScroll(remoteVersion, scrollParams(scroll, remoteVersion), scroll); assertScroll(remoteVersion, scroll(scroll, keepAlive, remoteVersion).getParameters(), keepAlive);
} }
public void testScrollEntity() throws IOException { public void testScrollEntity() throws IOException {
String scroll = randomAlphaOfLength(30); String scroll = randomAlphaOfLength(30);
HttpEntity entity = scrollEntity(scroll, Version.V_5_0_0); HttpEntity entity = scroll(scroll, timeValueMillis(between(1, 1000)), Version.V_5_0_0).getEntity();
assertEquals(ContentType.APPLICATION_JSON.toString(), entity.getContentType().getValue()); assertEquals(ContentType.APPLICATION_JSON.toString(), entity.getContentType().getValue());
assertThat(Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)), assertThat(Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)),
containsString("\"" + scroll + "\"")); containsString("\"" + scroll + "\""));
// Test with version < 2.0.0 // Test with version < 2.0.0
entity = scrollEntity(scroll, Version.fromId(1070499)); entity = scroll(scroll, timeValueMillis(between(1, 1000)), Version.fromId(1070499)).getEntity();
assertEquals(ContentType.TEXT_PLAIN.toString(), entity.getContentType().getValue()); assertEquals(ContentType.TEXT_PLAIN.toString(), entity.getContentType().getValue());
assertEquals(scroll, Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8))); assertEquals(scroll, Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)));
} }
public void testClearScrollEntity() throws IOException { public void testClearScroll() throws IOException {
String scroll = randomAlphaOfLength(30); String scroll = randomAlphaOfLength(30);
HttpEntity entity = clearScrollEntity(scroll, Version.V_5_0_0); Request request = clearScroll(scroll, Version.V_5_0_0);
assertEquals(ContentType.APPLICATION_JSON.toString(), entity.getContentType().getValue()); assertEquals(ContentType.APPLICATION_JSON.toString(), request.getEntity().getContentType().getValue());
assertThat(Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)), assertThat(Streams.copyToString(new InputStreamReader(request.getEntity().getContent(), StandardCharsets.UTF_8)),
containsString("\"" + scroll + "\"")); containsString("\"" + scroll + "\""));
assertThat(request.getParameters().keySet(), empty());
// Test with version < 2.0.0 // Test with version < 2.0.0
entity = clearScrollEntity(scroll, Version.fromId(1070499)); request = clearScroll(scroll, Version.fromId(1070499));
assertEquals(ContentType.TEXT_PLAIN.toString(), entity.getContentType().getValue()); assertEquals(ContentType.TEXT_PLAIN.toString(), request.getEntity().getContentType().getValue());
assertEquals(scroll, Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8))); assertEquals(scroll, Streams.copyToString(new InputStreamReader(request.getEntity().getContent(), StandardCharsets.UTF_8)));
assertThat(request.getParameters().keySet(), empty());
} }
} }