diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestReindexAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestReindexAction.java index 410a92ab198..e534717e775 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestReindexAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestReindexAction.java @@ -19,89 +19,21 @@ package org.elasticsearch.index.reindex; -import org.apache.logging.log4j.LogManager; -import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.node.NodeClient; -import org.elasticsearch.common.ParseField; -import org.elasticsearch.common.Strings; -import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.ObjectParser; -import org.elasticsearch.common.xcontent.ObjectParser.ValueType; -import org.elasticsearch.common.xcontent.ToXContent; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.common.xcontent.json.JsonXContent; -import org.elasticsearch.index.VersionType; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; -import org.elasticsearch.script.Script; import java.io.IOException; -import java.io.InputStream; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.List; -import java.util.Map; -import static java.util.Collections.emptyMap; -import static java.util.Objects.requireNonNull; import static org.elasticsearch.common.unit.TimeValue.parseTimeValue; -import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.rest.RestRequest.Method.POST; /** * Expose reindex over rest. */ public class RestReindexAction extends AbstractBaseReindexRestHandler { - static final ObjectParser PARSER = new ObjectParser<>("reindex"); - static final String TYPES_DEPRECATION_MESSAGE = "[types removal] Specifying types in reindex requests is deprecated."; - private static final DeprecationLogger deprecationLogger = new DeprecationLogger(LogManager.getLogger(RestReindexAction.class)); - - static { - ObjectParser.Parser sourceParser = (parser, request, context) -> { - // Funky hack to work around Search not having a proper ObjectParser and us wanting to extract query if using remote. - Map source = parser.map(); - String[] indices = extractStringArray(source, "index"); - if (indices != null) { - request.getSearchRequest().indices(indices); - } - String[] types = extractStringArray(source, "type"); - if (types != null) { - deprecationLogger.deprecatedAndMaybeLog("reindex_with_types", TYPES_DEPRECATION_MESSAGE); - request.getSearchRequest().types(types); - } - request.setRemoteInfo(buildRemoteInfo(source)); - XContentBuilder builder = XContentFactory.contentBuilder(parser.contentType()); - builder.map(source); - try (InputStream stream = BytesReference.bytes(builder).streamInput(); - XContentParser innerParser = parser.contentType().xContent() - .createParser(parser.getXContentRegistry(), parser.getDeprecationHandler(), stream)) { - request.getSearchRequest().source().parseXContent(innerParser, false); - } - }; - - ObjectParser destParser = new ObjectParser<>("dest"); - destParser.declareString(IndexRequest::index, new ParseField("index")); - destParser.declareString((request, type) -> { - deprecationLogger.deprecatedAndMaybeLog("reindex_with_types", TYPES_DEPRECATION_MESSAGE); - request.type(type); - }, new ParseField("type")); - destParser.declareString(IndexRequest::routing, new ParseField("routing")); - destParser.declareString(IndexRequest::opType, new ParseField("op_type")); - destParser.declareString(IndexRequest::setPipeline, new ParseField("pipeline")); - destParser.declareString((s, i) -> s.versionType(VersionType.fromString(i)), new ParseField("version_type")); - - PARSER.declareField(sourceParser::parse, new ParseField("source"), ValueType.OBJECT); - PARSER.declareField((p, v, c) -> destParser.parse(p, v.getDestination(), c), new ParseField("dest"), ValueType.OBJECT); - PARSER.declareInt(RestReindexAction::setMaxDocsValidateIdentical, new ParseField("max_docs", "size")); - PARSER.declareField((p, v, c) -> v.setScript(Script.parse(p)), new ParseField("script"), - ValueType.OBJECT); - PARSER.declareString(ReindexRequest::setConflicts, new ParseField("conflicts")); - } public RestReindexAction(Settings settings, RestController controller) { super(settings, ReindexAction.INSTANCE); @@ -124,123 +56,15 @@ public class RestReindexAction extends AbstractBaseReindexRestHandler source) throws IOException { - @SuppressWarnings("unchecked") - Map remote = (Map) source.remove("remote"); - if (remote == null) { - return null; - } - String username = extractString(remote, "username"); - String password = extractString(remote, "password"); - String hostInRequest = requireNonNull(extractString(remote, "host"), "[host] must be specified to reindex from a remote cluster"); - URI uri; - try { - uri = new URI(hostInRequest); - // URI has less stringent URL parsing than our code. We want to fail if all values are not provided. - if (uri.getPort() == -1) { - throw new URISyntaxException(hostInRequest, "The port was not defined in the [host]"); - } - } catch (URISyntaxException ex) { - throw new IllegalArgumentException("[host] must be of the form [scheme]://[host]:[port](/[pathPrefix])? but was [" - + hostInRequest + "]", ex); - } - - String scheme = uri.getScheme(); - String host = uri.getHost(); - int port = uri.getPort(); - - String pathPrefix = null; - if (uri.getPath().isEmpty() == false) { - pathPrefix = uri.getPath(); - } - - Map headers = extractStringStringMap(remote, "headers"); - TimeValue socketTimeout = extractTimeValue(remote, "socket_timeout", RemoteInfo.DEFAULT_SOCKET_TIMEOUT); - TimeValue connectTimeout = extractTimeValue(remote, "connect_timeout", RemoteInfo.DEFAULT_CONNECT_TIMEOUT); - if (false == remote.isEmpty()) { - throw new IllegalArgumentException( - "Unsupported fields in [remote]: [" + Strings.collectionToCommaDelimitedString(remote.keySet()) + "]"); - } - return new RemoteInfo(scheme, host, port, pathPrefix, queryForRemote(source), - username, password, headers, socketTimeout, connectTimeout); - } - - /** - * Yank a string array from a map. Emulates XContent's permissive String to - * String array conversions. - */ - private static String[] extractStringArray(Map source, String name) { - Object value = source.remove(name); - if (value == null) { - return null; - } - if (value instanceof List) { - @SuppressWarnings("unchecked") - List list = (List) value; - return list.toArray(new String[list.size()]); - } else if (value instanceof String) { - return new String[] {(String) value}; - } else { - throw new IllegalArgumentException("Expected [" + name + "] to be a list of a string but was [" + value + ']'); - } - } - - private static String extractString(Map source, String name) { - Object value = source.remove(name); - if (value == null) { - return null; - } - if (value instanceof String) { - return (String) value; - } - throw new IllegalArgumentException("Expected [" + name + "] to be a string but was [" + value + "]"); - } - - private static Map extractStringStringMap(Map source, String name) { - Object value = source.remove(name); - if (value == null) { - return emptyMap(); - } - if (false == value instanceof Map) { - throw new IllegalArgumentException("Expected [" + name + "] to be an object containing strings but was [" + value + "]"); - } - Map map = (Map) value; - for (Map.Entry entry : map.entrySet()) { - if (false == entry.getKey() instanceof String || false == entry.getValue() instanceof String) { - throw new IllegalArgumentException("Expected [" + name + "] to be an object containing strings but has [" + entry + "]"); - } - } - @SuppressWarnings("unchecked") // We just checked.... - Map safe = (Map) map; - return safe; - } - - private static TimeValue extractTimeValue(Map source, String name, TimeValue defaultValue) { - String string = extractString(source, name); - return string == null ? defaultValue : parseTimeValue(string, name); - } - - private static BytesReference queryForRemote(Map source) throws IOException { - XContentBuilder builder = JsonXContent.contentBuilder().prettyPrint(); - Object query = source.remove("query"); - if (query == null) { - return BytesReference.bytes(matchAllQuery().toXContent(builder, ToXContent.EMPTY_PARAMS)); - } - if (!(query instanceof Map)) { - throw new IllegalArgumentException("Expected [query] to be an object but was [" + query + "]"); - } - @SuppressWarnings("unchecked") - Map map = (Map) query; - return BytesReference.bytes(builder.map(map)); - } } diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RestReindexActionTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RestReindexActionTests.java index f0aca38545b..e9c46bbb8e4 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RestReindexActionTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RestReindexActionTests.java @@ -23,7 +23,6 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.rest.RestRequest.Method; @@ -33,11 +32,8 @@ import org.junit.Before; import java.io.IOException; import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; import static java.util.Collections.singletonMap; -import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; public class RestReindexActionTests extends RestActionTestCase { @@ -48,126 +44,6 @@ public class RestReindexActionTests extends RestActionTestCase { action = new RestReindexAction(Settings.EMPTY, controller()); } - public void testBuildRemoteInfoNoRemote() throws IOException { - assertNull(RestReindexAction.buildRemoteInfo(new HashMap<>())); - } - - public void testBuildRemoteInfoFullyLoaded() throws IOException { - Map headers = new HashMap<>(); - headers.put("first", "a"); - headers.put("second", "b"); - headers.put("third", ""); - - Map remote = new HashMap<>(); - remote.put("host", "https://example.com:9200"); - remote.put("username", "testuser"); - remote.put("password", "testpass"); - remote.put("headers", headers); - remote.put("socket_timeout", "90s"); - remote.put("connect_timeout", "10s"); - - Map query = new HashMap<>(); - query.put("a", "b"); - - Map source = new HashMap<>(); - source.put("remote", remote); - source.put("query", query); - - RemoteInfo remoteInfo = RestReindexAction.buildRemoteInfo(source); - assertEquals("https", remoteInfo.getScheme()); - assertEquals("example.com", remoteInfo.getHost()); - assertEquals(9200, remoteInfo.getPort()); - assertEquals("{\n \"a\" : \"b\"\n}", remoteInfo.getQuery().utf8ToString()); - assertEquals("testuser", remoteInfo.getUsername()); - assertEquals("testpass", remoteInfo.getPassword()); - assertEquals(headers, remoteInfo.getHeaders()); - assertEquals(timeValueSeconds(90), remoteInfo.getSocketTimeout()); - assertEquals(timeValueSeconds(10), remoteInfo.getConnectTimeout()); - } - - public void testBuildRemoteInfoWithoutAllParts() throws IOException { - expectThrows(IllegalArgumentException.class, () -> buildRemoteInfoHostTestCase("example.com")); - expectThrows(IllegalArgumentException.class, () -> buildRemoteInfoHostTestCase(":9200")); - expectThrows(IllegalArgumentException.class, () -> buildRemoteInfoHostTestCase("http://:9200")); - expectThrows(IllegalArgumentException.class, () -> buildRemoteInfoHostTestCase("example.com:9200")); - expectThrows(IllegalArgumentException.class, () -> buildRemoteInfoHostTestCase("http://example.com")); - } - - public void testBuildRemoteInfoWithAllHostParts() throws IOException { - RemoteInfo info = buildRemoteInfoHostTestCase("http://example.com:9200"); - assertEquals("http", info.getScheme()); - assertEquals("example.com", info.getHost()); - assertEquals(9200, info.getPort()); - assertNull(info.getPathPrefix()); - assertEquals(RemoteInfo.DEFAULT_SOCKET_TIMEOUT, info.getSocketTimeout()); // Didn't set the timeout so we should get the default - assertEquals(RemoteInfo.DEFAULT_CONNECT_TIMEOUT, info.getConnectTimeout()); // Didn't set the timeout so we should get the default - - info = buildRemoteInfoHostTestCase("https://other.example.com:9201"); - assertEquals("https", info.getScheme()); - assertEquals("other.example.com", info.getHost()); - assertEquals(9201, info.getPort()); - assertNull(info.getPathPrefix()); - assertEquals(RemoteInfo.DEFAULT_SOCKET_TIMEOUT, info.getSocketTimeout()); - assertEquals(RemoteInfo.DEFAULT_CONNECT_TIMEOUT, info.getConnectTimeout()); - - info = buildRemoteInfoHostTestCase("https://[::1]:9201"); - assertEquals("https", info.getScheme()); - assertEquals("[::1]", info.getHost()); - assertEquals(9201, info.getPort()); - assertNull(info.getPathPrefix()); - assertEquals(RemoteInfo.DEFAULT_SOCKET_TIMEOUT, info.getSocketTimeout()); - assertEquals(RemoteInfo.DEFAULT_CONNECT_TIMEOUT, info.getConnectTimeout()); - - info = buildRemoteInfoHostTestCase("https://other.example.com:9201/"); - assertEquals("https", info.getScheme()); - assertEquals("other.example.com", info.getHost()); - assertEquals(9201, info.getPort()); - assertEquals("/", info.getPathPrefix()); - assertEquals(RemoteInfo.DEFAULT_SOCKET_TIMEOUT, info.getSocketTimeout()); - assertEquals(RemoteInfo.DEFAULT_CONNECT_TIMEOUT, info.getConnectTimeout()); - - info = buildRemoteInfoHostTestCase("https://other.example.com:9201/proxy-path/"); - assertEquals("https", info.getScheme()); - assertEquals("other.example.com", info.getHost()); - assertEquals(9201, info.getPort()); - assertEquals("/proxy-path/", info.getPathPrefix()); - assertEquals(RemoteInfo.DEFAULT_SOCKET_TIMEOUT, info.getSocketTimeout()); - assertEquals(RemoteInfo.DEFAULT_CONNECT_TIMEOUT, info.getConnectTimeout()); - - final IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, - () -> buildRemoteInfoHostTestCase("https")); - assertEquals("[host] must be of the form [scheme]://[host]:[port](/[pathPrefix])? but was [https]", - exception.getMessage()); - } - - public void testReindexFromRemoteRequestParsing() throws IOException { - BytesReference request; - try (XContentBuilder b = JsonXContent.contentBuilder()) { - b.startObject(); { - b.startObject("source"); { - b.startObject("remote"); { - b.field("host", "http://localhost:9200"); - } - b.endObject(); - b.field("index", "source"); - } - b.endObject(); - b.startObject("dest"); { - b.field("index", "dest"); - } - b.endObject(); - } - b.endObject(); - request = BytesReference.bytes(b); - } - try (XContentParser p = createParser(JsonXContent.jsonXContent, request)) { - ReindexRequest r = new ReindexRequest(); - RestReindexAction.PARSER.parse(p, r, null); - assertEquals("localhost", r.getRemoteInfo().getHost()); - assertArrayEquals(new String[] {"source"}, r.getSearchRequest().indices()); - } - } - public void testPipelineQueryParameterIsError() throws IOException { FakeRestRequest.Builder request = new FakeRestRequest.Builder(xContentRegistry()); try (XContentBuilder body = JsonXContent.contentBuilder().prettyPrint()) { @@ -206,16 +82,6 @@ public class RestReindexActionTests extends RestActionTestCase { } } - private RemoteInfo buildRemoteInfoHostTestCase(String hostInRest) throws IOException { - Map remote = new HashMap<>(); - remote.put("host", hostInRest); - - Map source = new HashMap<>(); - source.put("remote", remote); - - return RestReindexAction.buildRemoteInfo(source); - } - /** * test deprecation is logged if one or more types are used in source search request inside reindex */ @@ -234,7 +100,7 @@ public class RestReindexActionTests extends RestActionTestCase { b.endObject(); requestBuilder.withContent(new BytesArray(BytesReference.bytes(b).toBytesRef()), XContentType.JSON); dispatchRequest(requestBuilder.build()); - assertWarnings(RestReindexAction.TYPES_DEPRECATION_MESSAGE); + assertWarnings(ReindexRequest.TYPES_DEPRECATION_MESSAGE); } /** @@ -255,6 +121,6 @@ public class RestReindexActionTests extends RestActionTestCase { b.endObject(); requestBuilder.withContent(new BytesArray(BytesReference.bytes(b).toBytesRef()), XContentType.JSON); dispatchRequest(requestBuilder.build()); - assertWarnings(RestReindexAction.TYPES_DEPRECATION_MESSAGE); + assertWarnings(ReindexRequest.TYPES_DEPRECATION_MESSAGE); } } diff --git a/server/src/main/java/org/elasticsearch/index/reindex/ReindexRequest.java b/server/src/main/java/org/elasticsearch/index/reindex/ReindexRequest.java index 3aa501819b6..60fe95fedf5 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/ReindexRequest.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/ReindexRequest.java @@ -19,25 +19,46 @@ package org.elasticsearch.index.reindex; +import org.apache.logging.log4j.LogManager; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.CompositeIndicesRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.lucene.uid.Versions; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.script.Script; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.tasks.TaskId; import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.List; +import java.util.Map; +import static java.util.Collections.emptyMap; +import static java.util.Objects.requireNonNull; import static org.elasticsearch.action.ValidateActions.addValidationError; +import static org.elasticsearch.common.unit.TimeValue.parseTimeValue; import static org.elasticsearch.index.VersionType.INTERNAL; +import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; /** * Request to reindex some documents from one index to another. This implements CompositeIndicesRequest but in a misleading way. Rather than @@ -335,4 +356,175 @@ public class ReindexRequest extends AbstractBulkIndexByScrollRequest PARSER = new ObjectParser<>("reindex"); + static final String TYPES_DEPRECATION_MESSAGE = "[types removal] Specifying types in reindex requests is deprecated."; + private static final DeprecationLogger deprecationLogger = new DeprecationLogger(LogManager.getLogger(ReindexRequest.class)); + + static { + ObjectParser.Parser sourceParser = (parser, request, context) -> { + // Funky hack to work around Search not having a proper ObjectParser and us wanting to extract query if using remote. + Map source = parser.map(); + String[] indices = extractStringArray(source, "index"); + if (indices != null) { + request.getSearchRequest().indices(indices); + } + String[] types = extractStringArray(source, "type"); + if (types != null) { + deprecationLogger.deprecatedAndMaybeLog("reindex_with_types", TYPES_DEPRECATION_MESSAGE); + request.getSearchRequest().types(types); + } + request.setRemoteInfo(buildRemoteInfo(source)); + XContentBuilder builder = XContentFactory.contentBuilder(parser.contentType()); + builder.map(source); + try (InputStream stream = BytesReference.bytes(builder).streamInput(); + XContentParser innerParser = parser.contentType().xContent() + .createParser(parser.getXContentRegistry(), parser.getDeprecationHandler(), stream)) { + request.getSearchRequest().source().parseXContent(innerParser, false); + } + }; + + ObjectParser destParser = new ObjectParser<>("dest"); + destParser.declareString(IndexRequest::index, new ParseField("index")); + destParser.declareString((request, type) -> { + deprecationLogger.deprecatedAndMaybeLog("reindex_with_types", TYPES_DEPRECATION_MESSAGE); + request.type(type); + }, new ParseField("type")); + destParser.declareString(IndexRequest::routing, new ParseField("routing")); + destParser.declareString(IndexRequest::opType, new ParseField("op_type")); + destParser.declareString(IndexRequest::setPipeline, new ParseField("pipeline")); + destParser.declareString((s, i) -> s.versionType(VersionType.fromString(i)), new ParseField("version_type")); + + PARSER.declareField(sourceParser::parse, new ParseField("source"), ObjectParser.ValueType.OBJECT); + PARSER.declareField((p, v, c) -> destParser.parse(p, v.getDestination(), c), new ParseField("dest"), ObjectParser.ValueType.OBJECT); + PARSER.declareInt(ReindexRequest::setMaxDocsValidateIdentical, new ParseField("max_docs", "size")); + PARSER.declareField((p, v, c) -> v.setScript(Script.parse(p)), new ParseField("script"), + ObjectParser.ValueType.OBJECT); + PARSER.declareString(ReindexRequest::setConflicts, new ParseField("conflicts")); + } + + public static ReindexRequest fromXContent(XContentParser parser) throws IOException { + ReindexRequest reindexRequest = new ReindexRequest(); + PARSER.parse(parser, reindexRequest, null); + return reindexRequest; + } + + /** + * Yank a string array from a map. Emulates XContent's permissive String to + * String array conversions. + */ + private static String[] extractStringArray(Map source, String name) { + Object value = source.remove(name); + if (value == null) { + return null; + } + if (value instanceof List) { + @SuppressWarnings("unchecked") + List list = (List) value; + return list.toArray(new String[list.size()]); + } else if (value instanceof String) { + return new String[] {(String) value}; + } else { + throw new IllegalArgumentException("Expected [" + name + "] to be a list of a string but was [" + value + ']'); + } + } + + static RemoteInfo buildRemoteInfo(Map source) throws IOException { + @SuppressWarnings("unchecked") + Map remote = (Map) source.remove("remote"); + if (remote == null) { + return null; + } + String username = extractString(remote, "username"); + String password = extractString(remote, "password"); + String hostInRequest = requireNonNull(extractString(remote, "host"), "[host] must be specified to reindex from a remote cluster"); + URI uri; + try { + uri = new URI(hostInRequest); + // URI has less stringent URL parsing than our code. We want to fail if all values are not provided. + if (uri.getPort() == -1) { + throw new URISyntaxException(hostInRequest, "The port was not defined in the [host]"); + } + } catch (URISyntaxException ex) { + throw new IllegalArgumentException("[host] must be of the form [scheme]://[host]:[port](/[pathPrefix])? but was [" + + hostInRequest + "]", ex); + } + + String scheme = uri.getScheme(); + String host = uri.getHost(); + int port = uri.getPort(); + + String pathPrefix = null; + if (uri.getPath().isEmpty() == false) { + pathPrefix = uri.getPath(); + } + + Map headers = extractStringStringMap(remote, "headers"); + TimeValue socketTimeout = extractTimeValue(remote, "socket_timeout", RemoteInfo.DEFAULT_SOCKET_TIMEOUT); + TimeValue connectTimeout = extractTimeValue(remote, "connect_timeout", RemoteInfo.DEFAULT_CONNECT_TIMEOUT); + if (false == remote.isEmpty()) { + throw new IllegalArgumentException( + "Unsupported fields in [remote]: [" + Strings.collectionToCommaDelimitedString(remote.keySet()) + "]"); + } + return new RemoteInfo(scheme, host, port, pathPrefix, queryForRemote(source), + username, password, headers, socketTimeout, connectTimeout); + } + + private static String extractString(Map source, String name) { + Object value = source.remove(name); + if (value == null) { + return null; + } + if (value instanceof String) { + return (String) value; + } + throw new IllegalArgumentException("Expected [" + name + "] to be a string but was [" + value + "]"); + } + + private static Map extractStringStringMap(Map source, String name) { + Object value = source.remove(name); + if (value == null) { + return emptyMap(); + } + if (false == value instanceof Map) { + throw new IllegalArgumentException("Expected [" + name + "] to be an object containing strings but was [" + value + "]"); + } + Map map = (Map) value; + for (Map.Entry entry : map.entrySet()) { + if (false == entry.getKey() instanceof String || false == entry.getValue() instanceof String) { + throw new IllegalArgumentException("Expected [" + name + "] to be an object containing strings but has [" + entry + "]"); + } + } + @SuppressWarnings("unchecked") // We just checked.... + Map safe = (Map) map; + return safe; + } + + private static TimeValue extractTimeValue(Map source, String name, TimeValue defaultValue) { + String string = extractString(source, name); + return string == null ? defaultValue : parseTimeValue(string, name); + } + + private static BytesReference queryForRemote(Map source) throws IOException { + XContentBuilder builder = JsonXContent.contentBuilder().prettyPrint(); + Object query = source.remove("query"); + if (query == null) { + return BytesReference.bytes(matchAllQuery().toXContent(builder, ToXContent.EMPTY_PARAMS)); + } + if (!(query instanceof Map)) { + throw new IllegalArgumentException("Expected [query] to be an object but was [" + query + "]"); + } + @SuppressWarnings("unchecked") + Map map = (Map) query; + return BytesReference.bytes(builder.map(map)); + } + + static void setMaxDocsValidateIdentical(AbstractBulkByScrollRequest request, int maxDocs) { + if (request.getMaxDocs() != AbstractBulkByScrollRequest.MAX_DOCS_ALL_MATCHES && request.getMaxDocs() != maxDocs) { + throw new IllegalArgumentException("[max_docs] set to two different values [" + request.getMaxDocs() + "]" + + " and [" + maxDocs + "]"); + } else { + request.setMaxDocs(maxDocs); + } + } } diff --git a/server/src/test/java/org/elasticsearch/index/reindex/ReindexRequestTests.java b/server/src/test/java/org/elasticsearch/index/reindex/ReindexRequestTests.java index 1c3d539263e..c0333cab98c 100644 --- a/server/src/test/java/org/elasticsearch/index/reindex/ReindexRequestTests.java +++ b/server/src/test/java/org/elasticsearch/index/reindex/ReindexRequestTests.java @@ -21,10 +21,19 @@ package org.elasticsearch.index.reindex; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.search.slice.SliceBuilder; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + import static java.util.Collections.emptyMap; import static org.elasticsearch.common.unit.TimeValue.parseTimeValue; +import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; /** @@ -92,4 +101,134 @@ public class ReindexRequestTests extends AbstractBulkByScrollRequestTestCase())); + } + + public void testBuildRemoteInfoFullyLoaded() throws IOException { + Map headers = new HashMap<>(); + headers.put("first", "a"); + headers.put("second", "b"); + headers.put("third", ""); + + Map remote = new HashMap<>(); + remote.put("host", "https://example.com:9200"); + remote.put("username", "testuser"); + remote.put("password", "testpass"); + remote.put("headers", headers); + remote.put("socket_timeout", "90s"); + remote.put("connect_timeout", "10s"); + + Map query = new HashMap<>(); + query.put("a", "b"); + + Map source = new HashMap<>(); + source.put("remote", remote); + source.put("query", query); + + RemoteInfo remoteInfo = ReindexRequest.buildRemoteInfo(source); + assertEquals("https", remoteInfo.getScheme()); + assertEquals("example.com", remoteInfo.getHost()); + assertEquals(9200, remoteInfo.getPort()); + assertEquals("{\n \"a\" : \"b\"\n}", remoteInfo.getQuery().utf8ToString()); + assertEquals("testuser", remoteInfo.getUsername()); + assertEquals("testpass", remoteInfo.getPassword()); + assertEquals(headers, remoteInfo.getHeaders()); + assertEquals(timeValueSeconds(90), remoteInfo.getSocketTimeout()); + assertEquals(timeValueSeconds(10), remoteInfo.getConnectTimeout()); + } + + public void testBuildRemoteInfoWithoutAllParts() throws IOException { + expectThrows(IllegalArgumentException.class, () -> buildRemoteInfoHostTestCase("example.com")); + expectThrows(IllegalArgumentException.class, () -> buildRemoteInfoHostTestCase(":9200")); + expectThrows(IllegalArgumentException.class, () -> buildRemoteInfoHostTestCase("http://:9200")); + expectThrows(IllegalArgumentException.class, () -> buildRemoteInfoHostTestCase("example.com:9200")); + expectThrows(IllegalArgumentException.class, () -> buildRemoteInfoHostTestCase("http://example.com")); + } + + public void testBuildRemoteInfoWithAllHostParts() throws IOException { + RemoteInfo info = buildRemoteInfoHostTestCase("http://example.com:9200"); + assertEquals("http", info.getScheme()); + assertEquals("example.com", info.getHost()); + assertEquals(9200, info.getPort()); + assertNull(info.getPathPrefix()); + assertEquals(RemoteInfo.DEFAULT_SOCKET_TIMEOUT, info.getSocketTimeout()); // Didn't set the timeout so we should get the default + assertEquals(RemoteInfo.DEFAULT_CONNECT_TIMEOUT, info.getConnectTimeout()); // Didn't set the timeout so we should get the default + + info = buildRemoteInfoHostTestCase("https://other.example.com:9201"); + assertEquals("https", info.getScheme()); + assertEquals("other.example.com", info.getHost()); + assertEquals(9201, info.getPort()); + assertNull(info.getPathPrefix()); + assertEquals(RemoteInfo.DEFAULT_SOCKET_TIMEOUT, info.getSocketTimeout()); + assertEquals(RemoteInfo.DEFAULT_CONNECT_TIMEOUT, info.getConnectTimeout()); + + info = buildRemoteInfoHostTestCase("https://[::1]:9201"); + assertEquals("https", info.getScheme()); + assertEquals("[::1]", info.getHost()); + assertEquals(9201, info.getPort()); + assertNull(info.getPathPrefix()); + assertEquals(RemoteInfo.DEFAULT_SOCKET_TIMEOUT, info.getSocketTimeout()); + assertEquals(RemoteInfo.DEFAULT_CONNECT_TIMEOUT, info.getConnectTimeout()); + + info = buildRemoteInfoHostTestCase("https://other.example.com:9201/"); + assertEquals("https", info.getScheme()); + assertEquals("other.example.com", info.getHost()); + assertEquals(9201, info.getPort()); + assertEquals("/", info.getPathPrefix()); + assertEquals(RemoteInfo.DEFAULT_SOCKET_TIMEOUT, info.getSocketTimeout()); + assertEquals(RemoteInfo.DEFAULT_CONNECT_TIMEOUT, info.getConnectTimeout()); + + info = buildRemoteInfoHostTestCase("https://other.example.com:9201/proxy-path/"); + assertEquals("https", info.getScheme()); + assertEquals("other.example.com", info.getHost()); + assertEquals(9201, info.getPort()); + assertEquals("/proxy-path/", info.getPathPrefix()); + assertEquals(RemoteInfo.DEFAULT_SOCKET_TIMEOUT, info.getSocketTimeout()); + assertEquals(RemoteInfo.DEFAULT_CONNECT_TIMEOUT, info.getConnectTimeout()); + + final IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, + () -> buildRemoteInfoHostTestCase("https")); + assertEquals("[host] must be of the form [scheme]://[host]:[port](/[pathPrefix])? but was [https]", + exception.getMessage()); + } + + public void testReindexFromRemoteRequestParsing() throws IOException { + BytesReference request; + try (XContentBuilder b = JsonXContent.contentBuilder()) { + b.startObject(); { + b.startObject("source"); { + b.startObject("remote"); { + b.field("host", "http://localhost:9200"); + } + b.endObject(); + b.field("index", "source"); + } + b.endObject(); + b.startObject("dest"); { + b.field("index", "dest"); + } + b.endObject(); + } + b.endObject(); + request = BytesReference.bytes(b); + } + try (XContentParser p = createParser(JsonXContent.jsonXContent, request)) { + ReindexRequest r = ReindexRequest.fromXContent(p); + assertEquals("localhost", r.getRemoteInfo().getHost()); + assertArrayEquals(new String[] {"source"}, r.getSearchRequest().indices()); + } + } + + private RemoteInfo buildRemoteInfoHostTestCase(String hostInRest) throws IOException { + Map remote = new HashMap<>(); + remote.put("host", hostInRest); + + Map source = new HashMap<>(); + source.put("remote", remote); + + return ReindexRequest.buildRemoteInfo(source); + } + }