From ef4ecb1f1e21a30e1d67b32f711de3a84016a73e Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Mon, 7 May 2018 17:14:38 -0400 Subject: [PATCH] Reindex: Use request flavored methods (#30317) Use the new request flavored methods for the low level rest client introduced in #29623 in reindex. --- .../reindex/remote/RemoteRequestBuilders.java | 72 ++++++++-------- .../remote/RemoteScrollableHitSource.java | 33 +++----- .../remote/RemoteRequestBuildersTests.java | 82 +++++++++++-------- 3 files changed, 93 insertions(+), 94 deletions(-) diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/RemoteRequestBuilders.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/RemoteRequestBuilders.java index 063a0ad31f3..4dcc0a34758 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/RemoteRequestBuilders.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/RemoteRequestBuilders.java @@ -19,7 +19,6 @@ package org.elasticsearch.index.reindex.remote; -import org.apache.http.HttpEntity; import org.apache.http.entity.ByteArrayEntity; import org.apache.http.entity.ContentType; import org.apache.http.entity.StringEntity; @@ -27,6 +26,7 @@ import org.apache.lucene.util.BytesRef; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.client.Request; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.unit.TimeValue; @@ -40,33 +40,27 @@ import org.elasticsearch.search.sort.FieldSortBuilder; import org.elasticsearch.search.sort.SortBuilder; import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import static java.util.Collections.singletonMap; import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; /** * Builds requests for remote version of Elasticsearch. Note that unlike most of the * rest of Elasticsearch this file needs to be compatible with very old versions of - * Elasticsearch. Thus is often uses identifiers for versions like {@code 2000099} + * Elasticsearch. Thus it often uses identifiers for versions like {@code 2000099} * for {@code 2.0.0-alpha1}. Do not drop support for features from this file just * because the version constants have been removed. */ final class RemoteRequestBuilders { private RemoteRequestBuilders() {} - static String initialSearchPath(SearchRequest searchRequest) { + static Request initialSearch(SearchRequest searchRequest, BytesReference query, Version remoteVersion) { // It is nasty to build paths with StringBuilder but we'll be careful.... StringBuilder path = new StringBuilder("/"); addIndexesOrTypes(path, "Index", searchRequest.indices()); addIndexesOrTypes(path, "Type", searchRequest.types()); path.append("_search"); - return path.toString(); - } + Request request = new Request("POST", path.toString()); - static Map initialSearchParams(SearchRequest searchRequest, Version remoteVersion) { - Map params = new HashMap<>(); if (searchRequest.scroll() != null) { TimeValue keepAlive = searchRequest.scroll().keepAlive(); if (remoteVersion.before(Version.V_5_0_0)) { @@ -75,16 +69,16 @@ final class RemoteRequestBuilders { * timeout seems safer than less. */ keepAlive = timeValueMillis((long) Math.ceil(keepAlive.millisFrac())); } - params.put("scroll", keepAlive.getStringRep()); + request.addParameter("scroll", keepAlive.getStringRep()); } - params.put("size", Integer.toString(searchRequest.source().size())); + request.addParameter("size", Integer.toString(searchRequest.source().size())); if (searchRequest.source().version() == null || searchRequest.source().version() == true) { /* * Passing `null` here just add the `version` request parameter * without any value. This way of requesting the version works * for all supported versions of Elasticsearch. */ - params.put("version", null); + request.addParameter("version", null); } if (searchRequest.source().sorts() != null) { boolean useScan = false; @@ -101,13 +95,13 @@ final class RemoteRequestBuilders { } } if (useScan) { - params.put("search_type", "scan"); + request.addParameter("search_type", "scan"); } else { StringBuilder sorts = new StringBuilder(sortToUri(searchRequest.source().sorts().get(0))); for (int i = 1; i < searchRequest.source().sorts().size(); i++) { sorts.append(',').append(sortToUri(searchRequest.source().sorts().get(i))); } - params.put("sort", sorts.toString()); + request.addParameter("sort", sorts.toString()); } } if (remoteVersion.before(Version.fromId(2000099))) { @@ -126,12 +120,9 @@ final class RemoteRequestBuilders { fields.append(',').append(searchRequest.source().storedFields().fieldNames().get(i)); } String storedFieldsParamName = remoteVersion.before(Version.V_5_0_0_alpha4) ? "fields" : "stored_fields"; - params.put(storedFieldsParamName, fields.toString()); + request.addParameter(storedFieldsParamName, fields.toString()); } - return params; - } - static HttpEntity initialSearchEntity(SearchRequest searchRequest, BytesReference query, Version remoteVersion) { // EMPTY is safe here because we're not calling namedObject try (XContentBuilder entity = JsonXContent.contentBuilder(); XContentParser queryParser = XContentHelper @@ -139,7 +130,8 @@ final class RemoteRequestBuilders { entity.startObject(); entity.field("query"); { - /* We're intentionally a bit paranoid here - copying the query as xcontent rather than writing a raw field. We don't want + /* We're intentionally a bit paranoid here - copying the query + * as xcontent rather than writing a raw field. We don't want * poorly written queries to escape. Ever. */ entity.copyCurrentStructure(queryParser); XContentParser.Token shouldBeEof = queryParser.nextToken(); @@ -160,10 +152,11 @@ final class RemoteRequestBuilders { entity.endObject(); BytesRef bytes = BytesReference.bytes(entity).toBytesRef(); - return new ByteArrayEntity(bytes.bytes, bytes.offset, bytes.length, ContentType.APPLICATION_JSON); + request.setEntity(new ByteArrayEntity(bytes.bytes, bytes.offset, bytes.length, ContentType.APPLICATION_JSON)); } catch (IOException e) { throw new ElasticsearchException("unexpected error building entity", e); } + return request; } private static void addIndexesOrTypes(StringBuilder path, String name, String[] indicesOrTypes) { @@ -193,45 +186,50 @@ final class RemoteRequestBuilders { throw new IllegalArgumentException("Unsupported sort [" + sort + "]"); } - static String scrollPath() { - return "/_search/scroll"; - } + static Request scroll(String scroll, TimeValue keepAlive, Version remoteVersion) { + Request request = new Request("POST", "/_search/scroll"); - static Map scrollParams(TimeValue keepAlive, Version remoteVersion) { if (remoteVersion.before(Version.V_5_0_0)) { /* Versions of Elasticsearch before 5.0 couldn't parse nanos or micros * so we toss out that resolution, rounding up so we shouldn't end up * with 0s. */ keepAlive = timeValueMillis((long) Math.ceil(keepAlive.millisFrac())); } - return singletonMap("scroll", keepAlive.getStringRep()); - } + request.addParameter("scroll", keepAlive.getStringRep()); - static HttpEntity scrollEntity(String scroll, Version remoteVersion) { if (remoteVersion.before(Version.fromId(2000099))) { // Versions before 2.0.0 extract the plain scroll_id from the body - return new StringEntity(scroll, ContentType.TEXT_PLAIN); + request.setEntity(new StringEntity(scroll, ContentType.TEXT_PLAIN)); + return request; } + try (XContentBuilder entity = JsonXContent.contentBuilder()) { - return new StringEntity(Strings.toString(entity.startObject() - .field("scroll_id", scroll) - .endObject()), ContentType.APPLICATION_JSON); + entity.startObject() + .field("scroll_id", scroll) + .endObject(); + request.setEntity(new StringEntity(Strings.toString(entity), ContentType.APPLICATION_JSON)); } catch (IOException e) { throw new ElasticsearchException("failed to build scroll entity", e); } + return request; } - static HttpEntity clearScrollEntity(String scroll, Version remoteVersion) { + static Request clearScroll(String scroll, Version remoteVersion) { + Request request = new Request("DELETE", "/_search/scroll"); + if (remoteVersion.before(Version.fromId(2000099))) { // Versions before 2.0.0 extract the plain scroll_id from the body - return new StringEntity(scroll, ContentType.TEXT_PLAIN); + request.setEntity(new StringEntity(scroll, ContentType.TEXT_PLAIN)); + return request; } try (XContentBuilder entity = JsonXContent.contentBuilder()) { - return new StringEntity(Strings.toString(entity.startObject() - .array("scroll_id", scroll) - .endObject()), ContentType.APPLICATION_JSON); + entity.startObject() + .array("scroll_id", scroll) + .endObject(); + request.setEntity(new StringEntity(Strings.toString(entity), ContentType.APPLICATION_JSON)); } catch (IOException e) { throw new ElasticsearchException("failed to build clear scroll entity", e); } + return request; } } diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSource.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSource.java index 566c97c61c4..9264cdde30c 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSource.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSource.java @@ -30,22 +30,22 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.Version; import org.elasticsearch.action.bulk.BackoffPolicy; -import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; -import org.elasticsearch.common.xcontent.XContentParseException; import org.elasticsearch.index.reindex.ScrollableHitSource; import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.client.Request; import org.elasticsearch.client.ResponseException; import org.elasticsearch.client.ResponseListener; import org.elasticsearch.client.RestClient; import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.ParsingException; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentParseException; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; @@ -53,20 +53,11 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.io.InputStream; import java.util.Iterator; -import java.util.Map; import java.util.function.BiFunction; import java.util.function.Consumer; -import static java.util.Collections.emptyMap; import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; import static org.elasticsearch.common.unit.TimeValue.timeValueNanos; -import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.clearScrollEntity; -import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.initialSearchEntity; -import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.initialSearchParams; -import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.initialSearchPath; -import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.scrollEntity; -import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.scrollParams; -import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.scrollPath; import static org.elasticsearch.index.reindex.remote.RemoteResponseParsers.MAIN_ACTION_PARSER; import static org.elasticsearch.index.reindex.remote.RemoteResponseParsers.RESPONSE_PARSER; @@ -88,13 +79,13 @@ public class RemoteScrollableHitSource extends ScrollableHitSource { protected void doStart(Consumer onResponse) { lookupRemoteVersion(version -> { remoteVersion = version; - execute("POST", initialSearchPath(searchRequest), initialSearchParams(searchRequest, version), - initialSearchEntity(searchRequest, query, remoteVersion), RESPONSE_PARSER, r -> onStartResponse(onResponse, r)); + execute(RemoteRequestBuilders.initialSearch(searchRequest, query, remoteVersion), + RESPONSE_PARSER, r -> onStartResponse(onResponse, r)); }); } void lookupRemoteVersion(Consumer onVersion) { - execute("GET", "", emptyMap(), null, MAIN_ACTION_PARSER, onVersion); + execute(new Request("GET", ""), MAIN_ACTION_PARSER, onVersion); } private void onStartResponse(Consumer onResponse, Response response) { @@ -108,15 +99,13 @@ public class RemoteScrollableHitSource extends ScrollableHitSource { @Override protected void doStartNextScroll(String scrollId, TimeValue extraKeepAlive, Consumer onResponse) { - Map scrollParams = scrollParams( - timeValueNanos(searchRequest.scroll().keepAlive().nanos() + extraKeepAlive.nanos()), - remoteVersion); - execute("POST", scrollPath(), scrollParams, scrollEntity(scrollId, remoteVersion), RESPONSE_PARSER, onResponse); + TimeValue keepAlive = timeValueNanos(searchRequest.scroll().keepAlive().nanos() + extraKeepAlive.nanos()); + execute(RemoteRequestBuilders.scroll(scrollId, keepAlive, remoteVersion), RESPONSE_PARSER, onResponse); } @Override protected void clearScroll(String scrollId, Runnable onCompletion) { - client.performRequestAsync("DELETE", scrollPath(), emptyMap(), clearScrollEntity(scrollId, remoteVersion), new ResponseListener() { + client.performRequestAsync(RemoteRequestBuilders.clearScroll(scrollId, remoteVersion), new ResponseListener() { @Override public void onSuccess(org.elasticsearch.client.Response response) { logger.debug("Successfully cleared [{}]", scrollId); @@ -162,7 +151,7 @@ public class RemoteScrollableHitSource extends ScrollableHitSource { }); } - private void execute(String method, String uri, Map params, HttpEntity entity, + private void execute(Request request, BiFunction parser, Consumer listener) { // Preserve the thread context so headers survive after the call java.util.function.Supplier contextSupplier = threadPool.getThreadContext().newRestorableContext(true); @@ -171,7 +160,7 @@ public class RemoteScrollableHitSource extends ScrollableHitSource { @Override protected void doRun() throws Exception { - client.performRequestAsync(method, uri, params, entity, new ResponseListener() { + client.performRequestAsync(request, new ResponseListener() { @Override public void onSuccess(org.elasticsearch.client.Response response) { // Restore the thread context to get the precious headers diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/remote/RemoteRequestBuildersTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/remote/RemoteRequestBuildersTests.java index 9cb644162da..b51525f20e3 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/remote/RemoteRequestBuildersTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/remote/RemoteRequestBuildersTests.java @@ -23,7 +23,9 @@ import org.apache.http.HttpEntity; import org.apache.http.entity.ContentType; import org.elasticsearch.Version; import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.client.Request; import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.search.builder.SearchSourceBuilder; @@ -35,14 +37,12 @@ import java.nio.charset.StandardCharsets; import java.util.Map; import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; -import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.clearScrollEntity; -import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.initialSearchEntity; -import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.initialSearchParams; -import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.initialSearchPath; -import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.scrollEntity; -import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.scrollParams; +import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.clearScroll; +import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.initialSearch; +import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.scroll; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.either; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.hasKey; @@ -57,15 +57,17 @@ import static org.hamcrest.Matchers.not; */ public class RemoteRequestBuildersTests extends ESTestCase { public void testIntialSearchPath() { - SearchRequest searchRequest = new SearchRequest().source(new SearchSourceBuilder()); + Version remoteVersion = Version.fromId(between(0, Version.CURRENT.id)); + BytesReference query = new BytesArray("{}"); - assertEquals("/_search", initialSearchPath(searchRequest)); + SearchRequest searchRequest = new SearchRequest().source(new SearchSourceBuilder()); + assertEquals("/_search", initialSearch(searchRequest, query, remoteVersion).getEndpoint()); searchRequest.indices("a"); searchRequest.types("b"); - assertEquals("/a/b/_search", initialSearchPath(searchRequest)); + assertEquals("/a/b/_search", initialSearch(searchRequest, query, remoteVersion).getEndpoint()); searchRequest.indices("a", "b"); searchRequest.types("c", "d"); - assertEquals("/a,b/c,d/_search", initialSearchPath(searchRequest)); + assertEquals("/a,b/c,d/_search", initialSearch(searchRequest, query, remoteVersion).getEndpoint()); searchRequest.indices("cat,"); expectBadStartRequest(searchRequest, "Index", ",", "cat,"); @@ -96,63 +98,70 @@ public class RemoteRequestBuildersTests extends ESTestCase { } private void expectBadStartRequest(SearchRequest searchRequest, String type, String bad, String failed) { - IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> initialSearchPath(searchRequest)); + Version remoteVersion = Version.fromId(between(0, Version.CURRENT.id)); + BytesReference query = new BytesArray("{}"); + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> initialSearch(searchRequest, query, remoteVersion)); assertEquals(type + " containing [" + bad + "] not supported but got [" + failed + "]", e.getMessage()); } public void testInitialSearchParamsSort() { + BytesReference query = new BytesArray("{}"); SearchRequest searchRequest = new SearchRequest().source(new SearchSourceBuilder()); // Test sort:_doc for versions that support it. Version remoteVersion = Version.fromId(between(2010099, Version.CURRENT.id)); searchRequest.source().sort("_doc"); - assertThat(initialSearchParams(searchRequest, remoteVersion), hasEntry("sort", "_doc:asc")); + assertThat(initialSearch(searchRequest, query, remoteVersion).getParameters(), hasEntry("sort", "_doc:asc")); // Test search_type scan for versions that don't support sort:_doc. remoteVersion = Version.fromId(between(0, 2010099 - 1)); - assertThat(initialSearchParams(searchRequest, remoteVersion), hasEntry("search_type", "scan")); + assertThat(initialSearch(searchRequest, query, remoteVersion).getParameters(), hasEntry("search_type", "scan")); // Test sorting by some field. Version doesn't matter. remoteVersion = Version.fromId(between(0, Version.CURRENT.id)); searchRequest.source().sorts().clear(); searchRequest.source().sort("foo"); - assertThat(initialSearchParams(searchRequest, remoteVersion), hasEntry("sort", "foo:asc")); + assertThat(initialSearch(searchRequest, query, remoteVersion).getParameters(), hasEntry("sort", "foo:asc")); } public void testInitialSearchParamsFields() { + BytesReference query = new BytesArray("{}"); SearchRequest searchRequest = new SearchRequest().source(new SearchSourceBuilder()); // Test request without any fields Version remoteVersion = Version.fromId(between(2000099, Version.CURRENT.id)); - assertThat(initialSearchParams(searchRequest, remoteVersion), + assertThat(initialSearch(searchRequest, query, remoteVersion).getParameters(), not(either(hasKey("stored_fields")).or(hasKey("fields")))); // Test stored_fields for versions that support it searchRequest = new SearchRequest().source(new SearchSourceBuilder()); searchRequest.source().storedField("_source").storedField("_id"); remoteVersion = Version.fromId(between(Version.V_5_0_0_alpha4_ID, Version.CURRENT.id)); - assertThat(initialSearchParams(searchRequest, remoteVersion), hasEntry("stored_fields", "_source,_id")); + assertThat(initialSearch(searchRequest, query, remoteVersion).getParameters(), hasEntry("stored_fields", "_source,_id")); // Test fields for versions that support it searchRequest = new SearchRequest().source(new SearchSourceBuilder()); searchRequest.source().storedField("_source").storedField("_id"); remoteVersion = Version.fromId(between(2000099, Version.V_5_0_0_alpha4_ID - 1)); - assertThat(initialSearchParams(searchRequest, remoteVersion), hasEntry("fields", "_source,_id")); + assertThat(initialSearch(searchRequest, query, remoteVersion).getParameters(), hasEntry("fields", "_source,_id")); // Test extra fields for versions that need it searchRequest = new SearchRequest().source(new SearchSourceBuilder()); searchRequest.source().storedField("_source").storedField("_id"); remoteVersion = Version.fromId(between(0, 2000099 - 1)); - assertThat(initialSearchParams(searchRequest, remoteVersion), hasEntry("fields", "_source,_id,_parent,_routing,_ttl")); + assertThat(initialSearch(searchRequest, query, remoteVersion).getParameters(), + hasEntry("fields", "_source,_id,_parent,_routing,_ttl")); // But only versions before 1.0 force _source to be in the list searchRequest = new SearchRequest().source(new SearchSourceBuilder()); searchRequest.source().storedField("_id"); remoteVersion = Version.fromId(between(1000099, 2000099 - 1)); - assertThat(initialSearchParams(searchRequest, remoteVersion), hasEntry("fields", "_id,_parent,_routing,_ttl")); + assertThat(initialSearch(searchRequest, query, remoteVersion).getParameters(), + hasEntry("fields", "_id,_parent,_routing,_ttl")); } public void testInitialSearchParamsMisc() { + BytesReference query = new BytesArray("{}"); SearchRequest searchRequest = new SearchRequest().source(new SearchSourceBuilder()); Version remoteVersion = Version.fromId(between(0, Version.CURRENT.id)); @@ -169,7 +178,7 @@ public class RemoteRequestBuildersTests extends ESTestCase { searchRequest.source().version(fetchVersion); } - Map params = initialSearchParams(searchRequest, remoteVersion); + Map params = initialSearch(searchRequest, query, remoteVersion).getParameters(); if (scroll == null) { assertThat(params, not(hasKey("scroll"))); @@ -199,7 +208,7 @@ public class RemoteRequestBuildersTests extends ESTestCase { SearchRequest searchRequest = new SearchRequest(); searchRequest.source(new SearchSourceBuilder()); String query = "{\"match_all\":{}}"; - HttpEntity entity = initialSearchEntity(searchRequest, new BytesArray(query), remoteVersion); + HttpEntity entity = initialSearch(searchRequest, new BytesArray(query), remoteVersion).getEntity(); assertEquals(ContentType.APPLICATION_JSON.toString(), entity.getContentType().getValue()); if (remoteVersion.onOrAfter(Version.fromId(1000099))) { assertEquals("{\"query\":" + query + ",\"_source\":true}", @@ -211,48 +220,51 @@ public class RemoteRequestBuildersTests extends ESTestCase { // Source filtering is included if set up searchRequest.source().fetchSource(new String[] {"in1", "in2"}, new String[] {"out"}); - entity = initialSearchEntity(searchRequest, new BytesArray(query), remoteVersion); + entity = initialSearch(searchRequest, new BytesArray(query), remoteVersion).getEntity(); assertEquals(ContentType.APPLICATION_JSON.toString(), entity.getContentType().getValue()); assertEquals("{\"query\":" + query + ",\"_source\":{\"includes\":[\"in1\",\"in2\"],\"excludes\":[\"out\"]}}", Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8))); // Invalid XContent fails RuntimeException e = expectThrows(RuntimeException.class, - () -> initialSearchEntity(searchRequest, new BytesArray("{}, \"trailing\": {}"), remoteVersion)); + () -> initialSearch(searchRequest, new BytesArray("{}, \"trailing\": {}"), remoteVersion)); assertThat(e.getCause().getMessage(), containsString("Unexpected character (',' (code 44))")); - e = expectThrows(RuntimeException.class, () -> initialSearchEntity(searchRequest, new BytesArray("{"), remoteVersion)); + e = expectThrows(RuntimeException.class, () -> initialSearch(searchRequest, new BytesArray("{"), remoteVersion)); assertThat(e.getCause().getMessage(), containsString("Unexpected end-of-input")); } public void testScrollParams() { + String scroll = randomAlphaOfLength(30); Version remoteVersion = Version.fromId(between(0, Version.CURRENT.id)); - TimeValue scroll = TimeValue.parseTimeValue(randomPositiveTimeValue(), "test"); - assertScroll(remoteVersion, scrollParams(scroll, remoteVersion), scroll); + TimeValue keepAlive = TimeValue.parseTimeValue(randomPositiveTimeValue(), "test"); + assertScroll(remoteVersion, scroll(scroll, keepAlive, remoteVersion).getParameters(), keepAlive); } public void testScrollEntity() throws IOException { String scroll = randomAlphaOfLength(30); - HttpEntity entity = scrollEntity(scroll, Version.V_5_0_0); + HttpEntity entity = scroll(scroll, timeValueMillis(between(1, 1000)), Version.V_5_0_0).getEntity(); assertEquals(ContentType.APPLICATION_JSON.toString(), entity.getContentType().getValue()); assertThat(Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)), containsString("\"" + scroll + "\"")); // Test with version < 2.0.0 - entity = scrollEntity(scroll, Version.fromId(1070499)); + entity = scroll(scroll, timeValueMillis(between(1, 1000)), Version.fromId(1070499)).getEntity(); assertEquals(ContentType.TEXT_PLAIN.toString(), entity.getContentType().getValue()); assertEquals(scroll, Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8))); } - public void testClearScrollEntity() throws IOException { + public void testClearScroll() throws IOException { String scroll = randomAlphaOfLength(30); - HttpEntity entity = clearScrollEntity(scroll, Version.V_5_0_0); - assertEquals(ContentType.APPLICATION_JSON.toString(), entity.getContentType().getValue()); - assertThat(Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)), + Request request = clearScroll(scroll, Version.V_5_0_0); + assertEquals(ContentType.APPLICATION_JSON.toString(), request.getEntity().getContentType().getValue()); + assertThat(Streams.copyToString(new InputStreamReader(request.getEntity().getContent(), StandardCharsets.UTF_8)), containsString("\"" + scroll + "\"")); + assertThat(request.getParameters().keySet(), empty()); // Test with version < 2.0.0 - entity = clearScrollEntity(scroll, Version.fromId(1070499)); - assertEquals(ContentType.TEXT_PLAIN.toString(), entity.getContentType().getValue()); - assertEquals(scroll, Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8))); + request = clearScroll(scroll, Version.fromId(1070499)); + assertEquals(ContentType.TEXT_PLAIN.toString(), request.getEntity().getContentType().getValue()); + assertEquals(scroll, Streams.copyToString(new InputStreamReader(request.getEntity().getContent(), StandardCharsets.UTF_8))); + assertThat(request.getParameters().keySet(), empty()); } }