From fb45f6a8a864597f05e7ecb65d0c99e560b99968 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Wed, 6 Jul 2016 15:47:45 -0400 Subject: [PATCH] Add authentication to reindex-from-remote The tests for authentication extend ESIntegTestCase and use a mock authentication plugin. This way the clients don't have to worry about running it. Sadly, that means we don't really have good coverage on the REST portion of the authentication. This also adds ElasticsearchStatusException, and exception on which you can set an explicit status. The nice thing about it is that you can set the RestStatus that it returns to whatever arbitrary status you like based on the status that comes back from the remote system. reindex-from-remote then uses it to wrap all remote failures, preserving the status from the remote Elasticsearch or whatever proxy is between us and the remove Elasticsearch. --- .../elasticsearch/ElasticsearchException.java | 4 +- .../ElasticsearchSecurityException.java | 40 ++-- .../ElasticsearchStatusException.java | 68 ++++++ .../org/elasticsearch/rest/RestStatus.java | 20 ++ .../ExceptionSerializationTests.java | 8 + .../index/reindex/RestReindexAction.java | 23 +- .../index/reindex/TransportReindexAction.java | 34 ++- .../index/reindex/package-info.java | 24 +++ .../index/reindex/remote/RemoteInfo.java | 23 +- .../remote/RemoteScrollableHitSource.java | 37 ++++ .../index/reindex/remote/package-info.java | 23 ++ .../ReindexFromRemoteWhitelistTests.java | 13 +- .../ReindexFromRemoteWithAuthTests.java | 197 ++++++++++++++++++ .../index/reindex/ReindexRequestTests.java | 3 +- .../ReindexSourceTargetValidationTests.java | 6 +- .../index/reindex/RestReindexActionTests.java | 7 + .../index/reindex/RetryTests.java | 6 +- .../index/reindex/RoundTripTests.java | 10 +- .../index/reindex/remote/RemoteInfoTests.java | 10 +- .../RemoteScrollableHitSourceTests.java | 67 +++++- test/build.gradle | 4 +- .../test/rest/yaml/section/DoSection.java | 13 +- .../test/rest/yaml/support/Features.java | 2 +- 23 files changed, 586 insertions(+), 56 deletions(-) create mode 100644 core/src/main/java/org/elasticsearch/ElasticsearchStatusException.java create mode 100644 modules/reindex/src/main/java/org/elasticsearch/index/reindex/package-info.java create mode 100644 modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/package-info.java create mode 100644 modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexFromRemoteWithAuthTests.java diff --git a/core/src/main/java/org/elasticsearch/ElasticsearchException.java b/core/src/main/java/org/elasticsearch/ElasticsearchException.java index 8222955c60b..c6cfb9e9a02 100644 --- a/core/src/main/java/org/elasticsearch/ElasticsearchException.java +++ b/core/src/main/java/org/elasticsearch/ElasticsearchException.java @@ -692,8 +692,8 @@ public class ElasticsearchException extends RuntimeException implements ToXConte NO_LONGER_PRIMARY_SHARD_EXCEPTION(ShardStateAction.NoLongerPrimaryShardException.class, ShardStateAction.NoLongerPrimaryShardException::new, 142), SCRIPT_EXCEPTION(org.elasticsearch.script.ScriptException.class, org.elasticsearch.script.ScriptException::new, 143), - NOT_MASTER_EXCEPTION(org.elasticsearch.cluster.NotMasterException.class, org.elasticsearch.cluster.NotMasterException::new, 144); - + NOT_MASTER_EXCEPTION(org.elasticsearch.cluster.NotMasterException.class, org.elasticsearch.cluster.NotMasterException::new, 144), + STATUS_EXCEPTION(org.elasticsearch.ElasticsearchStatusException.class, org.elasticsearch.ElasticsearchStatusException::new, 145); final Class exceptionClass; final FunctionThatThrowsIOException constructor; diff --git a/core/src/main/java/org/elasticsearch/ElasticsearchSecurityException.java b/core/src/main/java/org/elasticsearch/ElasticsearchSecurityException.java index b6cd420c856..0cf5fb474e0 100644 --- a/core/src/main/java/org/elasticsearch/ElasticsearchSecurityException.java +++ b/core/src/main/java/org/elasticsearch/ElasticsearchSecurityException.java @@ -19,7 +19,6 @@ package org.elasticsearch; import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.rest.RestStatus; import java.io.IOException; @@ -27,40 +26,39 @@ import java.io.IOException; /** * Generic security exception */ -public class ElasticsearchSecurityException extends ElasticsearchException { - - private final RestStatus status; - +public class ElasticsearchSecurityException extends ElasticsearchStatusException { + /** + * Build the exception with a specific status and cause. + */ public ElasticsearchSecurityException(String msg, RestStatus status, Throwable cause, Object... args) { - super(msg, cause, args); - this.status = status ; + super(msg, status, cause, args); } + /** + * Build the exception with the status derived from the cause. + */ public ElasticsearchSecurityException(String msg, Exception cause, Object... args) { this(msg, ExceptionsHelper.status(cause), cause, args); } + /** + * Build the exception with a status of {@link RestStatus#INTERNAL_SERVER_ERROR} without a cause. + */ public ElasticsearchSecurityException(String msg, Object... args) { - this(msg, RestStatus.INTERNAL_SERVER_ERROR, null, args); + this(msg, RestStatus.INTERNAL_SERVER_ERROR, args); } + /** + * Build the exception without a cause. + */ public ElasticsearchSecurityException(String msg, RestStatus status, Object... args) { - this(msg, status, null, args); + super(msg, status, args); } + /** + * Read from a stream. + */ public ElasticsearchSecurityException(StreamInput in) throws IOException { super(in); - status = RestStatus.readFrom(in); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - RestStatus.writeTo(out, status); - } - - @Override - public final RestStatus status() { - return status; } } diff --git a/core/src/main/java/org/elasticsearch/ElasticsearchStatusException.java b/core/src/main/java/org/elasticsearch/ElasticsearchStatusException.java new file mode 100644 index 00000000000..55f12db69e1 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/ElasticsearchStatusException.java @@ -0,0 +1,68 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.rest.RestStatus; + +import java.io.IOException; + +/** + * Exception who's {@link RestStatus} is arbitrary rather than derived. Used, for example, by reindex-from-remote to wrap remote exceptions + * that contain a status. + */ +public class ElasticsearchStatusException extends ElasticsearchException { + private final RestStatus status; + + /** + * Build the exception with a specific status and cause. + */ + public ElasticsearchStatusException(String msg, RestStatus status, Throwable cause, Object... args) { + super(msg, cause, args); + this.status = status; + } + + /** + * Build the exception without a cause. + */ + public ElasticsearchStatusException(String msg, RestStatus status, Object... args) { + this(msg, status, null, args); + } + + /** + * Read from a stream. + */ + public ElasticsearchStatusException(StreamInput in) throws IOException { + super(in); + status = RestStatus.readFrom(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + RestStatus.writeTo(out, status); + } + + @Override + public final RestStatus status() { + return status; + } +} diff --git a/core/src/main/java/org/elasticsearch/rest/RestStatus.java b/core/src/main/java/org/elasticsearch/rest/RestStatus.java index d78b9c50e0c..d72eb2d11f4 100644 --- a/core/src/main/java/org/elasticsearch/rest/RestStatus.java +++ b/core/src/main/java/org/elasticsearch/rest/RestStatus.java @@ -24,6 +24,10 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static java.util.Collections.unmodifiableMap; public enum RestStatus { /** @@ -477,6 +481,15 @@ public enum RestStatus { */ INSUFFICIENT_STORAGE(506); + private static final Map CODE_TO_STATUS; + static { + RestStatus[] values = values(); + Map codeToStatus = new HashMap<>(values.length); + for (RestStatus value : values) { + codeToStatus.put(value.status, value); + } + CODE_TO_STATUS = unmodifiableMap(codeToStatus); + } private int status; @@ -515,4 +528,11 @@ public enum RestStatus { } return status; } + + /** + * Turn a status code into a {@link RestStatus}, returning null if we don't know that status. + */ + public static RestStatus fromCode(int code) { + return CODE_TO_STATUS.get(code); + } } diff --git a/core/src/test/java/org/elasticsearch/ExceptionSerializationTests.java b/core/src/test/java/org/elasticsearch/ExceptionSerializationTests.java index a7dbb145e40..b9f3d443089 100644 --- a/core/src/test/java/org/elasticsearch/ExceptionSerializationTests.java +++ b/core/src/test/java/org/elasticsearch/ExceptionSerializationTests.java @@ -792,6 +792,7 @@ public class ExceptionSerializationTests extends ESTestCase { ids.put(142, ShardStateAction.NoLongerPrimaryShardException.class); ids.put(143, org.elasticsearch.script.ScriptException.class); ids.put(144, org.elasticsearch.cluster.NotMasterException.class); + ids.put(145, org.elasticsearch.ElasticsearchStatusException.class); Map, Integer> reverse = new HashMap<>(); for (Map.Entry> entry : ids.entrySet()) { @@ -842,4 +843,11 @@ public class ExceptionSerializationTests extends ESTestCase { } } } + + public void testElasticsearchRemoteException() throws IOException { + ElasticsearchStatusException ex = new ElasticsearchStatusException("something", RestStatus.TOO_MANY_REQUESTS); + ElasticsearchStatusException e = serialize(ex); + assertEquals(ex.status(), e.status()); + assertEquals(RestStatus.TOO_MANY_REQUESTS, e.status()); + } } diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestReindexAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestReindexAction.java index d40c1ea6622..03c5054afd2 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestReindexAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestReindexAction.java @@ -56,6 +56,7 @@ import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; +import static java.util.Collections.emptyMap; import static java.util.Objects.requireNonNull; import static org.elasticsearch.common.unit.TimeValue.parseTimeValue; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; @@ -151,11 +152,12 @@ public class RestReindexAction extends AbstractBaseReindexRestHandler headers = extractStringStringMap(remote, "headers"); if (false == remote.isEmpty()) { throw new IllegalArgumentException( "Unsupported fields in [remote]: [" + Strings.collectionToCommaDelimitedString(remote.keySet()) + "]"); } - return new RemoteInfo(scheme, host, port, queryForRemote(source), username, password); + return new RemoteInfo(scheme, host, port, queryForRemote(source), username, password, headers); } /** @@ -189,6 +191,25 @@ public class RestReindexAction extends AbstractBaseReindexRestHandler extractStringStringMap(Map source, String name) { + Object value = source.remove(name); + if (value == null) { + return emptyMap(); + } + if (false == value instanceof Map) { + throw new IllegalArgumentException("Expected [" + name + "] to be an object containing strings but was [" + value + "]"); + } + Map map = (Map) value; + for (Map.Entry entry : map.entrySet()) { + if (false == entry.getKey() instanceof String || false == entry.getValue() instanceof String) { + throw new IllegalArgumentException("Expected [" + name + "] to be an object containing strings but has [" + entry + "]"); + } + } + @SuppressWarnings("unchecked") // We just checked.... + Map safe = (Map) map; + return safe; + } + private static BytesReference queryForRemote(Map source) throws IOException { XContentBuilder builder = JsonXContent.contentBuilder().prettyPrint(); Object query = source.remove("query"); diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportReindexAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportReindexAction.java index 2238bec433b..291cd4f7b4d 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportReindexAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportReindexAction.java @@ -19,7 +19,13 @@ package org.elasticsearch.index.reindex; +import org.apache.http.Header; import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.message.BasicHeader; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.bulk.BackoffPolicy; @@ -31,6 +37,7 @@ import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.client.Client; import org.elasticsearch.client.ParentTaskAssigningClient; import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; @@ -178,16 +185,27 @@ public class TransportReindexAction extends HandledTransportAction header : remoteInfo.getHeaders().entrySet()) { + clientHeaders[i] = new BasicHeader(header.getKey(), header.getValue()); } - RestClient restClient = RestClient.builder( - new HttpHost(remoteInfo.getHost(), remoteInfo.getPort(), remoteInfo.getScheme())).build(); - return new RemoteScrollableHitSource(logger, backoffPolicy, threadPool, task::countSearchRetry, - this::finishHim, restClient, remoteInfo.getQuery(), mainRequest.getSearchRequest()); + RestClientBuilder restClient = RestClient + .builder(new HttpHost(remoteInfo.getHost(), remoteInfo.getPort(), remoteInfo.getScheme())) + .setDefaultHeaders(clientHeaders); + if (remoteInfo.getUsername() != null) { + restClient.setHttpClientConfigCallback(c -> { + UsernamePasswordCredentials creds = new UsernamePasswordCredentials(remoteInfo.getUsername(), + remoteInfo.getPassword()); + CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials(AuthScope.ANY, creds); + c.setDefaultCredentialsProvider(credentialsProvider); + return c; + }); + } + return new RemoteScrollableHitSource(logger, backoffPolicy, threadPool, task::countSearchRetry, this::finishHim, + restClient.build(), remoteInfo.getQuery(), mainRequest.getSearchRequest()); } return super.buildScrollableResultSource(backoffPolicy); } diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/package-info.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/package-info.java new file mode 100644 index 00000000000..bb16842867f --- /dev/null +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/package-info.java @@ -0,0 +1,24 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Actions that modify documents based on the results of a scrolling query like {@link ReindexAction}, {@link UpdateByQueryAction}, and + * {@link DeleteByQueryAction}. + */ +package org.elasticsearch.index.reindex; diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/RemoteInfo.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/RemoteInfo.java index 89d6cb18401..1405d656d99 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/RemoteInfo.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/RemoteInfo.java @@ -26,7 +26,10 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import static java.util.Collections.unmodifiableMap; import static java.util.Objects.requireNonNull; public class RemoteInfo implements Writeable { @@ -36,14 +39,17 @@ public class RemoteInfo implements Writeable { private final BytesReference query; private final String username; private final String password; + private final Map headers; - public RemoteInfo(String scheme, String host, int port, BytesReference query, String username, String password) { + public RemoteInfo(String scheme, String host, int port, BytesReference query, String username, String password, + Map headers) { this.scheme = requireNonNull(scheme, "[scheme] must be specified to reindex from a remote cluster"); this.host = requireNonNull(host, "[host] must be specified to reindex from a remote cluster"); this.port = port; this.query = requireNonNull(query, "[query] must be specified to reindex from a remote cluster"); this.username = username; this.password = password; + this.headers = unmodifiableMap(requireNonNull(headers, "[headers] is required")); } /** @@ -56,6 +62,12 @@ public class RemoteInfo implements Writeable { query = in.readBytesReference(); username = in.readOptionalString(); password = in.readOptionalString(); + int headersLength = in.readVInt(); + Map headers = new HashMap<>(headersLength); + for (int i = 0; i < headersLength; i++) { + headers.put(in.readString(), in.readString()); + } + this.headers = unmodifiableMap(headers); } @Override @@ -66,6 +78,11 @@ public class RemoteInfo implements Writeable { out.writeBytesReference(query); out.writeOptionalString(username); out.writeOptionalString(password); + out.writeVInt(headers.size()); + for (Map.Entry header : headers.entrySet()) { + out.writeString(header.getKey()); + out.writeString(header.getValue()); + } } public String getScheme() { @@ -94,6 +111,10 @@ public class RemoteInfo implements Writeable { return password; } + public Map getHeaders() { + return headers; + } + @Override public String toString() { StringBuilder b = new StringBuilder(); diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSource.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSource.java index eee3e2c59ae..41f6dd5f946 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSource.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSource.java @@ -20,13 +20,16 @@ package org.elasticsearch.index.reindex.remote; import org.apache.http.HttpEntity; +import org.apache.http.util.EntityUtils; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.Version; import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.client.ResponseException; import org.elasticsearch.client.ResponseListener; import org.elasticsearch.client.RestClient; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseFieldMatcher; import org.elasticsearch.common.ParseFieldMatcherSupplier; import org.elasticsearch.common.Strings; @@ -34,6 +37,7 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; @@ -130,6 +134,8 @@ public class RemoteScrollableHitSource extends ScrollableHitSource { private void execute(String method, String uri, Map params, HttpEntity entity, BiFunction parser, Consumer listener) { + // Preserve the thread context so headers survive after the call + ThreadContext.StoredContext ctx = threadPool.getThreadContext().newStoredContext(); class RetryHelper extends AbstractRunnable { private final Iterator retries = backoffPolicy.iterator(); @@ -138,6 +144,8 @@ public class RemoteScrollableHitSource extends ScrollableHitSource { client.performRequest(method, uri, params, entity, new ResponseListener() { @Override public void onSuccess(org.elasticsearch.client.Response response) { + // Restore the thread context to get the precious headers + ctx.restore(); T parsedResponse; try { HttpEntity responseEntity = response.getEntity(); @@ -172,6 +180,8 @@ public class RemoteScrollableHitSource extends ScrollableHitSource { return; } } + e = wrapExceptionToPreserveStatus(re.getResponse().getStatusLine().getStatusCode(), + re.getResponse().getEntity(), re); } fail.accept(e); } @@ -185,4 +195,31 @@ public class RemoteScrollableHitSource extends ScrollableHitSource { } new RetryHelper().run(); } + + /** + * Wrap the ResponseException in an exception that'll preserve its status code if possible so we can send it back to the user. We might + * not have a constant for the status code so in that case we just use 500 instead. We also extract make sure to include the response + * body in the message so the user can figure out *why* the remote Elasticsearch service threw the error back to us. + */ + static ElasticsearchStatusException wrapExceptionToPreserveStatus(int statusCode, @Nullable HttpEntity entity, Exception cause) { + RestStatus status = RestStatus.fromCode(statusCode); + String messagePrefix = ""; + if (status == null) { + messagePrefix = "Couldn't extract status [" + statusCode + "]. "; + status = RestStatus.INTERNAL_SERVER_ERROR; + } + String message; + if (entity == null) { + message = messagePrefix + "No error body."; + } else { + try { + message = messagePrefix + "body=" + EntityUtils.toString(entity); + } catch (IOException ioe) { + ElasticsearchStatusException e = new ElasticsearchStatusException(messagePrefix + "Failed to extract body.", status, cause); + e.addSuppressed(ioe); + return e; + } + } + return new ElasticsearchStatusException(message, status, cause); + } } diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/package-info.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/package-info.java new file mode 100644 index 00000000000..6dd03ad2952 --- /dev/null +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Support for reindexing from a remote Elasticsearch cluster. + */ +package org.elasticsearch.index.reindex.remote; \ No newline at end of file diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexFromRemoteWhitelistTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexFromRemoteWhitelistTests.java index b88ed135515..61204177072 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexFromRemoteWhitelistTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexFromRemoteWhitelistTests.java @@ -31,6 +31,8 @@ import java.net.UnknownHostException; import java.util.HashSet; import java.util.Set; +import static java.util.Collections.emptyList; +import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static org.elasticsearch.index.reindex.TransportReindexAction.checkRemoteWhitelist; @@ -58,7 +60,7 @@ public class ReindexFromRemoteWhitelistTests extends ESTestCase { String[] inList = whitelist.iterator().next().split(":"); String host = inList[0]; int port = Integer.valueOf(inList[1]); - checkRemoteWhitelist(whitelist, new RemoteInfo(randomAsciiOfLength(5), host, port, new BytesArray("test"), null, null), + checkRemoteWhitelist(whitelist, new RemoteInfo(randomAsciiOfLength(5), host, port, new BytesArray("test"), null, null, emptyMap()), localhostOrNone()); } @@ -66,14 +68,15 @@ public class ReindexFromRemoteWhitelistTests extends ESTestCase { Set whitelist = randomWhitelist(); whitelist.add("myself"); TransportAddress publishAddress = new InetSocketTransportAddress(InetAddress.getByAddress(new byte[] {0x7f,0x00,0x00,0x01}), 9200); - checkRemoteWhitelist(whitelist, new RemoteInfo(randomAsciiOfLength(5), "127.0.0.1", 9200, new BytesArray("test"), null, null), - publishAddress); + checkRemoteWhitelist(whitelist, + new RemoteInfo(randomAsciiOfLength(5), "127.0.0.1", 9200, new BytesArray("test"), null, null, emptyMap()), publishAddress); } public void testUnwhitelistedRemote() { int port = between(1, Integer.MAX_VALUE); - Exception e = expectThrows(IllegalArgumentException.class, () -> checkRemoteWhitelist(randomWhitelist(), - new RemoteInfo(randomAsciiOfLength(5), "not in list", port, new BytesArray("test"), null, null), localhostOrNone())); + RemoteInfo remoteInfo = new RemoteInfo(randomAsciiOfLength(5), "not in list", port, new BytesArray("test"), null, null, emptyMap()); + Exception e = expectThrows(IllegalArgumentException.class, + () -> checkRemoteWhitelist(randomWhitelist(), remoteInfo, localhostOrNone())); assertEquals("[not in list:" + port + "] not whitelisted in reindex.remote.whitelist", e.getMessage()); } diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexFromRemoteWithAuthTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexFromRemoteWithAuthTests.java new file mode 100644 index 00000000000..d305fc77331 --- /dev/null +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexFromRemoteWithAuthTests.java @@ -0,0 +1,197 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.reindex; + +import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.ElasticsearchSecurityException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; +import org.elasticsearch.action.search.SearchAction; +import org.elasticsearch.action.support.ActionFilter; +import org.elasticsearch.action.support.ActionFilterChain; +import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.network.NetworkModule; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.index.reindex.remote.RemoteInfo; +import org.elasticsearch.plugins.ActionPlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.Netty4Plugin; +import org.junit.Before; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +import static java.util.Collections.emptyMap; +import static java.util.Collections.singletonList; +import static java.util.Collections.singletonMap; +import static org.elasticsearch.index.reindex.ReindexTestCase.matcher; +import static org.hamcrest.Matchers.containsString; + +public class ReindexFromRemoteWithAuthTests extends ESSingleNodeTestCase { + private TransportAddress address; + + @Override + protected Collection> getPlugins() { + return Arrays.asList(RetryTests.BogusPlugin.class, + Netty4Plugin.class, + ReindexFromRemoteWithAuthTests.TestPlugin.class, + ReindexPlugin.class); + } + + @Override + protected Settings nodeSettings() { + Settings.Builder settings = Settings.builder().put(super.nodeSettings()); + // Weird incantation required to test with netty + settings.put("netty.assert.buglevel", false); + settings.put(NetworkModule.HTTP_ENABLED.getKey(), true); + // Whitelist reindexing from the http host we're going to use + settings.put(TransportReindexAction.REMOTE_CLUSTER_WHITELIST.getKey(), "myself"); + settings.put(NetworkModule.HTTP_TYPE_KEY, Netty4Plugin.NETTY_HTTP_TRANSPORT_NAME); + return settings.build(); + } + + @Before + public void setupSourceIndex() { + client().prepareIndex("source", "test").setSource("test", "test").setRefreshPolicy(RefreshPolicy.IMMEDIATE).get(); + } + + @Before + public void fetchTransportAddress() { + NodeInfo nodeInfo = client().admin().cluster().prepareNodesInfo().get().getNodes().get(0); + address = nodeInfo.getHttp().getAddress().publishAddress(); + } + + public void testReindexFromRemoteWithAuthentication() throws Exception { + RemoteInfo remote = new RemoteInfo("http", address.getHost(), address.getPort(), new BytesArray("{\"match_all\":{}}"), "Aladdin", + "open sesame", emptyMap()); + ReindexRequestBuilder request = ReindexAction.INSTANCE.newRequestBuilder(client()).source("source").destination("dest") + .setRemoteInfo(remote); + assertThat(request.get(), matcher().created(1)); + } + + public void testReindexSendsHeaders() throws Exception { + RemoteInfo remote = new RemoteInfo("http", address.getHost(), address.getPort(), new BytesArray("{\"match_all\":{}}"), null, null, + singletonMap(TestFilter.EXAMPLE_HEADER, "doesn't matter")); + ReindexRequestBuilder request = ReindexAction.INSTANCE.newRequestBuilder(client()).source("source").destination("dest") + .setRemoteInfo(remote); + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> request.get()); + assertEquals(RestStatus.BAD_REQUEST, e.status()); + assertThat(e.getMessage(), containsString("Hurray! Sent the header!")); + } + + public void testReindexWithoutAuthenticationWhenRequired() throws Exception { + RemoteInfo remote = new RemoteInfo("http", address.getHost(), address.getPort(), new BytesArray("{\"match_all\":{}}"), null, null, + emptyMap()); + ReindexRequestBuilder request = ReindexAction.INSTANCE.newRequestBuilder(client()).source("source").destination("dest") + .setRemoteInfo(remote); + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> request.get()); + assertEquals(RestStatus.UNAUTHORIZED, e.status()); + assertThat(e.getMessage(), containsString("\"reason\":\"Authentication required\"")); + assertThat(e.getMessage(), containsString("\"WWW-Authenticate\":\"Basic realm=auth-realm\"")); + } + + public void testReindexWithBadAuthentication() throws Exception { + RemoteInfo remote = new RemoteInfo("http", address.getHost(), address.getPort(), new BytesArray("{\"match_all\":{}}"), "junk", + "auth", emptyMap()); + ReindexRequestBuilder request = ReindexAction.INSTANCE.newRequestBuilder(client()).source("source").destination("dest") + .setRemoteInfo(remote); + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> request.get()); + assertThat(e.getMessage(), containsString("\"reason\":\"Bad Authorization\"")); + } + + /** + * Plugin that demands authentication. + */ + public static class TestPlugin extends Plugin implements ActionPlugin { + @Override + public List> getActionFilters() { + return singletonList(ReindexFromRemoteWithAuthTests.TestFilter.class); + } + + @Override + public Collection getRestHeaders() { + return Arrays.asList(TestFilter.AUTHORIZATION_HEADER, TestFilter.EXAMPLE_HEADER); + } + } + + /** + * Action filter that will reject the request if it isn't authenticated. + */ + public static class TestFilter implements ActionFilter { + /** + * The authorization required. Corresponds to username="Aladdin" and password="open sesame". It is the example in + * HTTP/1.0's RFC. + */ + private static final String REQUIRED_AUTH = "Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ=="; + private static final String AUTHORIZATION_HEADER = "Authorization"; + private static final String EXAMPLE_HEADER = "Example-Header"; + private final ThreadContext context; + + @Inject + public TestFilter(ThreadPool threadPool) { + context = threadPool.getThreadContext(); + } + + @Override + public int order() { + return Integer.MIN_VALUE; + } + + @Override + public , Response extends ActionResponse> void apply(Task task, String action, + Request request, ActionListener listener, ActionFilterChain chain) { + if (false == action.equals(SearchAction.NAME)) { + chain.proceed(task, action, request, listener); + return; + } + if (context.getHeader(EXAMPLE_HEADER) != null) { + throw new IllegalArgumentException("Hurray! Sent the header!"); + } + String auth = context.getHeader(AUTHORIZATION_HEADER); + if (auth == null) { + ElasticsearchSecurityException e = new ElasticsearchSecurityException("Authentication required", + RestStatus.UNAUTHORIZED); + e.addHeader("WWW-Authenticate", "Basic realm=auth-realm"); + throw e; + } + if (false == REQUIRED_AUTH.equals(auth)) { + throw new ElasticsearchSecurityException("Bad Authorization", RestStatus.FORBIDDEN); + } + chain.proceed(task, action, request, listener); + } + + @Override + public void apply(String action, Response response, ActionListener listener, + ActionFilterChain chain) { + chain.proceed(action, response, listener); + } + } +} diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexRequestTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexRequestTests.java index efaf5e627ad..0455e43ec09 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexRequestTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexRequestTests.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.index.reindex.remote.RemoteInfo; import org.elasticsearch.test.ESTestCase; +import static java.util.Collections.emptyMap; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; /** @@ -44,7 +45,7 @@ public class ReindexRequestTests extends ESTestCase { public void testReindexFromRemoteDoesNotSupportSearchQuery() { ReindexRequest reindex = request(); reindex.setRemoteInfo(new RemoteInfo(randomAsciiOfLength(5), randomAsciiOfLength(5), between(1, Integer.MAX_VALUE), - new BytesArray("real_query"), null, null)); + new BytesArray("real_query"), null, null, emptyMap())); reindex.getSearchRequest().source().query(matchAllQuery()); // Unsupported place to put query ActionRequestValidationException e = reindex.validate(); assertEquals("Validation Failed: 1: reindex from remote sources should use RemoteInfo's query instead of source's query;", diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexSourceTargetValidationTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexSourceTargetValidationTests.java index 66896406c66..1213762155b 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexSourceTargetValidationTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexSourceTargetValidationTests.java @@ -35,6 +35,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.reindex.remote.RemoteInfo; import org.elasticsearch.test.ESTestCase; +import static java.util.Collections.emptyMap; import static org.hamcrest.Matchers.containsString; /** @@ -86,9 +87,10 @@ public class ReindexSourceTargetValidationTests extends ESTestCase { public void testRemoteInfoSkipsValidation() { // The index doesn't have to exist - succeeds(new RemoteInfo(randomAsciiOfLength(5), "test", 9200, new BytesArray("test"), null, null), "does_not_exist", "target"); + succeeds(new RemoteInfo(randomAsciiOfLength(5), "test", 9200, new BytesArray("test"), null, null, emptyMap()), "does_not_exist", + "target"); // And it doesn't matter if they are the same index. They are considered to be different because the remote one is, well, remote. - succeeds(new RemoteInfo(randomAsciiOfLength(5), "test", 9200, new BytesArray("test"), null, null), "target", "target"); + succeeds(new RemoteInfo(randomAsciiOfLength(5), "test", 9200, new BytesArray("test"), null, null, emptyMap()), "target", "target"); } private void fails(String target, String... sources) { diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RestReindexActionTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RestReindexActionTests.java index 1cbec59c49d..b27ecfa3eb1 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RestReindexActionTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RestReindexActionTests.java @@ -41,10 +41,16 @@ public class RestReindexActionTests extends ESTestCase { } public void testBuildRemoteInfoFullyLoaded() throws IOException { + Map headers = new HashMap<>(); + headers.put("first", "a"); + headers.put("second", "b"); + headers.put("third", ""); + Map remote = new HashMap<>(); remote.put("host", "https://example.com:9200"); remote.put("username", "testuser"); remote.put("password", "testpass"); + remote.put("headers", headers); Map query = new HashMap<>(); query.put("a", "b"); @@ -60,6 +66,7 @@ public class RestReindexActionTests extends ESTestCase { assertEquals("{\n \"a\" : \"b\"\n}", remoteInfo.getQuery().utf8ToString()); assertEquals("testuser", remoteInfo.getUsername()); assertEquals("testpass", remoteInfo.getPassword()); + assertEquals(headers, remoteInfo.getHeaders()); } public void testBuildRemoteInfoWithoutAllParts() throws IOException { diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RetryTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RetryTests.java index 2be27a1a1ad..ecebe141ce9 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RetryTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RetryTests.java @@ -47,6 +47,7 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.CyclicBarrier; +import static java.util.Collections.emptyMap; import static org.elasticsearch.index.reindex.ReindexTestCase.matcher; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.greaterThan; @@ -114,8 +115,8 @@ public class RetryTests extends ESSingleNodeTestCase { @Override protected Settings nodeSettings() { Settings.Builder settings = Settings.builder().put(super.nodeSettings()); - // Use pools of size 1 so we can block them settings.put("netty.assert.buglevel", false); + // Use pools of size 1 so we can block them settings.put("thread_pool.bulk.size", 1); settings.put("thread_pool.search.size", 1); // Use queues of size 1 because size 0 is broken and because search requests need the queue to function @@ -140,7 +141,8 @@ public class RetryTests extends ESSingleNodeTestCase { public void testReindexFromRemote() throws Exception { NodeInfo nodeInfo = client().admin().cluster().prepareNodesInfo().get().getNodes().get(0); TransportAddress address = nodeInfo.getHttp().getAddress().publishAddress(); - RemoteInfo remote = new RemoteInfo("http", address.getHost(), address.getPort(), new BytesArray("{\"match_all\":{}}"), null, null); + RemoteInfo remote = new RemoteInfo("http", address.getHost(), address.getPort(), new BytesArray("{\"match_all\":{}}"), null, null, + emptyMap()); ReindexRequestBuilder request = ReindexAction.INSTANCE.newRequestBuilder(client()).source("source").destination("dest") .setRemoteInfo(remote); testCase(ReindexAction.NAME, request, matcher().created(DOC_COUNT)); diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java index 3e3b3a63d62..d30c7c8735d 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java @@ -38,7 +38,9 @@ import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESTestCase; import java.io.IOException; +import java.util.HashMap; import java.util.List; +import java.util.Map; import static java.lang.Math.abs; import static java.util.Collections.emptyList; @@ -62,7 +64,12 @@ public class RoundTripTests extends ESTestCase { BytesReference query = new BytesArray(randomAsciiOfLength(5)); String username = randomBoolean() ? randomAsciiOfLength(5) : null; String password = username != null && randomBoolean() ? randomAsciiOfLength(5) : null; - reindex.setRemoteInfo(new RemoteInfo(randomAsciiOfLength(5), randomAsciiOfLength(5), port, query, username, password)); + int headersCount = randomBoolean() ? 0 : between(1, 10); + Map headers = new HashMap<>(headersCount); + while (headers.size() < headersCount) { + headers.put(randomAsciiOfLength(5), randomAsciiOfLength(5)); + } + reindex.setRemoteInfo(new RemoteInfo(randomAsciiOfLength(5), randomAsciiOfLength(5), port, query, username, password, headers)); } ReindexRequest tripped = new ReindexRequest(); roundTrip(reindex, tripped); @@ -78,6 +85,7 @@ public class RoundTripTests extends ESTestCase { assertEquals(reindex.getRemoteInfo().getQuery(), tripped.getRemoteInfo().getQuery()); assertEquals(reindex.getRemoteInfo().getUsername(), tripped.getRemoteInfo().getUsername()); assertEquals(reindex.getRemoteInfo().getPassword(), tripped.getRemoteInfo().getPassword()); + assertEquals(reindex.getRemoteInfo().getHeaders(), tripped.getRemoteInfo().getHeaders()); } } diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/remote/RemoteInfoTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/remote/RemoteInfoTests.java index 5492a05986c..3ee647aa55b 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/remote/RemoteInfoTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/remote/RemoteInfoTests.java @@ -22,15 +22,17 @@ package org.elasticsearch.index.reindex.remote; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.test.ESTestCase; +import static java.util.Collections.emptyMap; + public class RemoteInfoTests extends ESTestCase { public void testToString() { - RemoteInfo info = new RemoteInfo("http", "testhost", 12344, new BytesArray("testquery"), null, null); + RemoteInfo info = new RemoteInfo("http", "testhost", 12344, new BytesArray("testquery"), null, null, emptyMap()); assertEquals("host=testhost port=12344 query=testquery", info.toString()); - info = new RemoteInfo("http", "testhost", 12344, new BytesArray("testquery"), "testuser", null); + info = new RemoteInfo("http", "testhost", 12344, new BytesArray("testquery"), "testuser", null, emptyMap()); assertEquals("host=testhost port=12344 query=testquery username=testuser", info.toString()); - info = new RemoteInfo("http", "testhost", 12344, new BytesArray("testquery"), "testuser", "testpass"); + info = new RemoteInfo("http", "testhost", 12344, new BytesArray("testquery"), "testuser", "testpass", emptyMap()); assertEquals("host=testhost port=12344 query=testquery username=testuser password=<<>>", info.toString()); - info = new RemoteInfo("https", "testhost", 12344, new BytesArray("testquery"), "testuser", "testpass"); + info = new RemoteInfo("https", "testhost", 12344, new BytesArray("testquery"), "testuser", "testpass", emptyMap()); assertEquals("scheme=https host=testhost port=12344 query=testquery username=testuser password=<<>>", info.toString()); } } diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSourceTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSourceTests.java index 6af4dab9405..6407bc0195b 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSourceTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSourceTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.index.reindex.remote; +import org.apache.http.HttpEntity; import org.apache.http.HttpEntityEnclosingRequest; import org.apache.http.HttpHost; import org.apache.http.HttpResponse; @@ -27,12 +28,14 @@ import org.apache.http.StatusLine; import org.apache.http.concurrent.FutureCallback; import org.apache.http.entity.ContentType; import org.apache.http.entity.InputStreamEntity; +import org.apache.http.entity.StringEntity; import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; import org.apache.http.message.BasicHttpResponse; import org.apache.http.message.BasicStatusLine; import org.apache.http.nio.protocol.HttpAsyncRequestProducer; import org.apache.http.nio.protocol.HttpAsyncResponseConsumer; +import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.Version; import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.search.SearchRequest; @@ -53,6 +56,7 @@ import org.junit.Before; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import java.io.IOException; import java.io.InputStreamReader; import java.net.URL; import java.nio.charset.StandardCharsets; @@ -316,6 +320,66 @@ public class RemoteScrollableHitSourceTests extends ESTestCase { assertEquals(retriesAllowed, retries); } + public void testThreadContextRestored() throws Exception { + String header = randomAsciiOfLength(5); + threadPool.getThreadContext().putHeader("test", header); + AtomicBoolean called = new AtomicBoolean(); + sourceWithMockedRemoteCall("start_ok.json").doStart(r -> { + assertEquals(header, threadPool.getThreadContext().getHeader("test")); + called.set(true); + }); + assertTrue(called.get()); + } + + public void testWrapExceptionToPreserveStatus() throws IOException { + Exception cause = new Exception(); + + // Successfully get the status without a body + RestStatus status = randomFrom(RestStatus.values()); + ElasticsearchStatusException wrapped = RemoteScrollableHitSource.wrapExceptionToPreserveStatus(status.getStatus(), null, cause); + assertEquals(status, wrapped.status()); + assertEquals(cause, wrapped.getCause()); + assertEquals("No error body.", wrapped.getMessage()); + + // Successfully get the status without a body + HttpEntity okEntity = new StringEntity("test body", StandardCharsets.UTF_8); + wrapped = RemoteScrollableHitSource.wrapExceptionToPreserveStatus(status.getStatus(), okEntity, cause); + assertEquals(status, wrapped.status()); + assertEquals(cause, wrapped.getCause()); + assertEquals("body=test body", wrapped.getMessage()); + + // Successfully get the status with a broken body + IOException badEntityException = new IOException(); + HttpEntity badEntity = mock(HttpEntity.class); + when(badEntity.getContent()).thenThrow(badEntityException); + wrapped = RemoteScrollableHitSource.wrapExceptionToPreserveStatus(status.getStatus(), badEntity, cause); + assertEquals(status, wrapped.status()); + assertEquals(cause, wrapped.getCause()); + assertEquals("Failed to extract body.", wrapped.getMessage()); + assertEquals(badEntityException, wrapped.getSuppressed()[0]); + + // Fail to get the status without a body + int notAnHttpStatus = -1; + assertNull(RestStatus.fromCode(notAnHttpStatus)); + wrapped = RemoteScrollableHitSource.wrapExceptionToPreserveStatus(notAnHttpStatus, null, cause); + assertEquals(RestStatus.INTERNAL_SERVER_ERROR, wrapped.status()); + assertEquals(cause, wrapped.getCause()); + assertEquals("Couldn't extract status [" + notAnHttpStatus + "]. No error body.", wrapped.getMessage()); + + // Fail to get the status without a body + wrapped = RemoteScrollableHitSource.wrapExceptionToPreserveStatus(notAnHttpStatus, okEntity, cause); + assertEquals(RestStatus.INTERNAL_SERVER_ERROR, wrapped.status()); + assertEquals(cause, wrapped.getCause()); + assertEquals("Couldn't extract status [" + notAnHttpStatus + "]. body=test body", wrapped.getMessage()); + + // Fail to get the status with a broken body + wrapped = RemoteScrollableHitSource.wrapExceptionToPreserveStatus(notAnHttpStatus, badEntity, cause); + assertEquals(RestStatus.INTERNAL_SERVER_ERROR, wrapped.status()); + assertEquals(cause, wrapped.getCause()); + assertEquals("Couldn't extract status [" + notAnHttpStatus + "]. Failed to extract body.", wrapped.getMessage()); + assertEquals(badEntityException, wrapped.getSuppressed()[0]); + } + private RemoteScrollableHitSource sourceWithMockedRemoteCall(String... paths) throws Exception { return sourceWithMockedRemoteCall(true, paths); } @@ -342,8 +406,9 @@ public class RemoteScrollableHitSourceTests extends ESTestCase { @Override public Future answer(InvocationOnMock invocationOnMock) throws Throwable { + // Throw away the current thread context to simulate running async httpclient's thread pool + threadPool.getThreadContext().stashContext(); HttpAsyncRequestProducer requestProducer = (HttpAsyncRequestProducer) invocationOnMock.getArguments()[0]; - @SuppressWarnings("unchecked") FutureCallback futureCallback = (FutureCallback) invocationOnMock.getArguments()[2]; HttpEntityEnclosingRequest request = (HttpEntityEnclosingRequest)requestProducer.generateRequest(); URL resource = resources[responseCount]; diff --git a/test/build.gradle b/test/build.gradle index a80ca59978c..594fa5bbb70 100644 --- a/test/build.gradle +++ b/test/build.gradle @@ -20,7 +20,7 @@ import org.elasticsearch.gradle.precommit.PrecommitTasks subprojects { - // fixtures is just an intermediate parent project + // fixtures is just intermediate parent project if (name == 'fixtures') return group = 'org.elasticsearch.test' @@ -28,7 +28,7 @@ subprojects { apply plugin: 'nebula.maven-base-publish' apply plugin: 'nebula.maven-scm' - + // the main files are actually test files, so use the appropriate forbidden api sigs forbiddenApisMain { signaturesURLs = [PrecommitTasks.getResource('/forbidden/jdk-signatures.txt'), diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/yaml/section/DoSection.java b/test/framework/src/main/java/org/elasticsearch/test/rest/yaml/section/DoSection.java index 78461130783..e66cde3fe32 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/yaml/section/DoSection.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/yaml/section/DoSection.java @@ -142,12 +142,17 @@ public class DoSection implements ExecutableSection { private static Map>> catches = new HashMap<>(); static { - catches.put("missing", tuple("404", equalTo(404))); - catches.put("conflict", tuple("409", equalTo(409))); + catches.put("unauthorized", tuple("401", equalTo(401))); catches.put("forbidden", tuple("403", equalTo(403))); + catches.put("missing", tuple("404", equalTo(404))); catches.put("request_timeout", tuple("408", equalTo(408))); + catches.put("conflict", tuple("409", equalTo(409))); catches.put("unavailable", tuple("503", equalTo(503))); - catches.put("request", tuple("4xx|5xx", - allOf(greaterThanOrEqualTo(400), not(equalTo(404)), not(equalTo(408)), not(equalTo(409)), not(equalTo(403))))); + catches.put("request", tuple("4xx|5xx", allOf(greaterThanOrEqualTo(400), + not(equalTo(401)), + not(equalTo(403)), + not(equalTo(404)), + not(equalTo(408)), + not(equalTo(409))))); } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/yaml/support/Features.java b/test/framework/src/main/java/org/elasticsearch/test/rest/yaml/support/Features.java index 8d7a5a58e16..02652aa271a 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/yaml/support/Features.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/yaml/support/Features.java @@ -35,8 +35,8 @@ import static java.util.Collections.unmodifiableList; * and the related skip sections can be removed from the tests as well. */ public final class Features { - private static final List SUPPORTED = unmodifiableList(Arrays.asList( + "catch_unauthorized", "embedded_stash_key", "groovy_scripting", "headers",