From 827f8fcbd5addb0030e76f2f3bad8ce542f2451f Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 20 Jun 2019 17:49:11 -0400 Subject: [PATCH] Move reindex request parsing into request (#43450) Currently the fromXContent logic for reindex requests is implemented in the rest action. This is inconsistent with other requests where the logic is implemented in the request. Additionally, it requires access to the rest action in order to parse the request. This commit moves the logic and tests into the ReindexRequest. --- .../index/reindex/RestReindexAction.java | 184 +---------------- .../index/reindex/RestReindexActionTests.java | 138 +------------ .../index/reindex/ReindexRequest.java | 192 ++++++++++++++++++ .../index/reindex/ReindexRequestTests.java | 139 +++++++++++++ 4 files changed, 337 insertions(+), 316 deletions(-) diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestReindexAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestReindexAction.java index 410a92ab198..e534717e775 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestReindexAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestReindexAction.java @@ -19,89 +19,21 @@ package org.elasticsearch.index.reindex; -import org.apache.logging.log4j.LogManager; -import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.node.NodeClient; -import org.elasticsearch.common.ParseField; -import org.elasticsearch.common.Strings; -import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.ObjectParser; -import org.elasticsearch.common.xcontent.ObjectParser.ValueType; -import org.elasticsearch.common.xcontent.ToXContent; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.common.xcontent.json.JsonXContent; -import org.elasticsearch.index.VersionType; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; -import org.elasticsearch.script.Script; import java.io.IOException; -import java.io.InputStream; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.List; -import java.util.Map; -import static java.util.Collections.emptyMap; -import static java.util.Objects.requireNonNull; import static org.elasticsearch.common.unit.TimeValue.parseTimeValue; -import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.rest.RestRequest.Method.POST; /** * Expose reindex over rest. */ public class RestReindexAction extends AbstractBaseReindexRestHandler { - static final ObjectParser PARSER = new ObjectParser<>("reindex"); - static final String TYPES_DEPRECATION_MESSAGE = "[types removal] Specifying types in reindex requests is deprecated."; - private static final DeprecationLogger deprecationLogger = new DeprecationLogger(LogManager.getLogger(RestReindexAction.class)); - - static { - ObjectParser.Parser sourceParser = (parser, request, context) -> { - // Funky hack to work around Search not having a proper ObjectParser and us wanting to extract query if using remote. - Map source = parser.map(); - String[] indices = extractStringArray(source, "index"); - if (indices != null) { - request.getSearchRequest().indices(indices); - } - String[] types = extractStringArray(source, "type"); - if (types != null) { - deprecationLogger.deprecatedAndMaybeLog("reindex_with_types", TYPES_DEPRECATION_MESSAGE); - request.getSearchRequest().types(types); - } - request.setRemoteInfo(buildRemoteInfo(source)); - XContentBuilder builder = XContentFactory.contentBuilder(parser.contentType()); - builder.map(source); - try (InputStream stream = BytesReference.bytes(builder).streamInput(); - XContentParser innerParser = parser.contentType().xContent() - .createParser(parser.getXContentRegistry(), parser.getDeprecationHandler(), stream)) { - request.getSearchRequest().source().parseXContent(innerParser, false); - } - }; - - ObjectParser destParser = new ObjectParser<>("dest"); - destParser.declareString(IndexRequest::index, new ParseField("index")); - destParser.declareString((request, type) -> { - deprecationLogger.deprecatedAndMaybeLog("reindex_with_types", TYPES_DEPRECATION_MESSAGE); - request.type(type); - }, new ParseField("type")); - destParser.declareString(IndexRequest::routing, new ParseField("routing")); - destParser.declareString(IndexRequest::opType, new ParseField("op_type")); - destParser.declareString(IndexRequest::setPipeline, new ParseField("pipeline")); - destParser.declareString((s, i) -> s.versionType(VersionType.fromString(i)), new ParseField("version_type")); - - PARSER.declareField(sourceParser::parse, new ParseField("source"), ValueType.OBJECT); - PARSER.declareField((p, v, c) -> destParser.parse(p, v.getDestination(), c), new ParseField("dest"), ValueType.OBJECT); - PARSER.declareInt(RestReindexAction::setMaxDocsValidateIdentical, new ParseField("max_docs", "size")); - PARSER.declareField((p, v, c) -> v.setScript(Script.parse(p)), new ParseField("script"), - ValueType.OBJECT); - PARSER.declareString(ReindexRequest::setConflicts, new ParseField("conflicts")); - } public RestReindexAction(Settings settings, RestController controller) { super(settings, ReindexAction.INSTANCE); @@ -124,123 +56,15 @@ public class RestReindexAction extends AbstractBaseReindexRestHandler source) throws IOException { - @SuppressWarnings("unchecked") - Map remote = (Map) source.remove("remote"); - if (remote == null) { - return null; - } - String username = extractString(remote, "username"); - String password = extractString(remote, "password"); - String hostInRequest = requireNonNull(extractString(remote, "host"), "[host] must be specified to reindex from a remote cluster"); - URI uri; - try { - uri = new URI(hostInRequest); - // URI has less stringent URL parsing than our code. We want to fail if all values are not provided. - if (uri.getPort() == -1) { - throw new URISyntaxException(hostInRequest, "The port was not defined in the [host]"); - } - } catch (URISyntaxException ex) { - throw new IllegalArgumentException("[host] must be of the form [scheme]://[host]:[port](/[pathPrefix])? but was [" - + hostInRequest + "]", ex); - } - - String scheme = uri.getScheme(); - String host = uri.getHost(); - int port = uri.getPort(); - - String pathPrefix = null; - if (uri.getPath().isEmpty() == false) { - pathPrefix = uri.getPath(); - } - - Map headers = extractStringStringMap(remote, "headers"); - TimeValue socketTimeout = extractTimeValue(remote, "socket_timeout", RemoteInfo.DEFAULT_SOCKET_TIMEOUT); - TimeValue connectTimeout = extractTimeValue(remote, "connect_timeout", RemoteInfo.DEFAULT_CONNECT_TIMEOUT); - if (false == remote.isEmpty()) { - throw new IllegalArgumentException( - "Unsupported fields in [remote]: [" + Strings.collectionToCommaDelimitedString(remote.keySet()) + "]"); - } - return new RemoteInfo(scheme, host, port, pathPrefix, queryForRemote(source), - username, password, headers, socketTimeout, connectTimeout); - } - - /** - * Yank a string array from a map. Emulates XContent's permissive String to - * String array conversions. - */ - private static String[] extractStringArray(Map source, String name) { - Object value = source.remove(name); - if (value == null) { - return null; - } - if (value instanceof List) { - @SuppressWarnings("unchecked") - List list = (List) value; - return list.toArray(new String[list.size()]); - } else if (value instanceof String) { - return new String[] {(String) value}; - } else { - throw new IllegalArgumentException("Expected [" + name + "] to be a list of a string but was [" + value + ']'); - } - } - - private static String extractString(Map source, String name) { - Object value = source.remove(name); - if (value == null) { - return null; - } - if (value instanceof String) { - return (String) value; - } - throw new IllegalArgumentException("Expected [" + name + "] to be a string but was [" + value + "]"); - } - - private static Map extractStringStringMap(Map source, String name) { - Object value = source.remove(name); - if (value == null) { - return emptyMap(); - } - if (false == value instanceof Map) { - throw new IllegalArgumentException("Expected [" + name + "] to be an object containing strings but was [" + value + "]"); - } - Map map = (Map) value; - for (Map.Entry entry : map.entrySet()) { - if (false == entry.getKey() instanceof String || false == entry.getValue() instanceof String) { - throw new IllegalArgumentException("Expected [" + name + "] to be an object containing strings but has [" + entry + "]"); - } - } - @SuppressWarnings("unchecked") // We just checked.... - Map safe = (Map) map; - return safe; - } - - private static TimeValue extractTimeValue(Map source, String name, TimeValue defaultValue) { - String string = extractString(source, name); - return string == null ? defaultValue : parseTimeValue(string, name); - } - - private static BytesReference queryForRemote(Map source) throws IOException { - XContentBuilder builder = JsonXContent.contentBuilder().prettyPrint(); - Object query = source.remove("query"); - if (query == null) { - return BytesReference.bytes(matchAllQuery().toXContent(builder, ToXContent.EMPTY_PARAMS)); - } - if (!(query instanceof Map)) { - throw new IllegalArgumentException("Expected [query] to be an object but was [" + query + "]"); - } - @SuppressWarnings("unchecked") - Map map = (Map) query; - return BytesReference.bytes(builder.map(map)); - } } diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RestReindexActionTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RestReindexActionTests.java index f0aca38545b..e9c46bbb8e4 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RestReindexActionTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RestReindexActionTests.java @@ -23,7 +23,6 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.rest.RestRequest.Method; @@ -33,11 +32,8 @@ import org.junit.Before; import java.io.IOException; import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; import static java.util.Collections.singletonMap; -import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; public class RestReindexActionTests extends RestActionTestCase { @@ -48,126 +44,6 @@ public class RestReindexActionTests extends RestActionTestCase { action = new RestReindexAction(Settings.EMPTY, controller()); } - public void testBuildRemoteInfoNoRemote() throws IOException { - assertNull(RestReindexAction.buildRemoteInfo(new HashMap<>())); - } - - public void testBuildRemoteInfoFullyLoaded() throws IOException { - Map headers = new HashMap<>(); - headers.put("first", "a"); - headers.put("second", "b"); - headers.put("third", ""); - - Map remote = new HashMap<>(); - remote.put("host", "https://example.com:9200"); - remote.put("username", "testuser"); - remote.put("password", "testpass"); - remote.put("headers", headers); - remote.put("socket_timeout", "90s"); - remote.put("connect_timeout", "10s"); - - Map query = new HashMap<>(); - query.put("a", "b"); - - Map source = new HashMap<>(); - source.put("remote", remote); - source.put("query", query); - - RemoteInfo remoteInfo = RestReindexAction.buildRemoteInfo(source); - assertEquals("https", remoteInfo.getScheme()); - assertEquals("example.com", remoteInfo.getHost()); - assertEquals(9200, remoteInfo.getPort()); - assertEquals("{\n \"a\" : \"b\"\n}", remoteInfo.getQuery().utf8ToString()); - assertEquals("testuser", remoteInfo.getUsername()); - assertEquals("testpass", remoteInfo.getPassword()); - assertEquals(headers, remoteInfo.getHeaders()); - assertEquals(timeValueSeconds(90), remoteInfo.getSocketTimeout()); - assertEquals(timeValueSeconds(10), remoteInfo.getConnectTimeout()); - } - - public void testBuildRemoteInfoWithoutAllParts() throws IOException { - expectThrows(IllegalArgumentException.class, () -> buildRemoteInfoHostTestCase("example.com")); - expectThrows(IllegalArgumentException.class, () -> buildRemoteInfoHostTestCase(":9200")); - expectThrows(IllegalArgumentException.class, () -> buildRemoteInfoHostTestCase("http://:9200")); - expectThrows(IllegalArgumentException.class, () -> buildRemoteInfoHostTestCase("example.com:9200")); - expectThrows(IllegalArgumentException.class, () -> buildRemoteInfoHostTestCase("http://example.com")); - } - - public void testBuildRemoteInfoWithAllHostParts() throws IOException { - RemoteInfo info = buildRemoteInfoHostTestCase("http://example.com:9200"); - assertEquals("http", info.getScheme()); - assertEquals("example.com", info.getHost()); - assertEquals(9200, info.getPort()); - assertNull(info.getPathPrefix()); - assertEquals(RemoteInfo.DEFAULT_SOCKET_TIMEOUT, info.getSocketTimeout()); // Didn't set the timeout so we should get the default - assertEquals(RemoteInfo.DEFAULT_CONNECT_TIMEOUT, info.getConnectTimeout()); // Didn't set the timeout so we should get the default - - info = buildRemoteInfoHostTestCase("https://other.example.com:9201"); - assertEquals("https", info.getScheme()); - assertEquals("other.example.com", info.getHost()); - assertEquals(9201, info.getPort()); - assertNull(info.getPathPrefix()); - assertEquals(RemoteInfo.DEFAULT_SOCKET_TIMEOUT, info.getSocketTimeout()); - assertEquals(RemoteInfo.DEFAULT_CONNECT_TIMEOUT, info.getConnectTimeout()); - - info = buildRemoteInfoHostTestCase("https://[::1]:9201"); - assertEquals("https", info.getScheme()); - assertEquals("[::1]", info.getHost()); - assertEquals(9201, info.getPort()); - assertNull(info.getPathPrefix()); - assertEquals(RemoteInfo.DEFAULT_SOCKET_TIMEOUT, info.getSocketTimeout()); - assertEquals(RemoteInfo.DEFAULT_CONNECT_TIMEOUT, info.getConnectTimeout()); - - info = buildRemoteInfoHostTestCase("https://other.example.com:9201/"); - assertEquals("https", info.getScheme()); - assertEquals("other.example.com", info.getHost()); - assertEquals(9201, info.getPort()); - assertEquals("/", info.getPathPrefix()); - assertEquals(RemoteInfo.DEFAULT_SOCKET_TIMEOUT, info.getSocketTimeout()); - assertEquals(RemoteInfo.DEFAULT_CONNECT_TIMEOUT, info.getConnectTimeout()); - - info = buildRemoteInfoHostTestCase("https://other.example.com:9201/proxy-path/"); - assertEquals("https", info.getScheme()); - assertEquals("other.example.com", info.getHost()); - assertEquals(9201, info.getPort()); - assertEquals("/proxy-path/", info.getPathPrefix()); - assertEquals(RemoteInfo.DEFAULT_SOCKET_TIMEOUT, info.getSocketTimeout()); - assertEquals(RemoteInfo.DEFAULT_CONNECT_TIMEOUT, info.getConnectTimeout()); - - final IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, - () -> buildRemoteInfoHostTestCase("https")); - assertEquals("[host] must be of the form [scheme]://[host]:[port](/[pathPrefix])? but was [https]", - exception.getMessage()); - } - - public void testReindexFromRemoteRequestParsing() throws IOException { - BytesReference request; - try (XContentBuilder b = JsonXContent.contentBuilder()) { - b.startObject(); { - b.startObject("source"); { - b.startObject("remote"); { - b.field("host", "http://localhost:9200"); - } - b.endObject(); - b.field("index", "source"); - } - b.endObject(); - b.startObject("dest"); { - b.field("index", "dest"); - } - b.endObject(); - } - b.endObject(); - request = BytesReference.bytes(b); - } - try (XContentParser p = createParser(JsonXContent.jsonXContent, request)) { - ReindexRequest r = new ReindexRequest(); - RestReindexAction.PARSER.parse(p, r, null); - assertEquals("localhost", r.getRemoteInfo().getHost()); - assertArrayEquals(new String[] {"source"}, r.getSearchRequest().indices()); - } - } - public void testPipelineQueryParameterIsError() throws IOException { FakeRestRequest.Builder request = new FakeRestRequest.Builder(xContentRegistry()); try (XContentBuilder body = JsonXContent.contentBuilder().prettyPrint()) { @@ -206,16 +82,6 @@ public class RestReindexActionTests extends RestActionTestCase { } } - private RemoteInfo buildRemoteInfoHostTestCase(String hostInRest) throws IOException { - Map remote = new HashMap<>(); - remote.put("host", hostInRest); - - Map source = new HashMap<>(); - source.put("remote", remote); - - return RestReindexAction.buildRemoteInfo(source); - } - /** * test deprecation is logged if one or more types are used in source search request inside reindex */ @@ -234,7 +100,7 @@ public class RestReindexActionTests extends RestActionTestCase { b.endObject(); requestBuilder.withContent(new BytesArray(BytesReference.bytes(b).toBytesRef()), XContentType.JSON); dispatchRequest(requestBuilder.build()); - assertWarnings(RestReindexAction.TYPES_DEPRECATION_MESSAGE); + assertWarnings(ReindexRequest.TYPES_DEPRECATION_MESSAGE); } /** @@ -255,6 +121,6 @@ public class RestReindexActionTests extends RestActionTestCase { b.endObject(); requestBuilder.withContent(new BytesArray(BytesReference.bytes(b).toBytesRef()), XContentType.JSON); dispatchRequest(requestBuilder.build()); - assertWarnings(RestReindexAction.TYPES_DEPRECATION_MESSAGE); + assertWarnings(ReindexRequest.TYPES_DEPRECATION_MESSAGE); } } diff --git a/server/src/main/java/org/elasticsearch/index/reindex/ReindexRequest.java b/server/src/main/java/org/elasticsearch/index/reindex/ReindexRequest.java index 3aa501819b6..60fe95fedf5 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/ReindexRequest.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/ReindexRequest.java @@ -19,25 +19,46 @@ package org.elasticsearch.index.reindex; +import org.apache.logging.log4j.LogManager; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.CompositeIndicesRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.lucene.uid.Versions; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.script.Script; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.tasks.TaskId; import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.List; +import java.util.Map; +import static java.util.Collections.emptyMap; +import static java.util.Objects.requireNonNull; import static org.elasticsearch.action.ValidateActions.addValidationError; +import static org.elasticsearch.common.unit.TimeValue.parseTimeValue; import static org.elasticsearch.index.VersionType.INTERNAL; +import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; /** * Request to reindex some documents from one index to another. This implements CompositeIndicesRequest but in a misleading way. Rather than @@ -335,4 +356,175 @@ public class ReindexRequest extends AbstractBulkIndexByScrollRequest PARSER = new ObjectParser<>("reindex"); + static final String TYPES_DEPRECATION_MESSAGE = "[types removal] Specifying types in reindex requests is deprecated."; + private static final DeprecationLogger deprecationLogger = new DeprecationLogger(LogManager.getLogger(ReindexRequest.class)); + + static { + ObjectParser.Parser sourceParser = (parser, request, context) -> { + // Funky hack to work around Search not having a proper ObjectParser and us wanting to extract query if using remote. + Map source = parser.map(); + String[] indices = extractStringArray(source, "index"); + if (indices != null) { + request.getSearchRequest().indices(indices); + } + String[] types = extractStringArray(source, "type"); + if (types != null) { + deprecationLogger.deprecatedAndMaybeLog("reindex_with_types", TYPES_DEPRECATION_MESSAGE); + request.getSearchRequest().types(types); + } + request.setRemoteInfo(buildRemoteInfo(source)); + XContentBuilder builder = XContentFactory.contentBuilder(parser.contentType()); + builder.map(source); + try (InputStream stream = BytesReference.bytes(builder).streamInput(); + XContentParser innerParser = parser.contentType().xContent() + .createParser(parser.getXContentRegistry(), parser.getDeprecationHandler(), stream)) { + request.getSearchRequest().source().parseXContent(innerParser, false); + } + }; + + ObjectParser destParser = new ObjectParser<>("dest"); + destParser.declareString(IndexRequest::index, new ParseField("index")); + destParser.declareString((request, type) -> { + deprecationLogger.deprecatedAndMaybeLog("reindex_with_types", TYPES_DEPRECATION_MESSAGE); + request.type(type); + }, new ParseField("type")); + destParser.declareString(IndexRequest::routing, new ParseField("routing")); + destParser.declareString(IndexRequest::opType, new ParseField("op_type")); + destParser.declareString(IndexRequest::setPipeline, new ParseField("pipeline")); + destParser.declareString((s, i) -> s.versionType(VersionType.fromString(i)), new ParseField("version_type")); + + PARSER.declareField(sourceParser::parse, new ParseField("source"), ObjectParser.ValueType.OBJECT); + PARSER.declareField((p, v, c) -> destParser.parse(p, v.getDestination(), c), new ParseField("dest"), ObjectParser.ValueType.OBJECT); + PARSER.declareInt(ReindexRequest::setMaxDocsValidateIdentical, new ParseField("max_docs", "size")); + PARSER.declareField((p, v, c) -> v.setScript(Script.parse(p)), new ParseField("script"), + ObjectParser.ValueType.OBJECT); + PARSER.declareString(ReindexRequest::setConflicts, new ParseField("conflicts")); + } + + public static ReindexRequest fromXContent(XContentParser parser) throws IOException { + ReindexRequest reindexRequest = new ReindexRequest(); + PARSER.parse(parser, reindexRequest, null); + return reindexRequest; + } + + /** + * Yank a string array from a map. Emulates XContent's permissive String to + * String array conversions. + */ + private static String[] extractStringArray(Map source, String name) { + Object value = source.remove(name); + if (value == null) { + return null; + } + if (value instanceof List) { + @SuppressWarnings("unchecked") + List list = (List) value; + return list.toArray(new String[list.size()]); + } else if (value instanceof String) { + return new String[] {(String) value}; + } else { + throw new IllegalArgumentException("Expected [" + name + "] to be a list of a string but was [" + value + ']'); + } + } + + static RemoteInfo buildRemoteInfo(Map source) throws IOException { + @SuppressWarnings("unchecked") + Map remote = (Map) source.remove("remote"); + if (remote == null) { + return null; + } + String username = extractString(remote, "username"); + String password = extractString(remote, "password"); + String hostInRequest = requireNonNull(extractString(remote, "host"), "[host] must be specified to reindex from a remote cluster"); + URI uri; + try { + uri = new URI(hostInRequest); + // URI has less stringent URL parsing than our code. We want to fail if all values are not provided. + if (uri.getPort() == -1) { + throw new URISyntaxException(hostInRequest, "The port was not defined in the [host]"); + } + } catch (URISyntaxException ex) { + throw new IllegalArgumentException("[host] must be of the form [scheme]://[host]:[port](/[pathPrefix])? but was [" + + hostInRequest + "]", ex); + } + + String scheme = uri.getScheme(); + String host = uri.getHost(); + int port = uri.getPort(); + + String pathPrefix = null; + if (uri.getPath().isEmpty() == false) { + pathPrefix = uri.getPath(); + } + + Map headers = extractStringStringMap(remote, "headers"); + TimeValue socketTimeout = extractTimeValue(remote, "socket_timeout", RemoteInfo.DEFAULT_SOCKET_TIMEOUT); + TimeValue connectTimeout = extractTimeValue(remote, "connect_timeout", RemoteInfo.DEFAULT_CONNECT_TIMEOUT); + if (false == remote.isEmpty()) { + throw new IllegalArgumentException( + "Unsupported fields in [remote]: [" + Strings.collectionToCommaDelimitedString(remote.keySet()) + "]"); + } + return new RemoteInfo(scheme, host, port, pathPrefix, queryForRemote(source), + username, password, headers, socketTimeout, connectTimeout); + } + + private static String extractString(Map source, String name) { + Object value = source.remove(name); + if (value == null) { + return null; + } + if (value instanceof String) { + return (String) value; + } + throw new IllegalArgumentException("Expected [" + name + "] to be a string but was [" + value + "]"); + } + + private static Map extractStringStringMap(Map source, String name) { + Object value = source.remove(name); + if (value == null) { + return emptyMap(); + } + if (false == value instanceof Map) { + throw new IllegalArgumentException("Expected [" + name + "] to be an object containing strings but was [" + value + "]"); + } + Map map = (Map) value; + for (Map.Entry entry : map.entrySet()) { + if (false == entry.getKey() instanceof String || false == entry.getValue() instanceof String) { + throw new IllegalArgumentException("Expected [" + name + "] to be an object containing strings but has [" + entry + "]"); + } + } + @SuppressWarnings("unchecked") // We just checked.... + Map safe = (Map) map; + return safe; + } + + private static TimeValue extractTimeValue(Map source, String name, TimeValue defaultValue) { + String string = extractString(source, name); + return string == null ? defaultValue : parseTimeValue(string, name); + } + + private static BytesReference queryForRemote(Map source) throws IOException { + XContentBuilder builder = JsonXContent.contentBuilder().prettyPrint(); + Object query = source.remove("query"); + if (query == null) { + return BytesReference.bytes(matchAllQuery().toXContent(builder, ToXContent.EMPTY_PARAMS)); + } + if (!(query instanceof Map)) { + throw new IllegalArgumentException("Expected [query] to be an object but was [" + query + "]"); + } + @SuppressWarnings("unchecked") + Map map = (Map) query; + return BytesReference.bytes(builder.map(map)); + } + + static void setMaxDocsValidateIdentical(AbstractBulkByScrollRequest request, int maxDocs) { + if (request.getMaxDocs() != AbstractBulkByScrollRequest.MAX_DOCS_ALL_MATCHES && request.getMaxDocs() != maxDocs) { + throw new IllegalArgumentException("[max_docs] set to two different values [" + request.getMaxDocs() + "]" + + " and [" + maxDocs + "]"); + } else { + request.setMaxDocs(maxDocs); + } + } } diff --git a/server/src/test/java/org/elasticsearch/index/reindex/ReindexRequestTests.java b/server/src/test/java/org/elasticsearch/index/reindex/ReindexRequestTests.java index 1c3d539263e..c0333cab98c 100644 --- a/server/src/test/java/org/elasticsearch/index/reindex/ReindexRequestTests.java +++ b/server/src/test/java/org/elasticsearch/index/reindex/ReindexRequestTests.java @@ -21,10 +21,19 @@ package org.elasticsearch.index.reindex; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.search.slice.SliceBuilder; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + import static java.util.Collections.emptyMap; import static org.elasticsearch.common.unit.TimeValue.parseTimeValue; +import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; /** @@ -92,4 +101,134 @@ public class ReindexRequestTests extends AbstractBulkByScrollRequestTestCase())); + } + + public void testBuildRemoteInfoFullyLoaded() throws IOException { + Map headers = new HashMap<>(); + headers.put("first", "a"); + headers.put("second", "b"); + headers.put("third", ""); + + Map remote = new HashMap<>(); + remote.put("host", "https://example.com:9200"); + remote.put("username", "testuser"); + remote.put("password", "testpass"); + remote.put("headers", headers); + remote.put("socket_timeout", "90s"); + remote.put("connect_timeout", "10s"); + + Map query = new HashMap<>(); + query.put("a", "b"); + + Map source = new HashMap<>(); + source.put("remote", remote); + source.put("query", query); + + RemoteInfo remoteInfo = ReindexRequest.buildRemoteInfo(source); + assertEquals("https", remoteInfo.getScheme()); + assertEquals("example.com", remoteInfo.getHost()); + assertEquals(9200, remoteInfo.getPort()); + assertEquals("{\n \"a\" : \"b\"\n}", remoteInfo.getQuery().utf8ToString()); + assertEquals("testuser", remoteInfo.getUsername()); + assertEquals("testpass", remoteInfo.getPassword()); + assertEquals(headers, remoteInfo.getHeaders()); + assertEquals(timeValueSeconds(90), remoteInfo.getSocketTimeout()); + assertEquals(timeValueSeconds(10), remoteInfo.getConnectTimeout()); + } + + public void testBuildRemoteInfoWithoutAllParts() throws IOException { + expectThrows(IllegalArgumentException.class, () -> buildRemoteInfoHostTestCase("example.com")); + expectThrows(IllegalArgumentException.class, () -> buildRemoteInfoHostTestCase(":9200")); + expectThrows(IllegalArgumentException.class, () -> buildRemoteInfoHostTestCase("http://:9200")); + expectThrows(IllegalArgumentException.class, () -> buildRemoteInfoHostTestCase("example.com:9200")); + expectThrows(IllegalArgumentException.class, () -> buildRemoteInfoHostTestCase("http://example.com")); + } + + public void testBuildRemoteInfoWithAllHostParts() throws IOException { + RemoteInfo info = buildRemoteInfoHostTestCase("http://example.com:9200"); + assertEquals("http", info.getScheme()); + assertEquals("example.com", info.getHost()); + assertEquals(9200, info.getPort()); + assertNull(info.getPathPrefix()); + assertEquals(RemoteInfo.DEFAULT_SOCKET_TIMEOUT, info.getSocketTimeout()); // Didn't set the timeout so we should get the default + assertEquals(RemoteInfo.DEFAULT_CONNECT_TIMEOUT, info.getConnectTimeout()); // Didn't set the timeout so we should get the default + + info = buildRemoteInfoHostTestCase("https://other.example.com:9201"); + assertEquals("https", info.getScheme()); + assertEquals("other.example.com", info.getHost()); + assertEquals(9201, info.getPort()); + assertNull(info.getPathPrefix()); + assertEquals(RemoteInfo.DEFAULT_SOCKET_TIMEOUT, info.getSocketTimeout()); + assertEquals(RemoteInfo.DEFAULT_CONNECT_TIMEOUT, info.getConnectTimeout()); + + info = buildRemoteInfoHostTestCase("https://[::1]:9201"); + assertEquals("https", info.getScheme()); + assertEquals("[::1]", info.getHost()); + assertEquals(9201, info.getPort()); + assertNull(info.getPathPrefix()); + assertEquals(RemoteInfo.DEFAULT_SOCKET_TIMEOUT, info.getSocketTimeout()); + assertEquals(RemoteInfo.DEFAULT_CONNECT_TIMEOUT, info.getConnectTimeout()); + + info = buildRemoteInfoHostTestCase("https://other.example.com:9201/"); + assertEquals("https", info.getScheme()); + assertEquals("other.example.com", info.getHost()); + assertEquals(9201, info.getPort()); + assertEquals("/", info.getPathPrefix()); + assertEquals(RemoteInfo.DEFAULT_SOCKET_TIMEOUT, info.getSocketTimeout()); + assertEquals(RemoteInfo.DEFAULT_CONNECT_TIMEOUT, info.getConnectTimeout()); + + info = buildRemoteInfoHostTestCase("https://other.example.com:9201/proxy-path/"); + assertEquals("https", info.getScheme()); + assertEquals("other.example.com", info.getHost()); + assertEquals(9201, info.getPort()); + assertEquals("/proxy-path/", info.getPathPrefix()); + assertEquals(RemoteInfo.DEFAULT_SOCKET_TIMEOUT, info.getSocketTimeout()); + assertEquals(RemoteInfo.DEFAULT_CONNECT_TIMEOUT, info.getConnectTimeout()); + + final IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, + () -> buildRemoteInfoHostTestCase("https")); + assertEquals("[host] must be of the form [scheme]://[host]:[port](/[pathPrefix])? but was [https]", + exception.getMessage()); + } + + public void testReindexFromRemoteRequestParsing() throws IOException { + BytesReference request; + try (XContentBuilder b = JsonXContent.contentBuilder()) { + b.startObject(); { + b.startObject("source"); { + b.startObject("remote"); { + b.field("host", "http://localhost:9200"); + } + b.endObject(); + b.field("index", "source"); + } + b.endObject(); + b.startObject("dest"); { + b.field("index", "dest"); + } + b.endObject(); + } + b.endObject(); + request = BytesReference.bytes(b); + } + try (XContentParser p = createParser(JsonXContent.jsonXContent, request)) { + ReindexRequest r = ReindexRequest.fromXContent(p); + assertEquals("localhost", r.getRemoteInfo().getHost()); + assertArrayEquals(new String[] {"source"}, r.getSearchRequest().indices()); + } + } + + private RemoteInfo buildRemoteInfoHostTestCase(String hostInRest) throws IOException { + Map remote = new HashMap<>(); + remote.put("host", hostInRest); + + Map source = new HashMap<>(); + source.put("remote", remote); + + return ReindexRequest.buildRemoteInfo(source); + } + }