Support for remote path in reindex api (#31290)

Support for remote path in reindex api
Closes #22913
This commit is contained in:
Vladimir Dolzhenko 2018-06-15 22:14:28 +02:00 committed by GitHub
parent a705e1a9e3
commit dbc9d60260
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 136 additions and 73 deletions

View File

@ -422,11 +422,11 @@ POST _reindex
// TEST[s/"username": "user",//]
// TEST[s/"password": "pass"//]
The `host` parameter must contain a scheme, host, and port (e.g.
`https://otherhost:9200`). The `username` and `password` parameters are
optional, and when they are present `_reindex` will connect to the remote
Elasticsearch node using basic auth. Be sure to use `https` when using
basic auth or the password will be sent in plain text.
The `host` parameter must contain a scheme, host, port (e.g.
`https://otherhost:9200`) and optional path (e.g. `https://otherhost:9200/proxy`).
The `username` and `password` parameters are optional, and when they are present `_reindex`
will connect to the remote Elasticsearch node using basic auth. Be sure to use `https` when
using basic auth or the password will be sent in plain text.
Remote hosts have to be explicitly whitelisted in elasticsearch.yaml using the
`reindex.remote.whitelist` property. It can be set to a comma delimited list

View File

@ -57,7 +57,7 @@ import static org.elasticsearch.rest.RestRequest.Method.POST;
*/
public class RestReindexAction extends AbstractBaseReindexRestHandler<ReindexRequest, ReindexAction> {
static final ObjectParser<ReindexRequest, Void> PARSER = new ObjectParser<>("reindex");
private static final Pattern HOST_PATTERN = Pattern.compile("(?<scheme>[^:]+)://(?<host>[^:]+):(?<port>\\d+)");
private static final Pattern HOST_PATTERN = Pattern.compile("(?<scheme>[^:]+)://(?<host>[^:]+):(?<port>\\d+)(?<pathPrefix>/.*)?");
static {
ObjectParser.Parser<ReindexRequest, Void> sourceParser = (parser, request, context) -> {
@ -139,10 +139,12 @@ public class RestReindexAction extends AbstractBaseReindexRestHandler<ReindexReq
String hostInRequest = requireNonNull(extractString(remote, "host"), "[host] must be specified to reindex from a remote cluster");
Matcher hostMatcher = HOST_PATTERN.matcher(hostInRequest);
if (false == hostMatcher.matches()) {
throw new IllegalArgumentException("[host] must be of the form [scheme]://[host]:[port] but was [" + hostInRequest + "]");
throw new IllegalArgumentException("[host] must be of the form [scheme]://[host]:[port](/[pathPrefix])? but was ["
+ hostInRequest + "]");
}
String scheme = hostMatcher.group("scheme");
String host = hostMatcher.group("host");
String pathPrefix = hostMatcher.group("pathPrefix");
int port = Integer.parseInt(hostMatcher.group("port"));
Map<String, String> headers = extractStringStringMap(remote, "headers");
TimeValue socketTimeout = extractTimeValue(remote, "socket_timeout", RemoteInfo.DEFAULT_SOCKET_TIMEOUT);
@ -151,7 +153,8 @@ public class RestReindexAction extends AbstractBaseReindexRestHandler<ReindexReq
throw new IllegalArgumentException(
"Unsupported fields in [remote]: [" + Strings.collectionToCommaDelimitedString(remote.keySet()) + "]");
}
return new RemoteInfo(scheme, host, port, queryForRemote(source), username, password, headers, socketTimeout, connectTimeout);
return new RemoteInfo(scheme, host, port, pathPrefix, queryForRemote(source),
username, password, headers, socketTimeout, connectTimeout);
}
/**

View File

@ -37,6 +37,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure;
@ -206,7 +207,8 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
for (Map.Entry<String, String> header : remoteInfo.getHeaders().entrySet()) {
clientHeaders[i++] = new BasicHeader(header.getKey(), header.getValue());
}
return RestClient.builder(new HttpHost(remoteInfo.getHost(), remoteInfo.getPort(), remoteInfo.getScheme()))
final RestClientBuilder builder =
RestClient.builder(new HttpHost(remoteInfo.getHost(), remoteInfo.getPort(), remoteInfo.getScheme()))
.setDefaultHeaders(clientHeaders)
.setRequestConfigCallback(c -> {
c.setConnectTimeout(Math.toIntExact(remoteInfo.getConnectTimeout().millis()));
@ -233,7 +235,11 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
// Limit ourselves to one reactor thread because for now the search process is single threaded.
c.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(1).build());
return c;
}).build();
});
if (Strings.hasLength(remoteInfo.getPathPrefix()) && "/".equals(remoteInfo.getPathPrefix()) == false) {
builder.setPathPrefix(remoteInfo.getPathPrefix());
}
return builder.build();
}
/**

View File

@ -34,7 +34,8 @@ import static org.hamcrest.Matchers.hasSize;
public class ReindexFromRemoteBuildRestClientTests extends RestClientBuilderTestCase {
public void testBuildRestClient() throws Exception {
RemoteInfo remoteInfo = new RemoteInfo("https", "localhost", 9200, new BytesArray("ignored"), null, null, emptyMap(),
for(final String path: new String[]{"", null, "/", "path"}) {
RemoteInfo remoteInfo = new RemoteInfo("https", "localhost", 9200, path, new BytesArray("ignored"), null, null, emptyMap(),
RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
long taskId = randomLong();
List<Thread> threads = synchronizedList(new ArrayList<>());
@ -50,6 +51,7 @@ public class ReindexFromRemoteBuildRestClientTests extends RestClientBuilderTest
client.close();
}
}
}
public void testHeaders() throws Exception {
Map<String, String> headers = new HashMap<>();
@ -57,7 +59,7 @@ public class ReindexFromRemoteBuildRestClientTests extends RestClientBuilderTest
for (int i = 0; i < numHeaders; ++i) {
headers.put("header" + i, Integer.toString(i));
}
RemoteInfo remoteInfo = new RemoteInfo("https", "localhost", 9200, new BytesArray("ignored"), null, null,
RemoteInfo remoteInfo = new RemoteInfo("https", "localhost", 9200, null, new BytesArray("ignored"), null, null,
headers, RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
long taskId = randomLong();
List<Thread> threads = synchronizedList(new ArrayList<>());

View File

@ -49,7 +49,7 @@ public class ReindexFromRemoteWhitelistTests extends ESTestCase {
* Build a {@link RemoteInfo}, defaulting values that we don't care about in this test to values that don't hurt anything.
*/
private RemoteInfo newRemoteInfo(String host, int port) {
return new RemoteInfo(randomAlphaOfLength(5), host, port, new BytesArray("test"), null, null, emptyMap(),
return new RemoteInfo(randomAlphaOfLength(5), host, port, null, new BytesArray("test"), null, null, emptyMap(),
RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
}
@ -63,7 +63,7 @@ public class ReindexFromRemoteWhitelistTests extends ESTestCase {
public void testWhitelistedByPrefix() {
checkRemoteWhitelist(buildRemoteWhitelist(singletonList("*.example.com:9200")),
new RemoteInfo(randomAlphaOfLength(5), "es.example.com", 9200, new BytesArray("test"), null, null, emptyMap(),
new RemoteInfo(randomAlphaOfLength(5), "es.example.com", 9200, null, new BytesArray("test"), null, null, emptyMap(),
RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT));
checkRemoteWhitelist(buildRemoteWhitelist(singletonList("*.example.com:9200")),
newRemoteInfo("6e134134a1.us-east-1.aws.example.com", 9200));

View File

@ -104,8 +104,9 @@ public class ReindexFromRemoteWithAuthTests extends ESSingleNodeTestCase {
* Build a {@link RemoteInfo}, defaulting values that we don't care about in this test to values that don't hurt anything.
*/
private RemoteInfo newRemoteInfo(String username, String password, Map<String, String> headers) {
return new RemoteInfo("http", address.getAddress(), address.getPort(), new BytesArray("{\"match_all\":{}}"), username, password,
headers, RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
return new RemoteInfo("http", address.getAddress(), address.getPort(), null,
new BytesArray("{\"match_all\":{}}"), username, password, headers,
RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
}
public void testReindexFromRemoteWithAuthentication() throws Exception {

View File

@ -88,10 +88,10 @@ public class ReindexSourceTargetValidationTests extends ESTestCase {
public void testRemoteInfoSkipsValidation() {
// The index doesn't have to exist
succeeds(new RemoteInfo(randomAlphaOfLength(5), "test", 9200, new BytesArray("test"), null, null, emptyMap(),
succeeds(new RemoteInfo(randomAlphaOfLength(5), "test", 9200, null, new BytesArray("test"), null, null, emptyMap(),
RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT), "does_not_exist", "target");
// And it doesn't matter if they are the same index. They are considered to be different because the remote one is, well, remote.
succeeds(new RemoteInfo(randomAlphaOfLength(5), "test", 9200, new BytesArray("test"), null, null, emptyMap(),
succeeds(new RemoteInfo(randomAlphaOfLength(5), "test", 9200, null, new BytesArray("test"), null, null, emptyMap(),
RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT), "target", "target");
}

View File

@ -89,6 +89,7 @@ public class RestReindexActionTests extends ESTestCase {
assertEquals("http", info.getScheme());
assertEquals("example.com", info.getHost());
assertEquals(9200, info.getPort());
assertNull(info.getPathPrefix());
assertEquals(RemoteInfo.DEFAULT_SOCKET_TIMEOUT, info.getSocketTimeout()); // Didn't set the timeout so we should get the default
assertEquals(RemoteInfo.DEFAULT_CONNECT_TIMEOUT, info.getConnectTimeout()); // Didn't set the timeout so we should get the default
@ -96,8 +97,30 @@ public class RestReindexActionTests extends ESTestCase {
assertEquals("https", info.getScheme());
assertEquals("other.example.com", info.getHost());
assertEquals(9201, info.getPort());
assertNull(info.getPathPrefix());
assertEquals(RemoteInfo.DEFAULT_SOCKET_TIMEOUT, info.getSocketTimeout());
assertEquals(RemoteInfo.DEFAULT_CONNECT_TIMEOUT, info.getConnectTimeout());
info = buildRemoteInfoHostTestCase("https://other.example.com:9201/");
assertEquals("https", info.getScheme());
assertEquals("other.example.com", info.getHost());
assertEquals(9201, info.getPort());
assertEquals("/", info.getPathPrefix());
assertEquals(RemoteInfo.DEFAULT_SOCKET_TIMEOUT, info.getSocketTimeout());
assertEquals(RemoteInfo.DEFAULT_CONNECT_TIMEOUT, info.getConnectTimeout());
info = buildRemoteInfoHostTestCase("https://other.example.com:9201/proxy-path/");
assertEquals("https", info.getScheme());
assertEquals("other.example.com", info.getHost());
assertEquals(9201, info.getPort());
assertEquals("/proxy-path/", info.getPathPrefix());
assertEquals(RemoteInfo.DEFAULT_SOCKET_TIMEOUT, info.getSocketTimeout());
assertEquals(RemoteInfo.DEFAULT_CONNECT_TIMEOUT, info.getConnectTimeout());
final IllegalArgumentException exception = expectThrows(IllegalArgumentException.class,
() -> buildRemoteInfoHostTestCase("https"));
assertEquals("[host] must be of the form [scheme]://[host]:[port](/[pathPrefix])? but was [https]",
exception.getMessage());
}
public void testReindexFromRemoteRequestParsing() throws IOException {

View File

@ -124,8 +124,10 @@ public class RetryTests extends ESIntegTestCase {
assertNotNull(masterNode);
TransportAddress address = masterNode.getHttp().getAddress().publishAddress();
RemoteInfo remote = new RemoteInfo("http", address.getAddress(), address.getPort(), new BytesArray("{\"match_all\":{}}"), null,
null, emptyMap(), RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
RemoteInfo remote =
new RemoteInfo("http", address.getAddress(), address.getPort(), null,
new BytesArray("{\"match_all\":{}}"), null, null, emptyMap(),
RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
ReindexRequestBuilder request = new ReindexRequestBuilder(client, ReindexAction.INSTANCE).source("source").destination("dest")
.setRemoteInfo(remote);
return request;

View File

@ -63,8 +63,9 @@ public class RoundTripTests extends ESTestCase {
}
TimeValue socketTimeout = parseTimeValue(randomPositiveTimeValue(), "socketTimeout");
TimeValue connectTimeout = parseTimeValue(randomPositiveTimeValue(), "connectTimeout");
reindex.setRemoteInfo(new RemoteInfo(randomAlphaOfLength(5), randomAlphaOfLength(5), port, query, username, password, headers,
socketTimeout, connectTimeout));
reindex.setRemoteInfo(
new RemoteInfo(randomAlphaOfLength(5), randomAlphaOfLength(5), port, null,
query, username, password, headers, socketTimeout, connectTimeout));
}
ReindexRequest tripped = new ReindexRequest();
roundTrip(reindex, tripped);

View File

@ -26,17 +26,21 @@ import org.elasticsearch.test.ESTestCase;
import static java.util.Collections.emptyMap;
public class RemoteInfoTests extends ESTestCase {
private RemoteInfo newRemoteInfo(String scheme, String username, String password) {
return new RemoteInfo(scheme, "testhost", 12344, new BytesArray("testquery"), username, password, emptyMap(),
private RemoteInfo newRemoteInfo(String scheme, String prefixPath, String username, String password) {
return new RemoteInfo(scheme, "testhost", 12344, prefixPath, new BytesArray("testquery"), username, password, emptyMap(),
RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
}
public void testToString() {
assertEquals("host=testhost port=12344 query=testquery", newRemoteInfo("http", null, null).toString());
assertEquals("host=testhost port=12344 query=testquery username=testuser", newRemoteInfo("http", "testuser", null).toString());
assertEquals("host=testhost port=12344 query=testquery",
newRemoteInfo("http", null, null, null).toString());
assertEquals("host=testhost port=12344 query=testquery username=testuser",
newRemoteInfo("http", null, "testuser", null).toString());
assertEquals("host=testhost port=12344 query=testquery username=testuser password=<<>>",
newRemoteInfo("http", "testuser", "testpass").toString());
newRemoteInfo("http", null, "testuser", "testpass").toString());
assertEquals("scheme=https host=testhost port=12344 query=testquery username=testuser password=<<>>",
newRemoteInfo("https", "testuser", "testpass").toString());
newRemoteInfo("https", null, "testuser", "testpass").toString());
assertEquals("scheme=https host=testhost port=12344 pathPrefix=prxy query=testquery username=testuser password=<<>>",
newRemoteInfo("https", "prxy", "testuser", "testpass").toString());
}
}

View File

@ -48,6 +48,7 @@ public class RemoteInfo implements Writeable {
private final String scheme;
private final String host;
private final int port;
private final String pathPrefix;
private final BytesReference query;
private final String username;
private final String password;
@ -61,11 +62,12 @@ public class RemoteInfo implements Writeable {
*/
private final TimeValue connectTimeout;
public RemoteInfo(String scheme, String host, int port, BytesReference query, String username, String password,
public RemoteInfo(String scheme, String host, int port, String pathPrefix, BytesReference query, String username, String password,
Map<String, String> headers, TimeValue socketTimeout, TimeValue connectTimeout) {
this.scheme = requireNonNull(scheme, "[scheme] must be specified to reindex from a remote cluster");
this.host = requireNonNull(host, "[host] must be specified to reindex from a remote cluster");
this.port = port;
this.pathPrefix = pathPrefix;
this.query = requireNonNull(query, "[query] must be specified to reindex from a remote cluster");
this.username = username;
this.password = password;
@ -97,6 +99,11 @@ public class RemoteInfo implements Writeable {
socketTimeout = DEFAULT_SOCKET_TIMEOUT;
connectTimeout = DEFAULT_CONNECT_TIMEOUT;
}
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
pathPrefix = in.readOptionalString();
} else {
pathPrefix = null;
}
}
@Override
@ -116,6 +123,9 @@ public class RemoteInfo implements Writeable {
out.writeTimeValue(socketTimeout);
out.writeTimeValue(connectTimeout);
}
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
out.writeOptionalString(pathPrefix);
}
}
public String getScheme() {
@ -130,6 +140,11 @@ public class RemoteInfo implements Writeable {
return port;
}
@Nullable
public String getPathPrefix() {
return pathPrefix;
}
public BytesReference getQuery() {
return query;
}
@ -169,7 +184,11 @@ public class RemoteInfo implements Writeable {
// http is the default so it isn't worth taking up space if it is the scheme
b.append("scheme=").append(scheme).append(' ');
}
b.append("host=").append(host).append(" port=").append(port).append(" query=").append(query.utf8ToString());
b.append("host=").append(host).append(" port=").append(port);
if (pathPrefix != null) {
b.append(" pathPrefix=").append(pathPrefix);
}
b.append(" query=").append(query.utf8ToString());
if (username != null) {
b.append(" username=").append(username);
}

View File

@ -37,8 +37,9 @@ public class ReindexRequestTests extends AbstractBulkByScrollRequestTestCase<Rei
public void testReindexFromRemoteDoesNotSupportSearchQuery() {
ReindexRequest reindex = newRequest();
reindex.setRemoteInfo(
new RemoteInfo(randomAlphaOfLength(5), randomAlphaOfLength(5), between(1, Integer.MAX_VALUE), new BytesArray("real_query"),
null, null, emptyMap(), RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT));
new RemoteInfo(randomAlphaOfLength(5), randomAlphaOfLength(5), between(1, Integer.MAX_VALUE), null,
new BytesArray("real_query"), null, null, emptyMap(),
RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT));
reindex.getSearchRequest().source().query(matchAllQuery()); // Unsupported place to put query
ActionRequestValidationException e = reindex.validate();
assertEquals("Validation Failed: 1: reindex from remote sources should use RemoteInfo's query instead of source's query;",
@ -48,8 +49,9 @@ public class ReindexRequestTests extends AbstractBulkByScrollRequestTestCase<Rei
public void testReindexFromRemoteDoesNotSupportSlices() {
ReindexRequest reindex = newRequest();
reindex.setRemoteInfo(
new RemoteInfo(randomAlphaOfLength(5), randomAlphaOfLength(5), between(1, Integer.MAX_VALUE), new BytesArray("real_query"),
null, null, emptyMap(), RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT));
new RemoteInfo(randomAlphaOfLength(5), randomAlphaOfLength(5), between(1, Integer.MAX_VALUE), null,
new BytesArray("real_query"), null, null, emptyMap(),
RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT));
reindex.setSlices(between(2, Integer.MAX_VALUE));
ActionRequestValidationException e = reindex.validate();
assertEquals(
@ -72,7 +74,7 @@ public class ReindexRequestTests extends AbstractBulkByScrollRequestTestCase<Rei
}
if (randomBoolean()) {
original.setRemoteInfo(new RemoteInfo(randomAlphaOfLength(5), randomAlphaOfLength(5), between(1, 10000),
new BytesArray(randomAlphaOfLength(5)), null, null, emptyMap(),
null, new BytesArray(randomAlphaOfLength(5)), null, null, emptyMap(),
parseTimeValue(randomPositiveTimeValue(), "socket_timeout"),
parseTimeValue(randomPositiveTimeValue(), "connect_timeout")));
}