Introduce ssl settings to reindex from remote (#37527)
Adds reindex.ssl.* settings for reindex from remote. This uses the ssl-config/ internal library to parse and load SSL configuration and files. This is applied when using the low level rest client to connect to a remote ES node Relates: #37287 Resolves: #29755
This commit is contained in:
parent
f5398d6511
commit
a8596de31f
|
@ -201,7 +201,7 @@ allprojects {
|
|||
}
|
||||
|
||||
/* Sets up the dependencies that we build as part of this project but
|
||||
register as thought they were external to resolve internally. We register
|
||||
register as though they were external to resolve internally. We register
|
||||
them as external dependencies so the build plugin that we use can be used
|
||||
to build elasticsearch plugins outside of the elasticsearch source tree. */
|
||||
ext.projectSubstitutions = [
|
||||
|
@ -214,6 +214,7 @@ allprojects {
|
|||
"org.elasticsearch:elasticsearch-x-content:${version}": ':libs:x-content',
|
||||
"org.elasticsearch:elasticsearch-geo:${version}": ':libs:elasticsearch-geo',
|
||||
"org.elasticsearch:elasticsearch-secure-sm:${version}": ':libs:secure-sm',
|
||||
"org.elasticsearch:elasticsearch-ssl-config:${version}": ':libs:elasticsearch-ssl-config',
|
||||
"org.elasticsearch.client:elasticsearch-rest-client:${version}": ':client:rest',
|
||||
"org.elasticsearch.client:elasticsearch-rest-client-sniffer:${version}": ':client:sniffer',
|
||||
"org.elasticsearch.client:elasticsearch-rest-high-level-client:${version}": ':client:rest-high-level',
|
||||
|
|
|
@ -56,6 +56,7 @@ unitTest {
|
|||
|
||||
dependencies {
|
||||
compile "org.elasticsearch.client:elasticsearch-rest-client:${version}"
|
||||
compile "org.elasticsearch:elasticsearch-ssl-config:${version}"
|
||||
// for http - testing reindex from remote
|
||||
testCompile project(path: ':modules:transport-netty4', configuration: 'runtime')
|
||||
// for parent/child testing
|
||||
|
@ -71,6 +72,11 @@ thirdPartyAudit.ignoreMissingClasses (
|
|||
'org.apache.log.Logger',
|
||||
)
|
||||
|
||||
forbiddenPatterns {
|
||||
// PKCS#12 file are not UTF-8
|
||||
exclude '**/*.p12'
|
||||
}
|
||||
|
||||
// Support for testing reindex-from-remote against old Elaticsearch versions
|
||||
configurations {
|
||||
oldesFixture
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.elasticsearch.action.bulk.BulkResponse;
|
|||
import org.elasticsearch.action.bulk.Retry;
|
||||
import org.elasticsearch.action.delete.DeleteRequest;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.support.TransportAction;
|
||||
import org.elasticsearch.client.ParentTaskAssigningClient;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
|
@ -82,13 +83,15 @@ import static org.elasticsearch.search.sort.SortBuilders.fieldSort;
|
|||
* Abstract base for scrolling across a search and executing bulk actions on all results. All package private methods are package private so
|
||||
* their tests can use them. Most methods run in the listener thread pool because the are meant to be fast and don't expect to block.
|
||||
*/
|
||||
public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBulkByScrollRequest<Request>> {
|
||||
public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBulkByScrollRequest<Request>,
|
||||
Action extends TransportAction<Request, ?>> {
|
||||
|
||||
protected final Logger logger;
|
||||
protected final BulkByScrollTask task;
|
||||
protected final WorkerBulkByScrollTaskState worker;
|
||||
protected final ThreadPool threadPool;
|
||||
protected final ScriptService scriptService;
|
||||
|
||||
protected final Action mainAction;
|
||||
/**
|
||||
* The request for this action. Named mainRequest because we create lots of <code>request</code> variables all representing child
|
||||
* requests of this mainRequest.
|
||||
|
@ -112,7 +115,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
|
|||
|
||||
public AbstractAsyncBulkByScrollAction(BulkByScrollTask task, boolean needsSourceDocumentVersions,
|
||||
boolean needsSourceDocumentSeqNoAndPrimaryTerm, Logger logger, ParentTaskAssigningClient client,
|
||||
ThreadPool threadPool, Request mainRequest, ScriptService scriptService,
|
||||
ThreadPool threadPool, Action mainAction, Request mainRequest,
|
||||
ActionListener<BulkByScrollResponse> listener) {
|
||||
|
||||
this.task = task;
|
||||
|
@ -124,7 +127,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
|
|||
this.logger = logger;
|
||||
this.client = client;
|
||||
this.threadPool = threadPool;
|
||||
this.scriptService = scriptService;
|
||||
this.mainAction = mainAction;
|
||||
this.mainRequest = mainRequest;
|
||||
this.listener = listener;
|
||||
BackoffPolicy backoffPolicy = buildBackoffPolicy();
|
||||
|
|
|
@ -31,22 +31,22 @@ import org.elasticsearch.threadpool.ThreadPool;
|
|||
/**
|
||||
* Implementation of delete-by-query using scrolling and bulk.
|
||||
*/
|
||||
public class AsyncDeleteByQueryAction extends AbstractAsyncBulkByScrollAction<DeleteByQueryRequest> {
|
||||
public class AsyncDeleteByQueryAction extends AbstractAsyncBulkByScrollAction<DeleteByQueryRequest, TransportDeleteByQueryAction> {
|
||||
|
||||
private final boolean useSeqNoForCAS;
|
||||
|
||||
public AsyncDeleteByQueryAction(BulkByScrollTask task, Logger logger, ParentTaskAssigningClient client,
|
||||
ThreadPool threadPool, DeleteByQueryRequest request, ScriptService scriptService,
|
||||
ClusterState clusterState, ActionListener<BulkByScrollResponse> listener) {
|
||||
ThreadPool threadPool, TransportDeleteByQueryAction action, DeleteByQueryRequest request,
|
||||
ScriptService scriptService, ClusterState clusterState, ActionListener<BulkByScrollResponse> listener) {
|
||||
super(task,
|
||||
// not all nodes support sequence number powered optimistic concurrency control, we fall back to version
|
||||
clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0) == false,
|
||||
// all nodes support sequence number powered optimistic concurrency control and we can use it
|
||||
clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0),
|
||||
logger, client, threadPool, request, scriptService, listener);
|
||||
logger, client, threadPool, action, request, listener);
|
||||
useSeqNoForCAS = clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected boolean accept(ScrollableHitSource.Hit doc) {
|
||||
// Delete-by-query does not require the source to delete a document
|
||||
|
|
|
@ -21,21 +21,32 @@ package org.elasticsearch.index.reindex;
|
|||
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.IndexScopedSettings;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.settings.SettingsFilter;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.env.NodeEnvironment;
|
||||
import org.elasticsearch.plugins.ActionPlugin;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.rest.RestController;
|
||||
import org.elasticsearch.rest.RestHandler;
|
||||
import org.elasticsearch.script.ScriptService;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.watcher.ResourceWatcherService;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
|
@ -69,8 +80,19 @@ public class ReindexPlugin extends Plugin implements ActionPlugin {
|
|||
new RestRethrottleAction(settings, restController, nodesInCluster));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool,
|
||||
ResourceWatcherService resourceWatcherService, ScriptService scriptService,
|
||||
NamedXContentRegistry xContentRegistry, Environment environment,
|
||||
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) {
|
||||
return Collections.singletonList(new ReindexSslConfig(environment.settings(), environment, resourceWatcherService));
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Setting<?>> getSettings() {
|
||||
return singletonList(TransportReindexAction.REMOTE_CLUSTER_WHITELIST);
|
||||
final List<Setting<?>> settings = new ArrayList<>();
|
||||
settings.add(TransportReindexAction.REMOTE_CLUSTER_WHITELIST);
|
||||
settings.addAll(ReindexSslConfig.getSettings());
|
||||
return settings;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,161 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.reindex;
|
||||
|
||||
import org.apache.http.conn.ssl.DefaultHostnameVerifier;
|
||||
import org.apache.http.conn.ssl.NoopHostnameVerifier;
|
||||
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.settings.SecureSetting;
|
||||
import org.elasticsearch.common.settings.SecureString;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.ssl.SslConfiguration;
|
||||
import org.elasticsearch.common.ssl.SslConfigurationKeys;
|
||||
import org.elasticsearch.common.ssl.SslConfigurationLoader;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.watcher.FileChangesListener;
|
||||
import org.elasticsearch.watcher.FileWatcher;
|
||||
import org.elasticsearch.watcher.ResourceWatcherService;
|
||||
|
||||
import javax.net.ssl.HostnameVerifier;
|
||||
import javax.net.ssl.SSLContext;
|
||||
import java.io.IOException;
|
||||
import java.io.UncheckedIOException;
|
||||
import java.nio.file.Path;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.Function;
|
||||
|
||||
import static org.elasticsearch.common.settings.Setting.listSetting;
|
||||
import static org.elasticsearch.common.settings.Setting.simpleString;
|
||||
|
||||
/**
|
||||
* Loads "reindex.ssl.*" configuration from Settings, and makes the applicable configuration (trust manager / key manager / hostname
|
||||
* verification / cipher-suites) available for reindex-from-remote.
|
||||
*/
|
||||
class ReindexSslConfig {
|
||||
|
||||
private static final Map<String, Setting<?>> SETTINGS = new HashMap<>();
|
||||
private static final Map<String, Setting<SecureString>> SECURE_SETTINGS = new HashMap<>();
|
||||
|
||||
static {
|
||||
Setting.Property[] defaultProperties = new Setting.Property[] { Setting.Property.NodeScope, Setting.Property.Filtered };
|
||||
Setting.Property[] deprecatedProperties = new Setting.Property[] { Setting.Property.Deprecated, Setting.Property.NodeScope,
|
||||
Setting.Property.Filtered };
|
||||
for (String key : SslConfigurationKeys.getStringKeys()) {
|
||||
String settingName = "reindex.ssl." + key;
|
||||
final Setting.Property[] properties = SslConfigurationKeys.isDeprecated(key) ? deprecatedProperties : defaultProperties;
|
||||
SETTINGS.put(settingName, simpleString(settingName, properties));
|
||||
}
|
||||
for (String key : SslConfigurationKeys.getListKeys()) {
|
||||
String settingName = "reindex.ssl." + key;
|
||||
final Setting.Property[] properties = SslConfigurationKeys.isDeprecated(key) ? deprecatedProperties : defaultProperties;
|
||||
SETTINGS.put(settingName, listSetting(settingName, Collections.emptyList(), Function.identity(), properties));
|
||||
}
|
||||
for (String key : SslConfigurationKeys.getSecureStringKeys()) {
|
||||
String settingName = "reindex.ssl." + key;
|
||||
SECURE_SETTINGS.put(settingName, SecureSetting.secureString(settingName, null));
|
||||
}
|
||||
}
|
||||
|
||||
private final SslConfiguration configuration;
|
||||
private volatile SSLContext context;
|
||||
|
||||
public static List<Setting<?>> getSettings() {
|
||||
List<Setting<?>> settings = new ArrayList<>();
|
||||
settings.addAll(SETTINGS.values());
|
||||
settings.addAll(SECURE_SETTINGS.values());
|
||||
return settings;
|
||||
}
|
||||
|
||||
ReindexSslConfig(Settings settings, Environment environment, ResourceWatcherService resourceWatcher) {
|
||||
final SslConfigurationLoader loader = new SslConfigurationLoader("reindex.ssl.") {
|
||||
|
||||
@Override
|
||||
protected String getSettingAsString(String key) {
|
||||
return settings.get(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected char[] getSecureSetting(String key) {
|
||||
final Setting<SecureString> setting = SECURE_SETTINGS.get(key);
|
||||
if (setting == null) {
|
||||
throw new IllegalArgumentException("The secure setting [" + key + "] is not registered");
|
||||
}
|
||||
return setting.get(settings).getChars();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<String> getSettingAsList(String key) throws Exception {
|
||||
return settings.getAsList(key);
|
||||
}
|
||||
};
|
||||
configuration = loader.load(environment.configFile());
|
||||
reload();
|
||||
|
||||
final FileChangesListener listener = new FileChangesListener() {
|
||||
@Override
|
||||
public void onFileCreated(Path file) {
|
||||
onFileChanged(file);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFileDeleted(Path file) {
|
||||
onFileChanged(file);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFileChanged(Path file) {
|
||||
ReindexSslConfig.this.reload();
|
||||
}
|
||||
};
|
||||
for (Path file : configuration.getDependentFiles()) {
|
||||
try {
|
||||
final FileWatcher watcher = new FileWatcher(file);
|
||||
watcher.addListener(listener);
|
||||
resourceWatcher.add(watcher, ResourceWatcherService.Frequency.HIGH);
|
||||
} catch (IOException e) {
|
||||
throw new UncheckedIOException("cannot watch file [" + file + "]", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void reload() {
|
||||
this.context = configuration.createSslContext();
|
||||
}
|
||||
|
||||
/**
|
||||
* Encapsulate the loaded SSL configuration as a HTTP-client {@link SSLIOSessionStrategy}.
|
||||
* The returned strategy is immutable, but successive calls will return different objects that may have different
|
||||
* configurations if the underlying key/certificate files are modified.
|
||||
*/
|
||||
SSLIOSessionStrategy getStrategy() {
|
||||
final HostnameVerifier hostnameVerifier = configuration.getVerificationMode().isHostnameVerificationEnabled()
|
||||
? new DefaultHostnameVerifier()
|
||||
: new NoopHostnameVerifier();
|
||||
final String[] protocols = configuration.getSupportedProtocols().toArray(Strings.EMPTY_ARRAY);
|
||||
final String[] cipherSuites = configuration.getCipherSuites().toArray(Strings.EMPTY_ARRAY);
|
||||
return new SSLIOSessionStrategy(context, protocols, cipherSuites, hostnameVerifier);
|
||||
}
|
||||
}
|
|
@ -61,7 +61,7 @@ public class TransportDeleteByQueryAction extends HandledTransportAction<DeleteB
|
|||
ClusterState state = clusterService.state();
|
||||
ParentTaskAssigningClient assigningClient = new ParentTaskAssigningClient(client, clusterService.localNode(),
|
||||
bulkByScrollTask);
|
||||
new AsyncDeleteByQueryAction(bulkByScrollTask, logger, assigningClient, threadPool, request, scriptService, state,
|
||||
new AsyncDeleteByQueryAction(bulkByScrollTask, logger, assigningClient, threadPool, this, request, scriptService, state,
|
||||
listener).start();
|
||||
}
|
||||
);
|
||||
|
|
|
@ -101,10 +101,12 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
|
|||
private final CharacterRunAutomaton remoteWhitelist;
|
||||
private final IndexNameExpressionResolver indexNameExpressionResolver;
|
||||
|
||||
private final ReindexSslConfig sslConfig;
|
||||
|
||||
@Inject
|
||||
public TransportReindexAction(Settings settings, ThreadPool threadPool, ActionFilters actionFilters,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver, ClusterService clusterService, ScriptService scriptService,
|
||||
AutoCreateIndex autoCreateIndex, Client client, TransportService transportService) {
|
||||
AutoCreateIndex autoCreateIndex, Client client, TransportService transportService, ReindexSslConfig sslConfig) {
|
||||
super(ReindexAction.NAME, transportService, actionFilters, (Writeable.Reader<ReindexRequest>)ReindexRequest::new);
|
||||
this.threadPool = threadPool;
|
||||
this.clusterService = clusterService;
|
||||
|
@ -113,6 +115,7 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
|
|||
this.client = client;
|
||||
remoteWhitelist = buildRemoteWhitelist(REMOTE_CLUSTER_WHITELIST.get(settings));
|
||||
this.indexNameExpressionResolver = indexNameExpressionResolver;
|
||||
this.sslConfig = sslConfig;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -129,7 +132,7 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
|
|||
() -> {
|
||||
ParentTaskAssigningClient assigningClient = new ParentTaskAssigningClient(client, clusterService.localNode(),
|
||||
bulkByScrollTask);
|
||||
new AsyncIndexBySearchAction(bulkByScrollTask, logger, assigningClient, threadPool, request, scriptService, state,
|
||||
new AsyncIndexBySearchAction(bulkByScrollTask, logger, assigningClient, threadPool, this, request, state,
|
||||
listener).start();
|
||||
}
|
||||
);
|
||||
|
@ -197,10 +200,11 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
|
|||
/**
|
||||
* Build the {@link RestClient} used for reindexing from remote clusters.
|
||||
* @param remoteInfo connection information for the remote cluster
|
||||
* @param sslConfig configuration for potential outgoing HTTPS connections
|
||||
* @param taskId the id of the current task. This is added to the thread name for easier tracking
|
||||
* @param threadCollector a list in which we collect all the threads created by the client
|
||||
*/
|
||||
static RestClient buildRestClient(RemoteInfo remoteInfo, long taskId, List<Thread> threadCollector) {
|
||||
static RestClient buildRestClient(RemoteInfo remoteInfo, ReindexSslConfig sslConfig, long taskId, List<Thread> threadCollector) {
|
||||
Header[] clientHeaders = new Header[remoteInfo.getHeaders().size()];
|
||||
int i = 0;
|
||||
for (Map.Entry<String, String> header : remoteInfo.getHeaders().entrySet()) {
|
||||
|
@ -233,6 +237,7 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
|
|||
});
|
||||
// Limit ourselves to one reactor thread because for now the search process is single threaded.
|
||||
c.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(1).build());
|
||||
c.setSSLStrategy(sslConfig.getStrategy());
|
||||
return c;
|
||||
});
|
||||
if (Strings.hasLength(remoteInfo.getPathPrefix()) && "/".equals(remoteInfo.getPathPrefix()) == false) {
|
||||
|
@ -247,7 +252,7 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
|
|||
* but this makes no attempt to do any of them so it can be as simple
|
||||
* possible.
|
||||
*/
|
||||
static class AsyncIndexBySearchAction extends AbstractAsyncBulkByScrollAction<ReindexRequest> {
|
||||
static class AsyncIndexBySearchAction extends AbstractAsyncBulkByScrollAction<ReindexRequest, TransportReindexAction> {
|
||||
/**
|
||||
* List of threads created by this process. Usually actions don't create threads in Elasticsearch. Instead they use the builtin
|
||||
* {@link ThreadPool}s. But reindex-from-remote uses Elasticsearch's {@link RestClient} which doesn't use the
|
||||
|
@ -257,7 +262,7 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
|
|||
private List<Thread> createdThreads = emptyList();
|
||||
|
||||
AsyncIndexBySearchAction(BulkByScrollTask task, Logger logger, ParentTaskAssigningClient client,
|
||||
ThreadPool threadPool, ReindexRequest request, ScriptService scriptService, ClusterState clusterState,
|
||||
ThreadPool threadPool, TransportReindexAction action, ReindexRequest request, ClusterState clusterState,
|
||||
ActionListener<BulkByScrollResponse> listener) {
|
||||
super(task,
|
||||
/*
|
||||
|
@ -265,7 +270,7 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
|
|||
* external versioning.
|
||||
*/
|
||||
request.getDestination().versionType() != VersionType.INTERNAL,
|
||||
false, logger, client, threadPool, request, scriptService, listener);
|
||||
false, logger, client, threadPool, action, request, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -273,7 +278,7 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
|
|||
if (mainRequest.getRemoteInfo() != null) {
|
||||
RemoteInfo remoteInfo = mainRequest.getRemoteInfo();
|
||||
createdThreads = synchronizedList(new ArrayList<>());
|
||||
RestClient restClient = buildRestClient(remoteInfo, task.getId(), createdThreads);
|
||||
RestClient restClient = buildRestClient(remoteInfo, mainAction.sslConfig, task.getId(), createdThreads);
|
||||
return new RemoteScrollableHitSource(logger, backoffPolicy, threadPool, worker::countSearchRetry, this::finishHim,
|
||||
restClient, remoteInfo.getQuery(), mainRequest.getSearchRequest());
|
||||
}
|
||||
|
@ -296,7 +301,7 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
|
|||
public BiFunction<RequestWrapper<?>, ScrollableHitSource.Hit, RequestWrapper<?>> buildScriptApplier() {
|
||||
Script script = mainRequest.getScript();
|
||||
if (script != null) {
|
||||
return new ReindexScriptApplier(worker, scriptService, script, script.getParams());
|
||||
return new ReindexScriptApplier(worker, mainAction.scriptService, script, script.getParams());
|
||||
}
|
||||
return super.buildScriptApplier();
|
||||
}
|
||||
|
|
|
@ -72,7 +72,7 @@ public class TransportUpdateByQueryAction extends HandledTransportAction<UpdateB
|
|||
ClusterState state = clusterService.state();
|
||||
ParentTaskAssigningClient assigningClient = new ParentTaskAssigningClient(client, clusterService.localNode(),
|
||||
bulkByScrollTask);
|
||||
new AsyncIndexBySearchAction(bulkByScrollTask, logger, assigningClient, threadPool, request, scriptService, state,
|
||||
new AsyncIndexBySearchAction(bulkByScrollTask, logger, assigningClient, threadPool, this, request, state,
|
||||
listener).start();
|
||||
}
|
||||
);
|
||||
|
@ -81,19 +81,19 @@ public class TransportUpdateByQueryAction extends HandledTransportAction<UpdateB
|
|||
/**
|
||||
* Simple implementation of update-by-query using scrolling and bulk.
|
||||
*/
|
||||
static class AsyncIndexBySearchAction extends AbstractAsyncBulkByScrollAction<UpdateByQueryRequest> {
|
||||
static class AsyncIndexBySearchAction extends AbstractAsyncBulkByScrollAction<UpdateByQueryRequest, TransportUpdateByQueryAction> {
|
||||
|
||||
private final boolean useSeqNoForCAS;
|
||||
|
||||
AsyncIndexBySearchAction(BulkByScrollTask task, Logger logger, ParentTaskAssigningClient client,
|
||||
ThreadPool threadPool, UpdateByQueryRequest request, ScriptService scriptService, ClusterState clusterState,
|
||||
ThreadPool threadPool, TransportUpdateByQueryAction action, UpdateByQueryRequest request, ClusterState clusterState,
|
||||
ActionListener<BulkByScrollResponse> listener) {
|
||||
super(task,
|
||||
// not all nodes support sequence number powered optimistic concurrency control, we fall back to version
|
||||
clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0) == false,
|
||||
// all nodes support sequence number powered optimistic concurrency control and we can use it
|
||||
clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0),
|
||||
logger, client, threadPool, request, scriptService, listener);
|
||||
logger, client, threadPool, action, request, listener);
|
||||
useSeqNoForCAS = clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0);
|
||||
}
|
||||
|
||||
|
@ -101,7 +101,7 @@ public class TransportUpdateByQueryAction extends HandledTransportAction<UpdateB
|
|||
public BiFunction<RequestWrapper<?>, ScrollableHitSource.Hit, RequestWrapper<?>> buildScriptApplier() {
|
||||
Script script = mainRequest.getScript();
|
||||
if (script != null) {
|
||||
return new UpdateByQueryScriptApplier(worker, scriptService, script, script.getParams());
|
||||
return new UpdateByQueryScriptApplier(worker, mainAction.scriptService, script, script.getParams());
|
||||
}
|
||||
return super.buildScriptApplier();
|
||||
}
|
||||
|
|
|
@ -28,5 +28,5 @@ public abstract class AbstractAsyncBulkByScrollActionMetadataTestCase<
|
|||
return new ScrollableHitSource.BasicHit("index", "type", "id", 0);
|
||||
}
|
||||
|
||||
protected abstract AbstractAsyncBulkByScrollAction<Request> action();
|
||||
protected abstract AbstractAsyncBulkByScrollAction<Request, ?> action();
|
||||
}
|
||||
|
|
|
@ -62,7 +62,7 @@ public abstract class AbstractAsyncBulkByScrollActionScriptTestCase<
|
|||
}
|
||||
};
|
||||
when(scriptService.compile(any(), eq(UpdateScript.CONTEXT))).thenReturn(factory);
|
||||
AbstractAsyncBulkByScrollAction<Request> action = action(scriptService, request().setScript(mockScript("")));
|
||||
AbstractAsyncBulkByScrollAction<Request, ?> action = action(scriptService, request().setScript(mockScript("")));
|
||||
RequestWrapper<?> result = action.buildScriptApplier().apply(AbstractAsyncBulkByScrollAction.wrap(index), doc);
|
||||
return (result != null) ? (T) result.self() : null;
|
||||
}
|
||||
|
@ -109,5 +109,5 @@ public abstract class AbstractAsyncBulkByScrollActionScriptTestCase<
|
|||
assertThat(e.getMessage(), equalTo("Operation type [unknown] not allowed, only [noop, index, delete] are allowed"));
|
||||
}
|
||||
|
||||
protected abstract AbstractAsyncBulkByScrollAction<Request> action(ScriptService scriptService, Request request);
|
||||
protected abstract AbstractAsyncBulkByScrollAction<Request, ?> action(ScriptService scriptService, Request request);
|
||||
}
|
||||
|
|
|
@ -48,7 +48,9 @@ import org.elasticsearch.action.search.SearchRequest;
|
|||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.search.SearchScrollRequest;
|
||||
import org.elasticsearch.action.search.ShardSearchFailure;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.action.support.TransportAction;
|
||||
import org.elasticsearch.action.update.UpdateRequest;
|
||||
import org.elasticsearch.action.update.UpdateResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
|
@ -71,6 +73,7 @@ import org.elasticsearch.rest.RestStatus;
|
|||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.search.SearchHits;
|
||||
import org.elasticsearch.search.internal.InternalSearchResponse;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.tasks.TaskId;
|
||||
import org.elasticsearch.tasks.TaskManager;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
@ -675,10 +678,11 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
|
|||
action.onScrollResponse(lastBatchTime, lastBatchSize, response);
|
||||
}
|
||||
|
||||
private class DummyAsyncBulkByScrollAction extends AbstractAsyncBulkByScrollAction<DummyAbstractBulkByScrollRequest> {
|
||||
private class DummyAsyncBulkByScrollAction
|
||||
extends AbstractAsyncBulkByScrollAction<DummyAbstractBulkByScrollRequest, DummyTransportAsyncBulkByScrollAction> {
|
||||
DummyAsyncBulkByScrollAction() {
|
||||
super(testTask, randomBoolean(), randomBoolean(), AsyncBulkByScrollActionTests.this.logger,
|
||||
new ParentTaskAssigningClient(client, localNode, testTask), client.threadPool(), testRequest, null, listener);
|
||||
new ParentTaskAssigningClient(client, localNode, testTask), client.threadPool(), null, testRequest, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -698,6 +702,20 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
private static class DummyTransportAsyncBulkByScrollAction
|
||||
extends TransportAction<DummyAbstractBulkByScrollRequest, BulkByScrollResponse> {
|
||||
|
||||
|
||||
protected DummyTransportAsyncBulkByScrollAction(String actionName, ActionFilters actionFilters, TaskManager taskManager) {
|
||||
super(actionName, actionFilters, taskManager);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(Task task, DummyAbstractBulkByScrollRequest request, ActionListener<BulkByScrollResponse> listener) {
|
||||
// no-op
|
||||
}
|
||||
}
|
||||
|
||||
private static class DummyAbstractBulkByScrollRequest extends AbstractBulkByScrollRequest<DummyAbstractBulkByScrollRequest> {
|
||||
DummyAbstractBulkByScrollRequest(SearchRequest searchRequest) {
|
||||
super(searchRequest, true);
|
||||
|
|
|
@ -22,6 +22,10 @@ package org.elasticsearch.index.reindex;
|
|||
import org.elasticsearch.client.RestClient;
|
||||
import org.elasticsearch.client.RestClientBuilderTestCase;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.env.TestEnvironment;
|
||||
import org.elasticsearch.watcher.ResourceWatcherService;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
|
@ -31,6 +35,7 @@ import java.util.Map;
|
|||
import static java.util.Collections.emptyMap;
|
||||
import static java.util.Collections.synchronizedList;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
public class ReindexFromRemoteBuildRestClientTests extends RestClientBuilderTestCase {
|
||||
public void testBuildRestClient() throws Exception {
|
||||
|
@ -39,7 +44,7 @@ public class ReindexFromRemoteBuildRestClientTests extends RestClientBuilderTest
|
|||
RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
|
||||
long taskId = randomLong();
|
||||
List<Thread> threads = synchronizedList(new ArrayList<>());
|
||||
RestClient client = TransportReindexAction.buildRestClient(remoteInfo, taskId, threads);
|
||||
RestClient client = TransportReindexAction.buildRestClient(remoteInfo, sslConfig(), taskId, threads);
|
||||
try {
|
||||
assertBusy(() -> assertThat(threads, hasSize(2)));
|
||||
int i = 0;
|
||||
|
@ -63,11 +68,18 @@ public class ReindexFromRemoteBuildRestClientTests extends RestClientBuilderTest
|
|||
headers, RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
|
||||
long taskId = randomLong();
|
||||
List<Thread> threads = synchronizedList(new ArrayList<>());
|
||||
RestClient client = TransportReindexAction.buildRestClient(remoteInfo, taskId, threads);
|
||||
RestClient client = TransportReindexAction.buildRestClient(remoteInfo, sslConfig(), taskId, threads);
|
||||
try {
|
||||
assertHeaders(client, headers);
|
||||
} finally {
|
||||
client.close();
|
||||
}
|
||||
}
|
||||
|
||||
private ReindexSslConfig sslConfig() {
|
||||
final Environment environment = TestEnvironment.newEnvironment(Settings.builder().put("path.home", createTempDir()).build());
|
||||
final ResourceWatcherService resourceWatcher = mock(ResourceWatcherService.class);
|
||||
return new ReindexSslConfig(environment.settings(), environment, resourceWatcher);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -76,8 +76,8 @@ public class ReindexMetadataTests extends AbstractAsyncBulkByScrollActionMetadat
|
|||
|
||||
private class TestAction extends TransportReindexAction.AsyncIndexBySearchAction {
|
||||
TestAction() {
|
||||
super(ReindexMetadataTests.this.task, ReindexMetadataTests.this.logger, null, ReindexMetadataTests.this.threadPool, request(),
|
||||
null, null, listener());
|
||||
super(ReindexMetadataTests.this.task, ReindexMetadataTests.this.logger, null, ReindexMetadataTests.this.threadPool,
|
||||
null, request(), null, listener());
|
||||
}
|
||||
|
||||
public ReindexRequest mainRequest() {
|
||||
|
|
|
@ -0,0 +1,213 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.reindex;
|
||||
|
||||
import com.sun.net.httpserver.HttpsConfigurator;
|
||||
import com.sun.net.httpserver.HttpsExchange;
|
||||
import com.sun.net.httpserver.HttpsParameters;
|
||||
import com.sun.net.httpserver.HttpsServer;
|
||||
import org.elasticsearch.client.Request;
|
||||
import org.elasticsearch.client.Response;
|
||||
import org.elasticsearch.client.RestClient;
|
||||
import org.elasticsearch.common.SuppressForbidden;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.io.PathUtils;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.ssl.PemKeyConfig;
|
||||
import org.elasticsearch.common.ssl.PemTrustConfig;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.env.TestEnvironment;
|
||||
import org.elasticsearch.mocksocket.MockHttpServer;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.watcher.ResourceWatcherService;
|
||||
import org.hamcrest.Matchers;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
import javax.net.ssl.KeyManager;
|
||||
import javax.net.ssl.SSLContext;
|
||||
import javax.net.ssl.SSLHandshakeException;
|
||||
import javax.net.ssl.SSLPeerUnverifiedException;
|
||||
import javax.net.ssl.TrustManager;
|
||||
import javax.net.ssl.X509ExtendedKeyManager;
|
||||
import javax.net.ssl.X509ExtendedTrustManager;
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.file.Path;
|
||||
import java.security.cert.Certificate;
|
||||
import java.security.cert.X509Certificate;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
/**
|
||||
* Because core ES doesn't have SSL available, this test uses a mock webserver
|
||||
* as the remote endpoint.
|
||||
* This makes it hard to test actual reindex functionality, but does allow us to test that the correct connections are made with the
|
||||
* right SSL keys + trust settings.
|
||||
*/
|
||||
@SuppressForbidden(reason = "use http server")
|
||||
public class ReindexRestClientSslTests extends ESTestCase {
|
||||
|
||||
private static HttpsServer server;
|
||||
private static Consumer<HttpsExchange> handler = ignore -> {
|
||||
};
|
||||
|
||||
@BeforeClass
|
||||
public static void setupHttpServer() throws Exception {
|
||||
InetSocketAddress address = new InetSocketAddress(InetAddress.getLoopbackAddress().getHostAddress(), 0);
|
||||
SSLContext sslContext = buildServerSslContext();
|
||||
server = MockHttpServer.createHttps(address, 0);
|
||||
server.setHttpsConfigurator(new ClientAuthHttpsConfigurator(sslContext));
|
||||
server.start();
|
||||
server.createContext("/", http -> {
|
||||
assert http instanceof HttpsExchange;
|
||||
HttpsExchange https = (HttpsExchange) http;
|
||||
handler.accept(https);
|
||||
// Always respond with 200
|
||||
// * If the reindex sees the 200, it means the SSL connection was established correctly.
|
||||
// * We can check client certs in the handler.
|
||||
https.sendResponseHeaders(200, 0);
|
||||
https.close();
|
||||
});
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void shutdownHttpServer() {
|
||||
server.stop(0);
|
||||
server = null;
|
||||
handler = null;
|
||||
}
|
||||
|
||||
private static SSLContext buildServerSslContext() throws Exception {
|
||||
final SSLContext sslContext = SSLContext.getInstance("TLSv1.2");
|
||||
final char[] password = "http-password".toCharArray();
|
||||
|
||||
final Path cert = PathUtils.get(ReindexRestClientSslTests.class.getResource("http/http.crt").toURI());
|
||||
final Path key = PathUtils.get(ReindexRestClientSslTests.class.getResource("http/http.key").toURI());
|
||||
final X509ExtendedKeyManager keyManager = new PemKeyConfig(cert, key, password).createKeyManager();
|
||||
|
||||
final Path ca = PathUtils.get(ReindexRestClientSslTests.class.getResource("ca.pem").toURI());
|
||||
final X509ExtendedTrustManager trustManager = new PemTrustConfig(Collections.singletonList(ca)).createTrustManager();
|
||||
|
||||
sslContext.init(new KeyManager[] { keyManager }, new TrustManager[] { trustManager }, null);
|
||||
return sslContext;
|
||||
}
|
||||
|
||||
public void testClientFailsWithUntrustedCertificate() throws IOException {
|
||||
final List<Thread> threads = new ArrayList<>();
|
||||
final Settings settings = Settings.builder()
|
||||
.put("path.home", createTempDir())
|
||||
.build();
|
||||
final Environment environment = TestEnvironment.newEnvironment(settings);
|
||||
final ReindexSslConfig ssl = new ReindexSslConfig(settings, environment, mock(ResourceWatcherService.class));
|
||||
try (RestClient client = TransportReindexAction.buildRestClient(getRemoteInfo(), ssl, 1L, threads)) {
|
||||
expectThrows(SSLHandshakeException.class, () -> client.performRequest(new Request("GET", "/")));
|
||||
}
|
||||
}
|
||||
|
||||
public void testClientSucceedsWithCertificateAuthorities() throws IOException {
|
||||
final List<Thread> threads = new ArrayList<>();
|
||||
final Path ca = getDataPath("ca.pem");
|
||||
final Settings settings = Settings.builder()
|
||||
.put("path.home", createTempDir())
|
||||
.putList("reindex.ssl.certificate_authorities", ca.toString())
|
||||
.build();
|
||||
final Environment environment = TestEnvironment.newEnvironment(settings);
|
||||
final ReindexSslConfig ssl = new ReindexSslConfig(settings, environment, mock(ResourceWatcherService.class));
|
||||
try (RestClient client = TransportReindexAction.buildRestClient(getRemoteInfo(), ssl, 1L, threads)) {
|
||||
final Response response = client.performRequest(new Request("GET", "/"));
|
||||
assertThat(response.getStatusLine().getStatusCode(), Matchers.is(200));
|
||||
}
|
||||
}
|
||||
|
||||
public void testClientSucceedsWithVerificationDisabled() throws IOException {
|
||||
assertFalse("Cannot disable verification in FIPS JVM", inFipsJvm());
|
||||
final List<Thread> threads = new ArrayList<>();
|
||||
final Settings settings = Settings.builder()
|
||||
.put("path.home", createTempDir())
|
||||
.put("reindex.ssl.verification_mode", "NONE")
|
||||
.build();
|
||||
final Environment environment = TestEnvironment.newEnvironment(settings);
|
||||
final ReindexSslConfig ssl = new ReindexSslConfig(settings, environment, mock(ResourceWatcherService.class));
|
||||
try (RestClient client = TransportReindexAction.buildRestClient(getRemoteInfo(), ssl, 1L, threads)) {
|
||||
final Response response = client.performRequest(new Request("GET", "/"));
|
||||
assertThat(response.getStatusLine().getStatusCode(), Matchers.is(200));
|
||||
}
|
||||
}
|
||||
|
||||
public void testClientPassesClientCertificate() throws IOException {
|
||||
final List<Thread> threads = new ArrayList<>();
|
||||
final Path ca = getDataPath("ca.pem");
|
||||
final Path cert = getDataPath("client/client.crt");
|
||||
final Path key = getDataPath("client/client.key");
|
||||
final Settings settings = Settings.builder()
|
||||
.put("path.home", createTempDir())
|
||||
.putList("reindex.ssl.certificate_authorities", ca.toString())
|
||||
.put("reindex.ssl.certificate", cert)
|
||||
.put("reindex.ssl.key", key)
|
||||
.put("reindex.ssl.key_passphrase", "client-password")
|
||||
.build();
|
||||
AtomicReference<Certificate[]> clientCertificates = new AtomicReference<>();
|
||||
handler = https -> {
|
||||
try {
|
||||
clientCertificates.set(https.getSSLSession().getPeerCertificates());
|
||||
} catch (SSLPeerUnverifiedException e) {
|
||||
logger.warn("Client did not provide certificates", e);
|
||||
clientCertificates.set(null);
|
||||
}
|
||||
};
|
||||
final Environment environment = TestEnvironment.newEnvironment(settings);
|
||||
final ReindexSslConfig ssl = new ReindexSslConfig(settings, environment, mock(ResourceWatcherService.class));
|
||||
try (RestClient client = TransportReindexAction.buildRestClient(getRemoteInfo(), ssl, 1L, threads)) {
|
||||
final Response response = client.performRequest(new Request("GET", "/"));
|
||||
assertThat(response.getStatusLine().getStatusCode(), Matchers.is(200));
|
||||
final Certificate[] certs = clientCertificates.get();
|
||||
assertThat(certs, Matchers.notNullValue());
|
||||
assertThat(certs, Matchers.arrayWithSize(1));
|
||||
assertThat(certs[0], Matchers.instanceOf(X509Certificate.class));
|
||||
final X509Certificate clientCert = (X509Certificate) certs[0];
|
||||
assertThat(clientCert.getSubjectDN().getName(), Matchers.is("CN=client"));
|
||||
assertThat(clientCert.getIssuerDN().getName(), Matchers.is("CN=Elastic Certificate Tool Autogenerated CA"));
|
||||
}
|
||||
}
|
||||
|
||||
private RemoteInfo getRemoteInfo() {
|
||||
return new RemoteInfo("https", server.getAddress().getHostName(), server.getAddress().getPort(), "/", new BytesArray("test"),
|
||||
"user", "password", Collections.emptyMap(), RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
|
||||
}
|
||||
|
||||
@SuppressForbidden(reason = "use http server")
|
||||
private static class ClientAuthHttpsConfigurator extends HttpsConfigurator {
|
||||
ClientAuthHttpsConfigurator(SSLContext sslContext) {
|
||||
super(sslContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configure(HttpsParameters params) {
|
||||
params.setWantClientAuth(true);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -20,9 +20,14 @@
|
|||
package org.elasticsearch.index.reindex;
|
||||
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.common.lucene.uid.Versions;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.script.ScriptService;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
|
@ -103,7 +108,11 @@ public class ReindexScriptTests extends AbstractAsyncBulkByScrollActionScriptTes
|
|||
|
||||
@Override
|
||||
protected TransportReindexAction.AsyncIndexBySearchAction action(ScriptService scriptService, ReindexRequest request) {
|
||||
return new TransportReindexAction.AsyncIndexBySearchAction(task, logger, null, threadPool, request, scriptService,
|
||||
TransportService transportService = Mockito.mock(TransportService.class);
|
||||
ReindexSslConfig sslConfig = Mockito.mock(ReindexSslConfig.class);
|
||||
TransportReindexAction transportAction = new TransportReindexAction(Settings.EMPTY, threadPool,
|
||||
new ActionFilters(Collections.emptySet()), null, null, scriptService, null, null, transportService, sslConfig);
|
||||
return new TransportReindexAction.AsyncIndexBySearchAction(task, logger, null, threadPool, transportAction, request,
|
||||
null, listener());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -45,7 +45,7 @@ public class UpdateByQueryMetadataTests
|
|||
private class TestAction extends TransportUpdateByQueryAction.AsyncIndexBySearchAction {
|
||||
TestAction() {
|
||||
super(UpdateByQueryMetadataTests.this.task, UpdateByQueryMetadataTests.this.logger, null,
|
||||
UpdateByQueryMetadataTests.this.threadPool, request(), null, ClusterState.EMPTY_STATE, listener());
|
||||
UpdateByQueryMetadataTests.this.threadPool, null, request(), ClusterState.EMPTY_STATE, listener());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -19,13 +19,17 @@
|
|||
|
||||
package org.elasticsearch.index.reindex;
|
||||
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.script.ScriptService;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
public class UpdateByQueryWithScriptTests
|
||||
extends AbstractAsyncBulkByScrollActionScriptTestCase<UpdateByQueryRequest, BulkByScrollResponse> {
|
||||
|
@ -54,7 +58,10 @@ public class UpdateByQueryWithScriptTests
|
|||
|
||||
@Override
|
||||
protected TransportUpdateByQueryAction.AsyncIndexBySearchAction action(ScriptService scriptService, UpdateByQueryRequest request) {
|
||||
return new TransportUpdateByQueryAction.AsyncIndexBySearchAction(task, logger, null, threadPool, request, scriptService,
|
||||
TransportService transportService = mock(TransportService.class);
|
||||
TransportUpdateByQueryAction transportAction = new TransportUpdateByQueryAction(threadPool,
|
||||
new ActionFilters(Collections.emptySet()), null, transportService, scriptService, null);
|
||||
return new TransportUpdateByQueryAction.AsyncIndexBySearchAction(task, logger, null, threadPool, transportAction, request,
|
||||
ClusterState.EMPTY_STATE, listener());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,25 @@
|
|||
# ca.p12
|
||||
|
||||
$ES_HOME/bin/elasticsearch-certutil ca --out ca.p12 --pass "ca-password" --days 9999
|
||||
|
||||
# ca.pem
|
||||
|
||||
openssl pkcs12 -info -in ./ca.p12 -nokeys -out ca.pem -passin "pass:ca-password"
|
||||
|
||||
# http.p12
|
||||
|
||||
$ES_HOME/bin/elasticsearch-certutil cert --out http.zip --pass "http-password" \
|
||||
--days 9999 --pem --name "http" \
|
||||
--ca ca.p12 --ca-pass "ca-password" \
|
||||
--dns=localhost --dns=localhost.localdomain --dns=localhost4 --dns=localhost4.localdomain4 --dns=localhost6 --dns=localhost6.localdomain6 \
|
||||
--ip=127.0.0.1 --ip=0:0:0:0:0:0:0:1
|
||||
unzip http.zip
|
||||
rm http.zip
|
||||
|
||||
# client.p12
|
||||
|
||||
$ES_HOME/bin/elasticsearch-certutil cert --out client.zip --pass "client-password" \
|
||||
--name "client" --days 9999 --pem \
|
||||
--ca ca.p12 --ca-pass "ca-password"
|
||||
unzip client.zip
|
||||
rm client.zip
|
Binary file not shown.
|
@ -0,0 +1,25 @@
|
|||
Bag Attributes
|
||||
friendlyName: ca
|
||||
localKeyID: 54 69 6D 65 20 31 35 34 37 30 38 36 32 32 39 31 30 37
|
||||
subject=/CN=Elastic Certificate Tool Autogenerated CA
|
||||
issuer=/CN=Elastic Certificate Tool Autogenerated CA
|
||||
-----BEGIN CERTIFICATE-----
|
||||
MIIDSTCCAjGgAwIBAgIUacmv5ElKJ1cs9n61tEpy5KM3Dv0wDQYJKoZIhvcNAQEL
|
||||
BQAwNDEyMDAGA1UEAxMpRWxhc3RpYyBDZXJ0aWZpY2F0ZSBUb29sIEF1dG9nZW5l
|
||||
cmF0ZWQgQ0EwHhcNMTkwMTEwMDIxMDI5WhcNNDYwNTI3MDIxMDI5WjA0MTIwMAYD
|
||||
VQQDEylFbGFzdGljIENlcnRpZmljYXRlIFRvb2wgQXV0b2dlbmVyYXRlZCBDQTCC
|
||||
ASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAJ0rA35tPl0FN+BPk2YfmET9
|
||||
MvDWFLvfL2Z1aw1q1vnd12K9zumjN6veilHA2Iw/P4LG/mkQZvY4bDPgibRD7hbE
|
||||
vwPoju4vr614tw60+FlkpO6HezYo2I3cni1//Gehhs5EW2P3g7Lw7UNCOAfcR2QQ
|
||||
p/dtwXYWzXHY9jTevQSv2q/x5jWKZT4ltaQExzvXAcxRGqyWV6d5vol3KH/GpCSI
|
||||
SQvRmRVNQGXhxi66MjCglGAM2oicd1qCUDCrljdFD/RQ1UzqIJRTXZQKOno1/Em9
|
||||
xR0Cd5KQapqttPusAO6uZblMO2Ru+XjCD6Y0o41eCDbkd0xA3/wgP3MD5n41yncC
|
||||
AwEAAaNTMFEwHQYDVR0OBBYEFJTry9da5RZbbELYCaWVVFllSm8DMB8GA1UdIwQY
|
||||
MBaAFJTry9da5RZbbELYCaWVVFllSm8DMA8GA1UdEwEB/wQFMAMBAf8wDQYJKoZI
|
||||
hvcNAQELBQADggEBADA6qhC35PwuL7LRddbhjjW8U/cCmG9m7AIvH6N+Mw/k76gt
|
||||
tJkEDxztMHUG+A2IPyEcYm7MLr1D8xEQYsq0x4pzFcQnMSQDv4WTK35vRxMtaqwA
|
||||
WZTyA+DibBknbaP1z3gNhR9A0TKx4cPagN3OYFvAi/24abf8qS6D/bcOiPDQ4oPb
|
||||
DVhmhqt5zduDM+Xsf6d4nsA6sf9+4AzneaZKGAMgCXgo4mYeP7M4nMQk0L3ao9Ts
|
||||
+Usr8WRxc4xHGyb09fsXWSz7ZmiJ6iXK2NvRUq46WCINLONLzNkx29WEKQpI3wh4
|
||||
kyx6wF9lwBF06P1raFIBMeMOCkqDc+nj7A91PEA=
|
||||
-----END CERTIFICATE-----
|
|
@ -0,0 +1,19 @@
|
|||
-----BEGIN CERTIFICATE-----
|
||||
MIIDIDCCAgigAwIBAgIUNOREYZadZ2EVkJ1m8Y9jnVmWmtAwDQYJKoZIhvcNAQEL
|
||||
BQAwNDEyMDAGA1UEAxMpRWxhc3RpYyBDZXJ0aWZpY2F0ZSBUb29sIEF1dG9nZW5l
|
||||
cmF0ZWQgQ0EwHhcNMTkwMTEwMDIxMDMyWhcNNDYwNTI3MDIxMDMyWjARMQ8wDQYD
|
||||
VQQDEwZjbGllbnQwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCCP2LE
|
||||
nws2+ZIwSQ3IvIhVfrueUmNt7Y5TdhhwO32p2wC4ZA62J9L8klAzt7R+izcL/qbF
|
||||
65inbXM0A7ge/2wZ09kbqBk5uS8jDetJS8lQmWVZDHfVi8g/yDMWklz2mQYleYmU
|
||||
HPyIplai3P3KBoT8HurzHw2C953EZ2HiANFnGoEPZZ5ytcT2WenxuU5kSXSxuDyn
|
||||
8/dCVHEQL1Yipr2LQKYQAHotjo56OhyL9KS5YPjzSFREeyRfQinssTmpGFsua/PK
|
||||
Vqj+hRdkaqRfiqPq3wxn8oOSpZLQe58O1e7OlqgjkPuZdjZ0pQ7KJj7N3fUQNSeg
|
||||
2VC2tk8zv/C/Qr2bAgMBAAGjTTBLMB0GA1UdDgQWBBQziDNuD83ZLwEt1e1txYJu
|
||||
oSseEDAfBgNVHSMEGDAWgBSU68vXWuUWW2xC2AmllVRZZUpvAzAJBgNVHRMEAjAA
|
||||
MA0GCSqGSIb3DQEBCwUAA4IBAQAPpyWyR4w6GvfvPmA1nk1qd7fsQ1AucrYweIJx
|
||||
dTeXg3Ps1bcgNq9Us9xtsKmsoKD8UhtPN6e8W8MkMmri+MSzlEemE+pJZrjHEudi
|
||||
Sj0AFVOK6jaE0lerbCnTQZvYH+J9Eb1i9RP7XHRShkR4MWgy2BzlENk9/LRbr84W
|
||||
Yf5TuM9+ApiiiOoX9UfSGBzNnqwhJNpG9yJ+HnQSqTnJJc/wL0211zLme9I/nhf0
|
||||
kQx6mPedJ3gGoJ8gqz38djIrhJDxq+0Bd9SsdlR6yT+1+bY7hinYx2eLV91AybZ4
|
||||
x07Kyl174DD41PYaE1AtoLlrMrQ5BG7Md50Am+XXOR1X1dkZ
|
||||
-----END CERTIFICATE-----
|
|
@ -0,0 +1,30 @@
|
|||
-----BEGIN RSA PRIVATE KEY-----
|
||||
Proc-Type: 4,ENCRYPTED
|
||||
DEK-Info: DES-EDE3-CBC,81AB10154C04B38F
|
||||
|
||||
0L6Buvpeg6QHh/mbYp/3bXDCsu0k0j5xPdIGWd6NCOdb24OQFsOjeA2WuPqs0WWF
|
||||
gzVrjh984biS3IqeglEr6X6PfVJ0QOgBkq0XgSBXhuoRJL/302N9oPGsf8T8oW9t
|
||||
pqR/JIB2L7lMbJlJYSjMl0YQT3hWpo2BlrtSIc/GWOKfjDNWc9BL+oHvKJwql1lb
|
||||
n4yMvYFYJDqgzgxa3r4IIQNsCn3SP+gqbTx9vF6StOIroV51BdSL4IGWRvqnMJrh
|
||||
ybk1EHSLR1oGcONLU4Ksi33UxdImG70SsnoH/NnInDvV2bxmxmgf5SfYKtxFhoxz
|
||||
0hISKTMTerPGtRQ5p8wtEi/ULKyInK+qF3tLgZa+S5VbByjDnUo2dCcbDDSkH5pO
|
||||
uczJ2bs1kJegpCrUueJdbi9OX2upmF+tJb9+5hzFTvey8dUWTEpdiN0xbp4BLfNd
|
||||
Yp4sMHZovsDJKIjDb0NbXRgLeFh1ijlLPhKwIXWTF3BaCKcSw34Qv22YPwn3qNuw
|
||||
0KuUPAo0B65R/hoJguvtks8QAXe0S1jZS/fAlQCoIB0TIduy1qkyje+AnSW+1RL0
|
||||
ysBxLqbvRUqWlgnu7/28V4FD8JNu3O+UGBEelXlfokLgCBZ6lSys2d3Zy/XVBnG0
|
||||
cPl59if+fxKaMWlhFvMLFBup1Y4a/1zA7Sx6kkhvawekHr40NcG4kLHJ+O6UoM4d
|
||||
/ibnbfIksLNkuo/nwoEcKp7W6SxafV0hROdxClkGKild66rnHtk4IGATjaBqt9nr
|
||||
FuO3vRtLuUMS+/4kpvhMwl0RhX2/i6xgV+klWNYNu1JTGDFvdG3qfiY2w88EIbGe
|
||||
rn8JEvRtaH/XNeGdhBwbuObvTifiHyYzA1i5Zh8zvE2+Dthlk19jbBoOUx//LOi2
|
||||
JrNkAsqQCF4HXh7n9HWA/ZrKTP7Xvkig6Vf7M2Y/tO361LSJfzKcRFLpl0P2ntEv
|
||||
XwFOqTvOURERTVr4sBLOVPRAhIs3yvkI5xfurXzbRWtSeLgrMoDgJlXIQbuXd8sq
|
||||
zIBLqvYf2bcroB66XJqX1IFWEstym/NHGcbrwjR5Fn2p3YAtXnIbw8VhHwV+LIOl
|
||||
ky/wH9vbnML/DE81qFqRe8vNZw2sGn9skOyU/QvKeV1NRHYZSV3hMx82bPnjgFeB
|
||||
ilzkb8FEPOAOJ0m44Q3C9eUoazJT8aCuRIAgSL43se1E2pFlIXQTfYRARaWEkSf9
|
||||
0hXqQJc17b+Hj0ire3PUqbG3+/l1qMhhIHwq7Kuyy2neTuW/DXbXp2AMv/bLcnHH
|
||||
apVeRZaYXVSnGXJNk2CeRnCs8OGir8g5zkH+fmVb9knt6TL2oFIsQqULyrLolhfe
|
||||
6Q8mLzq/sd+w+VuN1n/5+RQqOJZWEkLFzQPx8wTqeTB19OE0gjncrqzCHq7INqRe
|
||||
tGClWOj/yL0Sciu3ctVGz1VAbgeBKnLdKm2TX4oFB4OG4E7GMXIL7hGxjtjLAVMW
|
||||
XNc3ZYNQra+iPqJtFxnmbrF2Sn0Wr0hcAT1V0A0TRKe/n0lpUrfhTy/q4DUlOVKG
|
||||
qdCsTGoYXObpUWU5G9GyCVWWRJyrTxJcBZ9KWJu9Y/aMFzoa2n0HQw==
|
||||
-----END RSA PRIVATE KEY-----
|
|
@ -0,0 +1,22 @@
|
|||
-----BEGIN CERTIFICATE-----
|
||||
MIIDsjCCApqgAwIBAgIUXxlg/0/g3UYekXWBRpkHM84EYfIwDQYJKoZIhvcNAQEL
|
||||
BQAwNDEyMDAGA1UEAxMpRWxhc3RpYyBDZXJ0aWZpY2F0ZSBUb29sIEF1dG9nZW5l
|
||||
cmF0ZWQgQ0EwHhcNMTkwMTEwMDIxMDMwWhcNNDYwNTI3MDIxMDMwWjAPMQ0wCwYD
|
||||
VQQDEwRodHRwMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAi8VQaSR6
|
||||
uqgT1Rkw+a39OSXcXuhJBVdoO+AyYPK7hdUTxj1aqnXkKeAiNGpe/J+uXZ837Spy
|
||||
rmBZS3k6S5hLEceF2xug8yrR7RYEZ+JvGlRgg/jj+61gGbHAD314+vvu0YUo06YG
|
||||
wbz9AnjJA/sMbsCp3iSzWIkwZBZcCoZ/YsG4I89LSjYL3YmRi2193WMX6/OfQYMN
|
||||
Fkv61r/iwBEkgJ14cUSYe3norGuQfZuXSh5kI5D5R7q7Bmb0um+jzY/l62kj3oR1
|
||||
YWo3g6DdU/Bc/3/KmEEVXIfdTonMBMyL8PvYORoMKrYdph3E8e39ZQhPeBJNJKw0
|
||||
XzsZFzIUlTw0kQIDAQABo4HgMIHdMB0GA1UdDgQWBBTiqknjZLa5E1BneHRvTkNa
|
||||
Bm4nNTAfBgNVHSMEGDAWgBSU68vXWuUWW2xC2AmllVRZZUpvAzCBjwYDVR0RBIGH
|
||||
MIGEgglsb2NhbGhvc3SCF2xvY2FsaG9zdDYubG9jYWxkb21haW42hwR/AAABhxAA
|
||||
AAAAAAAAAAAAAAAAAAABggpsb2NhbGhvc3Q0ggpsb2NhbGhvc3Q2ghVsb2NhbGhv
|
||||
c3QubG9jYWxkb21haW6CF2xvY2FsaG9zdDQubG9jYWxkb21haW40MAkGA1UdEwQC
|
||||
MAAwDQYJKoZIhvcNAQELBQADggEBAIZr8EhhCbNyc6iHzUJ/NrUGht5RDHUKN9WU
|
||||
2fd+SJlWijQYGoFW6LfabmYxIVPAFtYzUiA378NFoOZZ4kdC3gQng8izvS2UDcO6
|
||||
cAG5q/dxop3VXqcLeK3NpH2jd83M8VZaOThPj/F07eTkVX+sGu+7VL5Lc/XPe8JS
|
||||
HhH2QtcTPGPpzPnWOUMLpRy4mh5sDyeftWr2PTFgMXFD6dtzDvaklGJvr1TmcOVb
|
||||
BFYyVyXRq6v8YsrRPp0GIl+X3zd3KgwUMuEzRKkJgeI1lZRjmHMIyFcqxlwMaHpv
|
||||
r1XUmz02ycy6t3n+2kCgfU6HnjbeFh55KzNCEv8TXQFg8Z8YpDA=
|
||||
-----END CERTIFICATE-----
|
|
@ -0,0 +1,30 @@
|
|||
-----BEGIN RSA PRIVATE KEY-----
|
||||
Proc-Type: 4,ENCRYPTED
|
||||
DEK-Info: DES-EDE3-CBC,127A4142FA81C5A1
|
||||
|
||||
dP6oSAUl47KCnP0YZSX108qcX5s2nVGpD0qtnVQg89mLVFd7IxpKQaIuODSadRTo
|
||||
AD0KINITy3ZwUr/TTJgERu88baBsTHv3PLEe7TpQI2DGGDz3aZfO9e6Jvglbdi5b
|
||||
CBLaxRXGGhhH9YH0E87Lp3JEwg4udWmlNahGIhbqNheZNTtDKt+Lx80TyyIml2r/
|
||||
GAhjT4UPvIRrATFAcL/3EKOjRqvb6SeGnZu21n2TSmsBEr02gC0Ox3qmsnRM3kvU
|
||||
jCuUzWTzJSQLXZwZuMtv5srOSFAbU8EklFXNhWJU/7GBy215aAAW48hCzkPMVEbg
|
||||
oeH4nuze/Uulih9UxJGCBIpvfTnksyMRGP/zdy1mnKuqQk+yI0n7JWMJL8QoDQc8
|
||||
XvzqOmKLdBVezmzOVP/PyMAhYWetILh/1UesjyJot2hwSXPAxqBHPVA9bnmel6CQ
|
||||
VccNSwaK120yT5YhkUMFc0AmUpztzNMQzJ10g1dW+Qsr+n4vtFmAuTvBgogNNVXn
|
||||
eX1hbbiXGO1Fw4OMu6qTJ4T/P+VFb0CxoxETWeqdjcs4LGbeqF68nayEsW0ZzhbI
|
||||
W5c+JAbW18Kb+k/KzKZTtJEXBw6B/2FMe9x9z3BIpVhplM2KsNk7joWnumD8LfUT
|
||||
ORRHUPV7bkdiDsn2CRaevubDQiChcjsdLWhG7JLm54ttyif7/X7htGOXPZLDLK8B
|
||||
Vxe09B006f7lM0tXEx8BLFDNroMLlrxB4K5MlwWpS3LLqy4zDbHka2I3s/ST/BD4
|
||||
0EURHefiXJkR6bRsfGCl3JDk0EakcUXM+Ob5/2rC/rPXO2pC0ksiQ2DSBm7ak9om
|
||||
vlC7dIzVipL0LZTd4SUDJyvmK4Ws6V98O5b+79To6oZnVs5CjvcmpSFVePZa5gm/
|
||||
DB8LOpW4jklz+ybJtHJRbEIzmpfwpizThto/zLbhPRyvJkagJfWgXI0j+jjKZj+w
|
||||
sy1V8S44aXJ3GX9p4d/Grnx6WGvEJSV0na7m3YQCPEi5sUgr+EMizGUYstSSUPtU
|
||||
XhxQRZ95K2cKORul9vzG3zZqqvi73Ju5vu9DLmmlI00sLzyVGFtvkuhrF2p7XclM
|
||||
GU/rMOeMClMb6qyCzldSs84Anhlh/6mYri6uYPhIGvxqtH44FTbu1APvZp0s2rVm
|
||||
ueClHG78lat+oqWFpbA8+peT0dMPdSKDAFDiHsGoeWCIoCF44a84bJX35OZk+Y4a
|
||||
+fDFuSiKYBMfAgqf/ZNzV4+ySka7dWdRQ2TDgIuxnvFV1NgC/ir3/mPgkf0xZU5d
|
||||
w8T+TW6T8PmJfHnW4nxgHaqgxMoEoPm8zn0HNpRFKwsDYRFfobpCXnoyx50JXxa4
|
||||
jg095zlp8X0JwconlGJB1gfeqvS2I50WEDR+2ZtDf7fUEnQ3LYJzP4lSwiSKiQsQ
|
||||
MPjy0SMQnqmWijylLYKunTl3Uh2DdYg4MOON662H3TxQW8TCYwK2maKujwS9VFLN
|
||||
GtRGlLrOtrOfHBSwDCujFjqEmQBsF/y2C6XfMoNq6xi5NzREGmNXYrHbLvl2Njwm
|
||||
WB1ouB4JzmEmb1QNwxkllBAaUp1SJGhW2+fYOe0zjWOP9R4sUq4rRw==
|
||||
-----END RSA PRIVATE KEY-----
|
Loading…
Reference in New Issue