Reindex from remote: remove async client in favour of using RestClient performRequest async method

This commit is contained in:
javanna 2016-07-12 20:41:33 +02:00 committed by Luca Cavanna
parent 283090e2ae
commit 54fa997545
3 changed files with 89 additions and 123 deletions

View File

@ -48,7 +48,6 @@ import org.elasticsearch.index.mapper.internal.TTLFieldMapper;
import org.elasticsearch.index.mapper.internal.VersionFieldMapper;
import org.elasticsearch.index.reindex.remote.RemoteInfo;
import org.elasticsearch.index.reindex.remote.RemoteScrollableHitSource;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.tasks.Task;
@ -185,12 +184,10 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
// NORELEASE support auth
throw new UnsupportedOperationException("Auth is unsupported");
}
RestClient restClient = RestClient.builder(new HttpHost(remoteInfo.getHost(), remoteInfo.getPort(), remoteInfo.getScheme()))
.build();
RemoteScrollableHitSource.AsyncClient client = new RemoteScrollableHitSource.AsynchronizingRestClient(threadPool,
restClient);
return new RemoteScrollableHitSource(logger, backoffPolicy, threadPool, task::countSearchRetry, this::finishHim, client,
remoteInfo.getQuery(), mainRequest.getSearchRequest());
RestClient restClient = RestClient.builder(
new HttpHost(remoteInfo.getHost(), remoteInfo.getPort(), remoteInfo.getScheme())).build();
return new RemoteScrollableHitSource(logger, backoffPolicy, threadPool, task::countSearchRetry,
this::finishHim, restClient, remoteInfo.getQuery(), mainRequest.getSearchRequest());
}
return super.buildScrollableResultSource(backoffPolicy);
}

View File

@ -22,10 +22,10 @@ package org.elasticsearch.index.reindex.remote;
import org.apache.http.HttpEntity;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.ResponseListener;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.ParseFieldMatcherSupplier;
@ -41,8 +41,6 @@ import org.elasticsearch.index.reindex.ScrollableHitSource;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.BufferedInputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
@ -63,13 +61,13 @@ import static org.elasticsearch.index.reindex.remote.RemoteResponseParsers.MAIN_
import static org.elasticsearch.index.reindex.remote.RemoteResponseParsers.RESPONSE_PARSER;
public class RemoteScrollableHitSource extends ScrollableHitSource {
private final AsyncClient client;
private final RestClient client;
private final BytesReference query;
private final SearchRequest searchRequest;
Version remoteVersion;
public RemoteScrollableHitSource(ESLogger logger, BackoffPolicy backoffPolicy, ThreadPool threadPool, Runnable countSearchRetry,
Consumer<Exception> fail, AsyncClient client, BytesReference query, SearchRequest searchRequest) {
Consumer<Exception> fail, RestClient client, BytesReference query, SearchRequest searchRequest) {
super(logger, backoffPolicy, threadPool, countSearchRetry, fail);
this.query = query;
this.searchRequest = searchRequest;
@ -99,7 +97,7 @@ public class RemoteScrollableHitSource extends ScrollableHitSource {
}
void onStartResponse(Consumer<? super Response> onResponse, Response response) {
private void onStartResponse(Consumer<? super Response> onResponse, Response response) {
if (Strings.hasLength(response.getScrollId()) && response.getHits().isEmpty()) {
logger.debug("First response looks like a scan response. Jumping right to the second. scroll=[{}]", response.getScrollId());
doStartNextScroll(response.getScrollId(), timeValueMillis(0), onResponse);
@ -119,15 +117,10 @@ public class RemoteScrollableHitSource extends ScrollableHitSource {
// Need to throw out response....
client.performRequest("DELETE", scrollPath(), emptyMap(), scrollEntity(scrollId), new ResponseListener() {
@Override
public void onResponse(InputStream response) {
public void onSuccess(org.elasticsearch.client.Response response) {
logger.debug("Successfully cleared [{}]", scrollId);
}
@Override
public void onRetryableFailure(Exception t) {
onFailure(t);
}
@Override
public void onFailure(Exception t) {
logger.warn("Failed to clear scroll [{}]", t, scrollId);
@ -135,7 +128,7 @@ public class RemoteScrollableHitSource extends ScrollableHitSource {
});
}
<T> void execute(String method, String uri, Map<String, String> params, HttpEntity entity,
private <T> void execute(String method, String uri, Map<String, String> params, HttpEntity entity,
BiFunction<XContentParser, ParseFieldMatcherSupplier, T> parser, Consumer<? super T> listener) {
class RetryHelper extends AbstractRunnable {
private final Iterator<TimeValue> retries = backoffPolicy.iterator();
@ -144,34 +137,35 @@ public class RemoteScrollableHitSource extends ScrollableHitSource {
protected void doRun() throws Exception {
client.performRequest(method, uri, params, entity, new ResponseListener() {
@Override
public void onResponse(InputStream content) {
T response;
public void onSuccess(org.elasticsearch.client.Response response) {
T parsedResponse;
try {
InputStream content = response.getEntity().getContent();
XContent xContent = XContentFactory.xContentType(content).xContent();
try(XContentParser xContentParser = xContent.createParser(content)) {
response = parser.apply(xContentParser, () -> ParseFieldMatcher.STRICT);
parsedResponse = parser.apply(xContentParser, () -> ParseFieldMatcher.STRICT);
}
} catch (IOException e) {
throw new ElasticsearchException("Error deserializing response", e);
}
listener.accept(response);
listener.accept(parsedResponse);
}
@Override
public void onFailure(Exception e) {
fail.accept(e);
}
@Override
public void onRetryableFailure(Exception t) {
if (retries.hasNext()) {
TimeValue delay = retries.next();
logger.trace("retrying rejected search after [{}]", t, delay);
countSearchRetry.run();
threadPool.schedule(delay, ThreadPool.Names.SAME, RetryHelper.this);
} else {
fail.accept(t);
if (e instanceof ResponseException) {
ResponseException re = (ResponseException) e;
if (RestStatus.TOO_MANY_REQUESTS.getStatus() == re.getResponse().getStatusLine().getStatusCode()) {
if (retries.hasNext()) {
TimeValue delay = retries.next();
logger.trace("retrying rejected search after [{}]", e, delay);
countSearchRetry.run();
threadPool.schedule(delay, ThreadPool.Names.SAME, RetryHelper.this);
return;
}
}
}
fail.accept(e);
}
});
}
@ -183,59 +177,4 @@ public class RemoteScrollableHitSource extends ScrollableHitSource {
}
new RetryHelper().run();
}
public interface AsyncClient extends Closeable {
void performRequest(String method, String uri, Map<String, String> params, HttpEntity entity, ResponseListener listener);
}
public interface ResponseListener extends ActionListener<InputStream> {
void onRetryableFailure(Exception t);
}
public static class AsynchronizingRestClient implements AsyncClient {
private final ThreadPool threadPool;
private final RestClient restClient;
public AsynchronizingRestClient(ThreadPool threadPool, RestClient restClient) {
this.threadPool = threadPool;
this.restClient = restClient;
}
@Override
public void performRequest(String method, String uri, Map<String, String> params, HttpEntity entity,
ResponseListener listener) {
/*
* We use the generic thread pool here because this client is blocking the generic thread pool is sized appropriately for some
* of the threads on it to be blocked, waiting on IO. It'd be a disaster if this ran on the listener thread pool, eating
* valuable threads needed to handle responses. Most other thread pool would probably not mind running this either, but the
* generic thread pool is the "most right" place for it to run. We could make our own thread pool for this but the generic
* thread pool already has plenty of capacity.
*/
threadPool.generic().execute(new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
org.elasticsearch.client.Response response = restClient.performRequest(method, uri, params, entity);
InputStream markSupportedInputStream = new BufferedInputStream(response.getEntity().getContent());
listener.onResponse(markSupportedInputStream);
}
@Override
public void onFailure(Exception t) {
if (t instanceof ResponseException) {
ResponseException re = (ResponseException) t;
if (RestStatus.TOO_MANY_REQUESTS.getStatus() == re.getResponse().getStatusLine().getStatusCode()) {
listener.onRetryableFailure(t);
return;
}
}
listener.onFailure(t);
}
});
}
@Override
public void close() throws IOException {
restClient.close();
}
}
}

View File

@ -19,31 +19,44 @@
package org.elasticsearch.index.reindex.remote;
import org.apache.http.HttpEntity;
import org.apache.http.HttpEntityEnclosingRequest;
import org.apache.http.HttpHost;
import org.apache.http.HttpResponse;
import org.apache.http.ProtocolVersion;
import org.apache.http.StatusLine;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.entity.InputStreamEntity;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.message.BasicHttpResponse;
import org.apache.http.message.BasicStatusLine;
import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
import org.elasticsearch.Version;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.index.reindex.ScrollableHitSource.Response;
import org.elasticsearch.index.reindex.remote.RemoteScrollableHitSource.ResponseListener;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.Before;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
@ -53,6 +66,9 @@ import static org.elasticsearch.common.unit.TimeValue.timeValueMinutes;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class RemoteScrollableHitSourceTests extends ESTestCase {
private final String FAKE_SCROLL_ID = "DnF1ZXJ5VGhlbkZldGNoBQAAAfakescroll";
@ -68,7 +84,7 @@ public class RemoteScrollableHitSourceTests extends ESTestCase {
threadPool = new TestThreadPool(getTestName()) {
@Override
public Executor executor(String name) {
return r -> r.run();
return Runnable::run;
}
@Override
@ -307,6 +323,7 @@ public class RemoteScrollableHitSourceTests extends ESTestCase {
* Creates a hit source that doesn't make the remote request and instead returns data from some files. Also requests are always returned
* synchronously rather than asynchronously.
*/
@SuppressWarnings("unchecked")
private RemoteScrollableHitSource sourceWithMockedRemoteCall(boolean mockRemoteVersion, String... paths) throws Exception {
URL[] resources = new URL[paths.length];
for (int i = 0; i < paths.length; i++) {
@ -315,35 +332,48 @@ public class RemoteScrollableHitSourceTests extends ESTestCase {
throw new IllegalArgumentException("Couldn't find [" + paths[i] + "]");
}
}
RemoteScrollableHitSource.AsyncClient client = new RemoteScrollableHitSource.AsyncClient() {
CloseableHttpAsyncClient httpClient = mock(CloseableHttpAsyncClient.class);
when(httpClient.<HttpResponse>execute(any(HttpAsyncRequestProducer.class), any(HttpAsyncResponseConsumer.class),
any(FutureCallback.class))).thenAnswer(new Answer<Future<HttpResponse>>() {
int responseCount = 0;
@Override
public void performRequest(String method, String uri, Map<String, String> params, HttpEntity entity,
ResponseListener listener) {
try {
URL resource = resources[responseCount];
String path = paths[responseCount++];
InputStream stream = resource.openStream();
if (path.startsWith("fail:")) {
String body = Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8));
if (path.equals("fail:rejection.json")) {
listener.onRetryableFailure(new RuntimeException(body));
} else {
listener.onFailure(new RuntimeException(body));
}
} else {
listener.onResponse(stream);
}
} catch (IOException e) {
listener.onFailure(e);
}
}
@Override
public void close() throws IOException {
public Future<HttpResponse> answer(InvocationOnMock invocationOnMock) throws Throwable {
HttpAsyncRequestProducer requestProducer = (HttpAsyncRequestProducer) invocationOnMock.getArguments()[0];
@SuppressWarnings("unchecked")
FutureCallback<HttpResponse> futureCallback = (FutureCallback<HttpResponse>) invocationOnMock.getArguments()[2];
HttpEntityEnclosingRequest request = (HttpEntityEnclosingRequest)requestProducer.generateRequest();
URL resource = resources[responseCount];
String path = paths[responseCount++];
ProtocolVersion protocolVersion = new ProtocolVersion("http", 1, 1);
if (path.startsWith("fail:")) {
String body = Streams.copyToString(new InputStreamReader(request.getEntity().getContent(), StandardCharsets.UTF_8));
if (path.equals("fail:rejection.json")) {
StatusLine statusLine = new BasicStatusLine(protocolVersion, RestStatus.TOO_MANY_REQUESTS.getStatus(), "");
BasicHttpResponse httpResponse = new BasicHttpResponse(statusLine);
futureCallback.completed(httpResponse);
} else {
futureCallback.failed(new RuntimeException(body));
}
} else {
StatusLine statusLine = new BasicStatusLine(protocolVersion, 200, "");
HttpResponse httpResponse = new BasicHttpResponse(statusLine);
httpResponse.setEntity(new InputStreamEntity(resource.openStream()));
futureCallback.completed(httpResponse);
}
return null;
}
};
TestRemoteScrollableHitSource hitSource = new TestRemoteScrollableHitSource(client) {
});
HttpAsyncClientBuilder clientBuilder = mock(HttpAsyncClientBuilder.class);
when(clientBuilder.build()).thenReturn(httpClient);
RestClient restClient = RestClient.builder(new HttpHost("localhost", 9200))
.setHttpClientConfigCallback(httpClientBuilder -> clientBuilder).build();
TestRemoteScrollableHitSource hitSource = new TestRemoteScrollableHitSource(restClient) {
@Override
void lookupRemoteVersion(Consumer<Version> onVersion) {
if (mockRemoteVersion) {
@ -372,7 +402,7 @@ public class RemoteScrollableHitSourceTests extends ESTestCase {
}
private class TestRemoteScrollableHitSource extends RemoteScrollableHitSource {
public TestRemoteScrollableHitSource(RemoteScrollableHitSource.AsyncClient client) {
TestRemoteScrollableHitSource(RestClient client) {
super(RemoteScrollableHitSourceTests.this.logger, backoff(), RemoteScrollableHitSourceTests.this.threadPool,
RemoteScrollableHitSourceTests.this::countRetry, RemoteScrollableHitSourceTests.this::failRequest, client,
new BytesArray("{}"), RemoteScrollableHitSourceTests.this.searchRequest);