mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-23 05:15:04 +00:00
Use fewer threads when reindexing-from-remote
Reindex from remote uses the Elasticsearch client which uses apache httpasyncclient which spins up 5 thread by default, 1 as a dispatcher and 4 more to handle IO. This changes Reindex's usage so it only spins up two thread - 1 dispatcher and one to handle io. It also renames the threads to "es-client-$taskid-$thread_number". That way if we see any thread sticking around we can trace it back to the task.
This commit is contained in:
parent
a91bb29585
commit
c9790a1257
@ -25,10 +25,12 @@ import org.apache.http.auth.AuthScope;
|
|||||||
import org.apache.http.auth.UsernamePasswordCredentials;
|
import org.apache.http.auth.UsernamePasswordCredentials;
|
||||||
import org.apache.http.client.CredentialsProvider;
|
import org.apache.http.client.CredentialsProvider;
|
||||||
import org.apache.http.impl.client.BasicCredentialsProvider;
|
import org.apache.http.impl.client.BasicCredentialsProvider;
|
||||||
|
import org.apache.http.impl.nio.reactor.IOReactorConfig;
|
||||||
import org.apache.http.message.BasicHeader;
|
import org.apache.http.message.BasicHeader;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.action.ActionRequestValidationException;
|
import org.elasticsearch.action.ActionRequestValidationException;
|
||||||
import org.elasticsearch.action.bulk.BackoffPolicy;
|
import org.elasticsearch.action.bulk.BackoffPolicy;
|
||||||
|
import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
|
||||||
import org.elasticsearch.action.index.IndexRequest;
|
import org.elasticsearch.action.index.IndexRequest;
|
||||||
import org.elasticsearch.action.search.SearchRequest;
|
import org.elasticsearch.action.search.SearchRequest;
|
||||||
import org.elasticsearch.action.support.ActionFilters;
|
import org.elasticsearch.action.support.ActionFilters;
|
||||||
@ -37,7 +39,6 @@ import org.elasticsearch.action.support.HandledTransportAction;
|
|||||||
import org.elasticsearch.client.Client;
|
import org.elasticsearch.client.Client;
|
||||||
import org.elasticsearch.client.ParentTaskAssigningClient;
|
import org.elasticsearch.client.ParentTaskAssigningClient;
|
||||||
import org.elasticsearch.client.RestClient;
|
import org.elasticsearch.client.RestClient;
|
||||||
import org.elasticsearch.client.RestClientBuilder;
|
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||||
import org.elasticsearch.cluster.service.ClusterService;
|
import org.elasticsearch.cluster.service.ClusterService;
|
||||||
@ -53,6 +54,7 @@ import org.elasticsearch.http.HttpInfo;
|
|||||||
import org.elasticsearch.http.HttpServer;
|
import org.elasticsearch.http.HttpServer;
|
||||||
import org.elasticsearch.index.mapper.internal.TTLFieldMapper;
|
import org.elasticsearch.index.mapper.internal.TTLFieldMapper;
|
||||||
import org.elasticsearch.index.mapper.internal.VersionFieldMapper;
|
import org.elasticsearch.index.mapper.internal.VersionFieldMapper;
|
||||||
|
import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure;
|
||||||
import org.elasticsearch.index.reindex.remote.RemoteInfo;
|
import org.elasticsearch.index.reindex.remote.RemoteInfo;
|
||||||
import org.elasticsearch.index.reindex.remote.RemoteScrollableHitSource;
|
import org.elasticsearch.index.reindex.remote.RemoteScrollableHitSource;
|
||||||
import org.elasticsearch.script.Script;
|
import org.elasticsearch.script.Script;
|
||||||
@ -61,15 +63,18 @@ import org.elasticsearch.tasks.Task;
|
|||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.elasticsearch.transport.TransportService;
|
import org.elasticsearch.transport.TransportService;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.function.BiFunction;
|
import java.util.function.BiFunction;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
|
||||||
import static java.util.Collections.emptyList;
|
import static java.util.Collections.emptyList;
|
||||||
|
import static java.util.Collections.synchronizedList;
|
||||||
import static java.util.Objects.requireNonNull;
|
import static java.util.Objects.requireNonNull;
|
||||||
import static org.elasticsearch.index.VersionType.INTERNAL;
|
import static org.elasticsearch.index.VersionType.INTERNAL;
|
||||||
|
|
||||||
@ -168,6 +173,43 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Build the {@link RestClient} used for reindexing from remote clusters.
|
||||||
|
* @param remoteInfo connection information for the remote cluster
|
||||||
|
* @param taskId the id of the current task. This is added to the thread name for easier tracking
|
||||||
|
* @param threadCollector a list in which we collect all the threads created by the client
|
||||||
|
*/
|
||||||
|
static RestClient buildRestClient(RemoteInfo remoteInfo, long taskId, List<Thread> threadCollector) {
|
||||||
|
Header[] clientHeaders = new Header[remoteInfo.getHeaders().size()];
|
||||||
|
int i = 0;
|
||||||
|
for (Map.Entry<String, String> header : remoteInfo.getHeaders().entrySet()) {
|
||||||
|
clientHeaders[i] = new BasicHeader(header.getKey(), header.getValue());
|
||||||
|
}
|
||||||
|
return RestClient.builder(new HttpHost(remoteInfo.getHost(), remoteInfo.getPort(), remoteInfo.getScheme()))
|
||||||
|
.setDefaultHeaders(clientHeaders)
|
||||||
|
.setHttpClientConfigCallback(c -> {
|
||||||
|
// Enable basic auth if it is configured
|
||||||
|
if (remoteInfo.getUsername() != null) {
|
||||||
|
UsernamePasswordCredentials creds = new UsernamePasswordCredentials(remoteInfo.getUsername(),
|
||||||
|
remoteInfo.getPassword());
|
||||||
|
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
|
||||||
|
credentialsProvider.setCredentials(AuthScope.ANY, creds);
|
||||||
|
c.setDefaultCredentialsProvider(credentialsProvider);
|
||||||
|
}
|
||||||
|
// Stick the task id in the thread name so we can track down tasks from stack traces
|
||||||
|
AtomicInteger threads = new AtomicInteger();
|
||||||
|
c.setThreadFactory(r -> {
|
||||||
|
String name = "es-client-" + taskId + "-" + threads.getAndIncrement();
|
||||||
|
Thread t = new Thread(r, name);
|
||||||
|
threadCollector.add(t);
|
||||||
|
return t;
|
||||||
|
});
|
||||||
|
// Limit ourselves to one reactor thread because for now the search process is single threaded.
|
||||||
|
c.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(1).build());
|
||||||
|
return c;
|
||||||
|
}).build();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Simple implementation of reindex using scrolling and bulk. There are tons
|
* Simple implementation of reindex using scrolling and bulk. There are tons
|
||||||
* of optimizations that can be done on certain types of reindex requests
|
* of optimizations that can be done on certain types of reindex requests
|
||||||
@ -175,6 +217,13 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
|
|||||||
* possible.
|
* possible.
|
||||||
*/
|
*/
|
||||||
static class AsyncIndexBySearchAction extends AbstractAsyncBulkIndexByScrollAction<ReindexRequest> {
|
static class AsyncIndexBySearchAction extends AbstractAsyncBulkIndexByScrollAction<ReindexRequest> {
|
||||||
|
/**
|
||||||
|
* List of threads created by this process. Usually actions don't create threads in Elasticsearch. Instead they use the builtin
|
||||||
|
* {@link ThreadPool}s. But reindex-from-remote uses Elasticsearch's {@link RestClient} which doesn't use the
|
||||||
|
* {@linkplain ThreadPool}s because it uses httpasyncclient. It'd be a ton of trouble to work around creating those threads. So
|
||||||
|
* instead we let it create threads but we watch them carefully and assert that they are dead when the process is over.
|
||||||
|
*/
|
||||||
|
private List<Thread> createdThreads = emptyList();
|
||||||
|
|
||||||
public AsyncIndexBySearchAction(BulkByScrollTask task, ESLogger logger, ParentTaskAssigningClient client, ThreadPool threadPool,
|
public AsyncIndexBySearchAction(BulkByScrollTask task, ESLogger logger, ParentTaskAssigningClient client, ThreadPool threadPool,
|
||||||
ReindexRequest request, ActionListener<BulkIndexByScrollResponse> listener,
|
ReindexRequest request, ActionListener<BulkIndexByScrollResponse> listener,
|
||||||
@ -186,30 +235,26 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
|
|||||||
protected ScrollableHitSource buildScrollableResultSource(BackoffPolicy backoffPolicy) {
|
protected ScrollableHitSource buildScrollableResultSource(BackoffPolicy backoffPolicy) {
|
||||||
if (mainRequest.getRemoteInfo() != null) {
|
if (mainRequest.getRemoteInfo() != null) {
|
||||||
RemoteInfo remoteInfo = mainRequest.getRemoteInfo();
|
RemoteInfo remoteInfo = mainRequest.getRemoteInfo();
|
||||||
Header[] clientHeaders = new Header[remoteInfo.getHeaders().size()];
|
createdThreads = synchronizedList(new ArrayList<>());
|
||||||
int i = 0;
|
RestClient restClient = buildRestClient(remoteInfo, task.getId(), createdThreads);
|
||||||
for (Map.Entry<String, String> header : remoteInfo.getHeaders().entrySet()) {
|
return new RemoteScrollableHitSource(logger, backoffPolicy, threadPool, task::countSearchRetry, this::finishHim, restClient,
|
||||||
clientHeaders[i] = new BasicHeader(header.getKey(), header.getValue());
|
remoteInfo.getQuery(), mainRequest.getSearchRequest());
|
||||||
}
|
|
||||||
RestClientBuilder restClient = RestClient
|
|
||||||
.builder(new HttpHost(remoteInfo.getHost(), remoteInfo.getPort(), remoteInfo.getScheme()))
|
|
||||||
.setDefaultHeaders(clientHeaders);
|
|
||||||
if (remoteInfo.getUsername() != null) {
|
|
||||||
restClient.setHttpClientConfigCallback(c -> {
|
|
||||||
UsernamePasswordCredentials creds = new UsernamePasswordCredentials(remoteInfo.getUsername(),
|
|
||||||
remoteInfo.getPassword());
|
|
||||||
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
|
|
||||||
credentialsProvider.setCredentials(AuthScope.ANY, creds);
|
|
||||||
c.setDefaultCredentialsProvider(credentialsProvider);
|
|
||||||
return c;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
return new RemoteScrollableHitSource(logger, backoffPolicy, threadPool, task::countSearchRetry, this::finishHim,
|
|
||||||
restClient.build(), remoteInfo.getQuery(), mainRequest.getSearchRequest());
|
|
||||||
}
|
}
|
||||||
return super.buildScrollableResultSource(backoffPolicy);
|
return super.buildScrollableResultSource(backoffPolicy);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
void finishHim(Exception failure, List<Failure> indexingFailures, List<SearchFailure> searchFailures, boolean timedOut) {
|
||||||
|
super.finishHim(failure, indexingFailures, searchFailures, timedOut);
|
||||||
|
// A little extra paranoia so we log something if we leave any threads running
|
||||||
|
for (Thread thread : createdThreads) {
|
||||||
|
if (thread.isAlive()) {
|
||||||
|
assert false: "Failed to properly stop client thread [" + thread.getName() + "]";
|
||||||
|
logger.error("Failed to properly stop client thread [{}]", thread.getName());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected BiFunction<RequestWrapper<?>, ScrollableHitSource.Hit, RequestWrapper<?>> buildScriptApplier() {
|
protected BiFunction<RequestWrapper<?>, ScrollableHitSource.Hit, RequestWrapper<?>> buildScriptApplier() {
|
||||||
Script script = mainRequest.getScript();
|
Script script = mainRequest.getScript();
|
||||||
|
@ -0,0 +1,51 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to Elasticsearch under one or more contributor
|
||||||
|
* license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright
|
||||||
|
* ownership. Elasticsearch licenses this file to you under
|
||||||
|
* the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
* not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.index.reindex;
|
||||||
|
|
||||||
|
import org.elasticsearch.client.RestClient;
|
||||||
|
import org.elasticsearch.common.bytes.BytesArray;
|
||||||
|
import org.elasticsearch.index.reindex.remote.RemoteInfo;
|
||||||
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import static java.util.Collections.emptyMap;
|
||||||
|
import static java.util.Collections.synchronizedList;
|
||||||
|
import static org.hamcrest.Matchers.hasSize;
|
||||||
|
|
||||||
|
public class ReindexFromRemoteBuildRestClientTests extends ESTestCase {
|
||||||
|
public void testBuildRestClient() throws Exception {
|
||||||
|
RemoteInfo remoteInfo = new RemoteInfo("https", "localhost", 9200, new BytesArray("ignored"), null, null, emptyMap());
|
||||||
|
long taskId = randomLong();
|
||||||
|
List<Thread> threads = synchronizedList(new ArrayList<>());
|
||||||
|
RestClient client = TransportReindexAction.buildRestClient(remoteInfo, taskId, threads);
|
||||||
|
try {
|
||||||
|
assertBusy(() -> assertThat(threads, hasSize(2)));
|
||||||
|
int i = 0;
|
||||||
|
for (Thread thread : threads) {
|
||||||
|
assertEquals("es-client-" + taskId + "-" + i, thread.getName());
|
||||||
|
i++;
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
client.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -31,7 +31,6 @@ import java.net.UnknownHostException;
|
|||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import static java.util.Collections.emptyList;
|
|
||||||
import static java.util.Collections.emptyMap;
|
import static java.util.Collections.emptyMap;
|
||||||
import static java.util.Collections.emptySet;
|
import static java.util.Collections.emptySet;
|
||||||
import static org.elasticsearch.index.reindex.TransportReindexAction.checkRemoteWhitelist;
|
import static org.elasticsearch.index.reindex.TransportReindexAction.checkRemoteWhitelist;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user