Add authentication to reindex-from-remote

The tests for authentication extend ESIntegTestCase and use a mock
authentication plugin. This way the clients don't have to worry about
running it. Sadly, that means we don't really have good coverage on the
REST portion of the authentication.

This also adds ElasticsearchStatusException, and exception on which you
can set an explicit status. The nice thing about it is that you can
set the RestStatus that it returns to whatever arbitrary status you like
based on the status that comes back from the remote system.
reindex-from-remote then uses it to wrap all remote failures, preserving
the status from the remote Elasticsearch or whatever proxy is between us
and the remove Elasticsearch.
This commit is contained in:
Nik Everett 2016-07-06 15:47:45 -04:00
parent 1307aa7e77
commit fb45f6a8a8
23 changed files with 586 additions and 56 deletions

View File

@ -692,8 +692,8 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
NO_LONGER_PRIMARY_SHARD_EXCEPTION(ShardStateAction.NoLongerPrimaryShardException.class,
ShardStateAction.NoLongerPrimaryShardException::new, 142),
SCRIPT_EXCEPTION(org.elasticsearch.script.ScriptException.class, org.elasticsearch.script.ScriptException::new, 143),
NOT_MASTER_EXCEPTION(org.elasticsearch.cluster.NotMasterException.class, org.elasticsearch.cluster.NotMasterException::new, 144);
NOT_MASTER_EXCEPTION(org.elasticsearch.cluster.NotMasterException.class, org.elasticsearch.cluster.NotMasterException::new, 144),
STATUS_EXCEPTION(org.elasticsearch.ElasticsearchStatusException.class, org.elasticsearch.ElasticsearchStatusException::new, 145);
final Class<? extends ElasticsearchException> exceptionClass;
final FunctionThatThrowsIOException<StreamInput, ? extends ElasticsearchException> constructor;

View File

@ -19,7 +19,6 @@
package org.elasticsearch;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException;
@ -27,40 +26,39 @@ import java.io.IOException;
/**
* Generic security exception
*/
public class ElasticsearchSecurityException extends ElasticsearchException {
private final RestStatus status;
public class ElasticsearchSecurityException extends ElasticsearchStatusException {
/**
* Build the exception with a specific status and cause.
*/
public ElasticsearchSecurityException(String msg, RestStatus status, Throwable cause, Object... args) {
super(msg, cause, args);
this.status = status ;
super(msg, status, cause, args);
}
/**
* Build the exception with the status derived from the cause.
*/
public ElasticsearchSecurityException(String msg, Exception cause, Object... args) {
this(msg, ExceptionsHelper.status(cause), cause, args);
}
/**
* Build the exception with a status of {@link RestStatus#INTERNAL_SERVER_ERROR} without a cause.
*/
public ElasticsearchSecurityException(String msg, Object... args) {
this(msg, RestStatus.INTERNAL_SERVER_ERROR, null, args);
this(msg, RestStatus.INTERNAL_SERVER_ERROR, args);
}
/**
* Build the exception without a cause.
*/
public ElasticsearchSecurityException(String msg, RestStatus status, Object... args) {
this(msg, status, null, args);
super(msg, status, args);
}
/**
* Read from a stream.
*/
public ElasticsearchSecurityException(StreamInput in) throws IOException {
super(in);
status = RestStatus.readFrom(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
RestStatus.writeTo(out, status);
}
@Override
public final RestStatus status() {
return status;
}
}

View File

@ -0,0 +1,68 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException;
/**
* Exception who's {@link RestStatus} is arbitrary rather than derived. Used, for example, by reindex-from-remote to wrap remote exceptions
* that contain a status.
*/
public class ElasticsearchStatusException extends ElasticsearchException {
private final RestStatus status;
/**
* Build the exception with a specific status and cause.
*/
public ElasticsearchStatusException(String msg, RestStatus status, Throwable cause, Object... args) {
super(msg, cause, args);
this.status = status;
}
/**
* Build the exception without a cause.
*/
public ElasticsearchStatusException(String msg, RestStatus status, Object... args) {
this(msg, status, null, args);
}
/**
* Read from a stream.
*/
public ElasticsearchStatusException(StreamInput in) throws IOException {
super(in);
status = RestStatus.readFrom(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
RestStatus.writeTo(out, status);
}
@Override
public final RestStatus status() {
return status;
}
}

View File

@ -24,6 +24,10 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import static java.util.Collections.unmodifiableMap;
public enum RestStatus {
/**
@ -477,6 +481,15 @@ public enum RestStatus {
*/
INSUFFICIENT_STORAGE(506);
private static final Map<Integer, RestStatus> CODE_TO_STATUS;
static {
RestStatus[] values = values();
Map<Integer, RestStatus> codeToStatus = new HashMap<>(values.length);
for (RestStatus value : values) {
codeToStatus.put(value.status, value);
}
CODE_TO_STATUS = unmodifiableMap(codeToStatus);
}
private int status;
@ -515,4 +528,11 @@ public enum RestStatus {
}
return status;
}
/**
* Turn a status code into a {@link RestStatus}, returning null if we don't know that status.
*/
public static RestStatus fromCode(int code) {
return CODE_TO_STATUS.get(code);
}
}

View File

@ -792,6 +792,7 @@ public class ExceptionSerializationTests extends ESTestCase {
ids.put(142, ShardStateAction.NoLongerPrimaryShardException.class);
ids.put(143, org.elasticsearch.script.ScriptException.class);
ids.put(144, org.elasticsearch.cluster.NotMasterException.class);
ids.put(145, org.elasticsearch.ElasticsearchStatusException.class);
Map<Class<? extends ElasticsearchException>, Integer> reverse = new HashMap<>();
for (Map.Entry<Integer, Class<? extends ElasticsearchException>> entry : ids.entrySet()) {
@ -842,4 +843,11 @@ public class ExceptionSerializationTests extends ESTestCase {
}
}
}
public void testElasticsearchRemoteException() throws IOException {
ElasticsearchStatusException ex = new ElasticsearchStatusException("something", RestStatus.TOO_MANY_REQUESTS);
ElasticsearchStatusException e = serialize(ex);
assertEquals(ex.status(), e.status());
assertEquals(RestStatus.TOO_MANY_REQUESTS, e.status());
}
}

View File

@ -56,6 +56,7 @@ import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static java.util.Collections.emptyMap;
import static java.util.Objects.requireNonNull;
import static org.elasticsearch.common.unit.TimeValue.parseTimeValue;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
@ -151,11 +152,12 @@ public class RestReindexAction extends AbstractBaseReindexRestHandler<ReindexReq
String scheme = hostMatcher.group("scheme");
String host = hostMatcher.group("host");
int port = Integer.parseInt(hostMatcher.group("port"));
Map<String, String> headers = extractStringStringMap(remote, "headers");
if (false == remote.isEmpty()) {
throw new IllegalArgumentException(
"Unsupported fields in [remote]: [" + Strings.collectionToCommaDelimitedString(remote.keySet()) + "]");
}
return new RemoteInfo(scheme, host, port, queryForRemote(source), username, password);
return new RemoteInfo(scheme, host, port, queryForRemote(source), username, password, headers);
}
/**
@ -189,6 +191,25 @@ public class RestReindexAction extends AbstractBaseReindexRestHandler<ReindexReq
throw new IllegalArgumentException("Expected [" + name + "] to be a string but was [" + value + "]");
}
private static Map<String, String> extractStringStringMap(Map<String, Object> source, String name) {
Object value = source.remove(name);
if (value == null) {
return emptyMap();
}
if (false == value instanceof Map) {
throw new IllegalArgumentException("Expected [" + name + "] to be an object containing strings but was [" + value + "]");
}
Map<?, ?> map = (Map<?, ?>) value;
for (Map.Entry<?, ?> entry : map.entrySet()) {
if (false == entry.getKey() instanceof String || false == entry.getValue() instanceof String) {
throw new IllegalArgumentException("Expected [" + name + "] to be an object containing strings but has [" + entry + "]");
}
}
@SuppressWarnings("unchecked") // We just checked....
Map<String, String> safe = (Map<String, String>) map;
return safe;
}
private static BytesReference queryForRemote(Map<String, Object> source) throws IOException {
XContentBuilder builder = JsonXContent.contentBuilder().prettyPrint();
Object query = source.remove("query");

View File

@ -19,7 +19,13 @@
package org.elasticsearch.index.reindex;
import org.apache.http.Header;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.message.BasicHeader;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.bulk.BackoffPolicy;
@ -31,6 +37,7 @@ import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ParentTaskAssigningClient;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
@ -178,16 +185,27 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
@Override
protected ScrollableHitSource buildScrollableResultSource(BackoffPolicy backoffPolicy) {
if (mainRequest.getRemoteInfo() != null) {
// NORELEASE track 500-level retries that are builtin to the client
RemoteInfo remoteInfo = mainRequest.getRemoteInfo();
if (remoteInfo.getUsername() != null) {
// NORELEASE support auth
throw new UnsupportedOperationException("Auth is unsupported");
Header[] clientHeaders = new Header[remoteInfo.getHeaders().size()];
int i = 0;
for (Map.Entry<String, String> header : remoteInfo.getHeaders().entrySet()) {
clientHeaders[i] = new BasicHeader(header.getKey(), header.getValue());
}
RestClient restClient = RestClient.builder(
new HttpHost(remoteInfo.getHost(), remoteInfo.getPort(), remoteInfo.getScheme())).build();
return new RemoteScrollableHitSource(logger, backoffPolicy, threadPool, task::countSearchRetry,
this::finishHim, restClient, remoteInfo.getQuery(), mainRequest.getSearchRequest());
RestClientBuilder restClient = RestClient
.builder(new HttpHost(remoteInfo.getHost(), remoteInfo.getPort(), remoteInfo.getScheme()))
.setDefaultHeaders(clientHeaders);
if (remoteInfo.getUsername() != null) {
restClient.setHttpClientConfigCallback(c -> {
UsernamePasswordCredentials creds = new UsernamePasswordCredentials(remoteInfo.getUsername(),
remoteInfo.getPassword());
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, creds);
c.setDefaultCredentialsProvider(credentialsProvider);
return c;
});
}
return new RemoteScrollableHitSource(logger, backoffPolicy, threadPool, task::countSearchRetry, this::finishHim,
restClient.build(), remoteInfo.getQuery(), mainRequest.getSearchRequest());
}
return super.buildScrollableResultSource(backoffPolicy);
}

View File

@ -0,0 +1,24 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/**
* Actions that modify documents based on the results of a scrolling query like {@link ReindexAction}, {@link UpdateByQueryAction}, and
* {@link DeleteByQueryAction}.
*/
package org.elasticsearch.index.reindex;

View File

@ -26,7 +26,10 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import static java.util.Collections.unmodifiableMap;
import static java.util.Objects.requireNonNull;
public class RemoteInfo implements Writeable {
@ -36,14 +39,17 @@ public class RemoteInfo implements Writeable {
private final BytesReference query;
private final String username;
private final String password;
private final Map<String, String> headers;
public RemoteInfo(String scheme, String host, int port, BytesReference query, String username, String password) {
public RemoteInfo(String scheme, String host, int port, BytesReference query, String username, String password,
Map<String, String> headers) {
this.scheme = requireNonNull(scheme, "[scheme] must be specified to reindex from a remote cluster");
this.host = requireNonNull(host, "[host] must be specified to reindex from a remote cluster");
this.port = port;
this.query = requireNonNull(query, "[query] must be specified to reindex from a remote cluster");
this.username = username;
this.password = password;
this.headers = unmodifiableMap(requireNonNull(headers, "[headers] is required"));
}
/**
@ -56,6 +62,12 @@ public class RemoteInfo implements Writeable {
query = in.readBytesReference();
username = in.readOptionalString();
password = in.readOptionalString();
int headersLength = in.readVInt();
Map<String, String> headers = new HashMap<>(headersLength);
for (int i = 0; i < headersLength; i++) {
headers.put(in.readString(), in.readString());
}
this.headers = unmodifiableMap(headers);
}
@Override
@ -66,6 +78,11 @@ public class RemoteInfo implements Writeable {
out.writeBytesReference(query);
out.writeOptionalString(username);
out.writeOptionalString(password);
out.writeVInt(headers.size());
for (Map.Entry<String, String> header : headers.entrySet()) {
out.writeString(header.getKey());
out.writeString(header.getValue());
}
}
public String getScheme() {
@ -94,6 +111,10 @@ public class RemoteInfo implements Writeable {
return password;
}
public Map<String, String> getHeaders() {
return headers;
}
@Override
public String toString() {
StringBuilder b = new StringBuilder();

View File

@ -20,13 +20,16 @@
package org.elasticsearch.index.reindex.remote;
import org.apache.http.HttpEntity;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.Version;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.ResponseListener;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.ParseFieldMatcherSupplier;
import org.elasticsearch.common.Strings;
@ -34,6 +37,7 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
@ -130,6 +134,8 @@ public class RemoteScrollableHitSource extends ScrollableHitSource {
private <T> void execute(String method, String uri, Map<String, String> params, HttpEntity entity,
BiFunction<XContentParser, ParseFieldMatcherSupplier, T> parser, Consumer<? super T> listener) {
// Preserve the thread context so headers survive after the call
ThreadContext.StoredContext ctx = threadPool.getThreadContext().newStoredContext();
class RetryHelper extends AbstractRunnable {
private final Iterator<TimeValue> retries = backoffPolicy.iterator();
@ -138,6 +144,8 @@ public class RemoteScrollableHitSource extends ScrollableHitSource {
client.performRequest(method, uri, params, entity, new ResponseListener() {
@Override
public void onSuccess(org.elasticsearch.client.Response response) {
// Restore the thread context to get the precious headers
ctx.restore();
T parsedResponse;
try {
HttpEntity responseEntity = response.getEntity();
@ -172,6 +180,8 @@ public class RemoteScrollableHitSource extends ScrollableHitSource {
return;
}
}
e = wrapExceptionToPreserveStatus(re.getResponse().getStatusLine().getStatusCode(),
re.getResponse().getEntity(), re);
}
fail.accept(e);
}
@ -185,4 +195,31 @@ public class RemoteScrollableHitSource extends ScrollableHitSource {
}
new RetryHelper().run();
}
/**
* Wrap the ResponseException in an exception that'll preserve its status code if possible so we can send it back to the user. We might
* not have a constant for the status code so in that case we just use 500 instead. We also extract make sure to include the response
* body in the message so the user can figure out *why* the remote Elasticsearch service threw the error back to us.
*/
static ElasticsearchStatusException wrapExceptionToPreserveStatus(int statusCode, @Nullable HttpEntity entity, Exception cause) {
RestStatus status = RestStatus.fromCode(statusCode);
String messagePrefix = "";
if (status == null) {
messagePrefix = "Couldn't extract status [" + statusCode + "]. ";
status = RestStatus.INTERNAL_SERVER_ERROR;
}
String message;
if (entity == null) {
message = messagePrefix + "No error body.";
} else {
try {
message = messagePrefix + "body=" + EntityUtils.toString(entity);
} catch (IOException ioe) {
ElasticsearchStatusException e = new ElasticsearchStatusException(messagePrefix + "Failed to extract body.", status, cause);
e.addSuppressed(ioe);
return e;
}
}
return new ElasticsearchStatusException(message, status, cause);
}
}

View File

@ -0,0 +1,23 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/**
* Support for reindexing from a remote Elasticsearch cluster.
*/
package org.elasticsearch.index.reindex.remote;

View File

@ -31,6 +31,8 @@ import java.net.UnknownHostException;
import java.util.HashSet;
import java.util.Set;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.elasticsearch.index.reindex.TransportReindexAction.checkRemoteWhitelist;
@ -58,7 +60,7 @@ public class ReindexFromRemoteWhitelistTests extends ESTestCase {
String[] inList = whitelist.iterator().next().split(":");
String host = inList[0];
int port = Integer.valueOf(inList[1]);
checkRemoteWhitelist(whitelist, new RemoteInfo(randomAsciiOfLength(5), host, port, new BytesArray("test"), null, null),
checkRemoteWhitelist(whitelist, new RemoteInfo(randomAsciiOfLength(5), host, port, new BytesArray("test"), null, null, emptyMap()),
localhostOrNone());
}
@ -66,14 +68,15 @@ public class ReindexFromRemoteWhitelistTests extends ESTestCase {
Set<String> whitelist = randomWhitelist();
whitelist.add("myself");
TransportAddress publishAddress = new InetSocketTransportAddress(InetAddress.getByAddress(new byte[] {0x7f,0x00,0x00,0x01}), 9200);
checkRemoteWhitelist(whitelist, new RemoteInfo(randomAsciiOfLength(5), "127.0.0.1", 9200, new BytesArray("test"), null, null),
publishAddress);
checkRemoteWhitelist(whitelist,
new RemoteInfo(randomAsciiOfLength(5), "127.0.0.1", 9200, new BytesArray("test"), null, null, emptyMap()), publishAddress);
}
public void testUnwhitelistedRemote() {
int port = between(1, Integer.MAX_VALUE);
Exception e = expectThrows(IllegalArgumentException.class, () -> checkRemoteWhitelist(randomWhitelist(),
new RemoteInfo(randomAsciiOfLength(5), "not in list", port, new BytesArray("test"), null, null), localhostOrNone()));
RemoteInfo remoteInfo = new RemoteInfo(randomAsciiOfLength(5), "not in list", port, new BytesArray("test"), null, null, emptyMap());
Exception e = expectThrows(IllegalArgumentException.class,
() -> checkRemoteWhitelist(randomWhitelist(), remoteInfo, localhostOrNone()));
assertEquals("[not in list:" + port + "] not whitelisted in reindex.remote.whitelist", e.getMessage());
}

View File

@ -0,0 +1,197 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.reindex;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.support.ActionFilter;
import org.elasticsearch.action.support.ActionFilterChain;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.reindex.remote.RemoteInfo;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Netty4Plugin;
import org.junit.Before;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static org.elasticsearch.index.reindex.ReindexTestCase.matcher;
import static org.hamcrest.Matchers.containsString;
public class ReindexFromRemoteWithAuthTests extends ESSingleNodeTestCase {
private TransportAddress address;
@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return Arrays.asList(RetryTests.BogusPlugin.class,
Netty4Plugin.class,
ReindexFromRemoteWithAuthTests.TestPlugin.class,
ReindexPlugin.class);
}
@Override
protected Settings nodeSettings() {
Settings.Builder settings = Settings.builder().put(super.nodeSettings());
// Weird incantation required to test with netty
settings.put("netty.assert.buglevel", false);
settings.put(NetworkModule.HTTP_ENABLED.getKey(), true);
// Whitelist reindexing from the http host we're going to use
settings.put(TransportReindexAction.REMOTE_CLUSTER_WHITELIST.getKey(), "myself");
settings.put(NetworkModule.HTTP_TYPE_KEY, Netty4Plugin.NETTY_HTTP_TRANSPORT_NAME);
return settings.build();
}
@Before
public void setupSourceIndex() {
client().prepareIndex("source", "test").setSource("test", "test").setRefreshPolicy(RefreshPolicy.IMMEDIATE).get();
}
@Before
public void fetchTransportAddress() {
NodeInfo nodeInfo = client().admin().cluster().prepareNodesInfo().get().getNodes().get(0);
address = nodeInfo.getHttp().getAddress().publishAddress();
}
public void testReindexFromRemoteWithAuthentication() throws Exception {
RemoteInfo remote = new RemoteInfo("http", address.getHost(), address.getPort(), new BytesArray("{\"match_all\":{}}"), "Aladdin",
"open sesame", emptyMap());
ReindexRequestBuilder request = ReindexAction.INSTANCE.newRequestBuilder(client()).source("source").destination("dest")
.setRemoteInfo(remote);
assertThat(request.get(), matcher().created(1));
}
public void testReindexSendsHeaders() throws Exception {
RemoteInfo remote = new RemoteInfo("http", address.getHost(), address.getPort(), new BytesArray("{\"match_all\":{}}"), null, null,
singletonMap(TestFilter.EXAMPLE_HEADER, "doesn't matter"));
ReindexRequestBuilder request = ReindexAction.INSTANCE.newRequestBuilder(client()).source("source").destination("dest")
.setRemoteInfo(remote);
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> request.get());
assertEquals(RestStatus.BAD_REQUEST, e.status());
assertThat(e.getMessage(), containsString("Hurray! Sent the header!"));
}
public void testReindexWithoutAuthenticationWhenRequired() throws Exception {
RemoteInfo remote = new RemoteInfo("http", address.getHost(), address.getPort(), new BytesArray("{\"match_all\":{}}"), null, null,
emptyMap());
ReindexRequestBuilder request = ReindexAction.INSTANCE.newRequestBuilder(client()).source("source").destination("dest")
.setRemoteInfo(remote);
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> request.get());
assertEquals(RestStatus.UNAUTHORIZED, e.status());
assertThat(e.getMessage(), containsString("\"reason\":\"Authentication required\""));
assertThat(e.getMessage(), containsString("\"WWW-Authenticate\":\"Basic realm=auth-realm\""));
}
public void testReindexWithBadAuthentication() throws Exception {
RemoteInfo remote = new RemoteInfo("http", address.getHost(), address.getPort(), new BytesArray("{\"match_all\":{}}"), "junk",
"auth", emptyMap());
ReindexRequestBuilder request = ReindexAction.INSTANCE.newRequestBuilder(client()).source("source").destination("dest")
.setRemoteInfo(remote);
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> request.get());
assertThat(e.getMessage(), containsString("\"reason\":\"Bad Authorization\""));
}
/**
* Plugin that demands authentication.
*/
public static class TestPlugin extends Plugin implements ActionPlugin {
@Override
public List<Class<? extends ActionFilter>> getActionFilters() {
return singletonList(ReindexFromRemoteWithAuthTests.TestFilter.class);
}
@Override
public Collection<String> getRestHeaders() {
return Arrays.asList(TestFilter.AUTHORIZATION_HEADER, TestFilter.EXAMPLE_HEADER);
}
}
/**
* Action filter that will reject the request if it isn't authenticated.
*/
public static class TestFilter implements ActionFilter {
/**
* The authorization required. Corresponds to username="Aladdin" and password="open sesame". It is the example in
* <a href="https://tools.ietf.org/html/rfc1945#section-11.1">HTTP/1.0's RFC</a>.
*/
private static final String REQUIRED_AUTH = "Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ==";
private static final String AUTHORIZATION_HEADER = "Authorization";
private static final String EXAMPLE_HEADER = "Example-Header";
private final ThreadContext context;
@Inject
public TestFilter(ThreadPool threadPool) {
context = threadPool.getThreadContext();
}
@Override
public int order() {
return Integer.MIN_VALUE;
}
@Override
public <Request extends ActionRequest<Request>, Response extends ActionResponse> void apply(Task task, String action,
Request request, ActionListener<Response> listener, ActionFilterChain<Request, Response> chain) {
if (false == action.equals(SearchAction.NAME)) {
chain.proceed(task, action, request, listener);
return;
}
if (context.getHeader(EXAMPLE_HEADER) != null) {
throw new IllegalArgumentException("Hurray! Sent the header!");
}
String auth = context.getHeader(AUTHORIZATION_HEADER);
if (auth == null) {
ElasticsearchSecurityException e = new ElasticsearchSecurityException("Authentication required",
RestStatus.UNAUTHORIZED);
e.addHeader("WWW-Authenticate", "Basic realm=auth-realm");
throw e;
}
if (false == REQUIRED_AUTH.equals(auth)) {
throw new ElasticsearchSecurityException("Bad Authorization", RestStatus.FORBIDDEN);
}
chain.proceed(task, action, request, listener);
}
@Override
public <Response extends ActionResponse> void apply(String action, Response response, ActionListener<Response> listener,
ActionFilterChain<?, Response> chain) {
chain.proceed(action, response, listener);
}
}
}

View File

@ -26,6 +26,7 @@ import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.index.reindex.remote.RemoteInfo;
import org.elasticsearch.test.ESTestCase;
import static java.util.Collections.emptyMap;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
/**
@ -44,7 +45,7 @@ public class ReindexRequestTests extends ESTestCase {
public void testReindexFromRemoteDoesNotSupportSearchQuery() {
ReindexRequest reindex = request();
reindex.setRemoteInfo(new RemoteInfo(randomAsciiOfLength(5), randomAsciiOfLength(5), between(1, Integer.MAX_VALUE),
new BytesArray("real_query"), null, null));
new BytesArray("real_query"), null, null, emptyMap()));
reindex.getSearchRequest().source().query(matchAllQuery()); // Unsupported place to put query
ActionRequestValidationException e = reindex.validate();
assertEquals("Validation Failed: 1: reindex from remote sources should use RemoteInfo's query instead of source's query;",

View File

@ -35,6 +35,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.reindex.remote.RemoteInfo;
import org.elasticsearch.test.ESTestCase;
import static java.util.Collections.emptyMap;
import static org.hamcrest.Matchers.containsString;
/**
@ -86,9 +87,10 @@ public class ReindexSourceTargetValidationTests extends ESTestCase {
public void testRemoteInfoSkipsValidation() {
// The index doesn't have to exist
succeeds(new RemoteInfo(randomAsciiOfLength(5), "test", 9200, new BytesArray("test"), null, null), "does_not_exist", "target");
succeeds(new RemoteInfo(randomAsciiOfLength(5), "test", 9200, new BytesArray("test"), null, null, emptyMap()), "does_not_exist",
"target");
// And it doesn't matter if they are the same index. They are considered to be different because the remote one is, well, remote.
succeeds(new RemoteInfo(randomAsciiOfLength(5), "test", 9200, new BytesArray("test"), null, null), "target", "target");
succeeds(new RemoteInfo(randomAsciiOfLength(5), "test", 9200, new BytesArray("test"), null, null, emptyMap()), "target", "target");
}
private void fails(String target, String... sources) {

View File

@ -41,10 +41,16 @@ public class RestReindexActionTests extends ESTestCase {
}
public void testBuildRemoteInfoFullyLoaded() throws IOException {
Map<String, String> headers = new HashMap<>();
headers.put("first", "a");
headers.put("second", "b");
headers.put("third", "");
Map<String, Object> remote = new HashMap<>();
remote.put("host", "https://example.com:9200");
remote.put("username", "testuser");
remote.put("password", "testpass");
remote.put("headers", headers);
Map<String, Object> query = new HashMap<>();
query.put("a", "b");
@ -60,6 +66,7 @@ public class RestReindexActionTests extends ESTestCase {
assertEquals("{\n \"a\" : \"b\"\n}", remoteInfo.getQuery().utf8ToString());
assertEquals("testuser", remoteInfo.getUsername());
assertEquals("testpass", remoteInfo.getPassword());
assertEquals(headers, remoteInfo.getHeaders());
}
public void testBuildRemoteInfoWithoutAllParts() throws IOException {

View File

@ -47,6 +47,7 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.CyclicBarrier;
import static java.util.Collections.emptyMap;
import static org.elasticsearch.index.reindex.ReindexTestCase.matcher;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.greaterThan;
@ -114,8 +115,8 @@ public class RetryTests extends ESSingleNodeTestCase {
@Override
protected Settings nodeSettings() {
Settings.Builder settings = Settings.builder().put(super.nodeSettings());
// Use pools of size 1 so we can block them
settings.put("netty.assert.buglevel", false);
// Use pools of size 1 so we can block them
settings.put("thread_pool.bulk.size", 1);
settings.put("thread_pool.search.size", 1);
// Use queues of size 1 because size 0 is broken and because search requests need the queue to function
@ -140,7 +141,8 @@ public class RetryTests extends ESSingleNodeTestCase {
public void testReindexFromRemote() throws Exception {
NodeInfo nodeInfo = client().admin().cluster().prepareNodesInfo().get().getNodes().get(0);
TransportAddress address = nodeInfo.getHttp().getAddress().publishAddress();
RemoteInfo remote = new RemoteInfo("http", address.getHost(), address.getPort(), new BytesArray("{\"match_all\":{}}"), null, null);
RemoteInfo remote = new RemoteInfo("http", address.getHost(), address.getPort(), new BytesArray("{\"match_all\":{}}"), null, null,
emptyMap());
ReindexRequestBuilder request = ReindexAction.INSTANCE.newRequestBuilder(client()).source("source").destination("dest")
.setRemoteInfo(remote);
testCase(ReindexAction.NAME, request, matcher().created(DOC_COUNT));

View File

@ -38,7 +38,9 @@ import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static java.lang.Math.abs;
import static java.util.Collections.emptyList;
@ -62,7 +64,12 @@ public class RoundTripTests extends ESTestCase {
BytesReference query = new BytesArray(randomAsciiOfLength(5));
String username = randomBoolean() ? randomAsciiOfLength(5) : null;
String password = username != null && randomBoolean() ? randomAsciiOfLength(5) : null;
reindex.setRemoteInfo(new RemoteInfo(randomAsciiOfLength(5), randomAsciiOfLength(5), port, query, username, password));
int headersCount = randomBoolean() ? 0 : between(1, 10);
Map<String, String> headers = new HashMap<>(headersCount);
while (headers.size() < headersCount) {
headers.put(randomAsciiOfLength(5), randomAsciiOfLength(5));
}
reindex.setRemoteInfo(new RemoteInfo(randomAsciiOfLength(5), randomAsciiOfLength(5), port, query, username, password, headers));
}
ReindexRequest tripped = new ReindexRequest();
roundTrip(reindex, tripped);
@ -78,6 +85,7 @@ public class RoundTripTests extends ESTestCase {
assertEquals(reindex.getRemoteInfo().getQuery(), tripped.getRemoteInfo().getQuery());
assertEquals(reindex.getRemoteInfo().getUsername(), tripped.getRemoteInfo().getUsername());
assertEquals(reindex.getRemoteInfo().getPassword(), tripped.getRemoteInfo().getPassword());
assertEquals(reindex.getRemoteInfo().getHeaders(), tripped.getRemoteInfo().getHeaders());
}
}

View File

@ -22,15 +22,17 @@ package org.elasticsearch.index.reindex.remote;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.test.ESTestCase;
import static java.util.Collections.emptyMap;
public class RemoteInfoTests extends ESTestCase {
public void testToString() {
RemoteInfo info = new RemoteInfo("http", "testhost", 12344, new BytesArray("testquery"), null, null);
RemoteInfo info = new RemoteInfo("http", "testhost", 12344, new BytesArray("testquery"), null, null, emptyMap());
assertEquals("host=testhost port=12344 query=testquery", info.toString());
info = new RemoteInfo("http", "testhost", 12344, new BytesArray("testquery"), "testuser", null);
info = new RemoteInfo("http", "testhost", 12344, new BytesArray("testquery"), "testuser", null, emptyMap());
assertEquals("host=testhost port=12344 query=testquery username=testuser", info.toString());
info = new RemoteInfo("http", "testhost", 12344, new BytesArray("testquery"), "testuser", "testpass");
info = new RemoteInfo("http", "testhost", 12344, new BytesArray("testquery"), "testuser", "testpass", emptyMap());
assertEquals("host=testhost port=12344 query=testquery username=testuser password=<<>>", info.toString());
info = new RemoteInfo("https", "testhost", 12344, new BytesArray("testquery"), "testuser", "testpass");
info = new RemoteInfo("https", "testhost", 12344, new BytesArray("testquery"), "testuser", "testpass", emptyMap());
assertEquals("scheme=https host=testhost port=12344 query=testquery username=testuser password=<<>>", info.toString());
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.index.reindex.remote;
import org.apache.http.HttpEntity;
import org.apache.http.HttpEntityEnclosingRequest;
import org.apache.http.HttpHost;
import org.apache.http.HttpResponse;
@ -27,12 +28,14 @@ import org.apache.http.StatusLine;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.InputStreamEntity;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.message.BasicHttpResponse;
import org.apache.http.message.BasicStatusLine;
import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.Version;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.search.SearchRequest;
@ -53,6 +56,7 @@ import org.junit.Before;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URL;
import java.nio.charset.StandardCharsets;
@ -316,6 +320,66 @@ public class RemoteScrollableHitSourceTests extends ESTestCase {
assertEquals(retriesAllowed, retries);
}
public void testThreadContextRestored() throws Exception {
String header = randomAsciiOfLength(5);
threadPool.getThreadContext().putHeader("test", header);
AtomicBoolean called = new AtomicBoolean();
sourceWithMockedRemoteCall("start_ok.json").doStart(r -> {
assertEquals(header, threadPool.getThreadContext().getHeader("test"));
called.set(true);
});
assertTrue(called.get());
}
public void testWrapExceptionToPreserveStatus() throws IOException {
Exception cause = new Exception();
// Successfully get the status without a body
RestStatus status = randomFrom(RestStatus.values());
ElasticsearchStatusException wrapped = RemoteScrollableHitSource.wrapExceptionToPreserveStatus(status.getStatus(), null, cause);
assertEquals(status, wrapped.status());
assertEquals(cause, wrapped.getCause());
assertEquals("No error body.", wrapped.getMessage());
// Successfully get the status without a body
HttpEntity okEntity = new StringEntity("test body", StandardCharsets.UTF_8);
wrapped = RemoteScrollableHitSource.wrapExceptionToPreserveStatus(status.getStatus(), okEntity, cause);
assertEquals(status, wrapped.status());
assertEquals(cause, wrapped.getCause());
assertEquals("body=test body", wrapped.getMessage());
// Successfully get the status with a broken body
IOException badEntityException = new IOException();
HttpEntity badEntity = mock(HttpEntity.class);
when(badEntity.getContent()).thenThrow(badEntityException);
wrapped = RemoteScrollableHitSource.wrapExceptionToPreserveStatus(status.getStatus(), badEntity, cause);
assertEquals(status, wrapped.status());
assertEquals(cause, wrapped.getCause());
assertEquals("Failed to extract body.", wrapped.getMessage());
assertEquals(badEntityException, wrapped.getSuppressed()[0]);
// Fail to get the status without a body
int notAnHttpStatus = -1;
assertNull(RestStatus.fromCode(notAnHttpStatus));
wrapped = RemoteScrollableHitSource.wrapExceptionToPreserveStatus(notAnHttpStatus, null, cause);
assertEquals(RestStatus.INTERNAL_SERVER_ERROR, wrapped.status());
assertEquals(cause, wrapped.getCause());
assertEquals("Couldn't extract status [" + notAnHttpStatus + "]. No error body.", wrapped.getMessage());
// Fail to get the status without a body
wrapped = RemoteScrollableHitSource.wrapExceptionToPreserveStatus(notAnHttpStatus, okEntity, cause);
assertEquals(RestStatus.INTERNAL_SERVER_ERROR, wrapped.status());
assertEquals(cause, wrapped.getCause());
assertEquals("Couldn't extract status [" + notAnHttpStatus + "]. body=test body", wrapped.getMessage());
// Fail to get the status with a broken body
wrapped = RemoteScrollableHitSource.wrapExceptionToPreserveStatus(notAnHttpStatus, badEntity, cause);
assertEquals(RestStatus.INTERNAL_SERVER_ERROR, wrapped.status());
assertEquals(cause, wrapped.getCause());
assertEquals("Couldn't extract status [" + notAnHttpStatus + "]. Failed to extract body.", wrapped.getMessage());
assertEquals(badEntityException, wrapped.getSuppressed()[0]);
}
private RemoteScrollableHitSource sourceWithMockedRemoteCall(String... paths) throws Exception {
return sourceWithMockedRemoteCall(true, paths);
}
@ -342,8 +406,9 @@ public class RemoteScrollableHitSourceTests extends ESTestCase {
@Override
public Future<HttpResponse> answer(InvocationOnMock invocationOnMock) throws Throwable {
// Throw away the current thread context to simulate running async httpclient's thread pool
threadPool.getThreadContext().stashContext();
HttpAsyncRequestProducer requestProducer = (HttpAsyncRequestProducer) invocationOnMock.getArguments()[0];
@SuppressWarnings("unchecked")
FutureCallback<HttpResponse> futureCallback = (FutureCallback<HttpResponse>) invocationOnMock.getArguments()[2];
HttpEntityEnclosingRequest request = (HttpEntityEnclosingRequest)requestProducer.generateRequest();
URL resource = resources[responseCount];

View File

@ -20,7 +20,7 @@
import org.elasticsearch.gradle.precommit.PrecommitTasks
subprojects {
// fixtures is just an intermediate parent project
// fixtures is just intermediate parent project
if (name == 'fixtures') return
group = 'org.elasticsearch.test'
@ -28,7 +28,7 @@ subprojects {
apply plugin: 'nebula.maven-base-publish'
apply plugin: 'nebula.maven-scm'
// the main files are actually test files, so use the appropriate forbidden api sigs
forbiddenApisMain {
signaturesURLs = [PrecommitTasks.getResource('/forbidden/jdk-signatures.txt'),

View File

@ -142,12 +142,17 @@ public class DoSection implements ExecutableSection {
private static Map<String, Tuple<String, org.hamcrest.Matcher<Integer>>> catches = new HashMap<>();
static {
catches.put("missing", tuple("404", equalTo(404)));
catches.put("conflict", tuple("409", equalTo(409)));
catches.put("unauthorized", tuple("401", equalTo(401)));
catches.put("forbidden", tuple("403", equalTo(403)));
catches.put("missing", tuple("404", equalTo(404)));
catches.put("request_timeout", tuple("408", equalTo(408)));
catches.put("conflict", tuple("409", equalTo(409)));
catches.put("unavailable", tuple("503", equalTo(503)));
catches.put("request", tuple("4xx|5xx",
allOf(greaterThanOrEqualTo(400), not(equalTo(404)), not(equalTo(408)), not(equalTo(409)), not(equalTo(403)))));
catches.put("request", tuple("4xx|5xx", allOf(greaterThanOrEqualTo(400),
not(equalTo(401)),
not(equalTo(403)),
not(equalTo(404)),
not(equalTo(408)),
not(equalTo(409)))));
}
}

View File

@ -35,8 +35,8 @@ import static java.util.Collections.unmodifiableList;
* and the related skip sections can be removed from the tests as well.
*/
public final class Features {
private static final List<String> SUPPORTED = unmodifiableList(Arrays.asList(
"catch_unauthorized",
"embedded_stash_key",
"groovy_scripting",
"headers",