Extract reindexing logic from transport action (#46033)
This commit extracts the reindexing logic from the transport action so that it can be incorporated into the persistent reindex work without requiring the usage of the client.
This commit is contained in:
parent
ff1acf3489
commit
07f3ddb549
|
@ -36,6 +36,7 @@ import org.elasticsearch.action.delete.DeleteRequest;
|
|||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.support.TransportAction;
|
||||
import org.elasticsearch.client.ParentTaskAssigningClient;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
|
@ -90,8 +91,9 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
|
|||
protected final BulkByScrollTask task;
|
||||
protected final WorkerBulkByScrollTaskState worker;
|
||||
protected final ThreadPool threadPool;
|
||||
protected final ScriptService scriptService;
|
||||
protected final ReindexSslConfig sslConfig;
|
||||
|
||||
protected final Action mainAction;
|
||||
/**
|
||||
* The request for this action. Named mainRequest because we create lots of <code>request</code> variables all representing child
|
||||
* requests of this mainRequest.
|
||||
|
@ -114,12 +116,13 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
|
|||
private final BiFunction<RequestWrapper<?>, ScrollableHitSource.Hit, RequestWrapper<?>> scriptApplier;
|
||||
private int lastBatchSize;
|
||||
|
||||
public AbstractAsyncBulkByScrollAction(BulkByScrollTask task, boolean needsSourceDocumentVersions,
|
||||
boolean needsSourceDocumentSeqNoAndPrimaryTerm, Logger logger, ParentTaskAssigningClient client,
|
||||
ThreadPool threadPool, Action mainAction, Request mainRequest,
|
||||
ActionListener<BulkByScrollResponse> listener) {
|
||||
|
||||
AbstractAsyncBulkByScrollAction(BulkByScrollTask task, boolean needsSourceDocumentVersions,
|
||||
boolean needsSourceDocumentSeqNoAndPrimaryTerm, Logger logger, ParentTaskAssigningClient client,
|
||||
ThreadPool threadPool, Request mainRequest, ActionListener<BulkByScrollResponse> listener,
|
||||
@Nullable ScriptService scriptService, @Nullable ReindexSslConfig sslConfig) {
|
||||
this.task = task;
|
||||
this.scriptService = scriptService;
|
||||
this.sslConfig = sslConfig;
|
||||
if (!task.isWorker()) {
|
||||
throw new IllegalArgumentException("Given task [" + task.getId() + "] must have a child worker");
|
||||
}
|
||||
|
@ -128,7 +131,6 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
|
|||
this.logger = logger;
|
||||
this.client = client;
|
||||
this.threadPool = threadPool;
|
||||
this.mainAction = mainAction;
|
||||
this.mainRequest = mainRequest;
|
||||
this.listener = listener;
|
||||
BackoffPolicy backoffPolicy = buildBackoffPolicy();
|
||||
|
@ -158,7 +160,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
|
|||
// The default script applier executes a no-op
|
||||
return (request, searchHit) -> request;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Build the {@link RequestWrapper} for a single search hit. This shouldn't handle
|
||||
* metadata or scripting. That will be handled by copyMetadata and
|
||||
|
|
|
@ -32,9 +32,9 @@ import org.elasticsearch.threadpool.ThreadPool;
|
|||
public class AsyncDeleteByQueryAction extends AbstractAsyncBulkByScrollAction<DeleteByQueryRequest, TransportDeleteByQueryAction> {
|
||||
|
||||
public AsyncDeleteByQueryAction(BulkByScrollTask task, Logger logger, ParentTaskAssigningClient client,
|
||||
ThreadPool threadPool, TransportDeleteByQueryAction action, DeleteByQueryRequest request,
|
||||
ScriptService scriptService, ActionListener<BulkByScrollResponse> listener) {
|
||||
super(task, false, true, logger, client, threadPool, action, request, listener);
|
||||
ThreadPool threadPool, DeleteByQueryRequest request, ScriptService scriptService,
|
||||
ActionListener<BulkByScrollResponse> listener) {
|
||||
super(task, false, true, logger, client, threadPool, request, listener, scriptService, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -0,0 +1,121 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.reindex;
|
||||
|
||||
import org.apache.lucene.util.automaton.Automata;
|
||||
import org.apache.lucene.util.automaton.Automaton;
|
||||
import org.apache.lucene.util.automaton.CharacterRunAutomaton;
|
||||
import org.apache.lucene.util.automaton.MinimizationOperations;
|
||||
import org.apache.lucene.util.automaton.Operations;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.support.AutoCreateIndex;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.regex.Regex;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
class ReindexValidator {
|
||||
|
||||
private final CharacterRunAutomaton remoteWhitelist;
|
||||
private final ClusterService clusterService;
|
||||
private final IndexNameExpressionResolver resolver;
|
||||
private final AutoCreateIndex autoCreateIndex;
|
||||
|
||||
ReindexValidator(Settings settings, ClusterService clusterService, IndexNameExpressionResolver resolver,
|
||||
AutoCreateIndex autoCreateIndex) {
|
||||
this.remoteWhitelist = buildRemoteWhitelist(TransportReindexAction.REMOTE_CLUSTER_WHITELIST.get(settings));
|
||||
this.clusterService = clusterService;
|
||||
this.resolver = resolver;
|
||||
this.autoCreateIndex = autoCreateIndex;
|
||||
}
|
||||
|
||||
void initialValidation(ReindexRequest request) {
|
||||
checkRemoteWhitelist(remoteWhitelist, request.getRemoteInfo());
|
||||
ClusterState state = clusterService.state();
|
||||
validateAgainstAliases(request.getSearchRequest(), request.getDestination(), request.getRemoteInfo(), resolver, autoCreateIndex,
|
||||
state);
|
||||
}
|
||||
|
||||
static void checkRemoteWhitelist(CharacterRunAutomaton whitelist, RemoteInfo remoteInfo) {
|
||||
if (remoteInfo == null) {
|
||||
return;
|
||||
}
|
||||
String check = remoteInfo.getHost() + ':' + remoteInfo.getPort();
|
||||
if (whitelist.run(check)) {
|
||||
return;
|
||||
}
|
||||
String whiteListKey = TransportReindexAction.REMOTE_CLUSTER_WHITELIST.getKey();
|
||||
throw new IllegalArgumentException('[' + check + "] not whitelisted in " + whiteListKey);
|
||||
}
|
||||
|
||||
/**
|
||||
* Build the {@link CharacterRunAutomaton} that represents the reindex-from-remote whitelist and make sure that it doesn't whitelist
|
||||
* the world.
|
||||
*/
|
||||
static CharacterRunAutomaton buildRemoteWhitelist(List<String> whitelist) {
|
||||
if (whitelist.isEmpty()) {
|
||||
return new CharacterRunAutomaton(Automata.makeEmpty());
|
||||
}
|
||||
Automaton automaton = Regex.simpleMatchToAutomaton(whitelist.toArray(Strings.EMPTY_ARRAY));
|
||||
automaton = MinimizationOperations.minimize(automaton, Operations.DEFAULT_MAX_DETERMINIZED_STATES);
|
||||
if (Operations.isTotal(automaton)) {
|
||||
throw new IllegalArgumentException("Refusing to start because whitelist " + whitelist + " accepts all addresses. "
|
||||
+ "This would allow users to reindex-from-remote any URL they like effectively having Elasticsearch make HTTP GETs "
|
||||
+ "for them.");
|
||||
}
|
||||
return new CharacterRunAutomaton(automaton);
|
||||
}
|
||||
|
||||
/**
|
||||
* Throws an ActionRequestValidationException if the request tries to index
|
||||
* back into the same index or into an index that points to two indexes.
|
||||
* This cannot be done during request validation because the cluster state
|
||||
* isn't available then. Package private for testing.
|
||||
*/
|
||||
static void validateAgainstAliases(SearchRequest source, IndexRequest destination, RemoteInfo remoteInfo,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver, AutoCreateIndex autoCreateIndex,
|
||||
ClusterState clusterState) {
|
||||
if (remoteInfo != null) {
|
||||
return;
|
||||
}
|
||||
String target = destination.index();
|
||||
if (false == autoCreateIndex.shouldAutoCreate(target, clusterState)) {
|
||||
/*
|
||||
* If we're going to autocreate the index we don't need to resolve
|
||||
* it. This is the same sort of dance that TransportIndexRequest
|
||||
* uses to decide to autocreate the index.
|
||||
*/
|
||||
target = indexNameExpressionResolver.concreteWriteIndex(clusterState, destination).getName();
|
||||
}
|
||||
for (String sourceIndex : indexNameExpressionResolver.concreteIndexNames(clusterState, source)) {
|
||||
if (sourceIndex.equals(target)) {
|
||||
ActionRequestValidationException e = new ActionRequestValidationException();
|
||||
e.addValidationError("reindex cannot write into an index its reading from [" + target + ']');
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,368 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.reindex;
|
||||
|
||||
import org.apache.http.Header;
|
||||
import org.apache.http.HttpHost;
|
||||
import org.apache.http.auth.AuthScope;
|
||||
import org.apache.http.auth.UsernamePasswordCredentials;
|
||||
import org.apache.http.client.CredentialsProvider;
|
||||
import org.apache.http.impl.client.BasicCredentialsProvider;
|
||||
import org.apache.http.impl.nio.reactor.IOReactorConfig;
|
||||
import org.apache.http.message.BasicHeader;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.bulk.BackoffPolicy;
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.ParentTaskAssigningClient;
|
||||
import org.elasticsearch.client.RestClient;
|
||||
import org.elasticsearch.client.RestClientBuilder;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.lucene.uid.Versions;
|
||||
import org.elasticsearch.common.xcontent.DeprecationHandler;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.index.mapper.VersionFieldMapper;
|
||||
import org.elasticsearch.index.reindex.remote.RemoteScrollableHitSource;
|
||||
import org.elasticsearch.script.Script;
|
||||
import org.elasticsearch.script.ScriptService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.UncheckedIOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.BiFunction;
|
||||
|
||||
import static java.util.Collections.emptyList;
|
||||
import static java.util.Collections.synchronizedList;
|
||||
import static java.util.Objects.requireNonNull;
|
||||
import static org.elasticsearch.index.VersionType.INTERNAL;
|
||||
|
||||
public class Reindexer {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(Reindexer.class);
|
||||
|
||||
private final ClusterService clusterService;
|
||||
private final Client client;
|
||||
private final ThreadPool threadPool;
|
||||
private final ScriptService scriptService;
|
||||
private final ReindexSslConfig reindexSslConfig;
|
||||
|
||||
Reindexer(ClusterService clusterService, Client client, ThreadPool threadPool, ScriptService scriptService,
|
||||
ReindexSslConfig reindexSslConfig) {
|
||||
this.clusterService = clusterService;
|
||||
this.client = client;
|
||||
this.threadPool = threadPool;
|
||||
this.scriptService = scriptService;
|
||||
this.reindexSslConfig = reindexSslConfig;
|
||||
}
|
||||
|
||||
public void execute(BulkByScrollTask task, ReindexRequest request, ActionListener<BulkByScrollResponse> listener) {
|
||||
BulkByScrollParallelizationHelper.startSlicedAction(request, task, ReindexAction.INSTANCE, listener, client,
|
||||
clusterService.localNode(),
|
||||
() -> {
|
||||
ParentTaskAssigningClient assigningClient = new ParentTaskAssigningClient(client, clusterService.localNode(), task);
|
||||
AsyncIndexBySearchAction searchAction = new AsyncIndexBySearchAction(task, logger, assigningClient, threadPool,
|
||||
scriptService, reindexSslConfig, request, listener);
|
||||
searchAction.start();
|
||||
}
|
||||
);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Build the {@link RestClient} used for reindexing from remote clusters.
|
||||
* @param remoteInfo connection information for the remote cluster
|
||||
* @param sslConfig configuration for potential outgoing HTTPS connections
|
||||
* @param taskId the id of the current task. This is added to the thread name for easier tracking
|
||||
* @param threadCollector a list in which we collect all the threads created by the client
|
||||
*/
|
||||
static RestClient buildRestClient(RemoteInfo remoteInfo, ReindexSslConfig sslConfig, long taskId, List<Thread> threadCollector) {
|
||||
Header[] clientHeaders = new Header[remoteInfo.getHeaders().size()];
|
||||
int i = 0;
|
||||
for (Map.Entry<String, String> header : remoteInfo.getHeaders().entrySet()) {
|
||||
clientHeaders[i++] = new BasicHeader(header.getKey(), header.getValue());
|
||||
}
|
||||
final RestClientBuilder builder =
|
||||
RestClient.builder(new HttpHost(remoteInfo.getHost(), remoteInfo.getPort(), remoteInfo.getScheme()))
|
||||
.setDefaultHeaders(clientHeaders)
|
||||
.setRequestConfigCallback(c -> {
|
||||
c.setConnectTimeout(Math.toIntExact(remoteInfo.getConnectTimeout().millis()));
|
||||
c.setSocketTimeout(Math.toIntExact(remoteInfo.getSocketTimeout().millis()));
|
||||
return c;
|
||||
})
|
||||
.setHttpClientConfigCallback(c -> {
|
||||
// Enable basic auth if it is configured
|
||||
if (remoteInfo.getUsername() != null) {
|
||||
UsernamePasswordCredentials creds = new UsernamePasswordCredentials(remoteInfo.getUsername(),
|
||||
remoteInfo.getPassword());
|
||||
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
|
||||
credentialsProvider.setCredentials(AuthScope.ANY, creds);
|
||||
c.setDefaultCredentialsProvider(credentialsProvider);
|
||||
}
|
||||
// Stick the task id in the thread name so we can track down tasks from stack traces
|
||||
AtomicInteger threads = new AtomicInteger();
|
||||
c.setThreadFactory(r -> {
|
||||
String name = "es-client-" + taskId + "-" + threads.getAndIncrement();
|
||||
Thread t = new Thread(r, name);
|
||||
threadCollector.add(t);
|
||||
return t;
|
||||
});
|
||||
// Limit ourselves to one reactor thread because for now the search process is single threaded.
|
||||
c.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(1).build());
|
||||
c.setSSLStrategy(sslConfig.getStrategy());
|
||||
return c;
|
||||
});
|
||||
if (Strings.hasLength(remoteInfo.getPathPrefix()) && "/".equals(remoteInfo.getPathPrefix()) == false) {
|
||||
builder.setPathPrefix(remoteInfo.getPathPrefix());
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Simple implementation of reindex using scrolling and bulk. There are tons
|
||||
* of optimizations that can be done on certain types of reindex requests
|
||||
* but this makes no attempt to do any of them so it can be as simple
|
||||
* possible.
|
||||
*/
|
||||
static class AsyncIndexBySearchAction extends AbstractAsyncBulkByScrollAction<ReindexRequest, TransportReindexAction> {
|
||||
|
||||
/**
|
||||
* List of threads created by this process. Usually actions don't create threads in Elasticsearch. Instead they use the builtin
|
||||
* {@link ThreadPool}s. But reindex-from-remote uses Elasticsearch's {@link RestClient} which doesn't use the
|
||||
* {@linkplain ThreadPool}s because it uses httpasyncclient. It'd be a ton of trouble to work around creating those threads. So
|
||||
* instead we let it create threads but we watch them carefully and assert that they are dead when the process is over.
|
||||
*/
|
||||
private List<Thread> createdThreads = emptyList();
|
||||
|
||||
AsyncIndexBySearchAction(BulkByScrollTask task, Logger logger, ParentTaskAssigningClient client,
|
||||
ThreadPool threadPool, ScriptService scriptService, ReindexSslConfig sslConfig, ReindexRequest request,
|
||||
ActionListener<BulkByScrollResponse> listener) {
|
||||
super(task,
|
||||
/*
|
||||
* We only need the source version if we're going to use it when write and we only do that when the destination request uses
|
||||
* external versioning.
|
||||
*/
|
||||
request.getDestination().versionType() != VersionType.INTERNAL,
|
||||
false, logger, client, threadPool, request, listener, scriptService, sslConfig);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ScrollableHitSource buildScrollableResultSource(BackoffPolicy backoffPolicy) {
|
||||
if (mainRequest.getRemoteInfo() != null) {
|
||||
RemoteInfo remoteInfo = mainRequest.getRemoteInfo();
|
||||
createdThreads = synchronizedList(new ArrayList<>());
|
||||
assert sslConfig != null : "Reindex ssl config must be set";
|
||||
RestClient restClient = buildRestClient(remoteInfo, sslConfig, task.getId(), createdThreads);
|
||||
return new RemoteScrollableHitSource(logger, backoffPolicy, threadPool, worker::countSearchRetry,
|
||||
this::onScrollResponse, this::finishHim,
|
||||
restClient, remoteInfo.getQuery(), mainRequest.getSearchRequest());
|
||||
}
|
||||
return super.buildScrollableResultSource(backoffPolicy);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void finishHim(Exception failure, List<BulkItemResponse.Failure> indexingFailures,
|
||||
List<ScrollableHitSource.SearchFailure> searchFailures, boolean timedOut) {
|
||||
super.finishHim(failure, indexingFailures, searchFailures, timedOut);
|
||||
// A little extra paranoia so we log something if we leave any threads running
|
||||
for (Thread thread : createdThreads) {
|
||||
if (thread.isAlive()) {
|
||||
assert false: "Failed to properly stop client thread [" + thread.getName() + "]";
|
||||
logger.error("Failed to properly stop client thread [{}]", thread.getName());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public BiFunction<RequestWrapper<?>, ScrollableHitSource.Hit, RequestWrapper<?>> buildScriptApplier() {
|
||||
Script script = mainRequest.getScript();
|
||||
if (script != null) {
|
||||
assert scriptService != null : "Script service must be set";
|
||||
return new Reindexer.AsyncIndexBySearchAction.ReindexScriptApplier(worker, scriptService, script, script.getParams());
|
||||
}
|
||||
return super.buildScriptApplier();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RequestWrapper<IndexRequest> buildRequest(ScrollableHitSource.Hit doc) {
|
||||
IndexRequest index = new IndexRequest();
|
||||
|
||||
// Copy the index from the request so we always write where it asked to write
|
||||
index.index(mainRequest.getDestination().index());
|
||||
|
||||
// If the request override's type then the user wants all documents in that type. Otherwise keep the doc's type.
|
||||
if (mainRequest.getDestination().type() == null) {
|
||||
index.type(doc.getType());
|
||||
} else {
|
||||
index.type(mainRequest.getDestination().type());
|
||||
}
|
||||
|
||||
/*
|
||||
* Internal versioning can just use what we copied from the destination request. Otherwise we assume we're using external
|
||||
* versioning and use the doc's version.
|
||||
*/
|
||||
index.versionType(mainRequest.getDestination().versionType());
|
||||
if (index.versionType() == INTERNAL) {
|
||||
assert doc.getVersion() == -1 : "fetched version when we didn't have to";
|
||||
index.version(mainRequest.getDestination().version());
|
||||
} else {
|
||||
index.version(doc.getVersion());
|
||||
}
|
||||
|
||||
// id and source always come from the found doc. Scripts can change them but they operate on the index request.
|
||||
index.id(doc.getId());
|
||||
|
||||
// the source xcontent type and destination could be different
|
||||
final XContentType sourceXContentType = doc.getXContentType();
|
||||
final XContentType mainRequestXContentType = mainRequest.getDestination().getContentType();
|
||||
if (mainRequestXContentType != null && doc.getXContentType() != mainRequestXContentType) {
|
||||
// we need to convert
|
||||
try (InputStream stream = doc.getSource().streamInput();
|
||||
XContentParser parser = sourceXContentType.xContent()
|
||||
.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, stream);
|
||||
XContentBuilder builder = XContentBuilder.builder(mainRequestXContentType.xContent())) {
|
||||
parser.nextToken();
|
||||
builder.copyCurrentStructure(parser);
|
||||
index.source(BytesReference.bytes(builder), builder.contentType());
|
||||
} catch (IOException e) {
|
||||
throw new UncheckedIOException("failed to convert hit from " + sourceXContentType + " to "
|
||||
+ mainRequestXContentType, e);
|
||||
}
|
||||
} else {
|
||||
index.source(doc.getSource(), doc.getXContentType());
|
||||
}
|
||||
|
||||
/*
|
||||
* The rest of the index request just has to be copied from the template. It may be changed later from scripts or the superclass
|
||||
* here on out operates on the index request rather than the template.
|
||||
*/
|
||||
index.routing(mainRequest.getDestination().routing());
|
||||
index.setPipeline(mainRequest.getDestination().getPipeline());
|
||||
// OpType is synthesized from version so it is handled when we copy version above.
|
||||
|
||||
return wrap(index);
|
||||
}
|
||||
|
||||
/**
|
||||
* Override the simple copy behavior to allow more fine grained control.
|
||||
*/
|
||||
@Override
|
||||
protected void copyRouting(RequestWrapper<?> request, String routing) {
|
||||
String routingSpec = mainRequest.getDestination().routing();
|
||||
if (routingSpec == null) {
|
||||
super.copyRouting(request, routing);
|
||||
return;
|
||||
}
|
||||
if (routingSpec.startsWith("=")) {
|
||||
super.copyRouting(request, mainRequest.getDestination().routing().substring(1));
|
||||
return;
|
||||
}
|
||||
switch (routingSpec) {
|
||||
case "keep":
|
||||
super.copyRouting(request, routing);
|
||||
break;
|
||||
case "discard":
|
||||
super.copyRouting(request, null);
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("Unsupported routing command");
|
||||
}
|
||||
}
|
||||
|
||||
class ReindexScriptApplier extends ScriptApplier {
|
||||
|
||||
ReindexScriptApplier(WorkerBulkByScrollTaskState taskWorker, ScriptService scriptService, Script script,
|
||||
Map<String, Object> params) {
|
||||
super(taskWorker, scriptService, script, params);
|
||||
}
|
||||
|
||||
/*
|
||||
* Methods below here handle script updating the index request. They try
|
||||
* to be pretty liberal with regards to types because script are often
|
||||
* dynamically typed.
|
||||
*/
|
||||
|
||||
@Override
|
||||
protected void scriptChangedIndex(RequestWrapper<?> request, Object to) {
|
||||
requireNonNull(to, "Can't reindex without a destination index!");
|
||||
request.setIndex(to.toString());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void scriptChangedType(RequestWrapper<?> request, Object to) {
|
||||
requireNonNull(to, "Can't reindex without a destination type!");
|
||||
request.setType(to.toString());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void scriptChangedId(RequestWrapper<?> request, Object to) {
|
||||
request.setId(Objects.toString(to, null));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void scriptChangedVersion(RequestWrapper<?> request, Object to) {
|
||||
if (to == null) {
|
||||
request.setVersion(Versions.MATCH_ANY);
|
||||
request.setVersionType(INTERNAL);
|
||||
} else {
|
||||
request.setVersion(asLong(to, VersionFieldMapper.NAME));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void scriptChangedRouting(RequestWrapper<?> request, Object to) {
|
||||
request.setRouting(Objects.toString(to, null));
|
||||
}
|
||||
|
||||
private long asLong(Object from, String name) {
|
||||
/*
|
||||
* Stuffing a number into the map will have converted it to
|
||||
* some Number.
|
||||
* */
|
||||
Number fromNumber;
|
||||
try {
|
||||
fromNumber = (Number) from;
|
||||
} catch (ClassCastException e) {
|
||||
throw new IllegalArgumentException(name + " may only be set to an int or a long but was [" + from + "]", e);
|
||||
}
|
||||
long l = fromNumber.longValue();
|
||||
// Check that we didn't round when we fetched the value.
|
||||
if (fromNumber.doubleValue() != l) {
|
||||
throw new IllegalArgumentException(name + " may only be set to an int or a long but was [" + from + "]");
|
||||
}
|
||||
return l;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -24,7 +24,6 @@ import org.elasticsearch.action.support.ActionFilters;
|
|||
import org.elasticsearch.action.support.HandledTransportAction;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.ParentTaskAssigningClient;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
|
@ -58,10 +57,9 @@ public class TransportDeleteByQueryAction extends HandledTransportAction<DeleteB
|
|||
BulkByScrollParallelizationHelper.startSlicedAction(request, bulkByScrollTask, DeleteByQueryAction.INSTANCE, listener, client,
|
||||
clusterService.localNode(),
|
||||
() -> {
|
||||
ClusterState state = clusterService.state();
|
||||
ParentTaskAssigningClient assigningClient = new ParentTaskAssigningClient(client, clusterService.localNode(),
|
||||
bulkByScrollTask);
|
||||
new AsyncDeleteByQueryAction(bulkByScrollTask, logger, assigningClient, threadPool, this, request, scriptService,
|
||||
new AsyncDeleteByQueryAction(bulkByScrollTask, logger, assigningClient, threadPool, request, scriptService,
|
||||
listener).start();
|
||||
}
|
||||
);
|
||||
|
|
|
@ -19,443 +19,48 @@
|
|||
|
||||
package org.elasticsearch.index.reindex;
|
||||
|
||||
import org.apache.http.Header;
|
||||
import org.apache.http.HttpHost;
|
||||
import org.apache.http.auth.AuthScope;
|
||||
import org.apache.http.auth.UsernamePasswordCredentials;
|
||||
import org.apache.http.client.CredentialsProvider;
|
||||
import org.apache.http.impl.client.BasicCredentialsProvider;
|
||||
import org.apache.http.impl.nio.reactor.IOReactorConfig;
|
||||
import org.apache.http.message.BasicHeader;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.lucene.util.automaton.Automata;
|
||||
import org.apache.lucene.util.automaton.Automaton;
|
||||
import org.apache.lucene.util.automaton.CharacterRunAutomaton;
|
||||
import org.apache.lucene.util.automaton.MinimizationOperations;
|
||||
import org.apache.lucene.util.automaton.Operations;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.bulk.BackoffPolicy;
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.AutoCreateIndex;
|
||||
import org.elasticsearch.action.support.HandledTransportAction;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.ParentTaskAssigningClient;
|
||||
import org.elasticsearch.client.RestClient;
|
||||
import org.elasticsearch.client.RestClientBuilder;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.lucene.uid.Versions;
|
||||
import org.elasticsearch.common.regex.Regex;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Setting.Property;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.DeprecationHandler;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.index.mapper.VersionFieldMapper;
|
||||
import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure;
|
||||
import org.elasticsearch.index.reindex.remote.RemoteScrollableHitSource;
|
||||
import org.elasticsearch.script.Script;
|
||||
import org.elasticsearch.script.ScriptService;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.UncheckedIOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.Function;
|
||||
|
||||
import static java.util.Collections.emptyList;
|
||||
import static java.util.Collections.synchronizedList;
|
||||
import static java.util.Objects.requireNonNull;
|
||||
import static org.elasticsearch.index.VersionType.INTERNAL;
|
||||
|
||||
public class TransportReindexAction extends HandledTransportAction<ReindexRequest, BulkByScrollResponse> {
|
||||
public static final Setting<List<String>> REMOTE_CLUSTER_WHITELIST =
|
||||
Setting.listSetting("reindex.remote.whitelist", emptyList(), Function.identity(), Property.NodeScope);
|
||||
|
||||
private final ThreadPool threadPool;
|
||||
private final ClusterService clusterService;
|
||||
private final ScriptService scriptService;
|
||||
private final AutoCreateIndex autoCreateIndex;
|
||||
private final Client client;
|
||||
private final CharacterRunAutomaton remoteWhitelist;
|
||||
private final IndexNameExpressionResolver indexNameExpressionResolver;
|
||||
|
||||
private final ReindexSslConfig sslConfig;
|
||||
private final ReindexValidator reindexValidator;
|
||||
private final Reindexer reindexer;
|
||||
|
||||
@Inject
|
||||
public TransportReindexAction(Settings settings, ThreadPool threadPool, ActionFilters actionFilters,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver, ClusterService clusterService, ScriptService scriptService,
|
||||
AutoCreateIndex autoCreateIndex, Client client, TransportService transportService, ReindexSslConfig sslConfig) {
|
||||
super(ReindexAction.NAME, transportService, actionFilters, (Writeable.Reader<ReindexRequest>)ReindexRequest::new);
|
||||
this.threadPool = threadPool;
|
||||
this.clusterService = clusterService;
|
||||
this.scriptService = scriptService;
|
||||
this.autoCreateIndex = autoCreateIndex;
|
||||
this.client = client;
|
||||
remoteWhitelist = buildRemoteWhitelist(REMOTE_CLUSTER_WHITELIST.get(settings));
|
||||
this.indexNameExpressionResolver = indexNameExpressionResolver;
|
||||
this.sslConfig = sslConfig;
|
||||
super(ReindexAction.NAME, transportService, actionFilters, ReindexRequest::new);
|
||||
this.reindexValidator = new ReindexValidator(settings, clusterService, indexNameExpressionResolver, autoCreateIndex);
|
||||
this.reindexer = new Reindexer(clusterService, client, threadPool, scriptService, sslConfig);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(Task task, ReindexRequest request, ActionListener<BulkByScrollResponse> listener) {
|
||||
checkRemoteWhitelist(remoteWhitelist, request.getRemoteInfo());
|
||||
ClusterState state = clusterService.state();
|
||||
validateAgainstAliases(request.getSearchRequest(), request.getDestination(), request.getRemoteInfo(),
|
||||
indexNameExpressionResolver, autoCreateIndex, state);
|
||||
|
||||
reindexValidator.initialValidation(request);
|
||||
BulkByScrollTask bulkByScrollTask = (BulkByScrollTask) task;
|
||||
|
||||
BulkByScrollParallelizationHelper.startSlicedAction(request, bulkByScrollTask, ReindexAction.INSTANCE, listener, client,
|
||||
clusterService.localNode(),
|
||||
() -> {
|
||||
ParentTaskAssigningClient assigningClient = new ParentTaskAssigningClient(client, clusterService.localNode(),
|
||||
bulkByScrollTask);
|
||||
new AsyncIndexBySearchAction(bulkByScrollTask, logger, assigningClient, threadPool, this, request, state,
|
||||
listener).start();
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
static void checkRemoteWhitelist(CharacterRunAutomaton whitelist, RemoteInfo remoteInfo) {
|
||||
if (remoteInfo == null) {
|
||||
return;
|
||||
}
|
||||
String check = remoteInfo.getHost() + ':' + remoteInfo.getPort();
|
||||
if (whitelist.run(check)) {
|
||||
return;
|
||||
}
|
||||
throw new IllegalArgumentException('[' + check + "] not whitelisted in " + REMOTE_CLUSTER_WHITELIST.getKey());
|
||||
}
|
||||
|
||||
/**
|
||||
* Build the {@link CharacterRunAutomaton} that represents the reindex-from-remote whitelist and make sure that it doesn't whitelist
|
||||
* the world.
|
||||
*/
|
||||
static CharacterRunAutomaton buildRemoteWhitelist(List<String> whitelist) {
|
||||
if (whitelist.isEmpty()) {
|
||||
return new CharacterRunAutomaton(Automata.makeEmpty());
|
||||
}
|
||||
Automaton automaton = Regex.simpleMatchToAutomaton(whitelist.toArray(Strings.EMPTY_ARRAY));
|
||||
automaton = MinimizationOperations.minimize(automaton, Operations.DEFAULT_MAX_DETERMINIZED_STATES);
|
||||
if (Operations.isTotal(automaton)) {
|
||||
throw new IllegalArgumentException("Refusing to start because whitelist " + whitelist + " accepts all addresses. "
|
||||
+ "This would allow users to reindex-from-remote any URL they like effectively having Elasticsearch make HTTP GETs "
|
||||
+ "for them.");
|
||||
}
|
||||
return new CharacterRunAutomaton(automaton);
|
||||
}
|
||||
|
||||
/**
|
||||
* Throws an ActionRequestValidationException if the request tries to index
|
||||
* back into the same index or into an index that points to two indexes.
|
||||
* This cannot be done during request validation because the cluster state
|
||||
* isn't available then. Package private for testing.
|
||||
*/
|
||||
static void validateAgainstAliases(SearchRequest source, IndexRequest destination, RemoteInfo remoteInfo,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver, AutoCreateIndex autoCreateIndex,
|
||||
ClusterState clusterState) {
|
||||
if (remoteInfo != null) {
|
||||
return;
|
||||
}
|
||||
String target = destination.index();
|
||||
if (false == autoCreateIndex.shouldAutoCreate(target, clusterState)) {
|
||||
/*
|
||||
* If we're going to autocreate the index we don't need to resolve
|
||||
* it. This is the same sort of dance that TransportIndexRequest
|
||||
* uses to decide to autocreate the index.
|
||||
*/
|
||||
target = indexNameExpressionResolver.concreteWriteIndex(clusterState, destination).getName();
|
||||
}
|
||||
for (String sourceIndex : indexNameExpressionResolver.concreteIndexNames(clusterState, source)) {
|
||||
if (sourceIndex.equals(target)) {
|
||||
ActionRequestValidationException e = new ActionRequestValidationException();
|
||||
e.addValidationError("reindex cannot write into an index its reading from [" + target + ']');
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Build the {@link RestClient} used for reindexing from remote clusters.
|
||||
* @param remoteInfo connection information for the remote cluster
|
||||
* @param sslConfig configuration for potential outgoing HTTPS connections
|
||||
* @param taskId the id of the current task. This is added to the thread name for easier tracking
|
||||
* @param threadCollector a list in which we collect all the threads created by the client
|
||||
*/
|
||||
static RestClient buildRestClient(RemoteInfo remoteInfo, ReindexSslConfig sslConfig, long taskId, List<Thread> threadCollector) {
|
||||
Header[] clientHeaders = new Header[remoteInfo.getHeaders().size()];
|
||||
int i = 0;
|
||||
for (Map.Entry<String, String> header : remoteInfo.getHeaders().entrySet()) {
|
||||
clientHeaders[i++] = new BasicHeader(header.getKey(), header.getValue());
|
||||
}
|
||||
final RestClientBuilder builder =
|
||||
RestClient.builder(new HttpHost(remoteInfo.getHost(), remoteInfo.getPort(), remoteInfo.getScheme()))
|
||||
.setDefaultHeaders(clientHeaders)
|
||||
.setRequestConfigCallback(c -> {
|
||||
c.setConnectTimeout(Math.toIntExact(remoteInfo.getConnectTimeout().millis()));
|
||||
c.setSocketTimeout(Math.toIntExact(remoteInfo.getSocketTimeout().millis()));
|
||||
return c;
|
||||
})
|
||||
.setHttpClientConfigCallback(c -> {
|
||||
// Enable basic auth if it is configured
|
||||
if (remoteInfo.getUsername() != null) {
|
||||
UsernamePasswordCredentials creds = new UsernamePasswordCredentials(remoteInfo.getUsername(),
|
||||
remoteInfo.getPassword());
|
||||
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
|
||||
credentialsProvider.setCredentials(AuthScope.ANY, creds);
|
||||
c.setDefaultCredentialsProvider(credentialsProvider);
|
||||
}
|
||||
// Stick the task id in the thread name so we can track down tasks from stack traces
|
||||
AtomicInteger threads = new AtomicInteger();
|
||||
c.setThreadFactory(r -> {
|
||||
String name = "es-client-" + taskId + "-" + threads.getAndIncrement();
|
||||
Thread t = new Thread(r, name);
|
||||
threadCollector.add(t);
|
||||
return t;
|
||||
});
|
||||
// Limit ourselves to one reactor thread because for now the search process is single threaded.
|
||||
c.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(1).build());
|
||||
c.setSSLStrategy(sslConfig.getStrategy());
|
||||
return c;
|
||||
});
|
||||
if (Strings.hasLength(remoteInfo.getPathPrefix()) && "/".equals(remoteInfo.getPathPrefix()) == false) {
|
||||
builder.setPathPrefix(remoteInfo.getPathPrefix());
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Simple implementation of reindex using scrolling and bulk. There are tons
|
||||
* of optimizations that can be done on certain types of reindex requests
|
||||
* but this makes no attempt to do any of them so it can be as simple
|
||||
* possible.
|
||||
*/
|
||||
static class AsyncIndexBySearchAction extends AbstractAsyncBulkByScrollAction<ReindexRequest, TransportReindexAction> {
|
||||
/**
|
||||
* List of threads created by this process. Usually actions don't create threads in Elasticsearch. Instead they use the builtin
|
||||
* {@link ThreadPool}s. But reindex-from-remote uses Elasticsearch's {@link RestClient} which doesn't use the
|
||||
* {@linkplain ThreadPool}s because it uses httpasyncclient. It'd be a ton of trouble to work around creating those threads. So
|
||||
* instead we let it create threads but we watch them carefully and assert that they are dead when the process is over.
|
||||
*/
|
||||
private List<Thread> createdThreads = emptyList();
|
||||
|
||||
AsyncIndexBySearchAction(BulkByScrollTask task, Logger logger, ParentTaskAssigningClient client,
|
||||
ThreadPool threadPool, TransportReindexAction action, ReindexRequest request, ClusterState clusterState,
|
||||
ActionListener<BulkByScrollResponse> listener) {
|
||||
super(task,
|
||||
/*
|
||||
* We only need the source version if we're going to use it when write and we only do that when the destination request uses
|
||||
* external versioning.
|
||||
*/
|
||||
request.getDestination().versionType() != VersionType.INTERNAL,
|
||||
false, logger, client, threadPool, action, request, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ScrollableHitSource buildScrollableResultSource(BackoffPolicy backoffPolicy) {
|
||||
if (mainRequest.getRemoteInfo() != null) {
|
||||
RemoteInfo remoteInfo = mainRequest.getRemoteInfo();
|
||||
createdThreads = synchronizedList(new ArrayList<>());
|
||||
RestClient restClient = buildRestClient(remoteInfo, mainAction.sslConfig, task.getId(), createdThreads);
|
||||
return new RemoteScrollableHitSource(logger, backoffPolicy, threadPool, worker::countSearchRetry,
|
||||
this::onScrollResponse, this::finishHim,
|
||||
restClient, remoteInfo.getQuery(), mainRequest.getSearchRequest());
|
||||
}
|
||||
return super.buildScrollableResultSource(backoffPolicy);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void finishHim(Exception failure, List<Failure> indexingFailures, List<SearchFailure> searchFailures, boolean timedOut) {
|
||||
super.finishHim(failure, indexingFailures, searchFailures, timedOut);
|
||||
// A little extra paranoia so we log something if we leave any threads running
|
||||
for (Thread thread : createdThreads) {
|
||||
if (thread.isAlive()) {
|
||||
assert false: "Failed to properly stop client thread [" + thread.getName() + "]";
|
||||
logger.error("Failed to properly stop client thread [{}]", thread.getName());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public BiFunction<RequestWrapper<?>, ScrollableHitSource.Hit, RequestWrapper<?>> buildScriptApplier() {
|
||||
Script script = mainRequest.getScript();
|
||||
if (script != null) {
|
||||
return new ReindexScriptApplier(worker, mainAction.scriptService, script, script.getParams());
|
||||
}
|
||||
return super.buildScriptApplier();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RequestWrapper<IndexRequest> buildRequest(ScrollableHitSource.Hit doc) {
|
||||
IndexRequest index = new IndexRequest();
|
||||
|
||||
// Copy the index from the request so we always write where it asked to write
|
||||
index.index(mainRequest.getDestination().index());
|
||||
|
||||
// If the request override's type then the user wants all documents in that type. Otherwise keep the doc's type.
|
||||
if (mainRequest.getDestination().type() == null) {
|
||||
index.type(doc.getType());
|
||||
} else {
|
||||
index.type(mainRequest.getDestination().type());
|
||||
}
|
||||
|
||||
/*
|
||||
* Internal versioning can just use what we copied from the destination request. Otherwise we assume we're using external
|
||||
* versioning and use the doc's version.
|
||||
*/
|
||||
index.versionType(mainRequest.getDestination().versionType());
|
||||
if (index.versionType() == INTERNAL) {
|
||||
assert doc.getVersion() == -1 : "fetched version when we didn't have to";
|
||||
index.version(mainRequest.getDestination().version());
|
||||
} else {
|
||||
index.version(doc.getVersion());
|
||||
}
|
||||
|
||||
// id and source always come from the found doc. Scripts can change them but they operate on the index request.
|
||||
index.id(doc.getId());
|
||||
|
||||
// the source xcontent type and destination could be different
|
||||
final XContentType sourceXContentType = doc.getXContentType();
|
||||
final XContentType mainRequestXContentType = mainRequest.getDestination().getContentType();
|
||||
if (mainRequestXContentType != null && doc.getXContentType() != mainRequestXContentType) {
|
||||
// we need to convert
|
||||
try (InputStream stream = doc.getSource().streamInput();
|
||||
XContentParser parser = sourceXContentType.xContent()
|
||||
.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, stream);
|
||||
XContentBuilder builder = XContentBuilder.builder(mainRequestXContentType.xContent())) {
|
||||
parser.nextToken();
|
||||
builder.copyCurrentStructure(parser);
|
||||
index.source(BytesReference.bytes(builder), builder.contentType());
|
||||
} catch (IOException e) {
|
||||
throw new UncheckedIOException("failed to convert hit from " + sourceXContentType + " to "
|
||||
+ mainRequestXContentType, e);
|
||||
}
|
||||
} else {
|
||||
index.source(doc.getSource(), doc.getXContentType());
|
||||
}
|
||||
|
||||
/*
|
||||
* The rest of the index request just has to be copied from the template. It may be changed later from scripts or the superclass
|
||||
* here on out operates on the index request rather than the template.
|
||||
*/
|
||||
index.routing(mainRequest.getDestination().routing());
|
||||
index.setPipeline(mainRequest.getDestination().getPipeline());
|
||||
// OpType is synthesized from version so it is handled when we copy version above.
|
||||
|
||||
return wrap(index);
|
||||
}
|
||||
|
||||
/**
|
||||
* Override the simple copy behavior to allow more fine grained control.
|
||||
*/
|
||||
@Override
|
||||
protected void copyRouting(RequestWrapper<?> request, String routing) {
|
||||
String routingSpec = mainRequest.getDestination().routing();
|
||||
if (routingSpec == null) {
|
||||
super.copyRouting(request, routing);
|
||||
return;
|
||||
}
|
||||
if (routingSpec.startsWith("=")) {
|
||||
super.copyRouting(request, mainRequest.getDestination().routing().substring(1));
|
||||
return;
|
||||
}
|
||||
switch (routingSpec) {
|
||||
case "keep":
|
||||
super.copyRouting(request, routing);
|
||||
break;
|
||||
case "discard":
|
||||
super.copyRouting(request, null);
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("Unsupported routing command");
|
||||
}
|
||||
}
|
||||
|
||||
class ReindexScriptApplier extends ScriptApplier {
|
||||
|
||||
ReindexScriptApplier(WorkerBulkByScrollTaskState taskWorker, ScriptService scriptService, Script script,
|
||||
Map<String, Object> params) {
|
||||
super(taskWorker, scriptService, script, params);
|
||||
}
|
||||
|
||||
/*
|
||||
* Methods below here handle script updating the index request. They try
|
||||
* to be pretty liberal with regards to types because script are often
|
||||
* dynamically typed.
|
||||
*/
|
||||
|
||||
@Override
|
||||
protected void scriptChangedIndex(RequestWrapper<?> request, Object to) {
|
||||
requireNonNull(to, "Can't reindex without a destination index!");
|
||||
request.setIndex(to.toString());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void scriptChangedType(RequestWrapper<?> request, Object to) {
|
||||
requireNonNull(to, "Can't reindex without a destination type!");
|
||||
request.setType(to.toString());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void scriptChangedId(RequestWrapper<?> request, Object to) {
|
||||
request.setId(Objects.toString(to, null));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void scriptChangedVersion(RequestWrapper<?> request, Object to) {
|
||||
if (to == null) {
|
||||
request.setVersion(Versions.MATCH_ANY);
|
||||
request.setVersionType(INTERNAL);
|
||||
} else {
|
||||
request.setVersion(asLong(to, VersionFieldMapper.NAME));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void scriptChangedRouting(RequestWrapper<?> request, Object to) {
|
||||
request.setRouting(Objects.toString(to, null));
|
||||
}
|
||||
|
||||
private long asLong(Object from, String name) {
|
||||
/*
|
||||
* Stuffing a number into the map will have converted it to
|
||||
* some Number.
|
||||
* */
|
||||
Number fromNumber;
|
||||
try {
|
||||
fromNumber = (Number) from;
|
||||
} catch (ClassCastException e) {
|
||||
throw new IllegalArgumentException(name + " may only be set to an int or a long but was [" + from + "]", e);
|
||||
}
|
||||
long l = fromNumber.longValue();
|
||||
// Check that we didn't round when we fetched the value.
|
||||
if (fromNumber.doubleValue() != l) {
|
||||
throw new IllegalArgumentException(name + " may only be set to an int or a long but was [" + from + "]");
|
||||
}
|
||||
return l;
|
||||
}
|
||||
}
|
||||
reindexer.execute(bulkByScrollTask, request, listener);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -71,7 +71,7 @@ public class TransportUpdateByQueryAction extends HandledTransportAction<UpdateB
|
|||
ClusterState state = clusterService.state();
|
||||
ParentTaskAssigningClient assigningClient = new ParentTaskAssigningClient(client, clusterService.localNode(),
|
||||
bulkByScrollTask);
|
||||
new AsyncIndexBySearchAction(bulkByScrollTask, logger, assigningClient, threadPool, this, request, state,
|
||||
new AsyncIndexBySearchAction(bulkByScrollTask, logger, assigningClient, threadPool, scriptService, request, state,
|
||||
listener).start();
|
||||
}
|
||||
);
|
||||
|
@ -85,14 +85,14 @@ public class TransportUpdateByQueryAction extends HandledTransportAction<UpdateB
|
|||
private final boolean useSeqNoForCAS;
|
||||
|
||||
AsyncIndexBySearchAction(BulkByScrollTask task, Logger logger, ParentTaskAssigningClient client,
|
||||
ThreadPool threadPool, TransportUpdateByQueryAction action, UpdateByQueryRequest request, ClusterState clusterState,
|
||||
ActionListener<BulkByScrollResponse> listener) {
|
||||
ThreadPool threadPool, ScriptService scriptService, UpdateByQueryRequest request,
|
||||
ClusterState clusterState, ActionListener<BulkByScrollResponse> listener) {
|
||||
super(task,
|
||||
// not all nodes support sequence number powered optimistic concurrency control, we fall back to version
|
||||
clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0) == false,
|
||||
// all nodes support sequence number powered optimistic concurrency control and we can use it
|
||||
clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0),
|
||||
logger, client, threadPool, action, request, listener);
|
||||
logger, client, threadPool, request, listener, scriptService, null);
|
||||
useSeqNoForCAS = clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0);
|
||||
}
|
||||
|
||||
|
@ -100,7 +100,7 @@ public class TransportUpdateByQueryAction extends HandledTransportAction<UpdateB
|
|||
public BiFunction<RequestWrapper<?>, ScrollableHitSource.Hit, RequestWrapper<?>> buildScriptApplier() {
|
||||
Script script = mainRequest.getScript();
|
||||
if (script != null) {
|
||||
return new UpdateByQueryScriptApplier(worker, mainAction.scriptService, script, script.getParams());
|
||||
return new UpdateByQueryScriptApplier(worker, scriptService, script, script.getParams());
|
||||
}
|
||||
return super.buildScriptApplier();
|
||||
}
|
||||
|
|
|
@ -733,7 +733,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
|
|||
extends AbstractAsyncBulkByScrollAction<DummyAbstractBulkByScrollRequest, DummyTransportAsyncBulkByScrollAction> {
|
||||
DummyAsyncBulkByScrollAction() {
|
||||
super(testTask, randomBoolean(), randomBoolean(), AsyncBulkByScrollActionTests.this.logger,
|
||||
new ParentTaskAssigningClient(client, localNode, testTask), client.threadPool(), null, testRequest, listener);
|
||||
new ParentTaskAssigningClient(client, localNode, testTask), client.threadPool(), testRequest, listener, null, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -49,7 +49,7 @@ public class ReindexFromRemoteBuildRestClientTests extends RestClientBuilderTest
|
|||
RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
|
||||
long taskId = randomLong();
|
||||
List<Thread> threads = synchronizedList(new ArrayList<>());
|
||||
RestClient client = TransportReindexAction.buildRestClient(remoteInfo, sslConfig(), taskId, threads);
|
||||
RestClient client = Reindexer.buildRestClient(remoteInfo, sslConfig(), taskId, threads);
|
||||
try {
|
||||
assertBusy(() -> assertThat(threads, hasSize(2)));
|
||||
int i = 0;
|
||||
|
@ -73,7 +73,7 @@ public class ReindexFromRemoteBuildRestClientTests extends RestClientBuilderTest
|
|||
headers, RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
|
||||
long taskId = randomLong();
|
||||
List<Thread> threads = synchronizedList(new ArrayList<>());
|
||||
RestClient client = TransportReindexAction.buildRestClient(remoteInfo, sslConfig(), taskId, threads);
|
||||
RestClient client = Reindexer.buildRestClient(remoteInfo, sslConfig(), taskId, threads);
|
||||
try {
|
||||
assertHeaders(client, headers);
|
||||
} finally {
|
||||
|
|
|
@ -31,8 +31,8 @@ import java.util.List;
|
|||
import static java.util.Collections.emptyList;
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static java.util.Collections.singletonList;
|
||||
import static org.elasticsearch.index.reindex.TransportReindexAction.buildRemoteWhitelist;
|
||||
import static org.elasticsearch.index.reindex.TransportReindexAction.checkRemoteWhitelist;
|
||||
import static org.elasticsearch.index.reindex.ReindexValidator.buildRemoteWhitelist;
|
||||
import static org.elasticsearch.index.reindex.ReindexValidator.checkRemoteWhitelist;
|
||||
|
||||
/**
|
||||
* Tests the reindex-from-remote whitelist of remotes.
|
||||
|
|
|
@ -74,10 +74,10 @@ public class ReindexMetadataTests extends AbstractAsyncBulkByScrollActionMetadat
|
|||
return new ReindexRequest();
|
||||
}
|
||||
|
||||
private class TestAction extends TransportReindexAction.AsyncIndexBySearchAction {
|
||||
private class TestAction extends Reindexer.AsyncIndexBySearchAction {
|
||||
TestAction() {
|
||||
super(ReindexMetadataTests.this.task, ReindexMetadataTests.this.logger, null, ReindexMetadataTests.this.threadPool,
|
||||
null, request(), null, listener());
|
||||
null, null, request(), listener());
|
||||
}
|
||||
|
||||
public ReindexRequest mainRequest() {
|
||||
|
|
|
@ -124,7 +124,7 @@ public class ReindexRestClientSslTests extends ESTestCase {
|
|||
.build();
|
||||
final Environment environment = TestEnvironment.newEnvironment(settings);
|
||||
final ReindexSslConfig ssl = new ReindexSslConfig(settings, environment, mock(ResourceWatcherService.class));
|
||||
try (RestClient client = TransportReindexAction.buildRestClient(getRemoteInfo(), ssl, 1L, threads)) {
|
||||
try (RestClient client = Reindexer.buildRestClient(getRemoteInfo(), ssl, 1L, threads)) {
|
||||
expectThrows(SSLHandshakeException.class, () -> client.performRequest(new Request("GET", "/")));
|
||||
}
|
||||
}
|
||||
|
@ -139,7 +139,7 @@ public class ReindexRestClientSslTests extends ESTestCase {
|
|||
.build();
|
||||
final Environment environment = TestEnvironment.newEnvironment(settings);
|
||||
final ReindexSslConfig ssl = new ReindexSslConfig(settings, environment, mock(ResourceWatcherService.class));
|
||||
try (RestClient client = TransportReindexAction.buildRestClient(getRemoteInfo(), ssl, 1L, threads)) {
|
||||
try (RestClient client = Reindexer.buildRestClient(getRemoteInfo(), ssl, 1L, threads)) {
|
||||
final Response response = client.performRequest(new Request("GET", "/"));
|
||||
assertThat(response.getStatusLine().getStatusCode(), Matchers.is(200));
|
||||
}
|
||||
|
@ -155,7 +155,7 @@ public class ReindexRestClientSslTests extends ESTestCase {
|
|||
.build();
|
||||
final Environment environment = TestEnvironment.newEnvironment(settings);
|
||||
final ReindexSslConfig ssl = new ReindexSslConfig(settings, environment, mock(ResourceWatcherService.class));
|
||||
try (RestClient client = TransportReindexAction.buildRestClient(getRemoteInfo(), ssl, 1L, threads)) {
|
||||
try (RestClient client = Reindexer.buildRestClient(getRemoteInfo(), ssl, 1L, threads)) {
|
||||
final Response response = client.performRequest(new Request("GET", "/"));
|
||||
assertThat(response.getStatusLine().getStatusCode(), Matchers.is(200));
|
||||
}
|
||||
|
@ -185,7 +185,7 @@ public class ReindexRestClientSslTests extends ESTestCase {
|
|||
};
|
||||
final Environment environment = TestEnvironment.newEnvironment(settings);
|
||||
final ReindexSslConfig ssl = new ReindexSslConfig(settings, environment, mock(ResourceWatcherService.class));
|
||||
try (RestClient client = TransportReindexAction.buildRestClient(getRemoteInfo(), ssl, 1L, threads)) {
|
||||
try (RestClient client = Reindexer.buildRestClient(getRemoteInfo(), ssl, 1L, threads)) {
|
||||
final Response response = client.performRequest(new Request("GET", "/"));
|
||||
assertThat(response.getStatusLine().getStatusCode(), Matchers.is(200));
|
||||
final Certificate[] certs = clientCertificates.get();
|
||||
|
|
|
@ -20,14 +20,10 @@
|
|||
package org.elasticsearch.index.reindex;
|
||||
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.common.lucene.uid.Versions;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.script.ScriptService;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
|
@ -107,12 +103,8 @@ public class ReindexScriptTests extends AbstractAsyncBulkByScrollActionScriptTes
|
|||
}
|
||||
|
||||
@Override
|
||||
protected TransportReindexAction.AsyncIndexBySearchAction action(ScriptService scriptService, ReindexRequest request) {
|
||||
TransportService transportService = Mockito.mock(TransportService.class);
|
||||
protected Reindexer.AsyncIndexBySearchAction action(ScriptService scriptService, ReindexRequest request) {
|
||||
ReindexSslConfig sslConfig = Mockito.mock(ReindexSslConfig.class);
|
||||
TransportReindexAction transportAction = new TransportReindexAction(Settings.EMPTY, threadPool,
|
||||
new ActionFilters(Collections.emptySet()), null, null, scriptService, null, null, transportService, sslConfig);
|
||||
return new TransportReindexAction.AsyncIndexBySearchAction(task, logger, null, threadPool, transportAction, request,
|
||||
null, listener());
|
||||
return new Reindexer.AsyncIndexBySearchAction(task, logger, null, threadPool, scriptService, sslConfig, request, listener());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -126,7 +126,7 @@ public class ReindexSourceTargetValidationTests extends ESTestCase {
|
|||
}
|
||||
|
||||
private void succeeds(RemoteInfo remoteInfo, String target, String... sources) {
|
||||
TransportReindexAction.validateAgainstAliases(new SearchRequest(sources), new IndexRequest(target), remoteInfo,
|
||||
ReindexValidator.validateAgainstAliases(new SearchRequest(sources), new IndexRequest(target), remoteInfo,
|
||||
INDEX_NAME_EXPRESSION_RESOLVER, AUTO_CREATE_INDEX, STATE);
|
||||
}
|
||||
|
||||
|
|
|
@ -61,7 +61,7 @@ public class UpdateByQueryWithScriptTests
|
|||
TransportService transportService = mock(TransportService.class);
|
||||
TransportUpdateByQueryAction transportAction = new TransportUpdateByQueryAction(threadPool,
|
||||
new ActionFilters(Collections.emptySet()), null, transportService, scriptService, null);
|
||||
return new TransportUpdateByQueryAction.AsyncIndexBySearchAction(task, logger, null, threadPool, transportAction, request,
|
||||
return new TransportUpdateByQueryAction.AsyncIndexBySearchAction(task, logger, null, threadPool, scriptService, request,
|
||||
ClusterState.EMPTY_STATE, listener());
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue