Move reindex request parsing into request (#43450)
Currently the fromXContent logic for reindex requests is implemented in the rest action. This is inconsistent with other requests where the logic is implemented in the request. Additionally, it requires access to the rest action in order to parse the request. This commit moves the logic and tests into the ReindexRequest.
This commit is contained in:
parent
7b0a259b2c
commit
827f8fcbd5
|
@ -19,89 +19,21 @@
|
|||
|
||||
package org.elasticsearch.index.reindex;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.client.node.NodeClient;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.logging.DeprecationLogger;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.ObjectParser;
|
||||
import org.elasticsearch.common.xcontent.ObjectParser.ValueType;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.rest.RestController;
|
||||
import org.elasticsearch.rest.RestRequest;
|
||||
import org.elasticsearch.script.Script;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static java.util.Objects.requireNonNull;
|
||||
import static org.elasticsearch.common.unit.TimeValue.parseTimeValue;
|
||||
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
|
||||
import static org.elasticsearch.rest.RestRequest.Method.POST;
|
||||
|
||||
/**
|
||||
* Expose reindex over rest.
|
||||
*/
|
||||
public class RestReindexAction extends AbstractBaseReindexRestHandler<ReindexRequest, ReindexAction> {
|
||||
static final ObjectParser<ReindexRequest, Void> PARSER = new ObjectParser<>("reindex");
|
||||
static final String TYPES_DEPRECATION_MESSAGE = "[types removal] Specifying types in reindex requests is deprecated.";
|
||||
private static final DeprecationLogger deprecationLogger = new DeprecationLogger(LogManager.getLogger(RestReindexAction.class));
|
||||
|
||||
static {
|
||||
ObjectParser.Parser<ReindexRequest, Void> sourceParser = (parser, request, context) -> {
|
||||
// Funky hack to work around Search not having a proper ObjectParser and us wanting to extract query if using remote.
|
||||
Map<String, Object> source = parser.map();
|
||||
String[] indices = extractStringArray(source, "index");
|
||||
if (indices != null) {
|
||||
request.getSearchRequest().indices(indices);
|
||||
}
|
||||
String[] types = extractStringArray(source, "type");
|
||||
if (types != null) {
|
||||
deprecationLogger.deprecatedAndMaybeLog("reindex_with_types", TYPES_DEPRECATION_MESSAGE);
|
||||
request.getSearchRequest().types(types);
|
||||
}
|
||||
request.setRemoteInfo(buildRemoteInfo(source));
|
||||
XContentBuilder builder = XContentFactory.contentBuilder(parser.contentType());
|
||||
builder.map(source);
|
||||
try (InputStream stream = BytesReference.bytes(builder).streamInput();
|
||||
XContentParser innerParser = parser.contentType().xContent()
|
||||
.createParser(parser.getXContentRegistry(), parser.getDeprecationHandler(), stream)) {
|
||||
request.getSearchRequest().source().parseXContent(innerParser, false);
|
||||
}
|
||||
};
|
||||
|
||||
ObjectParser<IndexRequest, Void> destParser = new ObjectParser<>("dest");
|
||||
destParser.declareString(IndexRequest::index, new ParseField("index"));
|
||||
destParser.declareString((request, type) -> {
|
||||
deprecationLogger.deprecatedAndMaybeLog("reindex_with_types", TYPES_DEPRECATION_MESSAGE);
|
||||
request.type(type);
|
||||
}, new ParseField("type"));
|
||||
destParser.declareString(IndexRequest::routing, new ParseField("routing"));
|
||||
destParser.declareString(IndexRequest::opType, new ParseField("op_type"));
|
||||
destParser.declareString(IndexRequest::setPipeline, new ParseField("pipeline"));
|
||||
destParser.declareString((s, i) -> s.versionType(VersionType.fromString(i)), new ParseField("version_type"));
|
||||
|
||||
PARSER.declareField(sourceParser::parse, new ParseField("source"), ValueType.OBJECT);
|
||||
PARSER.declareField((p, v, c) -> destParser.parse(p, v.getDestination(), c), new ParseField("dest"), ValueType.OBJECT);
|
||||
PARSER.declareInt(RestReindexAction::setMaxDocsValidateIdentical, new ParseField("max_docs", "size"));
|
||||
PARSER.declareField((p, v, c) -> v.setScript(Script.parse(p)), new ParseField("script"),
|
||||
ValueType.OBJECT);
|
||||
PARSER.declareString(ReindexRequest::setConflicts, new ParseField("conflicts"));
|
||||
}
|
||||
|
||||
public RestReindexAction(Settings settings, RestController controller) {
|
||||
super(settings, ReindexAction.INSTANCE);
|
||||
|
@ -124,123 +56,15 @@ public class RestReindexAction extends AbstractBaseReindexRestHandler<ReindexReq
|
|||
throw new IllegalArgumentException("_reindex doesn't support [pipeline] as a query parameter. "
|
||||
+ "Specify it in the [dest] object instead.");
|
||||
}
|
||||
ReindexRequest internal = new ReindexRequest();
|
||||
|
||||
ReindexRequest internal;
|
||||
try (XContentParser parser = request.contentParser()) {
|
||||
PARSER.parse(parser, internal, null);
|
||||
internal = ReindexRequest.fromXContent(parser);
|
||||
}
|
||||
|
||||
if (request.hasParam("scroll")) {
|
||||
internal.setScroll(parseTimeValue(request.param("scroll"), "scroll"));
|
||||
}
|
||||
return internal;
|
||||
}
|
||||
|
||||
static RemoteInfo buildRemoteInfo(Map<String, Object> source) throws IOException {
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, Object> remote = (Map<String, Object>) source.remove("remote");
|
||||
if (remote == null) {
|
||||
return null;
|
||||
}
|
||||
String username = extractString(remote, "username");
|
||||
String password = extractString(remote, "password");
|
||||
String hostInRequest = requireNonNull(extractString(remote, "host"), "[host] must be specified to reindex from a remote cluster");
|
||||
URI uri;
|
||||
try {
|
||||
uri = new URI(hostInRequest);
|
||||
// URI has less stringent URL parsing than our code. We want to fail if all values are not provided.
|
||||
if (uri.getPort() == -1) {
|
||||
throw new URISyntaxException(hostInRequest, "The port was not defined in the [host]");
|
||||
}
|
||||
} catch (URISyntaxException ex) {
|
||||
throw new IllegalArgumentException("[host] must be of the form [scheme]://[host]:[port](/[pathPrefix])? but was ["
|
||||
+ hostInRequest + "]", ex);
|
||||
}
|
||||
|
||||
String scheme = uri.getScheme();
|
||||
String host = uri.getHost();
|
||||
int port = uri.getPort();
|
||||
|
||||
String pathPrefix = null;
|
||||
if (uri.getPath().isEmpty() == false) {
|
||||
pathPrefix = uri.getPath();
|
||||
}
|
||||
|
||||
Map<String, String> headers = extractStringStringMap(remote, "headers");
|
||||
TimeValue socketTimeout = extractTimeValue(remote, "socket_timeout", RemoteInfo.DEFAULT_SOCKET_TIMEOUT);
|
||||
TimeValue connectTimeout = extractTimeValue(remote, "connect_timeout", RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
|
||||
if (false == remote.isEmpty()) {
|
||||
throw new IllegalArgumentException(
|
||||
"Unsupported fields in [remote]: [" + Strings.collectionToCommaDelimitedString(remote.keySet()) + "]");
|
||||
}
|
||||
return new RemoteInfo(scheme, host, port, pathPrefix, queryForRemote(source),
|
||||
username, password, headers, socketTimeout, connectTimeout);
|
||||
}
|
||||
|
||||
/**
|
||||
* Yank a string array from a map. Emulates XContent's permissive String to
|
||||
* String array conversions.
|
||||
*/
|
||||
private static String[] extractStringArray(Map<String, Object> source, String name) {
|
||||
Object value = source.remove(name);
|
||||
if (value == null) {
|
||||
return null;
|
||||
}
|
||||
if (value instanceof List) {
|
||||
@SuppressWarnings("unchecked")
|
||||
List<String> list = (List<String>) value;
|
||||
return list.toArray(new String[list.size()]);
|
||||
} else if (value instanceof String) {
|
||||
return new String[] {(String) value};
|
||||
} else {
|
||||
throw new IllegalArgumentException("Expected [" + name + "] to be a list of a string but was [" + value + ']');
|
||||
}
|
||||
}
|
||||
|
||||
private static String extractString(Map<String, Object> source, String name) {
|
||||
Object value = source.remove(name);
|
||||
if (value == null) {
|
||||
return null;
|
||||
}
|
||||
if (value instanceof String) {
|
||||
return (String) value;
|
||||
}
|
||||
throw new IllegalArgumentException("Expected [" + name + "] to be a string but was [" + value + "]");
|
||||
}
|
||||
|
||||
private static Map<String, String> extractStringStringMap(Map<String, Object> source, String name) {
|
||||
Object value = source.remove(name);
|
||||
if (value == null) {
|
||||
return emptyMap();
|
||||
}
|
||||
if (false == value instanceof Map) {
|
||||
throw new IllegalArgumentException("Expected [" + name + "] to be an object containing strings but was [" + value + "]");
|
||||
}
|
||||
Map<?, ?> map = (Map<?, ?>) value;
|
||||
for (Map.Entry<?, ?> entry : map.entrySet()) {
|
||||
if (false == entry.getKey() instanceof String || false == entry.getValue() instanceof String) {
|
||||
throw new IllegalArgumentException("Expected [" + name + "] to be an object containing strings but has [" + entry + "]");
|
||||
}
|
||||
}
|
||||
@SuppressWarnings("unchecked") // We just checked....
|
||||
Map<String, String> safe = (Map<String, String>) map;
|
||||
return safe;
|
||||
}
|
||||
|
||||
private static TimeValue extractTimeValue(Map<String, Object> source, String name, TimeValue defaultValue) {
|
||||
String string = extractString(source, name);
|
||||
return string == null ? defaultValue : parseTimeValue(string, name);
|
||||
}
|
||||
|
||||
private static BytesReference queryForRemote(Map<String, Object> source) throws IOException {
|
||||
XContentBuilder builder = JsonXContent.contentBuilder().prettyPrint();
|
||||
Object query = source.remove("query");
|
||||
if (query == null) {
|
||||
return BytesReference.bytes(matchAllQuery().toXContent(builder, ToXContent.EMPTY_PARAMS));
|
||||
}
|
||||
if (!(query instanceof Map)) {
|
||||
throw new IllegalArgumentException("Expected [query] to be an object but was [" + query + "]");
|
||||
}
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, Object> map = (Map<String, Object>) query;
|
||||
return BytesReference.bytes(builder.map(map));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,7 +23,6 @@ import org.elasticsearch.common.bytes.BytesArray;
|
|||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.rest.RestRequest.Method;
|
||||
|
@ -33,11 +32,8 @@ import org.junit.Before;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import static java.util.Collections.singletonMap;
|
||||
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
|
||||
|
||||
public class RestReindexActionTests extends RestActionTestCase {
|
||||
|
||||
|
@ -48,126 +44,6 @@ public class RestReindexActionTests extends RestActionTestCase {
|
|||
action = new RestReindexAction(Settings.EMPTY, controller());
|
||||
}
|
||||
|
||||
public void testBuildRemoteInfoNoRemote() throws IOException {
|
||||
assertNull(RestReindexAction.buildRemoteInfo(new HashMap<>()));
|
||||
}
|
||||
|
||||
public void testBuildRemoteInfoFullyLoaded() throws IOException {
|
||||
Map<String, String> headers = new HashMap<>();
|
||||
headers.put("first", "a");
|
||||
headers.put("second", "b");
|
||||
headers.put("third", "");
|
||||
|
||||
Map<String, Object> remote = new HashMap<>();
|
||||
remote.put("host", "https://example.com:9200");
|
||||
remote.put("username", "testuser");
|
||||
remote.put("password", "testpass");
|
||||
remote.put("headers", headers);
|
||||
remote.put("socket_timeout", "90s");
|
||||
remote.put("connect_timeout", "10s");
|
||||
|
||||
Map<String, Object> query = new HashMap<>();
|
||||
query.put("a", "b");
|
||||
|
||||
Map<String, Object> source = new HashMap<>();
|
||||
source.put("remote", remote);
|
||||
source.put("query", query);
|
||||
|
||||
RemoteInfo remoteInfo = RestReindexAction.buildRemoteInfo(source);
|
||||
assertEquals("https", remoteInfo.getScheme());
|
||||
assertEquals("example.com", remoteInfo.getHost());
|
||||
assertEquals(9200, remoteInfo.getPort());
|
||||
assertEquals("{\n \"a\" : \"b\"\n}", remoteInfo.getQuery().utf8ToString());
|
||||
assertEquals("testuser", remoteInfo.getUsername());
|
||||
assertEquals("testpass", remoteInfo.getPassword());
|
||||
assertEquals(headers, remoteInfo.getHeaders());
|
||||
assertEquals(timeValueSeconds(90), remoteInfo.getSocketTimeout());
|
||||
assertEquals(timeValueSeconds(10), remoteInfo.getConnectTimeout());
|
||||
}
|
||||
|
||||
public void testBuildRemoteInfoWithoutAllParts() throws IOException {
|
||||
expectThrows(IllegalArgumentException.class, () -> buildRemoteInfoHostTestCase("example.com"));
|
||||
expectThrows(IllegalArgumentException.class, () -> buildRemoteInfoHostTestCase(":9200"));
|
||||
expectThrows(IllegalArgumentException.class, () -> buildRemoteInfoHostTestCase("http://:9200"));
|
||||
expectThrows(IllegalArgumentException.class, () -> buildRemoteInfoHostTestCase("example.com:9200"));
|
||||
expectThrows(IllegalArgumentException.class, () -> buildRemoteInfoHostTestCase("http://example.com"));
|
||||
}
|
||||
|
||||
public void testBuildRemoteInfoWithAllHostParts() throws IOException {
|
||||
RemoteInfo info = buildRemoteInfoHostTestCase("http://example.com:9200");
|
||||
assertEquals("http", info.getScheme());
|
||||
assertEquals("example.com", info.getHost());
|
||||
assertEquals(9200, info.getPort());
|
||||
assertNull(info.getPathPrefix());
|
||||
assertEquals(RemoteInfo.DEFAULT_SOCKET_TIMEOUT, info.getSocketTimeout()); // Didn't set the timeout so we should get the default
|
||||
assertEquals(RemoteInfo.DEFAULT_CONNECT_TIMEOUT, info.getConnectTimeout()); // Didn't set the timeout so we should get the default
|
||||
|
||||
info = buildRemoteInfoHostTestCase("https://other.example.com:9201");
|
||||
assertEquals("https", info.getScheme());
|
||||
assertEquals("other.example.com", info.getHost());
|
||||
assertEquals(9201, info.getPort());
|
||||
assertNull(info.getPathPrefix());
|
||||
assertEquals(RemoteInfo.DEFAULT_SOCKET_TIMEOUT, info.getSocketTimeout());
|
||||
assertEquals(RemoteInfo.DEFAULT_CONNECT_TIMEOUT, info.getConnectTimeout());
|
||||
|
||||
info = buildRemoteInfoHostTestCase("https://[::1]:9201");
|
||||
assertEquals("https", info.getScheme());
|
||||
assertEquals("[::1]", info.getHost());
|
||||
assertEquals(9201, info.getPort());
|
||||
assertNull(info.getPathPrefix());
|
||||
assertEquals(RemoteInfo.DEFAULT_SOCKET_TIMEOUT, info.getSocketTimeout());
|
||||
assertEquals(RemoteInfo.DEFAULT_CONNECT_TIMEOUT, info.getConnectTimeout());
|
||||
|
||||
info = buildRemoteInfoHostTestCase("https://other.example.com:9201/");
|
||||
assertEquals("https", info.getScheme());
|
||||
assertEquals("other.example.com", info.getHost());
|
||||
assertEquals(9201, info.getPort());
|
||||
assertEquals("/", info.getPathPrefix());
|
||||
assertEquals(RemoteInfo.DEFAULT_SOCKET_TIMEOUT, info.getSocketTimeout());
|
||||
assertEquals(RemoteInfo.DEFAULT_CONNECT_TIMEOUT, info.getConnectTimeout());
|
||||
|
||||
info = buildRemoteInfoHostTestCase("https://other.example.com:9201/proxy-path/");
|
||||
assertEquals("https", info.getScheme());
|
||||
assertEquals("other.example.com", info.getHost());
|
||||
assertEquals(9201, info.getPort());
|
||||
assertEquals("/proxy-path/", info.getPathPrefix());
|
||||
assertEquals(RemoteInfo.DEFAULT_SOCKET_TIMEOUT, info.getSocketTimeout());
|
||||
assertEquals(RemoteInfo.DEFAULT_CONNECT_TIMEOUT, info.getConnectTimeout());
|
||||
|
||||
final IllegalArgumentException exception = expectThrows(IllegalArgumentException.class,
|
||||
() -> buildRemoteInfoHostTestCase("https"));
|
||||
assertEquals("[host] must be of the form [scheme]://[host]:[port](/[pathPrefix])? but was [https]",
|
||||
exception.getMessage());
|
||||
}
|
||||
|
||||
public void testReindexFromRemoteRequestParsing() throws IOException {
|
||||
BytesReference request;
|
||||
try (XContentBuilder b = JsonXContent.contentBuilder()) {
|
||||
b.startObject(); {
|
||||
b.startObject("source"); {
|
||||
b.startObject("remote"); {
|
||||
b.field("host", "http://localhost:9200");
|
||||
}
|
||||
b.endObject();
|
||||
b.field("index", "source");
|
||||
}
|
||||
b.endObject();
|
||||
b.startObject("dest"); {
|
||||
b.field("index", "dest");
|
||||
}
|
||||
b.endObject();
|
||||
}
|
||||
b.endObject();
|
||||
request = BytesReference.bytes(b);
|
||||
}
|
||||
try (XContentParser p = createParser(JsonXContent.jsonXContent, request)) {
|
||||
ReindexRequest r = new ReindexRequest();
|
||||
RestReindexAction.PARSER.parse(p, r, null);
|
||||
assertEquals("localhost", r.getRemoteInfo().getHost());
|
||||
assertArrayEquals(new String[] {"source"}, r.getSearchRequest().indices());
|
||||
}
|
||||
}
|
||||
|
||||
public void testPipelineQueryParameterIsError() throws IOException {
|
||||
FakeRestRequest.Builder request = new FakeRestRequest.Builder(xContentRegistry());
|
||||
try (XContentBuilder body = JsonXContent.contentBuilder().prettyPrint()) {
|
||||
|
@ -206,16 +82,6 @@ public class RestReindexActionTests extends RestActionTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
private RemoteInfo buildRemoteInfoHostTestCase(String hostInRest) throws IOException {
|
||||
Map<String, Object> remote = new HashMap<>();
|
||||
remote.put("host", hostInRest);
|
||||
|
||||
Map<String, Object> source = new HashMap<>();
|
||||
source.put("remote", remote);
|
||||
|
||||
return RestReindexAction.buildRemoteInfo(source);
|
||||
}
|
||||
|
||||
/**
|
||||
* test deprecation is logged if one or more types are used in source search request inside reindex
|
||||
*/
|
||||
|
@ -234,7 +100,7 @@ public class RestReindexActionTests extends RestActionTestCase {
|
|||
b.endObject();
|
||||
requestBuilder.withContent(new BytesArray(BytesReference.bytes(b).toBytesRef()), XContentType.JSON);
|
||||
dispatchRequest(requestBuilder.build());
|
||||
assertWarnings(RestReindexAction.TYPES_DEPRECATION_MESSAGE);
|
||||
assertWarnings(ReindexRequest.TYPES_DEPRECATION_MESSAGE);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -255,6 +121,6 @@ public class RestReindexActionTests extends RestActionTestCase {
|
|||
b.endObject();
|
||||
requestBuilder.withContent(new BytesArray(BytesReference.bytes(b).toBytesRef()), XContentType.JSON);
|
||||
dispatchRequest(requestBuilder.build());
|
||||
assertWarnings(RestReindexAction.TYPES_DEPRECATION_MESSAGE);
|
||||
assertWarnings(ReindexRequest.TYPES_DEPRECATION_MESSAGE);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,25 +19,46 @@
|
|||
|
||||
package org.elasticsearch.index.reindex;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.CompositeIndicesRequest;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.logging.DeprecationLogger;
|
||||
import org.elasticsearch.common.lucene.uid.Versions;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.ObjectParser;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.index.mapper.MapperService;
|
||||
import org.elasticsearch.index.query.QueryBuilder;
|
||||
import org.elasticsearch.script.Script;
|
||||
import org.elasticsearch.search.sort.SortOrder;
|
||||
import org.elasticsearch.tasks.TaskId;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static java.util.Objects.requireNonNull;
|
||||
import static org.elasticsearch.action.ValidateActions.addValidationError;
|
||||
import static org.elasticsearch.common.unit.TimeValue.parseTimeValue;
|
||||
import static org.elasticsearch.index.VersionType.INTERNAL;
|
||||
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
|
||||
|
||||
/**
|
||||
* Request to reindex some documents from one index to another. This implements CompositeIndicesRequest but in a misleading way. Rather than
|
||||
|
@ -335,4 +356,175 @@ public class ReindexRequest extends AbstractBulkIndexByScrollRequest<ReindexRequ
|
|||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
static final ObjectParser<ReindexRequest, Void> PARSER = new ObjectParser<>("reindex");
|
||||
static final String TYPES_DEPRECATION_MESSAGE = "[types removal] Specifying types in reindex requests is deprecated.";
|
||||
private static final DeprecationLogger deprecationLogger = new DeprecationLogger(LogManager.getLogger(ReindexRequest.class));
|
||||
|
||||
static {
|
||||
ObjectParser.Parser<ReindexRequest, Void> sourceParser = (parser, request, context) -> {
|
||||
// Funky hack to work around Search not having a proper ObjectParser and us wanting to extract query if using remote.
|
||||
Map<String, Object> source = parser.map();
|
||||
String[] indices = extractStringArray(source, "index");
|
||||
if (indices != null) {
|
||||
request.getSearchRequest().indices(indices);
|
||||
}
|
||||
String[] types = extractStringArray(source, "type");
|
||||
if (types != null) {
|
||||
deprecationLogger.deprecatedAndMaybeLog("reindex_with_types", TYPES_DEPRECATION_MESSAGE);
|
||||
request.getSearchRequest().types(types);
|
||||
}
|
||||
request.setRemoteInfo(buildRemoteInfo(source));
|
||||
XContentBuilder builder = XContentFactory.contentBuilder(parser.contentType());
|
||||
builder.map(source);
|
||||
try (InputStream stream = BytesReference.bytes(builder).streamInput();
|
||||
XContentParser innerParser = parser.contentType().xContent()
|
||||
.createParser(parser.getXContentRegistry(), parser.getDeprecationHandler(), stream)) {
|
||||
request.getSearchRequest().source().parseXContent(innerParser, false);
|
||||
}
|
||||
};
|
||||
|
||||
ObjectParser<IndexRequest, Void> destParser = new ObjectParser<>("dest");
|
||||
destParser.declareString(IndexRequest::index, new ParseField("index"));
|
||||
destParser.declareString((request, type) -> {
|
||||
deprecationLogger.deprecatedAndMaybeLog("reindex_with_types", TYPES_DEPRECATION_MESSAGE);
|
||||
request.type(type);
|
||||
}, new ParseField("type"));
|
||||
destParser.declareString(IndexRequest::routing, new ParseField("routing"));
|
||||
destParser.declareString(IndexRequest::opType, new ParseField("op_type"));
|
||||
destParser.declareString(IndexRequest::setPipeline, new ParseField("pipeline"));
|
||||
destParser.declareString((s, i) -> s.versionType(VersionType.fromString(i)), new ParseField("version_type"));
|
||||
|
||||
PARSER.declareField(sourceParser::parse, new ParseField("source"), ObjectParser.ValueType.OBJECT);
|
||||
PARSER.declareField((p, v, c) -> destParser.parse(p, v.getDestination(), c), new ParseField("dest"), ObjectParser.ValueType.OBJECT);
|
||||
PARSER.declareInt(ReindexRequest::setMaxDocsValidateIdentical, new ParseField("max_docs", "size"));
|
||||
PARSER.declareField((p, v, c) -> v.setScript(Script.parse(p)), new ParseField("script"),
|
||||
ObjectParser.ValueType.OBJECT);
|
||||
PARSER.declareString(ReindexRequest::setConflicts, new ParseField("conflicts"));
|
||||
}
|
||||
|
||||
public static ReindexRequest fromXContent(XContentParser parser) throws IOException {
|
||||
ReindexRequest reindexRequest = new ReindexRequest();
|
||||
PARSER.parse(parser, reindexRequest, null);
|
||||
return reindexRequest;
|
||||
}
|
||||
|
||||
/**
|
||||
* Yank a string array from a map. Emulates XContent's permissive String to
|
||||
* String array conversions.
|
||||
*/
|
||||
private static String[] extractStringArray(Map<String, Object> source, String name) {
|
||||
Object value = source.remove(name);
|
||||
if (value == null) {
|
||||
return null;
|
||||
}
|
||||
if (value instanceof List) {
|
||||
@SuppressWarnings("unchecked")
|
||||
List<String> list = (List<String>) value;
|
||||
return list.toArray(new String[list.size()]);
|
||||
} else if (value instanceof String) {
|
||||
return new String[] {(String) value};
|
||||
} else {
|
||||
throw new IllegalArgumentException("Expected [" + name + "] to be a list of a string but was [" + value + ']');
|
||||
}
|
||||
}
|
||||
|
||||
static RemoteInfo buildRemoteInfo(Map<String, Object> source) throws IOException {
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, Object> remote = (Map<String, Object>) source.remove("remote");
|
||||
if (remote == null) {
|
||||
return null;
|
||||
}
|
||||
String username = extractString(remote, "username");
|
||||
String password = extractString(remote, "password");
|
||||
String hostInRequest = requireNonNull(extractString(remote, "host"), "[host] must be specified to reindex from a remote cluster");
|
||||
URI uri;
|
||||
try {
|
||||
uri = new URI(hostInRequest);
|
||||
// URI has less stringent URL parsing than our code. We want to fail if all values are not provided.
|
||||
if (uri.getPort() == -1) {
|
||||
throw new URISyntaxException(hostInRequest, "The port was not defined in the [host]");
|
||||
}
|
||||
} catch (URISyntaxException ex) {
|
||||
throw new IllegalArgumentException("[host] must be of the form [scheme]://[host]:[port](/[pathPrefix])? but was ["
|
||||
+ hostInRequest + "]", ex);
|
||||
}
|
||||
|
||||
String scheme = uri.getScheme();
|
||||
String host = uri.getHost();
|
||||
int port = uri.getPort();
|
||||
|
||||
String pathPrefix = null;
|
||||
if (uri.getPath().isEmpty() == false) {
|
||||
pathPrefix = uri.getPath();
|
||||
}
|
||||
|
||||
Map<String, String> headers = extractStringStringMap(remote, "headers");
|
||||
TimeValue socketTimeout = extractTimeValue(remote, "socket_timeout", RemoteInfo.DEFAULT_SOCKET_TIMEOUT);
|
||||
TimeValue connectTimeout = extractTimeValue(remote, "connect_timeout", RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
|
||||
if (false == remote.isEmpty()) {
|
||||
throw new IllegalArgumentException(
|
||||
"Unsupported fields in [remote]: [" + Strings.collectionToCommaDelimitedString(remote.keySet()) + "]");
|
||||
}
|
||||
return new RemoteInfo(scheme, host, port, pathPrefix, queryForRemote(source),
|
||||
username, password, headers, socketTimeout, connectTimeout);
|
||||
}
|
||||
|
||||
private static String extractString(Map<String, Object> source, String name) {
|
||||
Object value = source.remove(name);
|
||||
if (value == null) {
|
||||
return null;
|
||||
}
|
||||
if (value instanceof String) {
|
||||
return (String) value;
|
||||
}
|
||||
throw new IllegalArgumentException("Expected [" + name + "] to be a string but was [" + value + "]");
|
||||
}
|
||||
|
||||
private static Map<String, String> extractStringStringMap(Map<String, Object> source, String name) {
|
||||
Object value = source.remove(name);
|
||||
if (value == null) {
|
||||
return emptyMap();
|
||||
}
|
||||
if (false == value instanceof Map) {
|
||||
throw new IllegalArgumentException("Expected [" + name + "] to be an object containing strings but was [" + value + "]");
|
||||
}
|
||||
Map<?, ?> map = (Map<?, ?>) value;
|
||||
for (Map.Entry<?, ?> entry : map.entrySet()) {
|
||||
if (false == entry.getKey() instanceof String || false == entry.getValue() instanceof String) {
|
||||
throw new IllegalArgumentException("Expected [" + name + "] to be an object containing strings but has [" + entry + "]");
|
||||
}
|
||||
}
|
||||
@SuppressWarnings("unchecked") // We just checked....
|
||||
Map<String, String> safe = (Map<String, String>) map;
|
||||
return safe;
|
||||
}
|
||||
|
||||
private static TimeValue extractTimeValue(Map<String, Object> source, String name, TimeValue defaultValue) {
|
||||
String string = extractString(source, name);
|
||||
return string == null ? defaultValue : parseTimeValue(string, name);
|
||||
}
|
||||
|
||||
private static BytesReference queryForRemote(Map<String, Object> source) throws IOException {
|
||||
XContentBuilder builder = JsonXContent.contentBuilder().prettyPrint();
|
||||
Object query = source.remove("query");
|
||||
if (query == null) {
|
||||
return BytesReference.bytes(matchAllQuery().toXContent(builder, ToXContent.EMPTY_PARAMS));
|
||||
}
|
||||
if (!(query instanceof Map)) {
|
||||
throw new IllegalArgumentException("Expected [query] to be an object but was [" + query + "]");
|
||||
}
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, Object> map = (Map<String, Object>) query;
|
||||
return BytesReference.bytes(builder.map(map));
|
||||
}
|
||||
|
||||
static void setMaxDocsValidateIdentical(AbstractBulkByScrollRequest<?> request, int maxDocs) {
|
||||
if (request.getMaxDocs() != AbstractBulkByScrollRequest.MAX_DOCS_ALL_MATCHES && request.getMaxDocs() != maxDocs) {
|
||||
throw new IllegalArgumentException("[max_docs] set to two different values [" + request.getMaxDocs() + "]" +
|
||||
" and [" + maxDocs + "]");
|
||||
} else {
|
||||
request.setMaxDocs(maxDocs);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,10 +21,19 @@ package org.elasticsearch.index.reindex;
|
|||
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.search.slice.SliceBuilder;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static org.elasticsearch.common.unit.TimeValue.parseTimeValue;
|
||||
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
|
||||
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
|
||||
|
||||
/**
|
||||
|
@ -92,4 +101,134 @@ public class ReindexRequestTests extends AbstractBulkByScrollRequestTestCase<Rei
|
|||
reindex.setDestIndex("dest");
|
||||
return reindex;
|
||||
}
|
||||
|
||||
public void testBuildRemoteInfoNoRemote() throws IOException {
|
||||
assertNull(ReindexRequest.buildRemoteInfo(new HashMap<>()));
|
||||
}
|
||||
|
||||
public void testBuildRemoteInfoFullyLoaded() throws IOException {
|
||||
Map<String, String> headers = new HashMap<>();
|
||||
headers.put("first", "a");
|
||||
headers.put("second", "b");
|
||||
headers.put("third", "");
|
||||
|
||||
Map<String, Object> remote = new HashMap<>();
|
||||
remote.put("host", "https://example.com:9200");
|
||||
remote.put("username", "testuser");
|
||||
remote.put("password", "testpass");
|
||||
remote.put("headers", headers);
|
||||
remote.put("socket_timeout", "90s");
|
||||
remote.put("connect_timeout", "10s");
|
||||
|
||||
Map<String, Object> query = new HashMap<>();
|
||||
query.put("a", "b");
|
||||
|
||||
Map<String, Object> source = new HashMap<>();
|
||||
source.put("remote", remote);
|
||||
source.put("query", query);
|
||||
|
||||
RemoteInfo remoteInfo = ReindexRequest.buildRemoteInfo(source);
|
||||
assertEquals("https", remoteInfo.getScheme());
|
||||
assertEquals("example.com", remoteInfo.getHost());
|
||||
assertEquals(9200, remoteInfo.getPort());
|
||||
assertEquals("{\n \"a\" : \"b\"\n}", remoteInfo.getQuery().utf8ToString());
|
||||
assertEquals("testuser", remoteInfo.getUsername());
|
||||
assertEquals("testpass", remoteInfo.getPassword());
|
||||
assertEquals(headers, remoteInfo.getHeaders());
|
||||
assertEquals(timeValueSeconds(90), remoteInfo.getSocketTimeout());
|
||||
assertEquals(timeValueSeconds(10), remoteInfo.getConnectTimeout());
|
||||
}
|
||||
|
||||
public void testBuildRemoteInfoWithoutAllParts() throws IOException {
|
||||
expectThrows(IllegalArgumentException.class, () -> buildRemoteInfoHostTestCase("example.com"));
|
||||
expectThrows(IllegalArgumentException.class, () -> buildRemoteInfoHostTestCase(":9200"));
|
||||
expectThrows(IllegalArgumentException.class, () -> buildRemoteInfoHostTestCase("http://:9200"));
|
||||
expectThrows(IllegalArgumentException.class, () -> buildRemoteInfoHostTestCase("example.com:9200"));
|
||||
expectThrows(IllegalArgumentException.class, () -> buildRemoteInfoHostTestCase("http://example.com"));
|
||||
}
|
||||
|
||||
public void testBuildRemoteInfoWithAllHostParts() throws IOException {
|
||||
RemoteInfo info = buildRemoteInfoHostTestCase("http://example.com:9200");
|
||||
assertEquals("http", info.getScheme());
|
||||
assertEquals("example.com", info.getHost());
|
||||
assertEquals(9200, info.getPort());
|
||||
assertNull(info.getPathPrefix());
|
||||
assertEquals(RemoteInfo.DEFAULT_SOCKET_TIMEOUT, info.getSocketTimeout()); // Didn't set the timeout so we should get the default
|
||||
assertEquals(RemoteInfo.DEFAULT_CONNECT_TIMEOUT, info.getConnectTimeout()); // Didn't set the timeout so we should get the default
|
||||
|
||||
info = buildRemoteInfoHostTestCase("https://other.example.com:9201");
|
||||
assertEquals("https", info.getScheme());
|
||||
assertEquals("other.example.com", info.getHost());
|
||||
assertEquals(9201, info.getPort());
|
||||
assertNull(info.getPathPrefix());
|
||||
assertEquals(RemoteInfo.DEFAULT_SOCKET_TIMEOUT, info.getSocketTimeout());
|
||||
assertEquals(RemoteInfo.DEFAULT_CONNECT_TIMEOUT, info.getConnectTimeout());
|
||||
|
||||
info = buildRemoteInfoHostTestCase("https://[::1]:9201");
|
||||
assertEquals("https", info.getScheme());
|
||||
assertEquals("[::1]", info.getHost());
|
||||
assertEquals(9201, info.getPort());
|
||||
assertNull(info.getPathPrefix());
|
||||
assertEquals(RemoteInfo.DEFAULT_SOCKET_TIMEOUT, info.getSocketTimeout());
|
||||
assertEquals(RemoteInfo.DEFAULT_CONNECT_TIMEOUT, info.getConnectTimeout());
|
||||
|
||||
info = buildRemoteInfoHostTestCase("https://other.example.com:9201/");
|
||||
assertEquals("https", info.getScheme());
|
||||
assertEquals("other.example.com", info.getHost());
|
||||
assertEquals(9201, info.getPort());
|
||||
assertEquals("/", info.getPathPrefix());
|
||||
assertEquals(RemoteInfo.DEFAULT_SOCKET_TIMEOUT, info.getSocketTimeout());
|
||||
assertEquals(RemoteInfo.DEFAULT_CONNECT_TIMEOUT, info.getConnectTimeout());
|
||||
|
||||
info = buildRemoteInfoHostTestCase("https://other.example.com:9201/proxy-path/");
|
||||
assertEquals("https", info.getScheme());
|
||||
assertEquals("other.example.com", info.getHost());
|
||||
assertEquals(9201, info.getPort());
|
||||
assertEquals("/proxy-path/", info.getPathPrefix());
|
||||
assertEquals(RemoteInfo.DEFAULT_SOCKET_TIMEOUT, info.getSocketTimeout());
|
||||
assertEquals(RemoteInfo.DEFAULT_CONNECT_TIMEOUT, info.getConnectTimeout());
|
||||
|
||||
final IllegalArgumentException exception = expectThrows(IllegalArgumentException.class,
|
||||
() -> buildRemoteInfoHostTestCase("https"));
|
||||
assertEquals("[host] must be of the form [scheme]://[host]:[port](/[pathPrefix])? but was [https]",
|
||||
exception.getMessage());
|
||||
}
|
||||
|
||||
public void testReindexFromRemoteRequestParsing() throws IOException {
|
||||
BytesReference request;
|
||||
try (XContentBuilder b = JsonXContent.contentBuilder()) {
|
||||
b.startObject(); {
|
||||
b.startObject("source"); {
|
||||
b.startObject("remote"); {
|
||||
b.field("host", "http://localhost:9200");
|
||||
}
|
||||
b.endObject();
|
||||
b.field("index", "source");
|
||||
}
|
||||
b.endObject();
|
||||
b.startObject("dest"); {
|
||||
b.field("index", "dest");
|
||||
}
|
||||
b.endObject();
|
||||
}
|
||||
b.endObject();
|
||||
request = BytesReference.bytes(b);
|
||||
}
|
||||
try (XContentParser p = createParser(JsonXContent.jsonXContent, request)) {
|
||||
ReindexRequest r = ReindexRequest.fromXContent(p);
|
||||
assertEquals("localhost", r.getRemoteInfo().getHost());
|
||||
assertArrayEquals(new String[] {"source"}, r.getSearchRequest().indices());
|
||||
}
|
||||
}
|
||||
|
||||
private RemoteInfo buildRemoteInfoHostTestCase(String hostInRest) throws IOException {
|
||||
Map<String, Object> remote = new HashMap<>();
|
||||
remote.put("host", hostInRest);
|
||||
|
||||
Map<String, Object> source = new HashMap<>();
|
||||
source.put("remote", remote);
|
||||
|
||||
return ReindexRequest.buildRemoteInfo(source);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue