Merge branch 'master' into feature/rank-eval
This commit is contained in:
commit
aa20ffcac7
|
@ -37,7 +37,7 @@ import java.util.Objects;
|
|||
*/
|
||||
public final class RestClientBuilder {
|
||||
public static final int DEFAULT_CONNECT_TIMEOUT_MILLIS = 1000;
|
||||
public static final int DEFAULT_SOCKET_TIMEOUT_MILLIS = 10000;
|
||||
public static final int DEFAULT_SOCKET_TIMEOUT_MILLIS = 30000;
|
||||
public static final int DEFAULT_MAX_RETRY_TIMEOUT_MILLIS = DEFAULT_SOCKET_TIMEOUT_MILLIS;
|
||||
public static final int DEFAULT_CONNECTION_REQUEST_TIMEOUT_MILLIS = 500;
|
||||
public static final int DEFAULT_MAX_CONN_PER_ROUTE = 10;
|
||||
|
@ -185,7 +185,8 @@ public final class RestClientBuilder {
|
|||
|
||||
private CloseableHttpAsyncClient createHttpClient() {
|
||||
//default timeouts are all infinite
|
||||
RequestConfig.Builder requestConfigBuilder = RequestConfig.custom().setConnectTimeout(DEFAULT_CONNECT_TIMEOUT_MILLIS)
|
||||
RequestConfig.Builder requestConfigBuilder = RequestConfig.custom()
|
||||
.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT_MILLIS)
|
||||
.setSocketTimeout(DEFAULT_SOCKET_TIMEOUT_MILLIS)
|
||||
.setConnectionRequestTimeout(DEFAULT_CONNECTION_REQUEST_TIMEOUT_MILLIS);
|
||||
if (requestConfigCallback != null) {
|
||||
|
|
|
@ -134,18 +134,10 @@ public class GetResponse extends ActionResponse implements Iterable<GetField>, T
|
|||
return getResult.getSource();
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated Use {@link GetResponse#getSource()} instead
|
||||
*/
|
||||
@Deprecated
|
||||
public Map<String, GetField> getFields() {
|
||||
return getResult.getFields();
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated Use {@link GetResponse#getSource()} instead
|
||||
*/
|
||||
@Deprecated
|
||||
public GetField getField(String name) {
|
||||
return getResult.field(name);
|
||||
}
|
||||
|
|
|
@ -85,11 +85,6 @@ public final class IngestActionFilter extends AbstractComponent implements Actio
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public <Response extends ActionResponse> void apply(String action, Response response, ActionListener<Response> listener, ActionFilterChain<?, Response> chain) {
|
||||
chain.proceed(action, response, listener);
|
||||
}
|
||||
|
||||
void processIndexRequest(Task task, String action, ActionListener listener, ActionFilterChain chain, IndexRequest indexRequest) {
|
||||
|
||||
executionService.executeIndexRequest(indexRequest, t -> {
|
||||
|
|
|
@ -86,11 +86,6 @@ public final class IngestProxyActionFilter implements ActionFilter {
|
|||
transportService.sendRequest(randomIngestNode(), action.name(), request, new ActionListenerResponseHandler(listener, action::newResponse));
|
||||
}
|
||||
|
||||
@Override
|
||||
public <Response extends ActionResponse> void apply(String action, Response response, ActionListener<Response> listener, ActionFilterChain<?, Response> chain) {
|
||||
chain.proceed(action, response, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int order() {
|
||||
return Integer.MAX_VALUE;
|
||||
|
|
|
@ -42,14 +42,6 @@ public interface ActionFilter {
|
|||
*/
|
||||
<Request extends ActionRequest, Response extends ActionResponse> void apply(Task task, String action, Request request,
|
||||
ActionListener<Response> listener, ActionFilterChain<Request, Response> chain);
|
||||
|
||||
/**
|
||||
* Enables filtering the execution of an action on the response side, either by sending a response through the
|
||||
* {@link ActionListener} or by continuing the execution through the given {@link ActionFilterChain chain}
|
||||
*/
|
||||
<Response extends ActionResponse> void apply(String action, Response response, ActionListener<Response> listener,
|
||||
ActionFilterChain<?, Response> chain);
|
||||
|
||||
/**
|
||||
* A simple base class for injectable action filters that spares the implementation from handling the
|
||||
* filter chain. This base class should serve any action filter implementations that doesn't require
|
||||
|
@ -74,19 +66,5 @@ public interface ActionFilter {
|
|||
* if it should be aborted since the filter already handled the request and called the given listener.
|
||||
*/
|
||||
protected abstract boolean apply(String action, ActionRequest request, ActionListener<?> listener);
|
||||
|
||||
@Override
|
||||
public final <Response extends ActionResponse> void apply(String action, Response response, ActionListener<Response> listener,
|
||||
ActionFilterChain<?, Response> chain) {
|
||||
if (apply(action, response, listener)) {
|
||||
chain.proceed(action, response, listener);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Applies this filter and returns {@code true} if the execution chain should proceed, or {@code false}
|
||||
* if it should be aborted since the filter already handled the response by calling the given listener.
|
||||
*/
|
||||
protected abstract boolean apply(String action, ActionResponse response, ActionListener<?> listener);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -34,10 +34,4 @@ public interface ActionFilterChain<Request extends ActionRequest, Response exten
|
|||
* the given {@link ActionListener listener}
|
||||
*/
|
||||
void proceed(Task task, final String action, final Request request, final ActionListener<Response> listener);
|
||||
|
||||
/**
|
||||
* Continue processing the response. Should only be called if a response has not been sent through
|
||||
* the given {@link ActionListener listener}
|
||||
*/
|
||||
void proceed(final String action, final Response response, final ActionListener<Response> listener);
|
||||
}
|
||||
|
|
|
@ -167,8 +167,7 @@ public abstract class TransportAction<Request extends ActionRequest, Response ex
|
|||
if (i < this.action.filters.length) {
|
||||
this.action.filters[i].apply(task, actionName, request, listener, this);
|
||||
} else if (i == this.action.filters.length) {
|
||||
this.action.doExecute(task, request, new FilteredActionListener<>(actionName, listener,
|
||||
new ResponseFilterChain<>(this.action.filters, logger)));
|
||||
this.action.doExecute(task, request, listener);
|
||||
} else {
|
||||
listener.onFailure(new IllegalStateException("proceed was called too many times"));
|
||||
}
|
||||
|
@ -178,69 +177,6 @@ public abstract class TransportAction<Request extends ActionRequest, Response ex
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void proceed(String action, Response response, ActionListener<Response> listener) {
|
||||
assert false : "request filter chain should never be called on the response side";
|
||||
}
|
||||
}
|
||||
|
||||
private static class ResponseFilterChain<Request extends ActionRequest, Response extends ActionResponse>
|
||||
implements ActionFilterChain<Request, Response> {
|
||||
|
||||
private final ActionFilter[] filters;
|
||||
private final AtomicInteger index;
|
||||
private final Logger logger;
|
||||
|
||||
private ResponseFilterChain(ActionFilter[] filters, Logger logger) {
|
||||
this.filters = filters;
|
||||
this.index = new AtomicInteger(filters.length);
|
||||
this.logger = logger;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void proceed(Task task, String action, Request request, ActionListener<Response> listener) {
|
||||
assert false : "response filter chain should never be called on the request side";
|
||||
}
|
||||
|
||||
@Override
|
||||
public void proceed(String action, Response response, ActionListener<Response> listener) {
|
||||
int i = index.decrementAndGet();
|
||||
try {
|
||||
if (i >= 0) {
|
||||
filters[i].apply(action, response, listener, this);
|
||||
} else if (i == -1) {
|
||||
listener.onResponse(response);
|
||||
} else {
|
||||
listener.onFailure(new IllegalStateException("proceed was called too many times"));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.trace("Error during transport action execution.", e);
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static class FilteredActionListener<Response extends ActionResponse> implements ActionListener<Response> {
|
||||
|
||||
private final String actionName;
|
||||
private final ActionListener<Response> listener;
|
||||
private final ResponseFilterChain<?, Response> chain;
|
||||
|
||||
private FilteredActionListener(String actionName, ActionListener<Response> listener, ResponseFilterChain<?, Response> chain) {
|
||||
this.actionName = actionName;
|
||||
this.listener = listener;
|
||||
this.chain = chain;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onResponse(Response response) {
|
||||
chain.proceed(actionName, response, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -47,6 +47,20 @@ public class ASCIIFoldingTokenFilterFactory extends AbstractTokenFilterFactory i
|
|||
|
||||
@Override
|
||||
public Object getMultiTermComponent() {
|
||||
return this;
|
||||
if (preserveOriginal == false) {
|
||||
return this;
|
||||
} else {
|
||||
// See https://issues.apache.org/jira/browse/LUCENE-7536 for the reasoning
|
||||
return new TokenFilterFactory() {
|
||||
@Override
|
||||
public String name() {
|
||||
return ASCIIFoldingTokenFilterFactory.this.name();
|
||||
}
|
||||
@Override
|
||||
public TokenStream create(TokenStream tokenStream) {
|
||||
return new ASCIIFoldingFilter(tokenStream, false);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,6 +29,8 @@ import java.util.Map;
|
|||
/**
|
||||
* A processor implementation may modify the data belonging to a document.
|
||||
* Whether changes are made and what exactly is modified is up to the implementation.
|
||||
*
|
||||
* Processors may get called concurrently and thus need to be thread-safe.
|
||||
*/
|
||||
public interface Processor {
|
||||
|
||||
|
|
|
@ -137,86 +137,6 @@ public class TransportActionFilterChainTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testActionFiltersResponse() throws ExecutionException, InterruptedException {
|
||||
int numFilters = randomInt(10);
|
||||
Set<Integer> orders = new HashSet<>(numFilters);
|
||||
while (orders.size() < numFilters) {
|
||||
orders.add(randomInt(10));
|
||||
}
|
||||
|
||||
Set<ActionFilter> filters = new HashSet<>();
|
||||
for (Integer order : orders) {
|
||||
filters.add(new ResponseTestFilter(order, randomFrom(ResponseOperation.values())));
|
||||
}
|
||||
|
||||
String actionName = randomAsciiOfLength(randomInt(30));
|
||||
ActionFilters actionFilters = new ActionFilters(filters);
|
||||
TransportAction<TestRequest, TestResponse> transportAction = new TransportAction<TestRequest, TestResponse>(Settings.EMPTY, actionName, null, actionFilters, null, new TaskManager(Settings.EMPTY)) {
|
||||
@Override
|
||||
protected void doExecute(TestRequest request, ActionListener<TestResponse> listener) {
|
||||
listener.onResponse(new TestResponse());
|
||||
}
|
||||
};
|
||||
|
||||
ArrayList<ActionFilter> actionFiltersByOrder = new ArrayList<>(filters);
|
||||
Collections.sort(actionFiltersByOrder, new Comparator<ActionFilter>() {
|
||||
@Override
|
||||
public int compare(ActionFilter o1, ActionFilter o2) {
|
||||
return Integer.compare(o2.order(), o1.order());
|
||||
}
|
||||
});
|
||||
|
||||
List<ActionFilter> expectedActionFilters = new ArrayList<>();
|
||||
boolean errorExpected = false;
|
||||
for (ActionFilter filter : actionFiltersByOrder) {
|
||||
ResponseTestFilter testFilter = (ResponseTestFilter) filter;
|
||||
expectedActionFilters.add(testFilter);
|
||||
if (testFilter.callback == ResponseOperation.LISTENER_FAILURE) {
|
||||
errorExpected = true;
|
||||
}
|
||||
if (testFilter.callback != ResponseOperation.CONTINUE_PROCESSING) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
PlainListenableActionFuture<TestResponse> future = new PlainListenableActionFuture<>(null);
|
||||
transportAction.execute(new TestRequest(), future);
|
||||
try {
|
||||
assertThat(future.get(), notNullValue());
|
||||
assertThat("shouldn't get here if an error is expected", errorExpected, equalTo(false));
|
||||
} catch(ExecutionException e) {
|
||||
assertThat("shouldn't get here if an error is not expected " + e.getMessage(), errorExpected, equalTo(true));
|
||||
}
|
||||
|
||||
List<ResponseTestFilter> testFiltersByLastExecution = new ArrayList<>();
|
||||
for (ActionFilter actionFilter : actionFilters.filters()) {
|
||||
testFiltersByLastExecution.add((ResponseTestFilter) actionFilter);
|
||||
}
|
||||
Collections.sort(testFiltersByLastExecution, new Comparator<ResponseTestFilter>() {
|
||||
@Override
|
||||
public int compare(ResponseTestFilter o1, ResponseTestFilter o2) {
|
||||
return Integer.compare(o1.executionToken, o2.executionToken);
|
||||
}
|
||||
});
|
||||
|
||||
ArrayList<ResponseTestFilter> finalTestFilters = new ArrayList<>();
|
||||
for (ActionFilter filter : testFiltersByLastExecution) {
|
||||
ResponseTestFilter testFilter = (ResponseTestFilter) filter;
|
||||
finalTestFilters.add(testFilter);
|
||||
if (testFilter.callback != ResponseOperation.CONTINUE_PROCESSING) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
assertThat(finalTestFilters.size(), equalTo(expectedActionFilters.size()));
|
||||
for (int i = 0; i < finalTestFilters.size(); i++) {
|
||||
ResponseTestFilter testFilter = finalTestFilters.get(i);
|
||||
assertThat(testFilter, equalTo(expectedActionFilters.get(i)));
|
||||
assertThat(testFilter.runs.get(), equalTo(1));
|
||||
assertThat(testFilter.lastActionName, equalTo(actionName));
|
||||
}
|
||||
}
|
||||
|
||||
public void testTooManyContinueProcessingRequest() throws ExecutionException, InterruptedException {
|
||||
final int additionalContinueCount = randomInt(10);
|
||||
|
||||
|
@ -274,63 +194,6 @@ public class TransportActionFilterChainTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testTooManyContinueProcessingResponse() throws ExecutionException, InterruptedException {
|
||||
final int additionalContinueCount = randomInt(10);
|
||||
|
||||
ResponseTestFilter testFilter = new ResponseTestFilter(randomInt(), new ResponseCallback() {
|
||||
@Override
|
||||
public <Response extends ActionResponse> void execute(String action, Response response, ActionListener<Response> listener,
|
||||
ActionFilterChain<?, Response> chain) {
|
||||
for (int i = 0; i <= additionalContinueCount; i++) {
|
||||
chain.proceed(action, response, listener);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Set<ActionFilter> filters = new HashSet<>();
|
||||
filters.add(testFilter);
|
||||
|
||||
String actionName = randomAsciiOfLength(randomInt(30));
|
||||
ActionFilters actionFilters = new ActionFilters(filters);
|
||||
TransportAction<TestRequest, TestResponse> transportAction = new TransportAction<TestRequest, TestResponse>(Settings.EMPTY, actionName, null, actionFilters, null, new TaskManager(Settings.EMPTY)) {
|
||||
@Override
|
||||
protected void doExecute(TestRequest request, ActionListener<TestResponse> listener) {
|
||||
listener.onResponse(new TestResponse());
|
||||
}
|
||||
};
|
||||
|
||||
final CountDownLatch latch = new CountDownLatch(additionalContinueCount + 1);
|
||||
final AtomicInteger responses = new AtomicInteger();
|
||||
final List<Throwable> failures = new CopyOnWriteArrayList<>();
|
||||
|
||||
transportAction.execute(new TestRequest(), new ActionListener<TestResponse>() {
|
||||
@Override
|
||||
public void onResponse(TestResponse testResponse) {
|
||||
responses.incrementAndGet();
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
failures.add(e);
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
if (!latch.await(10, TimeUnit.SECONDS)) {
|
||||
fail("timeout waiting for the filter to notify the listener as many times as expected");
|
||||
}
|
||||
|
||||
assertThat(testFilter.runs.get(), equalTo(1));
|
||||
assertThat(testFilter.lastActionName, equalTo(actionName));
|
||||
|
||||
assertThat(responses.get(), equalTo(1));
|
||||
assertThat(failures.size(), equalTo(additionalContinueCount));
|
||||
for (Throwable failure : failures) {
|
||||
assertThat(failure, instanceOf(IllegalStateException.class));
|
||||
}
|
||||
}
|
||||
|
||||
private class RequestTestFilter implements ActionFilter {
|
||||
private final RequestCallback callback;
|
||||
private final int order;
|
||||
|
@ -356,45 +219,6 @@ public class TransportActionFilterChainTests extends ESTestCase {
|
|||
this.executionToken = counter.incrementAndGet();
|
||||
this.callback.execute(task, action, request, listener, chain);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <Response extends ActionResponse> void apply(String action, Response response, ActionListener<Response> listener,
|
||||
ActionFilterChain<?, Response> chain) {
|
||||
chain.proceed(action, response, listener);
|
||||
}
|
||||
}
|
||||
|
||||
private class ResponseTestFilter implements ActionFilter {
|
||||
private final ResponseCallback callback;
|
||||
private final int order;
|
||||
AtomicInteger runs = new AtomicInteger();
|
||||
volatile String lastActionName;
|
||||
volatile int executionToken = Integer.MAX_VALUE; //the filters that don't run will go last in the sorted list
|
||||
|
||||
ResponseTestFilter(int order, ResponseCallback callback) {
|
||||
this.order = order;
|
||||
this.callback = callback;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int order() {
|
||||
return order;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <Request extends ActionRequest, Response extends ActionResponse> void apply(Task task, String action, Request request,
|
||||
ActionListener<Response> listener, ActionFilterChain<Request, Response> chain) {
|
||||
chain.proceed(task, action, request, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <Response extends ActionResponse> void apply(String action, Response response, ActionListener<Response> listener,
|
||||
ActionFilterChain<?, Response> chain) {
|
||||
this.runs.incrementAndGet();
|
||||
this.lastActionName = action;
|
||||
this.executionToken = counter.incrementAndGet();
|
||||
this.callback.execute(action, response, listener, chain);
|
||||
}
|
||||
}
|
||||
|
||||
private static enum RequestOperation implements RequestCallback {
|
||||
|
@ -422,41 +246,11 @@ public class TransportActionFilterChainTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
private static enum ResponseOperation implements ResponseCallback {
|
||||
CONTINUE_PROCESSING {
|
||||
@Override
|
||||
public <Response extends ActionResponse> void execute(String action, Response response, ActionListener<Response> listener,
|
||||
ActionFilterChain<?, Response> chain) {
|
||||
chain.proceed(action, response, listener);
|
||||
}
|
||||
},
|
||||
LISTENER_RESPONSE {
|
||||
@Override
|
||||
@SuppressWarnings("unchecked") // Safe because its all we test with
|
||||
public <Response extends ActionResponse> void execute(String action, Response response, ActionListener<Response> listener,
|
||||
ActionFilterChain<?, Response> chain) {
|
||||
((ActionListener<TestResponse>) listener).onResponse(new TestResponse());
|
||||
}
|
||||
},
|
||||
LISTENER_FAILURE {
|
||||
@Override
|
||||
public <Response extends ActionResponse> void execute(String action, Response response, ActionListener<Response> listener,
|
||||
ActionFilterChain<?, Response> chain) {
|
||||
listener.onFailure(new ElasticsearchTimeoutException(""));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private interface RequestCallback {
|
||||
<Request extends ActionRequest, Response extends ActionResponse> void execute(Task task, String action, Request request,
|
||||
ActionListener<Response> listener, ActionFilterChain<Request, Response> actionFilterChain);
|
||||
}
|
||||
|
||||
private interface ResponseCallback {
|
||||
<Response extends ActionResponse> void execute(String action, Response response, ActionListener<Response> listener,
|
||||
ActionFilterChain<?, Response> chain);
|
||||
}
|
||||
|
||||
public static class TestRequest extends ActionRequest {
|
||||
@Override
|
||||
public ActionRequestValidationException validate() {
|
||||
|
|
|
@ -100,11 +100,6 @@ public class ClusterInfoServiceIT extends ESIntegTestCase {
|
|||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean apply(String action, ActionResponse response, ActionListener<?> listener) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int order() {
|
||||
return 0;
|
||||
|
|
|
@ -55,5 +55,12 @@ public class ASCIIFoldingTokenFilterFactoryTests extends ESTokenStreamTestCase {
|
|||
Tokenizer tokenizer = new WhitespaceTokenizer();
|
||||
tokenizer.setReader(new StringReader(source));
|
||||
assertTokenStreamContents(tokenFilter.create(tokenizer), expected);
|
||||
|
||||
// but the multi-term aware component still emits a single token
|
||||
tokenFilter = (TokenFilterFactory) ((MultiTermAwareComponent) tokenFilter).getMultiTermComponent();
|
||||
tokenizer = new WhitespaceTokenizer();
|
||||
tokenizer.setReader(new StringReader(source));
|
||||
expected = new String[]{"Anspruche"};
|
||||
assertTokenStreamContents(tokenFilter.create(tokenizer), expected);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,8 +16,8 @@ The interface has one method that receives an instance of
|
|||
https://hc.apache.org/httpcomponents-client-ga/httpclient/apidocs/org/apache/http/client/config/RequestConfig.Builder.html[`org.apache.http.client.config.RequestConfig.Builder`]
|
||||
as an argument and has the same return type. The request config builder can
|
||||
be modified and then returned. In the following example we increase the
|
||||
connect timeout (defaults to 1 second) and the socket timeout (defaults to 10
|
||||
seconds). Also we adjust the max retry timeout accordingly (defaults to 10
|
||||
connect timeout (defaults to 1 second) and the socket timeout (defaults to 30
|
||||
seconds). Also we adjust the max retry timeout accordingly (defaults to 30
|
||||
seconds too).
|
||||
|
||||
[source,java]
|
||||
|
@ -27,10 +27,10 @@ RestClient restClient = RestClient.builder(new HttpHost("localhost", 9200))
|
|||
@Override
|
||||
public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {
|
||||
return requestConfigBuilder.setConnectTimeout(5000)
|
||||
.setSocketTimeout(30000);
|
||||
.setSocketTimeout(60000);
|
||||
}
|
||||
})
|
||||
.setMaxRetryTimeoutMillis(30000)
|
||||
.setMaxRetryTimeoutMillis(60000)
|
||||
.build();
|
||||
--------------------------------------------------
|
||||
|
||||
|
@ -110,4 +110,4 @@ RestClient restClient = RestClient.builder(new HttpHost("localhost", 9200))
|
|||
=== Others
|
||||
|
||||
For any other required configuration needed, the Apache HttpAsyncClient docs
|
||||
should be consulted: https://hc.apache.org/httpcomponents-asyncclient-4.1.x/ .
|
||||
should be consulted: https://hc.apache.org/httpcomponents-asyncclient-4.1.x/ .
|
||||
|
|
|
@ -444,7 +444,7 @@ To enable queries sent to older versions of Elasticsearch the `query` parameter
|
|||
is sent directly to the remote host without validation or modification.
|
||||
|
||||
Reindexing from a remote server uses an on-heap buffer that defaults to a
|
||||
maximum size of 200mb. If the remote index includes very large documents you'll
|
||||
maximum size of 100mb. If the remote index includes very large documents you'll
|
||||
need to use a smaller batch size. The example below sets the batch size `10`
|
||||
which is very, very small.
|
||||
|
||||
|
@ -454,9 +454,7 @@ POST _reindex
|
|||
{
|
||||
"source": {
|
||||
"remote": {
|
||||
"host": "http://otherhost:9200",
|
||||
"username": "user",
|
||||
"password": "pass"
|
||||
"host": "http://otherhost:9200"
|
||||
},
|
||||
"index": "source",
|
||||
"size": 10,
|
||||
|
@ -474,10 +472,40 @@ POST _reindex
|
|||
// CONSOLE
|
||||
// TEST[setup:host]
|
||||
// TEST[s/^/PUT source\n/]
|
||||
// TEST[s/otherhost:9200",/\${host}"/]
|
||||
// TEST[s/"username": "user",//]
|
||||
// TEST[s/"password": "pass"//]
|
||||
// TEST[s/otherhost:9200/\${host}/]
|
||||
|
||||
It is also possible to set the socket read timeout on the remote connection
|
||||
with the `socket_timeout` field and the connection timeout with the
|
||||
`connect_timeout` field. Both default to thirty seconds. This example
|
||||
sets the socket read timeout to one minute and the connection timeout to ten
|
||||
seconds:
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
POST _reindex
|
||||
{
|
||||
"source": {
|
||||
"remote": {
|
||||
"host": "http://otherhost:9200",
|
||||
"socket_timeout": "1m",
|
||||
"connect_timeout": "10s"
|
||||
},
|
||||
"index": "source",
|
||||
"query": {
|
||||
"match": {
|
||||
"test": "data"
|
||||
}
|
||||
}
|
||||
},
|
||||
"dest": {
|
||||
"index": "dest"
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
||||
// CONSOLE
|
||||
// TEST[setup:host]
|
||||
// TEST[s/^/PUT source\n/]
|
||||
// TEST[s/otherhost:9200/\${host}/]
|
||||
|
||||
[float]
|
||||
=== URL Parameters
|
||||
|
|
|
@ -45,7 +45,6 @@ import static org.elasticsearch.common.unit.TimeValue.timeValueNanos;
|
|||
* Task storing information about a currently running BulkByScroll request.
|
||||
*/
|
||||
public abstract class BulkByScrollTask extends CancellableTask {
|
||||
|
||||
public BulkByScrollTask(long id, String type, String action, String description, TaskId parentTaskId) {
|
||||
super(id, type, action, description, parentTaskId);
|
||||
}
|
||||
|
|
|
@ -145,11 +145,13 @@ public class RestReindexAction extends AbstractBaseReindexRestHandler<ReindexReq
|
|||
String host = hostMatcher.group("host");
|
||||
int port = Integer.parseInt(hostMatcher.group("port"));
|
||||
Map<String, String> headers = extractStringStringMap(remote, "headers");
|
||||
TimeValue socketTimeout = extractTimeValue(remote, "socket_timeout", RemoteInfo.DEFAULT_SOCKET_TIMEOUT);
|
||||
TimeValue connectTimeout = extractTimeValue(remote, "connect_timeout", RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
|
||||
if (false == remote.isEmpty()) {
|
||||
throw new IllegalArgumentException(
|
||||
"Unsupported fields in [remote]: [" + Strings.collectionToCommaDelimitedString(remote.keySet()) + "]");
|
||||
}
|
||||
return new RemoteInfo(scheme, host, port, queryForRemote(source), username, password, headers);
|
||||
return new RemoteInfo(scheme, host, port, queryForRemote(source), username, password, headers, socketTimeout, connectTimeout);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -202,6 +204,11 @@ public class RestReindexAction extends AbstractBaseReindexRestHandler<ReindexReq
|
|||
return safe;
|
||||
}
|
||||
|
||||
private static TimeValue extractTimeValue(Map<String, Object> source, String name, TimeValue defaultValue) {
|
||||
String string = extractString(source, name);
|
||||
return string == null ? defaultValue : parseTimeValue(string, name);
|
||||
}
|
||||
|
||||
private static BytesReference queryForRemote(Map<String, Object> source) throws IOException {
|
||||
XContentBuilder builder = JsonXContent.contentBuilder().prettyPrint();
|
||||
Object query = source.remove("query");
|
||||
|
|
|
@ -196,6 +196,11 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
|
|||
}
|
||||
return RestClient.builder(new HttpHost(remoteInfo.getHost(), remoteInfo.getPort(), remoteInfo.getScheme()))
|
||||
.setDefaultHeaders(clientHeaders)
|
||||
.setRequestConfigCallback(c -> {
|
||||
c.setConnectTimeout(Math.toIntExact(remoteInfo.getConnectTimeout().millis()));
|
||||
c.setSocketTimeout(Math.toIntExact(remoteInfo.getSocketTimeout().millis()));
|
||||
return c;
|
||||
})
|
||||
.setHttpClientConfigCallback(c -> {
|
||||
// Enable basic auth if it is configured
|
||||
if (remoteInfo.getUsername() != null) {
|
||||
|
|
|
@ -19,11 +19,13 @@
|
|||
|
||||
package org.elasticsearch.index.reindex.remote;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
|
@ -31,8 +33,18 @@ import java.util.Map;
|
|||
|
||||
import static java.util.Collections.unmodifiableMap;
|
||||
import static java.util.Objects.requireNonNull;
|
||||
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
|
||||
|
||||
public class RemoteInfo implements Writeable {
|
||||
/**
|
||||
* Default {@link #socketTimeout} for requests that don't have one set.
|
||||
*/
|
||||
public static final TimeValue DEFAULT_SOCKET_TIMEOUT = timeValueSeconds(30);
|
||||
/**
|
||||
* Default {@link #connectTimeout} for requests that don't have one set.
|
||||
*/
|
||||
public static final TimeValue DEFAULT_CONNECT_TIMEOUT = timeValueSeconds(30);
|
||||
|
||||
private final String scheme;
|
||||
private final String host;
|
||||
private final int port;
|
||||
|
@ -40,9 +52,17 @@ public class RemoteInfo implements Writeable {
|
|||
private final String username;
|
||||
private final String password;
|
||||
private final Map<String, String> headers;
|
||||
/**
|
||||
* Time to wait for a response from each request.
|
||||
*/
|
||||
private final TimeValue socketTimeout;
|
||||
/**
|
||||
* Time to wait for a connecting to the remote cluster.
|
||||
*/
|
||||
private final TimeValue connectTimeout;
|
||||
|
||||
public RemoteInfo(String scheme, String host, int port, BytesReference query, String username, String password,
|
||||
Map<String, String> headers) {
|
||||
Map<String, String> headers, TimeValue socketTimeout, TimeValue connectTimeout) {
|
||||
this.scheme = requireNonNull(scheme, "[scheme] must be specified to reindex from a remote cluster");
|
||||
this.host = requireNonNull(host, "[host] must be specified to reindex from a remote cluster");
|
||||
this.port = port;
|
||||
|
@ -50,6 +70,8 @@ public class RemoteInfo implements Writeable {
|
|||
this.username = username;
|
||||
this.password = password;
|
||||
this.headers = unmodifiableMap(requireNonNull(headers, "[headers] is required"));
|
||||
this.socketTimeout = requireNonNull(socketTimeout, "[socketTimeout] must be specified");
|
||||
this.connectTimeout = requireNonNull(connectTimeout, "[connectTimeout] must be specified");
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -68,6 +90,13 @@ public class RemoteInfo implements Writeable {
|
|||
headers.put(in.readString(), in.readString());
|
||||
}
|
||||
this.headers = unmodifiableMap(headers);
|
||||
if (in.getVersion().onOrAfter(Version.V_5_2_0_UNRELEASED)) {
|
||||
socketTimeout = new TimeValue(in);
|
||||
connectTimeout = new TimeValue(in);
|
||||
} else {
|
||||
socketTimeout = DEFAULT_SOCKET_TIMEOUT;
|
||||
connectTimeout = DEFAULT_CONNECT_TIMEOUT;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -83,6 +112,10 @@ public class RemoteInfo implements Writeable {
|
|||
out.writeString(header.getKey());
|
||||
out.writeString(header.getValue());
|
||||
}
|
||||
if (out.getVersion().onOrAfter(Version.V_5_2_0_UNRELEASED)) {
|
||||
socketTimeout.writeTo(out);
|
||||
connectTimeout.writeTo(out);
|
||||
}
|
||||
}
|
||||
|
||||
public String getScheme() {
|
||||
|
@ -115,6 +148,20 @@ public class RemoteInfo implements Writeable {
|
|||
return headers;
|
||||
}
|
||||
|
||||
/**
|
||||
* Time to wait for a response from each request.
|
||||
*/
|
||||
public TimeValue getSocketTimeout() {
|
||||
return socketTimeout;
|
||||
}
|
||||
|
||||
/**
|
||||
* Time to wait to connect to the external cluster.
|
||||
*/
|
||||
public TimeValue getConnectTimeout() {
|
||||
return connectTimeout;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder b = new StringBuilder();
|
||||
|
|
|
@ -33,7 +33,8 @@ import static org.hamcrest.Matchers.hasSize;
|
|||
|
||||
public class ReindexFromRemoteBuildRestClientTests extends ESTestCase {
|
||||
public void testBuildRestClient() throws Exception {
|
||||
RemoteInfo remoteInfo = new RemoteInfo("https", "localhost", 9200, new BytesArray("ignored"), null, null, emptyMap());
|
||||
RemoteInfo remoteInfo = new RemoteInfo("https", "localhost", 9200, new BytesArray("ignored"), null, null, emptyMap(),
|
||||
RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
|
||||
long taskId = randomLong();
|
||||
List<Thread> threads = synchronizedList(new ArrayList<>());
|
||||
RestClient client = TransportReindexAction.buildRestClient(remoteInfo, taskId, threads);
|
||||
|
|
|
@ -46,47 +46,49 @@ public class ReindexFromRemoteWhitelistTests extends ESTestCase {
|
|||
checkRemoteWhitelist(buildRemoteWhitelist(randomWhitelist()), null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Build a {@link RemoteInfo}, defaulting values that we don't care about in this test to values that don't hurt anything.
|
||||
*/
|
||||
private RemoteInfo newRemoteInfo(String host, int port) {
|
||||
return new RemoteInfo(randomAsciiOfLength(5), host, port, new BytesArray("test"), null, null, emptyMap(),
|
||||
RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
|
||||
}
|
||||
|
||||
public void testWhitelistedRemote() {
|
||||
List<String> whitelist = randomWhitelist();
|
||||
String[] inList = whitelist.iterator().next().split(":");
|
||||
String host = inList[0];
|
||||
int port = Integer.valueOf(inList[1]);
|
||||
checkRemoteWhitelist(buildRemoteWhitelist(whitelist),
|
||||
new RemoteInfo(randomAsciiOfLength(5), host, port, new BytesArray("test"), null, null, emptyMap()));
|
||||
checkRemoteWhitelist(buildRemoteWhitelist(whitelist), newRemoteInfo(host, port));
|
||||
}
|
||||
|
||||
public void testWhitelistedByPrefix() {
|
||||
checkRemoteWhitelist(buildRemoteWhitelist(singletonList("*.example.com:9200")),
|
||||
new RemoteInfo(randomAsciiOfLength(5), "es.example.com", 9200, new BytesArray("test"), null, null, emptyMap()));
|
||||
new RemoteInfo(randomAsciiOfLength(5), "es.example.com", 9200, new BytesArray("test"), null, null, emptyMap(),
|
||||
RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT));
|
||||
checkRemoteWhitelist(buildRemoteWhitelist(singletonList("*.example.com:9200")),
|
||||
new RemoteInfo(randomAsciiOfLength(5), "6e134134a1.us-east-1.aws.example.com", 9200,
|
||||
new BytesArray("test"), null, null, emptyMap()));
|
||||
newRemoteInfo("6e134134a1.us-east-1.aws.example.com", 9200));
|
||||
}
|
||||
|
||||
public void testWhitelistedBySuffix() {
|
||||
checkRemoteWhitelist(buildRemoteWhitelist(singletonList("es.example.com:*")),
|
||||
new RemoteInfo(randomAsciiOfLength(5), "es.example.com", 9200, new BytesArray("test"), null, null, emptyMap()));
|
||||
checkRemoteWhitelist(buildRemoteWhitelist(singletonList("es.example.com:*")), newRemoteInfo("es.example.com", 9200));
|
||||
}
|
||||
|
||||
public void testWhitelistedByInfix() {
|
||||
checkRemoteWhitelist(buildRemoteWhitelist(singletonList("es*.example.com:9200")),
|
||||
new RemoteInfo(randomAsciiOfLength(5), "es1.example.com", 9200, new BytesArray("test"), null, null, emptyMap()));
|
||||
checkRemoteWhitelist(buildRemoteWhitelist(singletonList("es*.example.com:9200")), newRemoteInfo("es1.example.com", 9200));
|
||||
}
|
||||
|
||||
|
||||
public void testLoopbackInWhitelistRemote() throws UnknownHostException {
|
||||
List<String> whitelist = randomWhitelist();
|
||||
whitelist.add("127.0.0.1:*");
|
||||
checkRemoteWhitelist(buildRemoteWhitelist(whitelist),
|
||||
new RemoteInfo(randomAsciiOfLength(5), "127.0.0.1", 9200, new BytesArray("test"), null, null, emptyMap()));
|
||||
checkRemoteWhitelist(buildRemoteWhitelist(whitelist), newRemoteInfo("127.0.0.1", 9200));
|
||||
}
|
||||
|
||||
public void testUnwhitelistedRemote() {
|
||||
int port = between(1, Integer.MAX_VALUE);
|
||||
RemoteInfo remoteInfo = new RemoteInfo(randomAsciiOfLength(5), "not in list", port, new BytesArray("test"), null, null, emptyMap());
|
||||
List<String> whitelist = randomBoolean() ? randomWhitelist() : emptyList();
|
||||
Exception e = expectThrows(IllegalArgumentException.class,
|
||||
() -> checkRemoteWhitelist(buildRemoteWhitelist(whitelist), remoteInfo));
|
||||
() -> checkRemoteWhitelist(buildRemoteWhitelist(whitelist), newRemoteInfo("not in list", port)));
|
||||
assertEquals("[not in list:" + port + "] not whitelisted in reindex.remote.whitelist", e.getMessage());
|
||||
}
|
||||
|
||||
|
|
|
@ -19,8 +19,8 @@
|
|||
|
||||
package org.elasticsearch.index.reindex;
|
||||
|
||||
import org.elasticsearch.ElasticsearchStatusException;
|
||||
import org.elasticsearch.ElasticsearchSecurityException;
|
||||
import org.elasticsearch.ElasticsearchStatusException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
|
@ -48,6 +48,7 @@ import org.junit.Before;
|
|||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static java.util.Collections.singletonList;
|
||||
|
@ -88,29 +89,31 @@ public class ReindexFromRemoteWithAuthTests extends ESSingleNodeTestCase {
|
|||
address = nodeInfo.getHttp().getAddress().publishAddress();
|
||||
}
|
||||
|
||||
/**
|
||||
* Build a {@link RemoteInfo}, defaulting values that we don't care about in this test to values that don't hurt anything.
|
||||
*/
|
||||
private RemoteInfo newRemoteInfo(String username, String password, Map<String, String> headers) {
|
||||
return new RemoteInfo("http", address.getAddress(), address.getPort(), new BytesArray("{\"match_all\":{}}"), username, password,
|
||||
headers, RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
|
||||
}
|
||||
|
||||
public void testReindexFromRemoteWithAuthentication() throws Exception {
|
||||
RemoteInfo remote = new RemoteInfo("http", address.getAddress(), address.getPort(), new BytesArray("{\"match_all\":{}}"), "Aladdin",
|
||||
"open sesame", emptyMap());
|
||||
ReindexRequestBuilder request = ReindexAction.INSTANCE.newRequestBuilder(client()).source("source").destination("dest")
|
||||
.setRemoteInfo(remote);
|
||||
.setRemoteInfo(newRemoteInfo("Aladdin", "open sesame", emptyMap()));
|
||||
assertThat(request.get(), matcher().created(1));
|
||||
}
|
||||
|
||||
public void testReindexSendsHeaders() throws Exception {
|
||||
RemoteInfo remote = new RemoteInfo("http", address.getAddress(), address.getPort(), new BytesArray("{\"match_all\":{}}"), null,
|
||||
null, singletonMap(TestFilter.EXAMPLE_HEADER, "doesn't matter"));
|
||||
ReindexRequestBuilder request = ReindexAction.INSTANCE.newRequestBuilder(client()).source("source").destination("dest")
|
||||
.setRemoteInfo(remote);
|
||||
.setRemoteInfo(newRemoteInfo(null, null, singletonMap(TestFilter.EXAMPLE_HEADER, "doesn't matter")));
|
||||
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> request.get());
|
||||
assertEquals(RestStatus.BAD_REQUEST, e.status());
|
||||
assertThat(e.getMessage(), containsString("Hurray! Sent the header!"));
|
||||
}
|
||||
|
||||
public void testReindexWithoutAuthenticationWhenRequired() throws Exception {
|
||||
RemoteInfo remote = new RemoteInfo("http", address.getAddress(), address.getPort(), new BytesArray("{\"match_all\":{}}"), null,
|
||||
null, emptyMap());
|
||||
ReindexRequestBuilder request = ReindexAction.INSTANCE.newRequestBuilder(client()).source("source").destination("dest")
|
||||
.setRemoteInfo(remote);
|
||||
.setRemoteInfo(newRemoteInfo(null, null, emptyMap()));
|
||||
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> request.get());
|
||||
assertEquals(RestStatus.UNAUTHORIZED, e.status());
|
||||
assertThat(e.getMessage(), containsString("\"reason\":\"Authentication required\""));
|
||||
|
@ -118,10 +121,8 @@ public class ReindexFromRemoteWithAuthTests extends ESSingleNodeTestCase {
|
|||
}
|
||||
|
||||
public void testReindexWithBadAuthentication() throws Exception {
|
||||
RemoteInfo remote = new RemoteInfo("http", address.getAddress(), address.getPort(), new BytesArray("{\"match_all\":{}}"), "junk",
|
||||
"auth", emptyMap());
|
||||
ReindexRequestBuilder request = ReindexAction.INSTANCE.newRequestBuilder(client()).source("source").destination("dest")
|
||||
.setRemoteInfo(remote);
|
||||
.setRemoteInfo(newRemoteInfo("junk", "auth", emptyMap()));
|
||||
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> request.get());
|
||||
assertThat(e.getMessage(), containsString("\"reason\":\"Bad Authorization\""));
|
||||
}
|
||||
|
@ -186,11 +187,5 @@ public class ReindexFromRemoteWithAuthTests extends ESSingleNodeTestCase {
|
|||
}
|
||||
chain.proceed(task, action, request, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <Response extends ActionResponse> void apply(String action, Response response, ActionListener<Response> listener,
|
||||
ActionFilterChain<?, Response> chain) {
|
||||
chain.proceed(action, response, listener);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.elasticsearch.script.Script;
|
|||
import org.elasticsearch.search.slice.SliceBuilder;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static org.elasticsearch.common.unit.TimeValue.parseTimeValue;
|
||||
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
|
||||
|
||||
/**
|
||||
|
@ -37,8 +38,9 @@ public class ReindexRequestTests extends AbstractBulkByScrollRequestTestCase<Rei
|
|||
|
||||
public void testReindexFromRemoteDoesNotSupportSearchQuery() {
|
||||
ReindexRequest reindex = newRequest();
|
||||
reindex.setRemoteInfo(new RemoteInfo(randomAsciiOfLength(5), randomAsciiOfLength(5), between(1, Integer.MAX_VALUE),
|
||||
new BytesArray("real_query"), null, null, emptyMap()));
|
||||
reindex.setRemoteInfo(
|
||||
new RemoteInfo(randomAsciiOfLength(5), randomAsciiOfLength(5), between(1, Integer.MAX_VALUE), new BytesArray("real_query"),
|
||||
null, null, emptyMap(), RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT));
|
||||
reindex.getSearchRequest().source().query(matchAllQuery()); // Unsupported place to put query
|
||||
ActionRequestValidationException e = reindex.validate();
|
||||
assertEquals("Validation Failed: 1: reindex from remote sources should use RemoteInfo's query instead of source's query;",
|
||||
|
@ -47,8 +49,9 @@ public class ReindexRequestTests extends AbstractBulkByScrollRequestTestCase<Rei
|
|||
|
||||
public void testReindexFromRemoteDoesNotSupportWorkers() {
|
||||
ReindexRequest reindex = newRequest();
|
||||
reindex.setRemoteInfo(new RemoteInfo(randomAsciiOfLength(5), randomAsciiOfLength(5), between(1, Integer.MAX_VALUE),
|
||||
new BytesArray("real_query"), null, null, emptyMap()));
|
||||
reindex.setRemoteInfo(
|
||||
new RemoteInfo(randomAsciiOfLength(5), randomAsciiOfLength(5), between(1, Integer.MAX_VALUE), new BytesArray("real_query"),
|
||||
null, null, emptyMap(), RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT));
|
||||
reindex.setSlices(between(2, Integer.MAX_VALUE));
|
||||
ActionRequestValidationException e = reindex.validate();
|
||||
assertEquals(
|
||||
|
@ -71,7 +74,9 @@ public class ReindexRequestTests extends AbstractBulkByScrollRequestTestCase<Rei
|
|||
}
|
||||
if (randomBoolean()) {
|
||||
original.setRemoteInfo(new RemoteInfo(randomAsciiOfLength(5), randomAsciiOfLength(5), between(1, 10000),
|
||||
new BytesArray(randomAsciiOfLength(5)), null, null, emptyMap()));
|
||||
new BytesArray(randomAsciiOfLength(5)), null, null, emptyMap(),
|
||||
parseTimeValue(randomPositiveTimeValue(), "socket_timeout"),
|
||||
parseTimeValue(randomPositiveTimeValue(), "connect_timeout")));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -36,8 +36,6 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.index.reindex.remote.RemoteInfo;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.util.Collections;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
|
||||
|
@ -91,10 +89,11 @@ public class ReindexSourceTargetValidationTests extends ESTestCase {
|
|||
|
||||
public void testRemoteInfoSkipsValidation() {
|
||||
// The index doesn't have to exist
|
||||
succeeds(new RemoteInfo(randomAsciiOfLength(5), "test", 9200, new BytesArray("test"), null, null, emptyMap()), "does_not_exist",
|
||||
"target");
|
||||
succeeds(new RemoteInfo(randomAsciiOfLength(5), "test", 9200, new BytesArray("test"), null, null, emptyMap(),
|
||||
RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT), "does_not_exist", "target");
|
||||
// And it doesn't matter if they are the same index. They are considered to be different because the remote one is, well, remote.
|
||||
succeeds(new RemoteInfo(randomAsciiOfLength(5), "test", 9200, new BytesArray("test"), null, null, emptyMap()), "target", "target");
|
||||
succeeds(new RemoteInfo(randomAsciiOfLength(5), "test", 9200, new BytesArray("test"), null, null, emptyMap(),
|
||||
RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT), "target", "target");
|
||||
}
|
||||
|
||||
private void fails(String target, String... sources) {
|
||||
|
|
|
@ -36,6 +36,8 @@ import java.io.IOException;
|
|||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
|
||||
|
||||
public class RestReindexActionTests extends ESTestCase {
|
||||
public void testBuildRemoteInfoNoRemote() throws IOException {
|
||||
assertNull(RestReindexAction.buildRemoteInfo(new HashMap<>()));
|
||||
|
@ -52,6 +54,8 @@ public class RestReindexActionTests extends ESTestCase {
|
|||
remote.put("username", "testuser");
|
||||
remote.put("password", "testpass");
|
||||
remote.put("headers", headers);
|
||||
remote.put("socket_timeout", "90s");
|
||||
remote.put("connect_timeout", "10s");
|
||||
|
||||
Map<String, Object> query = new HashMap<>();
|
||||
query.put("a", "b");
|
||||
|
@ -68,6 +72,8 @@ public class RestReindexActionTests extends ESTestCase {
|
|||
assertEquals("testuser", remoteInfo.getUsername());
|
||||
assertEquals("testpass", remoteInfo.getPassword());
|
||||
assertEquals(headers, remoteInfo.getHeaders());
|
||||
assertEquals(timeValueSeconds(90), remoteInfo.getSocketTimeout());
|
||||
assertEquals(timeValueSeconds(10), remoteInfo.getConnectTimeout());
|
||||
}
|
||||
|
||||
public void testBuildRemoteInfoWithoutAllParts() throws IOException {
|
||||
|
@ -76,16 +82,20 @@ public class RestReindexActionTests extends ESTestCase {
|
|||
expectThrows(IllegalArgumentException.class, () -> buildRemoteInfoHostTestCase("http://example.com"));
|
||||
}
|
||||
|
||||
public void testBuildRemoteInfoWithAllParts() throws IOException {
|
||||
public void testBuildRemoteInfoWithAllHostParts() throws IOException {
|
||||
RemoteInfo info = buildRemoteInfoHostTestCase("http://example.com:9200");
|
||||
assertEquals("http", info.getScheme());
|
||||
assertEquals("example.com", info.getHost());
|
||||
assertEquals(9200, info.getPort());
|
||||
assertEquals(RemoteInfo.DEFAULT_SOCKET_TIMEOUT, info.getSocketTimeout()); // Didn't set the timeout so we should get the default
|
||||
assertEquals(RemoteInfo.DEFAULT_CONNECT_TIMEOUT, info.getConnectTimeout()); // Didn't set the timeout so we should get the default
|
||||
|
||||
info = buildRemoteInfoHostTestCase("https://other.example.com:9201");
|
||||
assertEquals("https", info.getScheme());
|
||||
assertEquals("other.example.com", info.getHost());
|
||||
assertEquals(9201, info.getPort());
|
||||
assertEquals(RemoteInfo.DEFAULT_SOCKET_TIMEOUT, info.getSocketTimeout());
|
||||
assertEquals(RemoteInfo.DEFAULT_CONNECT_TIMEOUT, info.getConnectTimeout());
|
||||
}
|
||||
|
||||
public void testReindexFromRemoteRequestParsing() throws IOException {
|
||||
|
|
|
@ -118,7 +118,7 @@ public class RetryTests extends ESSingleNodeTestCase {
|
|||
NodeInfo nodeInfo = client().admin().cluster().prepareNodesInfo().get().getNodes().get(0);
|
||||
TransportAddress address = nodeInfo.getHttp().getAddress().publishAddress();
|
||||
RemoteInfo remote = new RemoteInfo("http", address.getAddress(), address.getPort(), new BytesArray("{\"match_all\":{}}"), null,
|
||||
null, emptyMap());
|
||||
null, emptyMap(), RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
|
||||
ReindexRequestBuilder request = ReindexAction.INSTANCE.newRequestBuilder(client()).source("source").destination("dest")
|
||||
.setRemoteInfo(remote);
|
||||
testCase(ReindexAction.NAME, request, matcher().created(DOC_COUNT));
|
||||
|
|
|
@ -73,7 +73,10 @@ public class RoundTripTests extends ESTestCase {
|
|||
while (headers.size() < headersCount) {
|
||||
headers.put(randomAsciiOfLength(5), randomAsciiOfLength(5));
|
||||
}
|
||||
reindex.setRemoteInfo(new RemoteInfo(randomAsciiOfLength(5), randomAsciiOfLength(5), port, query, username, password, headers));
|
||||
TimeValue socketTimeout = parseTimeValue(randomPositiveTimeValue(), "socketTimeout");
|
||||
TimeValue connectTimeout = parseTimeValue(randomPositiveTimeValue(), "connectTimeout");
|
||||
reindex.setRemoteInfo(new RemoteInfo(randomAsciiOfLength(5), randomAsciiOfLength(5), port, query, username, password, headers,
|
||||
socketTimeout, connectTimeout));
|
||||
}
|
||||
ReindexRequest tripped = new ReindexRequest();
|
||||
roundTrip(reindex, tripped);
|
||||
|
@ -89,7 +92,7 @@ public class RoundTripTests extends ESTestCase {
|
|||
tripped = new ReindexRequest();
|
||||
reindex.setSlices(1);
|
||||
roundTrip(Version.V_5_0_0_rc1, reindex, tripped);
|
||||
assertRequestEquals(reindex, tripped);
|
||||
assertRequestEquals(Version.V_5_0_0_rc1, reindex, tripped);
|
||||
}
|
||||
|
||||
public void testUpdateByQueryRequest() throws IOException {
|
||||
|
@ -154,7 +157,7 @@ public class RoundTripTests extends ESTestCase {
|
|||
request.setScript(random().nextBoolean() ? null : randomScript());
|
||||
}
|
||||
|
||||
private void assertRequestEquals(ReindexRequest request, ReindexRequest tripped) {
|
||||
private void assertRequestEquals(Version version, ReindexRequest request, ReindexRequest tripped) {
|
||||
assertRequestEquals((AbstractBulkIndexByScrollRequest<?>) request, (AbstractBulkIndexByScrollRequest<?>) tripped);
|
||||
assertEquals(request.getDestination().version(), tripped.getDestination().version());
|
||||
assertEquals(request.getDestination().index(), tripped.getDestination().index());
|
||||
|
@ -168,6 +171,13 @@ public class RoundTripTests extends ESTestCase {
|
|||
assertEquals(request.getRemoteInfo().getUsername(), tripped.getRemoteInfo().getUsername());
|
||||
assertEquals(request.getRemoteInfo().getPassword(), tripped.getRemoteInfo().getPassword());
|
||||
assertEquals(request.getRemoteInfo().getHeaders(), tripped.getRemoteInfo().getHeaders());
|
||||
if (version.onOrAfter(Version.V_5_2_0_UNRELEASED)) {
|
||||
assertEquals(request.getRemoteInfo().getSocketTimeout(), tripped.getRemoteInfo().getSocketTimeout());
|
||||
assertEquals(request.getRemoteInfo().getConnectTimeout(), tripped.getRemoteInfo().getConnectTimeout());
|
||||
} else {
|
||||
assertEquals(RemoteInfo.DEFAULT_SOCKET_TIMEOUT, tripped.getRemoteInfo().getSocketTimeout());
|
||||
assertEquals(RemoteInfo.DEFAULT_CONNECT_TIMEOUT, tripped.getRemoteInfo().getConnectTimeout());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -25,14 +25,17 @@ import org.elasticsearch.test.ESTestCase;
|
|||
import static java.util.Collections.emptyMap;
|
||||
|
||||
public class RemoteInfoTests extends ESTestCase {
|
||||
private RemoteInfo newRemoteInfo(String scheme, String username, String password) {
|
||||
return new RemoteInfo(scheme, "testhost", 12344, new BytesArray("testquery"), username, password, emptyMap(),
|
||||
RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
|
||||
}
|
||||
|
||||
public void testToString() {
|
||||
RemoteInfo info = new RemoteInfo("http", "testhost", 12344, new BytesArray("testquery"), null, null, emptyMap());
|
||||
assertEquals("host=testhost port=12344 query=testquery", info.toString());
|
||||
info = new RemoteInfo("http", "testhost", 12344, new BytesArray("testquery"), "testuser", null, emptyMap());
|
||||
assertEquals("host=testhost port=12344 query=testquery username=testuser", info.toString());
|
||||
info = new RemoteInfo("http", "testhost", 12344, new BytesArray("testquery"), "testuser", "testpass", emptyMap());
|
||||
assertEquals("host=testhost port=12344 query=testquery username=testuser password=<<>>", info.toString());
|
||||
info = new RemoteInfo("https", "testhost", 12344, new BytesArray("testquery"), "testuser", "testpass", emptyMap());
|
||||
assertEquals("scheme=https host=testhost port=12344 query=testquery username=testuser password=<<>>", info.toString());
|
||||
assertEquals("host=testhost port=12344 query=testquery", newRemoteInfo("http", null, null).toString());
|
||||
assertEquals("host=testhost port=12344 query=testquery username=testuser", newRemoteInfo("http", "testuser", null).toString());
|
||||
assertEquals("host=testhost port=12344 query=testquery username=testuser password=<<>>",
|
||||
newRemoteInfo("http", "testuser", "testpass").toString());
|
||||
assertEquals("scheme=https host=testhost port=12344 query=testquery username=testuser password=<<>>",
|
||||
newRemoteInfo("https", "testuser", "testpass").toString());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -244,6 +244,34 @@
|
|||
dest:
|
||||
index: dest
|
||||
|
||||
---
|
||||
"broken socket timeout in remote fails":
|
||||
- do:
|
||||
catch: /number_format_exception/
|
||||
reindex:
|
||||
body:
|
||||
source:
|
||||
remote:
|
||||
host: http://okremote:9200
|
||||
socket_timeout: borked
|
||||
index: test
|
||||
dest:
|
||||
index: dest
|
||||
|
||||
---
|
||||
"broken connect timeout in remote fails":
|
||||
- do:
|
||||
catch: /number_format_exception/
|
||||
reindex:
|
||||
body:
|
||||
source:
|
||||
remote:
|
||||
host: http://okremote:9200
|
||||
connect_timeout: borked
|
||||
index: test
|
||||
dest:
|
||||
index: dest
|
||||
|
||||
---
|
||||
"junk in slices fails":
|
||||
- do:
|
||||
|
|
|
@ -205,3 +205,55 @@
|
|||
match:
|
||||
text: test
|
||||
- match: {hits.total: 1}
|
||||
|
||||
---
|
||||
"Reindex from remote with timeouts":
|
||||
# Validates that you can configure the socket_timeout and connect_timeout,
|
||||
# not that they do anything.
|
||||
- do:
|
||||
index:
|
||||
index: source
|
||||
type: foo
|
||||
id: 1
|
||||
body: { "text": "test" }
|
||||
refresh: true
|
||||
|
||||
# Fetch the http host. We use the host of the master because we know there will always be a master.
|
||||
- do:
|
||||
cluster.state: {}
|
||||
- set: { master_node: master }
|
||||
- do:
|
||||
nodes.info:
|
||||
metric: [ http ]
|
||||
- is_true: nodes.$master.http.publish_address
|
||||
- set: {nodes.$master.http.publish_address: host}
|
||||
- do:
|
||||
reindex:
|
||||
refresh: true
|
||||
body:
|
||||
source:
|
||||
remote:
|
||||
host: http://${host}
|
||||
socket_timeout: 1m
|
||||
connect_timeout: 1m
|
||||
index: source
|
||||
dest:
|
||||
index: dest
|
||||
- match: {created: 1}
|
||||
- match: {updated: 0}
|
||||
- match: {version_conflicts: 0}
|
||||
- match: {batches: 1}
|
||||
- match: {failures: []}
|
||||
- match: {throttled_millis: 0}
|
||||
- gte: { took: 0 }
|
||||
- is_false: task
|
||||
- is_false: deleted
|
||||
|
||||
- do:
|
||||
search:
|
||||
index: dest
|
||||
body:
|
||||
query:
|
||||
match:
|
||||
text: test
|
||||
- match: {hits.total: 1}
|
||||
|
|
|
@ -321,11 +321,6 @@ public class ContextAndHeaderTransportIT extends HttpSmokeTestCase {
|
|||
requests.add(new RequestAndHeaders(threadPool.getThreadContext().getHeaders(), request));
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean apply(String action, ActionResponse response, ActionListener listener) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
private static class RequestAndHeaders {
|
||||
|
|
|
@ -272,10 +272,8 @@ public abstract class ESRestTestCase extends ESTestCase {
|
|||
return "http";
|
||||
}
|
||||
|
||||
private static RestClient buildClient(Settings settings) throws IOException {
|
||||
RestClientBuilder builder = RestClient.builder(clusterHosts.toArray(new HttpHost[clusterHosts.size()]))
|
||||
.setMaxRetryTimeoutMillis(30000)
|
||||
.setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder.setSocketTimeout(30000));
|
||||
private RestClient buildClient(Settings settings) throws IOException {
|
||||
RestClientBuilder builder = RestClient.builder(clusterHosts.toArray(new HttpHost[clusterHosts.size()]));
|
||||
String keystorePath = settings.get(TRUSTSTORE_PATH);
|
||||
if (keystorePath != null) {
|
||||
final String keystorePass = settings.get(TRUSTSTORE_PASSWORD);
|
||||
|
|
Loading…
Reference in New Issue