Add Remote Reindex SPI extension (#547)

This change extends the remote reindex SPI to allow adding a custom interceptor.
This interceptor can be plugged in to perform any processing on the request or response.

Signed-off-by: Sooraj Sinha <soosinha@amazon.com>
This commit is contained in:
Sooraj Sinha 2021-05-06 22:08:25 +05:30 committed by GitHub
parent 2cf40be8ce
commit 181ee8a211
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 140 additions and 5 deletions

View File

@ -32,6 +32,8 @@
package org.opensearch.index.reindex; package org.opensearch.index.reindex;
import java.util.Optional;
import org.apache.http.HttpRequestInterceptor;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.action.ActionListener; import org.opensearch.action.ActionListener;
@ -112,6 +114,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
protected final ThreadPool threadPool; protected final ThreadPool threadPool;
protected final ScriptService scriptService; protected final ScriptService scriptService;
protected final ReindexSslConfig sslConfig; protected final ReindexSslConfig sslConfig;
protected Optional<HttpRequestInterceptor> interceptor;
/** /**
* The request for this action. Named mainRequest because we create lots of <code>request</code> variables all representing child * The request for this action. Named mainRequest because we create lots of <code>request</code> variables all representing child
@ -152,6 +155,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
this.threadPool = threadPool; this.threadPool = threadPool;
this.mainRequest = mainRequest; this.mainRequest = mainRequest;
this.listener = listener; this.listener = listener;
this.interceptor = Optional.empty();
BackoffPolicy backoffPolicy = buildBackoffPolicy(); BackoffPolicy backoffPolicy = buildBackoffPolicy();
bulkRetry = new Retry(BackoffPolicy.wrap(backoffPolicy, worker::countBulkRetry), threadPool); bulkRetry = new Retry(BackoffPolicy.wrap(backoffPolicy, worker::countBulkRetry), threadPool);
scrollSource = buildScrollableResultSource(backoffPolicy); scrollSource = buildScrollableResultSource(backoffPolicy);

View File

@ -32,6 +32,12 @@
package org.opensearch.index.reindex; package org.opensearch.index.reindex;
import java.util.Optional;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.index.reindex.spi.RemoteReindexExtension;
import org.opensearch.plugins.ExtensiblePlugin;
import org.opensearch.plugins.ExtensiblePlugin.ExtensionLoader;
import org.opensearch.watcher.ResourceWatcherService; import org.opensearch.watcher.ResourceWatcherService;
import org.opensearch.action.ActionRequest; import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionResponse; import org.opensearch.action.ActionResponse;
@ -67,8 +73,9 @@ import java.util.function.Supplier;
import static java.util.Collections.singletonList; import static java.util.Collections.singletonList;
public class ReindexPlugin extends Plugin implements ActionPlugin { public class ReindexPlugin extends Plugin implements ActionPlugin, ExtensiblePlugin {
public static final String NAME = "reindex"; public static final String NAME = "reindex";
private static final Logger logger = LogManager.getLogger(ReindexPlugin.class);
@Override @Override
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() { public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
@ -112,4 +119,22 @@ public class ReindexPlugin extends Plugin implements ActionPlugin {
settings.addAll(ReindexSslConfig.getSettings()); settings.addAll(ReindexSslConfig.getSettings());
return settings; return settings;
} }
@Override
public void loadExtensions(ExtensionLoader loader) {
logger.info("ReindexPlugin reloadSPI called");
Iterable<RemoteReindexExtension> iterable = loader.loadExtensions(RemoteReindexExtension.class);
List<RemoteReindexExtension> remoteReindexExtensionList = new ArrayList<>();
iterable.forEach(remoteReindexExtensionList::add);
if (remoteReindexExtensionList.isEmpty()) {
logger.info("Unable to find any implementation for RemoteReindexExtension");
} else {
if (remoteReindexExtensionList.size() > 1) {
logger.warn("More than one implementation found: " + remoteReindexExtensionList);
}
// We shouldn't have more than one extension. Incase there is, we simply pick the first one.
TransportReindexAction.remoteExtension = Optional.ofNullable(remoteReindexExtensionList.get(0));
logger.info("Loaded extension " + TransportReindexAction.remoteExtension);
}
}
} }

View File

@ -32,8 +32,10 @@
package org.opensearch.index.reindex; package org.opensearch.index.reindex;
import java.util.Optional;
import org.apache.http.Header; import org.apache.http.Header;
import org.apache.http.HttpHost; import org.apache.http.HttpHost;
import org.apache.http.HttpRequestInterceptor;
import org.apache.http.auth.AuthScope; import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider; import org.apache.http.client.CredentialsProvider;
@ -63,6 +65,7 @@ import org.opensearch.common.xcontent.XContentType;
import org.opensearch.index.VersionType; import org.opensearch.index.VersionType;
import org.opensearch.index.mapper.VersionFieldMapper; import org.opensearch.index.mapper.VersionFieldMapper;
import org.opensearch.index.reindex.remote.RemoteScrollableHitSource; import org.opensearch.index.reindex.remote.RemoteScrollableHitSource;
import org.opensearch.index.reindex.spi.RemoteReindexExtension;
import org.opensearch.script.Script; import org.opensearch.script.Script;
import org.opensearch.script.ScriptService; import org.opensearch.script.ScriptService;
import org.opensearch.threadpool.ThreadPool; import org.opensearch.threadpool.ThreadPool;
@ -91,14 +94,21 @@ public class Reindexer {
private final ThreadPool threadPool; private final ThreadPool threadPool;
private final ScriptService scriptService; private final ScriptService scriptService;
private final ReindexSslConfig reindexSslConfig; private final ReindexSslConfig reindexSslConfig;
private final Optional<RemoteReindexExtension> remoteExtension;
Reindexer(ClusterService clusterService, Client client, ThreadPool threadPool, ScriptService scriptService, Reindexer(ClusterService clusterService, Client client, ThreadPool threadPool, ScriptService scriptService,
ReindexSslConfig reindexSslConfig) { ReindexSslConfig reindexSslConfig) {
this(clusterService, client, threadPool, scriptService, reindexSslConfig, Optional.empty());
}
Reindexer(ClusterService clusterService, Client client, ThreadPool threadPool, ScriptService scriptService,
ReindexSslConfig reindexSslConfig, Optional<RemoteReindexExtension> remoteExtension) {
this.clusterService = clusterService; this.clusterService = clusterService;
this.client = client; this.client = client;
this.threadPool = threadPool; this.threadPool = threadPool;
this.scriptService = scriptService; this.scriptService = scriptService;
this.reindexSslConfig = reindexSslConfig; this.reindexSslConfig = reindexSslConfig;
this.remoteExtension = remoteExtension;
} }
public void initTask(BulkByScrollTask task, ReindexRequest request, ActionListener<Void> listener) { public void initTask(BulkByScrollTask task, ReindexRequest request, ActionListener<Void> listener) {
@ -106,25 +116,54 @@ public class Reindexer {
} }
public void execute(BulkByScrollTask task, ReindexRequest request, ActionListener<BulkByScrollResponse> listener) { public void execute(BulkByScrollTask task, ReindexRequest request, ActionListener<BulkByScrollResponse> listener) {
ActionListener<BulkByScrollResponse> remoteReindexActionListener = getRemoteReindexWrapperListener(listener, request);
BulkByScrollParallelizationHelper.executeSlicedAction(task, request, ReindexAction.INSTANCE, listener, client, BulkByScrollParallelizationHelper.executeSlicedAction(task, request, ReindexAction.INSTANCE, listener, client,
clusterService.localNode(), clusterService.localNode(),
() -> { () -> {
ParentTaskAssigningClient assigningClient = new ParentTaskAssigningClient(client, clusterService.localNode(), task); ParentTaskAssigningClient assigningClient = new ParentTaskAssigningClient(client, clusterService.localNode(), task);
AsyncIndexBySearchAction searchAction = new AsyncIndexBySearchAction(task, logger, assigningClient, threadPool, AsyncIndexBySearchAction searchAction = new AsyncIndexBySearchAction(task, logger, assigningClient, threadPool,
scriptService, reindexSslConfig, request, listener); scriptService, reindexSslConfig, request, remoteReindexActionListener, getInterceptor(request));
searchAction.start(); searchAction.start();
}); });
} }
private Optional<HttpRequestInterceptor> getInterceptor(ReindexRequest request) {
if (request.getRemoteInfo() == null) {
return Optional.empty();
} else {
return remoteExtension.map(x -> x.getInterceptorProvider()).flatMap(provider ->
provider.getRestInterceptor(request, threadPool.getThreadContext()));
}
}
private ActionListener<BulkByScrollResponse> getRemoteReindexWrapperListener(
ActionListener<BulkByScrollResponse> listener, ReindexRequest reindexRequest) {
if (reindexRequest.getRemoteInfo() == null) {
return listener;
}
if (remoteExtension.isPresent()) {
return remoteExtension.get().getRemoteReindexActionListener(listener, reindexRequest);
}
logger.info("No extension found for remote reindex listener");
return listener;
}
static RestClient buildRestClient(RemoteInfo remoteInfo, ReindexSslConfig sslConfig, long taskId, List<Thread> threadCollector) {
return buildRestClient(remoteInfo, sslConfig, taskId, threadCollector, Optional.empty());
}
/** /**
* Build the {@link RestClient} used for reindexing from remote clusters. * Build the {@link RestClient} used for reindexing from remote clusters.
*
* @param remoteInfo connection information for the remote cluster * @param remoteInfo connection information for the remote cluster
* @param sslConfig configuration for potential outgoing HTTPS connections * @param sslConfig configuration for potential outgoing HTTPS connections
* @param taskId the id of the current task. This is added to the thread name for easier tracking * @param taskId the id of the current task. This is added to the thread name for easier tracking
* @param threadCollector a list in which we collect all the threads created by the client * @param threadCollector a list in which we collect all the threads created by the client
* @param restInterceptor an optional HttpRequestInterceptor
*/ */
static RestClient buildRestClient(RemoteInfo remoteInfo, ReindexSslConfig sslConfig, long taskId, List<Thread> threadCollector) { static RestClient buildRestClient(RemoteInfo remoteInfo, ReindexSslConfig sslConfig, long taskId, List<Thread> threadCollector,
Optional<HttpRequestInterceptor> restInterceptor) {
Header[] clientHeaders = new Header[remoteInfo.getHeaders().size()]; Header[] clientHeaders = new Header[remoteInfo.getHeaders().size()];
int i = 0; int i = 0;
for (Map.Entry<String, String> header : remoteInfo.getHeaders().entrySet()) { for (Map.Entry<String, String> header : remoteInfo.getHeaders().entrySet()) {
@ -146,6 +185,8 @@ public class Reindexer {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, creds); credentialsProvider.setCredentials(AuthScope.ANY, creds);
c.setDefaultCredentialsProvider(credentialsProvider); c.setDefaultCredentialsProvider(credentialsProvider);
} else {
restInterceptor.ifPresent(interceptor -> c.addInterceptorLast(interceptor));
} }
// Stick the task id in the thread name so we can track down tasks from stack traces // Stick the task id in the thread name so we can track down tasks from stack traces
AtomicInteger threads = new AtomicInteger(); AtomicInteger threads = new AtomicInteger();
@ -185,6 +226,12 @@ public class Reindexer {
AsyncIndexBySearchAction(BulkByScrollTask task, Logger logger, ParentTaskAssigningClient client, AsyncIndexBySearchAction(BulkByScrollTask task, Logger logger, ParentTaskAssigningClient client,
ThreadPool threadPool, ScriptService scriptService, ReindexSslConfig sslConfig, ReindexRequest request, ThreadPool threadPool, ScriptService scriptService, ReindexSslConfig sslConfig, ReindexRequest request,
ActionListener<BulkByScrollResponse> listener) { ActionListener<BulkByScrollResponse> listener) {
this(task, logger, client, threadPool, scriptService, sslConfig, request, listener, Optional.empty());
}
AsyncIndexBySearchAction(BulkByScrollTask task, Logger logger, ParentTaskAssigningClient client,
ThreadPool threadPool, ScriptService scriptService, ReindexSslConfig sslConfig, ReindexRequest request,
ActionListener<BulkByScrollResponse> listener, Optional<HttpRequestInterceptor> interceptor) {
super(task, super(task,
/* /*
* We only need the source version if we're going to use it when write and we only do that when the destination request uses * We only need the source version if we're going to use it when write and we only do that when the destination request uses
@ -192,6 +239,7 @@ public class Reindexer {
*/ */
request.getDestination().versionType() != VersionType.INTERNAL, request.getDestination().versionType() != VersionType.INTERNAL,
false, logger, client, threadPool, request, listener, scriptService, sslConfig); false, logger, client, threadPool, request, listener, scriptService, sslConfig);
this.interceptor = interceptor;
} }
@Override @Override
@ -200,7 +248,8 @@ public class Reindexer {
RemoteInfo remoteInfo = mainRequest.getRemoteInfo(); RemoteInfo remoteInfo = mainRequest.getRemoteInfo();
createdThreads = synchronizedList(new ArrayList<>()); createdThreads = synchronizedList(new ArrayList<>());
assert sslConfig != null : "Reindex ssl config must be set"; assert sslConfig != null : "Reindex ssl config must be set";
RestClient restClient = buildRestClient(remoteInfo, sslConfig, task.getId(), createdThreads); RestClient restClient = buildRestClient(remoteInfo, sslConfig, task.getId(), createdThreads,
this.interceptor);
return new RemoteScrollableHitSource(logger, backoffPolicy, threadPool, worker::countSearchRetry, return new RemoteScrollableHitSource(logger, backoffPolicy, threadPool, worker::countSearchRetry,
this::onScrollResponse, this::finishHim, this::onScrollResponse, this::finishHim,
restClient, remoteInfo.getQuery(), mainRequest.getSearchRequest()); restClient, remoteInfo.getQuery(), mainRequest.getSearchRequest());

View File

@ -32,6 +32,7 @@
package org.opensearch.index.reindex; package org.opensearch.index.reindex;
import java.util.Optional;
import org.opensearch.action.ActionListener; import org.opensearch.action.ActionListener;
import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.AutoCreateIndex; import org.opensearch.action.support.AutoCreateIndex;
@ -43,6 +44,7 @@ import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property; import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings; import org.opensearch.common.settings.Settings;
import org.opensearch.index.reindex.spi.RemoteReindexExtension;
import org.opensearch.script.ScriptService; import org.opensearch.script.ScriptService;
import org.opensearch.tasks.Task; import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool; import org.opensearch.threadpool.ThreadPool;
@ -56,6 +58,7 @@ import static java.util.Collections.emptyList;
public class TransportReindexAction extends HandledTransportAction<ReindexRequest, BulkByScrollResponse> { public class TransportReindexAction extends HandledTransportAction<ReindexRequest, BulkByScrollResponse> {
public static final Setting<List<String>> REMOTE_CLUSTER_WHITELIST = public static final Setting<List<String>> REMOTE_CLUSTER_WHITELIST =
Setting.listSetting("reindex.remote.whitelist", emptyList(), Function.identity(), Property.NodeScope); Setting.listSetting("reindex.remote.whitelist", emptyList(), Function.identity(), Property.NodeScope);
public static Optional<RemoteReindexExtension> remoteExtension = Optional.empty();
private final ReindexValidator reindexValidator; private final ReindexValidator reindexValidator;
private final Reindexer reindexer; private final Reindexer reindexer;
@ -66,7 +69,7 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
AutoCreateIndex autoCreateIndex, Client client, TransportService transportService, ReindexSslConfig sslConfig) { AutoCreateIndex autoCreateIndex, Client client, TransportService transportService, ReindexSslConfig sslConfig) {
super(ReindexAction.NAME, transportService, actionFilters, ReindexRequest::new); super(ReindexAction.NAME, transportService, actionFilters, ReindexRequest::new);
this.reindexValidator = new ReindexValidator(settings, clusterService, indexNameExpressionResolver, autoCreateIndex); this.reindexValidator = new ReindexValidator(settings, clusterService, indexNameExpressionResolver, autoCreateIndex);
this.reindexer = new Reindexer(clusterService, client, threadPool, scriptService, sslConfig); this.reindexer = new Reindexer(clusterService, client, threadPool, scriptService, sslConfig, remoteExtension);
} }
@Override @Override

View File

@ -0,0 +1,20 @@
/*
* Copyright OpenSearch Contributors.
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.index.reindex.spi;
import java.util.Optional;
import org.apache.http.HttpRequestInterceptor;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.index.reindex.ReindexRequest;
public interface ReindexRestInterceptorProvider {
/**
* @param request Reindex request.
* @param threadContext Current thread context.
* @return HttpRequestInterceptor object.
*/
Optional<HttpRequestInterceptor> getRestInterceptor(ReindexRequest request, ThreadContext threadContext);
}

View File

@ -0,0 +1,34 @@
/*
* Copyright OpenSearch Contributors.
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.index.reindex.spi;
import org.opensearch.action.ActionListener;
import org.opensearch.index.reindex.BulkByScrollResponse;
import org.opensearch.index.reindex.ReindexRequest;
/**
* This interface provides an extension point for {@link org.opensearch.index.reindex.ReindexPlugin}.
* This interface can be implemented to provide a custom Rest interceptor and {@link ActionListener}
* The Rest interceptor can be used to pre-process any reindex request and perform any action
* on the response. The ActionListener listens to the success and failure events on every reindex request
* and can be used to take any actions based on the success or failure.
*/
public interface RemoteReindexExtension {
/**
* Get an InterceptorProvider.
* @return ReindexRestInterceptorProvider implementation.
*/
ReindexRestInterceptorProvider getInterceptorProvider();
/**
* Get a wrapper of ActionListener which is can used to perform any action based on
* the success/failure of the remote reindex call.
* @return ActionListener wrapper implementation.
*/
ActionListener<BulkByScrollResponse> getRemoteReindexActionListener(ActionListener<BulkByScrollResponse> listener,
ReindexRequest reindexRequest);
}