Remove support for maxRetryTimeout from low-level REST client (#38085)
We have had various reports of problems caused by the maxRetryTimeout setting in the low-level REST client. Such setting was initially added in the attempts to not have requests go through retries if the request already took longer than the provided timeout. The implementation was problematic though as such timeout would also expire in the first request attempt (see #31834), would leave the request executing after expiration causing memory leaks (see #33342), and would not take into account the http client internal queuing (see #25951). Given all these issues, it seems that this custom timeout mechanism gives little benefits while causing a lot of harm. We should rather rely on connect and socket timeout exposed by the underlying http client and accept that a request can overall take longer than the configured timeout, which is the case even with a single retry anyways. This commit removes the `maxRetryTimeout` setting and all of its usages.
This commit is contained in:
parent
9492dcea42
commit
a7046e001c
|
@ -39,16 +39,6 @@ public final class ResponseException extends IOException {
|
|||
this.response = response;
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrap a {@linkplain ResponseException} with another one with the current
|
||||
* stack trace. This is used during synchronous calls so that the caller
|
||||
* ends up in the stack trace of the exception thrown.
|
||||
*/
|
||||
ResponseException(ResponseException e) throws IOException {
|
||||
super(e.getMessage(), e);
|
||||
this.response = e.getResponse();
|
||||
}
|
||||
|
||||
static String buildMessage(Response response) throws IOException {
|
||||
String message = String.format(Locale.ROOT,
|
||||
"method [%s], host [%s], URI [%s], status line [%s]",
|
||||
|
|
|
@ -70,11 +70,8 @@ import java.util.Objects;
|
|||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static java.util.Collections.singletonList;
|
||||
|
||||
|
@ -103,7 +100,6 @@ public class RestClient implements Closeable {
|
|||
// We don't rely on default headers supported by HttpAsyncClient as those cannot be replaced.
|
||||
// These are package private for tests.
|
||||
final List<Header> defaultHeaders;
|
||||
private final long maxRetryTimeoutMillis;
|
||||
private final String pathPrefix;
|
||||
private final AtomicInteger lastNodeIndex = new AtomicInteger(0);
|
||||
private final ConcurrentMap<HttpHost, DeadHostState> blacklist = new ConcurrentHashMap<>();
|
||||
|
@ -112,10 +108,9 @@ public class RestClient implements Closeable {
|
|||
private volatile NodeTuple<List<Node>> nodeTuple;
|
||||
private final WarningsHandler warningsHandler;
|
||||
|
||||
RestClient(CloseableHttpAsyncClient client, long maxRetryTimeoutMillis, Header[] defaultHeaders, List<Node> nodes, String pathPrefix,
|
||||
RestClient(CloseableHttpAsyncClient client, Header[] defaultHeaders, List<Node> nodes, String pathPrefix,
|
||||
FailureListener failureListener, NodeSelector nodeSelector, boolean strictDeprecationMode) {
|
||||
this.client = client;
|
||||
this.maxRetryTimeoutMillis = maxRetryTimeoutMillis;
|
||||
this.defaultHeaders = Collections.unmodifiableList(Arrays.asList(defaultHeaders));
|
||||
this.failureListener = failureListener;
|
||||
this.pathPrefix = pathPrefix;
|
||||
|
@ -213,9 +208,64 @@ public class RestClient implements Closeable {
|
|||
* @throws ResponseException in case Elasticsearch responded with a status code that indicated an error
|
||||
*/
|
||||
public Response performRequest(Request request) throws IOException {
|
||||
SyncResponseListener listener = new SyncResponseListener(maxRetryTimeoutMillis);
|
||||
performRequestAsyncNoCatch(request, listener);
|
||||
return listener.get();
|
||||
InternalRequest internalRequest = new InternalRequest(request);
|
||||
return performRequest(nextNodes(), internalRequest, null);
|
||||
}
|
||||
|
||||
private Response performRequest(final NodeTuple<Iterator<Node>> nodeTuple,
|
||||
final InternalRequest request,
|
||||
Exception previousException) throws IOException {
|
||||
RequestContext context = request.createContextForNextAttempt(nodeTuple.nodes.next(), nodeTuple.authCache);
|
||||
HttpResponse httpResponse;
|
||||
try {
|
||||
httpResponse = client.execute(context.requestProducer, context.asyncResponseConsumer, context.context, null).get();
|
||||
} catch(Exception e) {
|
||||
RequestLogger.logFailedRequest(logger, request.httpRequest, context.node, e);
|
||||
onFailure(context.node);
|
||||
Exception cause = extractAndWrapCause(e);
|
||||
addSuppressedException(previousException, cause);
|
||||
if (nodeTuple.nodes.hasNext()) {
|
||||
return performRequest(nodeTuple, request, cause);
|
||||
}
|
||||
if (cause instanceof IOException) {
|
||||
throw (IOException) cause;
|
||||
}
|
||||
if (cause instanceof RuntimeException) {
|
||||
throw (RuntimeException) cause;
|
||||
}
|
||||
throw new IllegalStateException("unexpected exception type: must be either RuntimeException or IOException", cause);
|
||||
}
|
||||
ResponseOrResponseException responseOrResponseException = convertResponse(request, context.node, httpResponse);
|
||||
if (responseOrResponseException.responseException == null) {
|
||||
return responseOrResponseException.response;
|
||||
}
|
||||
addSuppressedException(previousException, responseOrResponseException.responseException);
|
||||
if (nodeTuple.nodes.hasNext()) {
|
||||
return performRequest(nodeTuple, request, responseOrResponseException.responseException);
|
||||
}
|
||||
throw responseOrResponseException.responseException;
|
||||
}
|
||||
|
||||
private ResponseOrResponseException convertResponse(InternalRequest request, Node node, HttpResponse httpResponse) throws IOException {
|
||||
RequestLogger.logResponse(logger, request.httpRequest, node.getHost(), httpResponse);
|
||||
int statusCode = httpResponse.getStatusLine().getStatusCode();
|
||||
Response response = new Response(request.httpRequest.getRequestLine(), node.getHost(), httpResponse);
|
||||
if (isSuccessfulResponse(statusCode) || request.ignoreErrorCodes.contains(response.getStatusLine().getStatusCode())) {
|
||||
onResponse(node);
|
||||
if (request.warningsHandler.warningsShouldFailRequest(response.getWarnings())) {
|
||||
throw new WarningFailureException(response);
|
||||
}
|
||||
return new ResponseOrResponseException(response);
|
||||
}
|
||||
ResponseException responseException = new ResponseException(response);
|
||||
if (isRetryStatus(statusCode)) {
|
||||
//mark host dead and retry against next one
|
||||
onFailure(node);
|
||||
return new ResponseOrResponseException(responseException);
|
||||
}
|
||||
//mark host alive and don't retry, as the error should be a request problem
|
||||
onResponse(node);
|
||||
throw responseException;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -236,85 +286,31 @@ public class RestClient implements Closeable {
|
|||
*/
|
||||
public void performRequestAsync(Request request, ResponseListener responseListener) {
|
||||
try {
|
||||
performRequestAsyncNoCatch(request, responseListener);
|
||||
FailureTrackingResponseListener failureTrackingResponseListener = new FailureTrackingResponseListener(responseListener);
|
||||
InternalRequest internalRequest = new InternalRequest(request);
|
||||
performRequestAsync(nextNodes(), internalRequest, failureTrackingResponseListener);
|
||||
} catch (Exception e) {
|
||||
responseListener.onFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
void performRequestAsyncNoCatch(Request request, ResponseListener listener) throws IOException {
|
||||
Map<String, String> requestParams = new HashMap<>(request.getParameters());
|
||||
//ignore is a special parameter supported by the clients, shouldn't be sent to es
|
||||
String ignoreString = requestParams.remove("ignore");
|
||||
Set<Integer> ignoreErrorCodes;
|
||||
if (ignoreString == null) {
|
||||
if (HttpHead.METHOD_NAME.equals(request.getMethod())) {
|
||||
//404 never causes error if returned for a HEAD request
|
||||
ignoreErrorCodes = Collections.singleton(404);
|
||||
} else {
|
||||
ignoreErrorCodes = Collections.emptySet();
|
||||
}
|
||||
} else {
|
||||
String[] ignoresArray = ignoreString.split(",");
|
||||
ignoreErrorCodes = new HashSet<>();
|
||||
if (HttpHead.METHOD_NAME.equals(request.getMethod())) {
|
||||
//404 never causes error if returned for a HEAD request
|
||||
ignoreErrorCodes.add(404);
|
||||
}
|
||||
for (String ignoreCode : ignoresArray) {
|
||||
try {
|
||||
ignoreErrorCodes.add(Integer.valueOf(ignoreCode));
|
||||
} catch (NumberFormatException e) {
|
||||
throw new IllegalArgumentException("ignore value should be a number, found [" + ignoreString + "] instead", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
URI uri = buildUri(pathPrefix, request.getEndpoint(), requestParams);
|
||||
HttpRequestBase httpRequest = createHttpRequest(request.getMethod(), uri, request.getEntity());
|
||||
setHeaders(httpRequest, request.getOptions().getHeaders());
|
||||
FailureTrackingResponseListener failureTrackingResponseListener = new FailureTrackingResponseListener(listener);
|
||||
long startTime = System.nanoTime();
|
||||
performRequestAsync(startTime, nextNode(), httpRequest, ignoreErrorCodes,
|
||||
request.getOptions().getWarningsHandler() == null ? warningsHandler : request.getOptions().getWarningsHandler(),
|
||||
request.getOptions().getHttpAsyncResponseConsumerFactory(), failureTrackingResponseListener);
|
||||
}
|
||||
|
||||
private void performRequestAsync(final long startTime, final NodeTuple<Iterator<Node>> nodeTuple, final HttpRequestBase request,
|
||||
final Set<Integer> ignoreErrorCodes,
|
||||
final WarningsHandler thisWarningsHandler,
|
||||
final HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory,
|
||||
private void performRequestAsync(final NodeTuple<Iterator<Node>> nodeTuple,
|
||||
final InternalRequest request,
|
||||
final FailureTrackingResponseListener listener) {
|
||||
final Node node = nodeTuple.nodes.next();
|
||||
//we stream the request body if the entity allows for it
|
||||
final HttpAsyncRequestProducer requestProducer = HttpAsyncMethods.create(node.getHost(), request);
|
||||
final HttpAsyncResponseConsumer<HttpResponse> asyncResponseConsumer =
|
||||
httpAsyncResponseConsumerFactory.createHttpAsyncResponseConsumer();
|
||||
final HttpClientContext context = HttpClientContext.create();
|
||||
context.setAuthCache(nodeTuple.authCache);
|
||||
client.execute(requestProducer, asyncResponseConsumer, context, new FutureCallback<HttpResponse>() {
|
||||
final RequestContext context = request.createContextForNextAttempt(nodeTuple.nodes.next(), nodeTuple.authCache);
|
||||
client.execute(context.requestProducer, context.asyncResponseConsumer, context.context, new FutureCallback<HttpResponse>() {
|
||||
@Override
|
||||
public void completed(HttpResponse httpResponse) {
|
||||
try {
|
||||
RequestLogger.logResponse(logger, request, node.getHost(), httpResponse);
|
||||
int statusCode = httpResponse.getStatusLine().getStatusCode();
|
||||
Response response = new Response(request.getRequestLine(), node.getHost(), httpResponse);
|
||||
if (isSuccessfulResponse(statusCode) || ignoreErrorCodes.contains(response.getStatusLine().getStatusCode())) {
|
||||
onResponse(node);
|
||||
if (thisWarningsHandler.warningsShouldFailRequest(response.getWarnings())) {
|
||||
listener.onDefinitiveFailure(new WarningFailureException(response));
|
||||
ResponseOrResponseException responseOrResponseException = convertResponse(request, context.node, httpResponse);
|
||||
if (responseOrResponseException.responseException == null) {
|
||||
listener.onSuccess(responseOrResponseException.response);
|
||||
} else {
|
||||
listener.onSuccess(response);
|
||||
}
|
||||
if (nodeTuple.nodes.hasNext()) {
|
||||
listener.trackFailure(responseOrResponseException.responseException);
|
||||
performRequestAsync(nodeTuple, request, listener);
|
||||
} else {
|
||||
ResponseException responseException = new ResponseException(response);
|
||||
if (isRetryStatus(statusCode)) {
|
||||
//mark host dead and retry against next one
|
||||
onFailure(node);
|
||||
retryIfPossible(responseException);
|
||||
} else {
|
||||
//mark host alive and don't retry, as the error should be a request problem
|
||||
onResponse(node);
|
||||
listener.onDefinitiveFailure(responseException);
|
||||
listener.onDefinitiveFailure(responseOrResponseException.responseException);
|
||||
}
|
||||
}
|
||||
} catch(Exception e) {
|
||||
|
@ -325,34 +321,19 @@ public class RestClient implements Closeable {
|
|||
@Override
|
||||
public void failed(Exception failure) {
|
||||
try {
|
||||
RequestLogger.logFailedRequest(logger, request, node, failure);
|
||||
onFailure(node);
|
||||
retryIfPossible(failure);
|
||||
RequestLogger.logFailedRequest(logger, request.httpRequest, context.node, failure);
|
||||
onFailure(context.node);
|
||||
if (nodeTuple.nodes.hasNext()) {
|
||||
listener.trackFailure(failure);
|
||||
performRequestAsync(nodeTuple, request, listener);
|
||||
} else {
|
||||
listener.onDefinitiveFailure(failure);
|
||||
}
|
||||
} catch(Exception e) {
|
||||
listener.onDefinitiveFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
private void retryIfPossible(Exception exception) {
|
||||
if (nodeTuple.nodes.hasNext()) {
|
||||
//in case we are retrying, check whether maxRetryTimeout has been reached
|
||||
long timeElapsedMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
|
||||
long timeout = maxRetryTimeoutMillis - timeElapsedMillis;
|
||||
if (timeout <= 0) {
|
||||
IOException retryTimeoutException = new IOException(
|
||||
"request retries exceeded max retry timeout [" + maxRetryTimeoutMillis + "]", exception);
|
||||
listener.onDefinitiveFailure(retryTimeoutException);
|
||||
} else {
|
||||
listener.trackFailure(exception);
|
||||
request.reset();
|
||||
performRequestAsync(startTime, nodeTuple, request, ignoreErrorCodes,
|
||||
thisWarningsHandler, httpAsyncResponseConsumerFactory, listener);
|
||||
}
|
||||
} else {
|
||||
listener.onDefinitiveFailure(exception);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancelled() {
|
||||
listener.onDefinitiveFailure(new ExecutionException("request was cancelled", null));
|
||||
|
@ -360,20 +341,6 @@ public class RestClient implements Closeable {
|
|||
});
|
||||
}
|
||||
|
||||
private void setHeaders(HttpRequest httpRequest, Collection<Header> requestHeaders) {
|
||||
// request headers override default headers, so we don't add default headers if they exist as request headers
|
||||
final Set<String> requestNames = new HashSet<>(requestHeaders.size());
|
||||
for (Header requestHeader : requestHeaders) {
|
||||
httpRequest.addHeader(requestHeader);
|
||||
requestNames.add(requestHeader.getName());
|
||||
}
|
||||
for (Header defaultHeader : defaultHeaders) {
|
||||
if (requestNames.contains(defaultHeader.getName()) == false) {
|
||||
httpRequest.addHeader(defaultHeader);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a non-empty {@link Iterator} of nodes to be used for a request
|
||||
* that match the {@link NodeSelector}.
|
||||
|
@ -383,7 +350,7 @@ public class RestClient implements Closeable {
|
|||
* that is closest to being revived.
|
||||
* @throws IOException if no nodes are available
|
||||
*/
|
||||
private NodeTuple<Iterator<Node>> nextNode() throws IOException {
|
||||
private NodeTuple<Iterator<Node>> nextNodes() throws IOException {
|
||||
NodeTuple<List<Node>> nodeTuple = this.nodeTuple;
|
||||
Iterable<Node> hosts = selectNodes(nodeTuple, blacklist, lastNodeIndex, nodeSelector);
|
||||
return new NodeTuple<>(hosts.iterator(), nodeTuple.authCache);
|
||||
|
@ -517,11 +484,10 @@ public class RestClient implements Closeable {
|
|||
return false;
|
||||
}
|
||||
|
||||
private static Exception addSuppressedException(Exception suppressedException, Exception currentException) {
|
||||
private static void addSuppressedException(Exception suppressedException, Exception currentException) {
|
||||
if (suppressedException != null) {
|
||||
currentException.addSuppressed(suppressedException);
|
||||
}
|
||||
return currentException;
|
||||
}
|
||||
|
||||
private static HttpRequestBase createHttpRequest(String method, URI uri, HttpEntity entity) {
|
||||
|
@ -618,118 +584,8 @@ public class RestClient implements Closeable {
|
|||
* Tracks an exception, which caused a retry hence we should not return yet to the caller
|
||||
*/
|
||||
void trackFailure(Exception exception) {
|
||||
this.exception = addSuppressedException(this.exception, exception);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Listener used in any sync performRequest calls, it waits for a response or an exception back up to a timeout
|
||||
*/
|
||||
static class SyncResponseListener implements ResponseListener {
|
||||
private final CountDownLatch latch = new CountDownLatch(1);
|
||||
private final AtomicReference<Response> response = new AtomicReference<>();
|
||||
private final AtomicReference<Exception> exception = new AtomicReference<>();
|
||||
|
||||
private final long timeout;
|
||||
|
||||
SyncResponseListener(long timeout) {
|
||||
assert timeout > 0;
|
||||
this.timeout = timeout;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onSuccess(Response response) {
|
||||
Objects.requireNonNull(response, "response must not be null");
|
||||
boolean wasResponseNull = this.response.compareAndSet(null, response);
|
||||
if (wasResponseNull == false) {
|
||||
throw new IllegalStateException("response is already set");
|
||||
}
|
||||
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception exception) {
|
||||
Objects.requireNonNull(exception, "exception must not be null");
|
||||
boolean wasExceptionNull = this.exception.compareAndSet(null, exception);
|
||||
if (wasExceptionNull == false) {
|
||||
throw new IllegalStateException("exception is already set");
|
||||
}
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
/**
|
||||
* Waits (up to a timeout) for some result of the request: either a response, or an exception.
|
||||
*/
|
||||
Response get() throws IOException {
|
||||
try {
|
||||
//providing timeout is just a safety measure to prevent everlasting waits
|
||||
//the different client timeouts should already do their jobs
|
||||
if (latch.await(timeout, TimeUnit.MILLISECONDS) == false) {
|
||||
throw new IOException("listener timeout after waiting for [" + timeout + "] ms");
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException("thread waiting for the response was interrupted", e);
|
||||
}
|
||||
|
||||
Exception exception = this.exception.get();
|
||||
Response response = this.response.get();
|
||||
if (exception != null) {
|
||||
if (response != null) {
|
||||
IllegalStateException e = new IllegalStateException("response and exception are unexpectedly set at the same time");
|
||||
e.addSuppressed(exception);
|
||||
throw e;
|
||||
}
|
||||
/*
|
||||
* Wrap and rethrow whatever exception we received, copying the type
|
||||
* where possible so the synchronous API looks as much as possible
|
||||
* like the asynchronous API. We wrap the exception so that the caller's
|
||||
* signature shows up in any exception we throw.
|
||||
*/
|
||||
if (exception instanceof WarningFailureException) {
|
||||
throw new WarningFailureException((WarningFailureException) exception);
|
||||
}
|
||||
if (exception instanceof ResponseException) {
|
||||
throw new ResponseException((ResponseException) exception);
|
||||
}
|
||||
if (exception instanceof ConnectTimeoutException) {
|
||||
ConnectTimeoutException e = new ConnectTimeoutException(exception.getMessage());
|
||||
e.initCause(exception);
|
||||
throw e;
|
||||
}
|
||||
if (exception instanceof SocketTimeoutException) {
|
||||
SocketTimeoutException e = new SocketTimeoutException(exception.getMessage());
|
||||
e.initCause(exception);
|
||||
throw e;
|
||||
}
|
||||
if (exception instanceof ConnectionClosedException) {
|
||||
ConnectionClosedException e = new ConnectionClosedException(exception.getMessage());
|
||||
e.initCause(exception);
|
||||
throw e;
|
||||
}
|
||||
if (exception instanceof SSLHandshakeException) {
|
||||
SSLHandshakeException e = new SSLHandshakeException(exception.getMessage());
|
||||
e.initCause(exception);
|
||||
throw e;
|
||||
}
|
||||
if (exception instanceof ConnectException) {
|
||||
ConnectException e = new ConnectException(exception.getMessage());
|
||||
e.initCause(exception);
|
||||
throw e;
|
||||
}
|
||||
if (exception instanceof IOException) {
|
||||
throw new IOException(exception.getMessage(), exception);
|
||||
}
|
||||
if (exception instanceof RuntimeException){
|
||||
throw new RuntimeException(exception.getMessage(), exception);
|
||||
}
|
||||
throw new RuntimeException("error while performing request", exception);
|
||||
}
|
||||
|
||||
if (response == null) {
|
||||
throw new IllegalStateException("response not set and no exception caught either");
|
||||
}
|
||||
return response;
|
||||
addSuppressedException(this.exception, exception);
|
||||
this.exception = exception;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -808,4 +664,153 @@ public class RestClient implements Closeable {
|
|||
itr.remove();
|
||||
}
|
||||
}
|
||||
|
||||
private class InternalRequest {
|
||||
private final Request request;
|
||||
private final Map<String, String> params;
|
||||
private final Set<Integer> ignoreErrorCodes;
|
||||
private final HttpRequestBase httpRequest;
|
||||
private final WarningsHandler warningsHandler;
|
||||
|
||||
InternalRequest(Request request) {
|
||||
this.request = request;
|
||||
this.params = new HashMap<>(request.getParameters());
|
||||
//ignore is a special parameter supported by the clients, shouldn't be sent to es
|
||||
String ignoreString = params.remove("ignore");
|
||||
this.ignoreErrorCodes = getIgnoreErrorCodes(ignoreString, request.getMethod());
|
||||
URI uri = buildUri(pathPrefix, request.getEndpoint(), params);
|
||||
this.httpRequest = createHttpRequest(request.getMethod(), uri, request.getEntity());
|
||||
setHeaders(httpRequest, request.getOptions().getHeaders());
|
||||
this.warningsHandler = request.getOptions().getWarningsHandler() == null ?
|
||||
RestClient.this.warningsHandler : request.getOptions().getWarningsHandler();
|
||||
}
|
||||
|
||||
private void setHeaders(HttpRequest httpRequest, Collection<Header> requestHeaders) {
|
||||
// request headers override default headers, so we don't add default headers if they exist as request headers
|
||||
final Set<String> requestNames = new HashSet<>(requestHeaders.size());
|
||||
for (Header requestHeader : requestHeaders) {
|
||||
httpRequest.addHeader(requestHeader);
|
||||
requestNames.add(requestHeader.getName());
|
||||
}
|
||||
for (Header defaultHeader : defaultHeaders) {
|
||||
if (requestNames.contains(defaultHeader.getName()) == false) {
|
||||
httpRequest.addHeader(defaultHeader);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
RequestContext createContextForNextAttempt(Node node, AuthCache authCache) {
|
||||
this.httpRequest.reset();
|
||||
return new RequestContext(this, node, authCache);
|
||||
}
|
||||
}
|
||||
|
||||
private static class RequestContext {
|
||||
private final Node node;
|
||||
private final HttpAsyncRequestProducer requestProducer;
|
||||
private final HttpAsyncResponseConsumer<HttpResponse> asyncResponseConsumer;
|
||||
private final HttpClientContext context;
|
||||
|
||||
RequestContext(InternalRequest request, Node node, AuthCache authCache) {
|
||||
this.node = node;
|
||||
//we stream the request body if the entity allows for it
|
||||
this.requestProducer = HttpAsyncMethods.create(node.getHost(), request.httpRequest);
|
||||
this.asyncResponseConsumer =
|
||||
request.request.getOptions().getHttpAsyncResponseConsumerFactory().createHttpAsyncResponseConsumer();
|
||||
this.context = HttpClientContext.create();
|
||||
context.setAuthCache(authCache);
|
||||
}
|
||||
}
|
||||
|
||||
private static Set<Integer> getIgnoreErrorCodes(String ignoreString, String requestMethod) {
|
||||
Set<Integer> ignoreErrorCodes;
|
||||
if (ignoreString == null) {
|
||||
if (HttpHead.METHOD_NAME.equals(requestMethod)) {
|
||||
//404 never causes error if returned for a HEAD request
|
||||
ignoreErrorCodes = Collections.singleton(404);
|
||||
} else {
|
||||
ignoreErrorCodes = Collections.emptySet();
|
||||
}
|
||||
} else {
|
||||
String[] ignoresArray = ignoreString.split(",");
|
||||
ignoreErrorCodes = new HashSet<>();
|
||||
if (HttpHead.METHOD_NAME.equals(requestMethod)) {
|
||||
//404 never causes error if returned for a HEAD request
|
||||
ignoreErrorCodes.add(404);
|
||||
}
|
||||
for (String ignoreCode : ignoresArray) {
|
||||
try {
|
||||
ignoreErrorCodes.add(Integer.valueOf(ignoreCode));
|
||||
} catch (NumberFormatException e) {
|
||||
throw new IllegalArgumentException("ignore value should be a number, found [" + ignoreString + "] instead", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
return ignoreErrorCodes;
|
||||
}
|
||||
|
||||
private static class ResponseOrResponseException {
|
||||
private final Response response;
|
||||
private final ResponseException responseException;
|
||||
|
||||
ResponseOrResponseException(Response response) {
|
||||
this.response = Objects.requireNonNull(response);
|
||||
this.responseException = null;
|
||||
}
|
||||
|
||||
ResponseOrResponseException(ResponseException responseException) {
|
||||
this.responseException = Objects.requireNonNull(responseException);
|
||||
this.response = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrap the exception so the caller's signature shows up in the stack trace, taking care to copy the original type and message
|
||||
* where possible so async and sync code don't have to check different exceptions.
|
||||
*/
|
||||
private static Exception extractAndWrapCause(Exception exception) {
|
||||
if (exception instanceof InterruptedException) {
|
||||
throw new RuntimeException("thread waiting for the response was interrupted", exception);
|
||||
}
|
||||
if (exception instanceof ExecutionException) {
|
||||
ExecutionException executionException = (ExecutionException)exception;
|
||||
Throwable t = executionException.getCause() == null ? executionException : executionException.getCause();
|
||||
if (t instanceof Error) {
|
||||
throw (Error)t;
|
||||
}
|
||||
exception = (Exception)t;
|
||||
}
|
||||
if (exception instanceof ConnectTimeoutException) {
|
||||
ConnectTimeoutException e = new ConnectTimeoutException(exception.getMessage());
|
||||
e.initCause(exception);
|
||||
return e;
|
||||
}
|
||||
if (exception instanceof SocketTimeoutException) {
|
||||
SocketTimeoutException e = new SocketTimeoutException(exception.getMessage());
|
||||
e.initCause(exception);
|
||||
return e;
|
||||
}
|
||||
if (exception instanceof ConnectionClosedException) {
|
||||
ConnectionClosedException e = new ConnectionClosedException(exception.getMessage());
|
||||
e.initCause(exception);
|
||||
return e;
|
||||
}
|
||||
if (exception instanceof SSLHandshakeException) {
|
||||
SSLHandshakeException e = new SSLHandshakeException(exception.getMessage());
|
||||
e.initCause(exception);
|
||||
return e;
|
||||
}
|
||||
if (exception instanceof ConnectException) {
|
||||
ConnectException e = new ConnectException(exception.getMessage());
|
||||
e.initCause(exception);
|
||||
return e;
|
||||
}
|
||||
if (exception instanceof IOException) {
|
||||
return new IOException(exception.getMessage(), exception);
|
||||
}
|
||||
if (exception instanceof RuntimeException){
|
||||
return new RuntimeException(exception.getMessage(), exception);
|
||||
}
|
||||
return new RuntimeException("error while performing request", exception);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -42,14 +42,12 @@ import java.util.Objects;
|
|||
public final class RestClientBuilder {
|
||||
public static final int DEFAULT_CONNECT_TIMEOUT_MILLIS = 1000;
|
||||
public static final int DEFAULT_SOCKET_TIMEOUT_MILLIS = 30000;
|
||||
public static final int DEFAULT_MAX_RETRY_TIMEOUT_MILLIS = DEFAULT_SOCKET_TIMEOUT_MILLIS;
|
||||
public static final int DEFAULT_MAX_CONN_PER_ROUTE = 10;
|
||||
public static final int DEFAULT_MAX_CONN_TOTAL = 30;
|
||||
|
||||
private static final Header[] EMPTY_HEADERS = new Header[0];
|
||||
|
||||
private final List<Node> nodes;
|
||||
private int maxRetryTimeout = DEFAULT_MAX_RETRY_TIMEOUT_MILLIS;
|
||||
private Header[] defaultHeaders = EMPTY_HEADERS;
|
||||
private RestClient.FailureListener failureListener;
|
||||
private HttpClientConfigCallback httpClientConfigCallback;
|
||||
|
@ -102,20 +100,6 @@ public final class RestClientBuilder {
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the maximum timeout (in milliseconds) to honour in case of multiple retries of the same request.
|
||||
* {@link #DEFAULT_MAX_RETRY_TIMEOUT_MILLIS} if not specified.
|
||||
*
|
||||
* @throws IllegalArgumentException if {@code maxRetryTimeoutMillis} is not greater than 0
|
||||
*/
|
||||
public RestClientBuilder setMaxRetryTimeoutMillis(int maxRetryTimeoutMillis) {
|
||||
if (maxRetryTimeoutMillis <= 0) {
|
||||
throw new IllegalArgumentException("maxRetryTimeoutMillis must be greater than 0");
|
||||
}
|
||||
this.maxRetryTimeout = maxRetryTimeoutMillis;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the {@link HttpClientConfigCallback} to be used to customize http client configuration
|
||||
*
|
||||
|
@ -208,7 +192,7 @@ public final class RestClientBuilder {
|
|||
return createHttpClient();
|
||||
}
|
||||
});
|
||||
RestClient restClient = new RestClient(httpClient, maxRetryTimeout, defaultHeaders, nodes,
|
||||
RestClient restClient = new RestClient(httpClient, defaultHeaders, nodes,
|
||||
pathPrefix, failureListener, nodeSelector, strictDeprecationMode);
|
||||
httpClient.start();
|
||||
return restClient;
|
||||
|
|
|
@ -82,14 +82,6 @@ public class RestClientBuilderTests extends RestClientTestCase {
|
|||
assertNotNull(restClient);
|
||||
}
|
||||
|
||||
try {
|
||||
RestClient.builder(new HttpHost("localhost", 9200))
|
||||
.setMaxRetryTimeoutMillis(randomIntBetween(Integer.MIN_VALUE, 0));
|
||||
fail("should have failed");
|
||||
} catch(IllegalArgumentException e) {
|
||||
assertEquals("maxRetryTimeoutMillis must be greater than 0", e.getMessage());
|
||||
}
|
||||
|
||||
try {
|
||||
RestClient.builder(new HttpHost("localhost", 9200)).setDefaultHeaders(null);
|
||||
fail("should have failed");
|
||||
|
@ -156,12 +148,9 @@ public class RestClientBuilderTests extends RestClientTestCase {
|
|||
builder.setDefaultHeaders(headers);
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
builder.setMaxRetryTimeoutMillis(randomIntBetween(1, Integer.MAX_VALUE));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
String pathPrefix = (randomBoolean() ? "/" : "") + randomAsciiOfLengthBetween(2, 5);
|
||||
String pathPrefix = (randomBoolean() ? "/" : "") + randomAsciiLettersOfLengthBetween(2, 5);
|
||||
while (pathPrefix.length() < 20 && randomBoolean()) {
|
||||
pathPrefix += "/" + randomAsciiOfLengthBetween(3, 6);
|
||||
pathPrefix += "/" + randomAsciiLettersOfLengthBetween(3, 6);
|
||||
}
|
||||
builder.setPathPrefix(pathPrefix + (randomBoolean() ? "/" : ""));
|
||||
}
|
||||
|
|
|
@ -199,7 +199,7 @@ public class RestClientMultipleHostsIntegTests extends RestClientTestCase {
|
|||
* Test host selector against a real server <strong>and</strong>
|
||||
* test what happens after calling
|
||||
*/
|
||||
public void testNodeSelector() throws IOException {
|
||||
public void testNodeSelector() throws Exception {
|
||||
try (RestClient restClient = buildRestClient(firstPositionNodeSelector())) {
|
||||
Request request = new Request("GET", "/200");
|
||||
int rounds = between(1, 10);
|
||||
|
@ -210,7 +210,7 @@ public class RestClientMultipleHostsIntegTests extends RestClientTestCase {
|
|||
*/
|
||||
if (stoppedFirstHost) {
|
||||
try {
|
||||
restClient.performRequest(request);
|
||||
RestClientSingleHostTests.performRequestSyncOrAsync(restClient, request);
|
||||
fail("expected to fail to connect");
|
||||
} catch (ConnectException e) {
|
||||
// Windows isn't consistent here. Sometimes the message is even null!
|
||||
|
@ -219,7 +219,7 @@ public class RestClientMultipleHostsIntegTests extends RestClientTestCase {
|
|||
}
|
||||
}
|
||||
} else {
|
||||
Response response = restClient.performRequest(request);
|
||||
Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, request);
|
||||
assertEquals(httpHosts[0], response.getHost());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,25 +22,10 @@ package org.elasticsearch.client;
|
|||
import com.carrotsearch.randomizedtesting.generators.RandomNumbers;
|
||||
import org.apache.http.Header;
|
||||
import org.apache.http.HttpHost;
|
||||
import org.apache.http.HttpResponse;
|
||||
import org.apache.http.ProtocolVersion;
|
||||
import org.apache.http.StatusLine;
|
||||
import org.apache.http.client.methods.HttpUriRequest;
|
||||
import org.apache.http.client.protocol.HttpClientContext;
|
||||
import org.apache.http.concurrent.FutureCallback;
|
||||
import org.apache.http.conn.ConnectTimeoutException;
|
||||
import org.apache.http.impl.auth.BasicScheme;
|
||||
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
|
||||
import org.apache.http.message.BasicHttpResponse;
|
||||
import org.apache.http.message.BasicStatusLine;
|
||||
import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
|
||||
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
|
||||
import org.junit.After;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
|
@ -49,7 +34,6 @@ import java.util.List;
|
|||
import java.util.Set;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import static org.elasticsearch.client.RestClientTestUtil.randomErrorNoRetryStatusCode;
|
||||
import static org.elasticsearch.client.RestClientTestUtil.randomErrorRetryStatusCode;
|
||||
|
@ -61,9 +45,6 @@ import static org.junit.Assert.assertEquals;
|
|||
import static org.junit.Assert.assertThat;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
/**
|
||||
* Tests for {@link RestClient} behaviour against multiple hosts: fail-over, blacklisting etc.
|
||||
|
@ -75,39 +56,8 @@ public class RestClientMultipleHostsTests extends RestClientTestCase {
|
|||
private List<Node> nodes;
|
||||
private HostsTrackingFailureListener failureListener;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public RestClient createRestClient(NodeSelector nodeSelector) {
|
||||
CloseableHttpAsyncClient httpClient = mock(CloseableHttpAsyncClient.class);
|
||||
when(httpClient.<HttpResponse>execute(any(HttpAsyncRequestProducer.class), any(HttpAsyncResponseConsumer.class),
|
||||
any(HttpClientContext.class), any(FutureCallback.class))).thenAnswer(new Answer<Future<HttpResponse>>() {
|
||||
@Override
|
||||
public Future<HttpResponse> answer(InvocationOnMock invocationOnMock) throws Throwable {
|
||||
HttpAsyncRequestProducer requestProducer = (HttpAsyncRequestProducer) invocationOnMock.getArguments()[0];
|
||||
final HttpUriRequest request = (HttpUriRequest)requestProducer.generateRequest();
|
||||
final HttpHost httpHost = requestProducer.getTarget();
|
||||
HttpClientContext context = (HttpClientContext) invocationOnMock.getArguments()[2];
|
||||
assertThat(context.getAuthCache().get(httpHost), instanceOf(BasicScheme.class));
|
||||
final FutureCallback<HttpResponse> futureCallback = (FutureCallback<HttpResponse>) invocationOnMock.getArguments()[3];
|
||||
//return the desired status code or exception depending on the path
|
||||
exec.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
if (request.getURI().getPath().equals("/soe")) {
|
||||
futureCallback.failed(new SocketTimeoutException(httpHost.toString()));
|
||||
} else if (request.getURI().getPath().equals("/coe")) {
|
||||
futureCallback.failed(new ConnectTimeoutException(httpHost.toString()));
|
||||
} else if (request.getURI().getPath().equals("/ioe")) {
|
||||
futureCallback.failed(new IOException(httpHost.toString()));
|
||||
} else {
|
||||
int statusCode = Integer.parseInt(request.getURI().getPath().substring(1));
|
||||
StatusLine statusLine = new BasicStatusLine(new ProtocolVersion("http", 1, 1), statusCode, "");
|
||||
futureCallback.completed(new BasicHttpResponse(statusLine));
|
||||
}
|
||||
}
|
||||
});
|
||||
return null;
|
||||
}
|
||||
});
|
||||
CloseableHttpAsyncClient httpClient = RestClientSingleHostTests.mockHttpClient(exec);
|
||||
int numNodes = RandomNumbers.randomIntBetween(getRandom(), 2, 5);
|
||||
nodes = new ArrayList<>(numNodes);
|
||||
for (int i = 0; i < numNodes; i++) {
|
||||
|
@ -115,7 +65,7 @@ public class RestClientMultipleHostsTests extends RestClientTestCase {
|
|||
}
|
||||
nodes = Collections.unmodifiableList(nodes);
|
||||
failureListener = new HostsTrackingFailureListener();
|
||||
return new RestClient(httpClient, 10000, new Header[0], nodes, null, failureListener, nodeSelector, false);
|
||||
return new RestClient(httpClient, new Header[0], nodes, null, failureListener, nodeSelector, false);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -126,14 +76,15 @@ public class RestClientMultipleHostsTests extends RestClientTestCase {
|
|||
exec.shutdown();
|
||||
}
|
||||
|
||||
public void testRoundRobinOkStatusCodes() throws IOException {
|
||||
public void testRoundRobinOkStatusCodes() throws Exception {
|
||||
RestClient restClient = createRestClient(NodeSelector.ANY);
|
||||
int numIters = RandomNumbers.randomIntBetween(getRandom(), 1, 5);
|
||||
for (int i = 0; i < numIters; i++) {
|
||||
Set<HttpHost> hostsSet = hostsSet();
|
||||
for (int j = 0; j < nodes.size(); j++) {
|
||||
int statusCode = randomOkStatusCode(getRandom());
|
||||
Response response = restClient.performRequest(new Request(randomHttpMethod(getRandom()), "/" + statusCode));
|
||||
Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient,
|
||||
new Request(randomHttpMethod(getRandom()), "/" + statusCode));
|
||||
assertEquals(statusCode, response.getStatusLine().getStatusCode());
|
||||
assertTrue("host not found: " + response.getHost(), hostsSet.remove(response.getHost()));
|
||||
}
|
||||
|
@ -142,7 +93,7 @@ public class RestClientMultipleHostsTests extends RestClientTestCase {
|
|||
failureListener.assertNotCalled();
|
||||
}
|
||||
|
||||
public void testRoundRobinNoRetryErrors() throws IOException {
|
||||
public void testRoundRobinNoRetryErrors() throws Exception {
|
||||
RestClient restClient = createRestClient(NodeSelector.ANY);
|
||||
int numIters = RandomNumbers.randomIntBetween(getRandom(), 1, 5);
|
||||
for (int i = 0; i < numIters; i++) {
|
||||
|
@ -151,7 +102,8 @@ public class RestClientMultipleHostsTests extends RestClientTestCase {
|
|||
String method = randomHttpMethod(getRandom());
|
||||
int statusCode = randomErrorNoRetryStatusCode(getRandom());
|
||||
try {
|
||||
Response response = restClient.performRequest(new Request(method, "/" + statusCode));
|
||||
Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient,
|
||||
new Request(method, "/" + statusCode));
|
||||
if (method.equals("HEAD") && statusCode == 404) {
|
||||
//no exception gets thrown although we got a 404
|
||||
assertEquals(404, response.getStatusLine().getStatusCode());
|
||||
|
@ -175,18 +127,13 @@ public class RestClientMultipleHostsTests extends RestClientTestCase {
|
|||
failureListener.assertNotCalled();
|
||||
}
|
||||
|
||||
public void testRoundRobinRetryErrors() throws IOException {
|
||||
public void testRoundRobinRetryErrors() throws Exception {
|
||||
RestClient restClient = createRestClient(NodeSelector.ANY);
|
||||
String retryEndpoint = randomErrorRetryEndpoint();
|
||||
try {
|
||||
restClient.performRequest(new Request(randomHttpMethod(getRandom()), retryEndpoint));
|
||||
RestClientSingleHostTests.performRequestSyncOrAsync(restClient, new Request(randomHttpMethod(getRandom()), retryEndpoint));
|
||||
fail("request should have failed");
|
||||
} catch (ResponseException e) {
|
||||
/*
|
||||
* Unwrap the top level failure that was added so the stack trace contains
|
||||
* the caller. It wraps the exception that contains the failed hosts.
|
||||
*/
|
||||
e = (ResponseException) e.getCause();
|
||||
Set<HttpHost> hostsSet = hostsSet();
|
||||
//first request causes all the hosts to be blacklisted, the returned exception holds one suppressed exception each
|
||||
failureListener.assertCalled(nodes);
|
||||
|
@ -206,11 +153,6 @@ public class RestClientMultipleHostsTests extends RestClientTestCase {
|
|||
} while(e != null);
|
||||
assertEquals("every host should have been used but some weren't: " + hostsSet, 0, hostsSet.size());
|
||||
} catch (IOException e) {
|
||||
/*
|
||||
* Unwrap the top level failure that was added so the stack trace contains
|
||||
* the caller. It wraps the exception that contains the failed hosts.
|
||||
*/
|
||||
e = (IOException) e.getCause();
|
||||
Set<HttpHost> hostsSet = hostsSet();
|
||||
//first request causes all the hosts to be blacklisted, the returned exception holds one suppressed exception each
|
||||
failureListener.assertCalled(nodes);
|
||||
|
@ -236,7 +178,8 @@ public class RestClientMultipleHostsTests extends RestClientTestCase {
|
|||
for (int j = 0; j < nodes.size(); j++) {
|
||||
retryEndpoint = randomErrorRetryEndpoint();
|
||||
try {
|
||||
restClient.performRequest(new Request(randomHttpMethod(getRandom()), retryEndpoint));
|
||||
RestClientSingleHostTests.performRequestSyncOrAsync(restClient,
|
||||
new Request(randomHttpMethod(getRandom()), retryEndpoint));
|
||||
fail("request should have failed");
|
||||
} catch (ResponseException e) {
|
||||
Response response = e.getResponse();
|
||||
|
@ -247,11 +190,6 @@ public class RestClientMultipleHostsTests extends RestClientTestCase {
|
|||
failureListener.assertCalled(response.getHost());
|
||||
assertEquals(0, e.getSuppressed().length);
|
||||
} catch (IOException e) {
|
||||
/*
|
||||
* Unwrap the top level failure that was added so the stack trace contains
|
||||
* the caller. It wraps the exception that contains the failed hosts.
|
||||
*/
|
||||
e = (IOException) e.getCause();
|
||||
HttpHost httpHost = HttpHost.create(e.getMessage());
|
||||
assertTrue("host [" + httpHost + "] not found, most likely used multiple times", hostsSet.remove(httpHost));
|
||||
//after the first request, all hosts are blacklisted, a single one gets resurrected each time
|
||||
|
@ -268,7 +206,8 @@ public class RestClientMultipleHostsTests extends RestClientTestCase {
|
|||
int statusCode = randomErrorNoRetryStatusCode(getRandom());
|
||||
Response response;
|
||||
try {
|
||||
response = restClient.performRequest(new Request(randomHttpMethod(getRandom()), "/" + statusCode));
|
||||
response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient,
|
||||
new Request(randomHttpMethod(getRandom()), "/" + statusCode));
|
||||
} catch (ResponseException e) {
|
||||
response = e.getResponse();
|
||||
}
|
||||
|
@ -285,7 +224,8 @@ public class RestClientMultipleHostsTests extends RestClientTestCase {
|
|||
for (int y = 0; y < i + 1; y++) {
|
||||
retryEndpoint = randomErrorRetryEndpoint();
|
||||
try {
|
||||
restClient.performRequest(new Request(randomHttpMethod(getRandom()), retryEndpoint));
|
||||
RestClientSingleHostTests.performRequestSyncOrAsync(restClient,
|
||||
new Request(randomHttpMethod(getRandom()), retryEndpoint));
|
||||
fail("request should have failed");
|
||||
} catch (ResponseException e) {
|
||||
Response response = e.getResponse();
|
||||
|
@ -293,11 +233,6 @@ public class RestClientMultipleHostsTests extends RestClientTestCase {
|
|||
assertThat(response.getHost(), equalTo(selectedHost));
|
||||
failureListener.assertCalled(selectedHost);
|
||||
} catch(IOException e) {
|
||||
/*
|
||||
* Unwrap the top level failure that was added so the stack trace contains
|
||||
* the caller. It wraps the exception that contains the failed hosts.
|
||||
*/
|
||||
e = (IOException) e.getCause();
|
||||
HttpHost httpHost = HttpHost.create(e.getMessage());
|
||||
assertThat(httpHost, equalTo(selectedHost));
|
||||
failureListener.assertCalled(selectedHost);
|
||||
|
@ -307,7 +242,7 @@ public class RestClientMultipleHostsTests extends RestClientTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testNodeSelector() throws IOException {
|
||||
public void testNodeSelector() throws Exception {
|
||||
NodeSelector firstPositionOnly = new NodeSelector() {
|
||||
@Override
|
||||
public void select(Iterable<Node> restClientNodes) {
|
||||
|
@ -330,12 +265,12 @@ public class RestClientMultipleHostsTests extends RestClientTestCase {
|
|||
* NodeSelector overrides the round robin behavior.
|
||||
*/
|
||||
Request request = new Request("GET", "/200");
|
||||
Response response = restClient.performRequest(request);
|
||||
Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, request);
|
||||
assertEquals(nodes.get(0).getHost(), response.getHost());
|
||||
}
|
||||
}
|
||||
|
||||
public void testSetNodes() throws IOException {
|
||||
public void testSetNodes() throws Exception {
|
||||
RestClient restClient = createRestClient(NodeSelector.SKIP_DEDICATED_MASTERS);
|
||||
List<Node> newNodes = new ArrayList<>(nodes.size());
|
||||
for (int i = 0; i < nodes.size(); i++) {
|
||||
|
@ -350,7 +285,7 @@ public class RestClientMultipleHostsTests extends RestClientTestCase {
|
|||
* NodeSelector overrides the round robin behavior.
|
||||
*/
|
||||
Request request = new Request("GET", "/200");
|
||||
Response response = restClient.performRequest(request);
|
||||
Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, request);
|
||||
assertEquals(newNodes.get(0).getHost(), response.getHost());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -206,7 +206,7 @@ public class RestClientSingleHostIntegTests extends RestClientTestCase {
|
|||
* to set/add headers to the {@link org.apache.http.client.HttpClient}.
|
||||
* Exercises the test http server ability to send back whatever headers it received.
|
||||
*/
|
||||
public void testHeaders() throws IOException {
|
||||
public void testHeaders() throws Exception {
|
||||
for (String method : getHttpMethods()) {
|
||||
final Set<String> standardHeaders = new HashSet<>(Arrays.asList("Connection", "Host", "User-agent", "Date"));
|
||||
if (method.equals("HEAD") == false) {
|
||||
|
@ -222,7 +222,7 @@ public class RestClientSingleHostIntegTests extends RestClientTestCase {
|
|||
request.setOptions(options);
|
||||
Response esResponse;
|
||||
try {
|
||||
esResponse = restClient.performRequest(request);
|
||||
esResponse = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, request);
|
||||
} catch (ResponseException e) {
|
||||
esResponse = e.getResponse();
|
||||
}
|
||||
|
@ -246,7 +246,7 @@ public class RestClientSingleHostIntegTests extends RestClientTestCase {
|
|||
* out of the box by {@link org.apache.http.client.HttpClient}.
|
||||
* Exercises the test http server ability to send back whatever body it received.
|
||||
*/
|
||||
public void testDeleteWithBody() throws IOException {
|
||||
public void testDeleteWithBody() throws Exception {
|
||||
bodyTest("DELETE");
|
||||
}
|
||||
|
||||
|
@ -255,57 +255,57 @@ public class RestClientSingleHostIntegTests extends RestClientTestCase {
|
|||
* out of the box by {@link org.apache.http.client.HttpClient}.
|
||||
* Exercises the test http server ability to send back whatever body it received.
|
||||
*/
|
||||
public void testGetWithBody() throws IOException {
|
||||
public void testGetWithBody() throws Exception {
|
||||
bodyTest("GET");
|
||||
}
|
||||
|
||||
public void testEncodeParams() throws IOException {
|
||||
public void testEncodeParams() throws Exception {
|
||||
{
|
||||
Request request = new Request("PUT", "/200");
|
||||
request.addParameter("routing", "this/is/the/routing");
|
||||
Response response = restClient.performRequest(request);
|
||||
Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, request);
|
||||
assertEquals(pathPrefix + "/200?routing=this%2Fis%2Fthe%2Frouting", response.getRequestLine().getUri());
|
||||
}
|
||||
{
|
||||
Request request = new Request("PUT", "/200");
|
||||
request.addParameter("routing", "this|is|the|routing");
|
||||
Response response = restClient.performRequest(request);
|
||||
Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, request);
|
||||
assertEquals(pathPrefix + "/200?routing=this%7Cis%7Cthe%7Crouting", response.getRequestLine().getUri());
|
||||
}
|
||||
{
|
||||
Request request = new Request("PUT", "/200");
|
||||
request.addParameter("routing", "routing#1");
|
||||
Response response = restClient.performRequest(request);
|
||||
Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, request);
|
||||
assertEquals(pathPrefix + "/200?routing=routing%231", response.getRequestLine().getUri());
|
||||
}
|
||||
{
|
||||
Request request = new Request("PUT", "/200");
|
||||
request.addParameter("routing", "中文");
|
||||
Response response = restClient.performRequest(request);
|
||||
Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, request);
|
||||
assertEquals(pathPrefix + "/200?routing=%E4%B8%AD%E6%96%87", response.getRequestLine().getUri());
|
||||
}
|
||||
{
|
||||
Request request = new Request("PUT", "/200");
|
||||
request.addParameter("routing", "foo bar");
|
||||
Response response = restClient.performRequest(request);
|
||||
Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, request);
|
||||
assertEquals(pathPrefix + "/200?routing=foo+bar", response.getRequestLine().getUri());
|
||||
}
|
||||
{
|
||||
Request request = new Request("PUT", "/200");
|
||||
request.addParameter("routing", "foo+bar");
|
||||
Response response = restClient.performRequest(request);
|
||||
Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, request);
|
||||
assertEquals(pathPrefix + "/200?routing=foo%2Bbar", response.getRequestLine().getUri());
|
||||
}
|
||||
{
|
||||
Request request = new Request("PUT", "/200");
|
||||
request.addParameter("routing", "foo/bar");
|
||||
Response response = restClient.performRequest(request);
|
||||
Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, request);
|
||||
assertEquals(pathPrefix + "/200?routing=foo%2Fbar", response.getRequestLine().getUri());
|
||||
}
|
||||
{
|
||||
Request request = new Request("PUT", "/200");
|
||||
request.addParameter("routing", "foo^bar");
|
||||
Response response = restClient.performRequest(request);
|
||||
Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, request);
|
||||
assertEquals(pathPrefix + "/200?routing=foo%5Ebar", response.getRequestLine().getUri());
|
||||
}
|
||||
}
|
||||
|
@ -313,7 +313,7 @@ public class RestClientSingleHostIntegTests extends RestClientTestCase {
|
|||
/**
|
||||
* Verify that credentials are sent on the first request with preemptive auth enabled (default when provided with credentials).
|
||||
*/
|
||||
public void testPreemptiveAuthEnabled() throws IOException {
|
||||
public void testPreemptiveAuthEnabled() throws Exception {
|
||||
final String[] methods = {"POST", "PUT", "GET", "DELETE"};
|
||||
|
||||
try (RestClient restClient = createRestClient(true, true)) {
|
||||
|
@ -328,7 +328,7 @@ public class RestClientSingleHostIntegTests extends RestClientTestCase {
|
|||
/**
|
||||
* Verify that credentials are <em>not</em> sent on the first request with preemptive auth disabled.
|
||||
*/
|
||||
public void testPreemptiveAuthDisabled() throws IOException {
|
||||
public void testPreemptiveAuthDisabled() throws Exception {
|
||||
final String[] methods = {"POST", "PUT", "GET", "DELETE"};
|
||||
|
||||
try (RestClient restClient = createRestClient(true, false)) {
|
||||
|
@ -343,7 +343,7 @@ public class RestClientSingleHostIntegTests extends RestClientTestCase {
|
|||
/**
|
||||
* Verify that credentials continue to be sent even if a 401 (Unauthorized) response is received
|
||||
*/
|
||||
public void testAuthCredentialsAreNotClearedOnAuthChallenge() throws IOException {
|
||||
public void testAuthCredentialsAreNotClearedOnAuthChallenge() throws Exception {
|
||||
final String[] methods = {"POST", "PUT", "GET", "DELETE"};
|
||||
|
||||
try (RestClient restClient = createRestClient(true, true)) {
|
||||
|
@ -362,14 +362,14 @@ public class RestClientSingleHostIntegTests extends RestClientTestCase {
|
|||
public void testUrlWithoutLeadingSlash() throws Exception {
|
||||
if (pathPrefix.length() == 0) {
|
||||
try {
|
||||
restClient.performRequest(new Request("GET", "200"));
|
||||
RestClientSingleHostTests.performRequestSyncOrAsync(restClient, new Request("GET", "200"));
|
||||
fail("request should have failed");
|
||||
} catch (ResponseException e) {
|
||||
assertEquals(404, e.getResponse().getStatusLine().getStatusCode());
|
||||
}
|
||||
} else {
|
||||
{
|
||||
Response response = restClient.performRequest(new Request("GET", "200"));
|
||||
Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, new Request("GET", "200"));
|
||||
//a trailing slash gets automatically added if a pathPrefix is configured
|
||||
assertEquals(200, response.getStatusLine().getStatusCode());
|
||||
}
|
||||
|
@ -378,7 +378,7 @@ public class RestClientSingleHostIntegTests extends RestClientTestCase {
|
|||
try (RestClient restClient = RestClient.builder(
|
||||
new HttpHost(httpServer.getAddress().getHostString(), httpServer.getAddress().getPort()))
|
||||
.setPathPrefix(pathPrefix.substring(1)).build()) {
|
||||
Response response = restClient.performRequest(new Request("GET", "200"));
|
||||
Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, new Request("GET", "200"));
|
||||
//a trailing slash gets automatically added if a pathPrefix is configured
|
||||
assertEquals(200, response.getStatusLine().getStatusCode());
|
||||
}
|
||||
|
@ -386,16 +386,16 @@ public class RestClientSingleHostIntegTests extends RestClientTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
private Response bodyTest(final String method) throws IOException {
|
||||
private Response bodyTest(final String method) throws Exception {
|
||||
return bodyTest(restClient, method);
|
||||
}
|
||||
|
||||
private Response bodyTest(final RestClient restClient, final String method) throws IOException {
|
||||
private Response bodyTest(final RestClient restClient, final String method) throws Exception {
|
||||
int statusCode = randomStatusCode(getRandom());
|
||||
return bodyTest(restClient, method, statusCode, new Header[0]);
|
||||
}
|
||||
|
||||
private Response bodyTest(RestClient restClient, String method, int statusCode, Header[] headers) throws IOException {
|
||||
private Response bodyTest(RestClient restClient, String method, int statusCode, Header[] headers) throws Exception {
|
||||
String requestBody = "{ \"field\": \"value\" }";
|
||||
Request request = new Request(method, "/" + statusCode);
|
||||
request.setJsonEntity(requestBody);
|
||||
|
@ -406,7 +406,7 @@ public class RestClientSingleHostIntegTests extends RestClientTestCase {
|
|||
request.setOptions(options);
|
||||
Response esResponse;
|
||||
try {
|
||||
esResponse = restClient.performRequest(request);
|
||||
esResponse = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, request);
|
||||
} catch(ResponseException e) {
|
||||
esResponse = e.getResponse();
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.elasticsearch.client;
|
|||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.http.ConnectionClosedException;
|
||||
import org.apache.http.Header;
|
||||
import org.apache.http.HttpEntity;
|
||||
import org.apache.http.HttpEntityEnclosingRequest;
|
||||
|
@ -42,7 +43,6 @@ import org.apache.http.concurrent.FutureCallback;
|
|||
import org.apache.http.conn.ConnectTimeoutException;
|
||||
import org.apache.http.entity.ContentType;
|
||||
import org.apache.http.entity.StringEntity;
|
||||
import org.apache.http.impl.auth.BasicScheme;
|
||||
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
|
||||
import org.apache.http.message.BasicHttpResponse;
|
||||
import org.apache.http.message.BasicStatusLine;
|
||||
|
@ -55,24 +55,30 @@ import org.mockito.ArgumentCaptor;
|
|||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import javax.net.ssl.SSLHandshakeException;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintWriter;
|
||||
import java.io.StringWriter;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static java.util.Collections.singletonList;
|
||||
import static org.elasticsearch.client.RestClientTestUtil.getAllErrorStatusCodes;
|
||||
import static org.elasticsearch.client.RestClientTestUtil.getHttpMethods;
|
||||
import static org.elasticsearch.client.RestClientTestUtil.getOkStatusCodes;
|
||||
import static org.elasticsearch.client.RestClientTestUtil.randomStatusCode;
|
||||
import static org.elasticsearch.client.SyncResponseListenerTests.assertExceptionStackContainsCallingMethod;
|
||||
import static org.hamcrest.CoreMatchers.containsString;
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
import static org.hamcrest.CoreMatchers.instanceOf;
|
||||
|
@ -105,25 +111,67 @@ public class RestClientSingleHostTests extends RestClientTestCase {
|
|||
private boolean strictDeprecationMode;
|
||||
|
||||
@Before
|
||||
@SuppressWarnings("unchecked")
|
||||
public void createRestClient() {
|
||||
httpClient = mock(CloseableHttpAsyncClient.class);
|
||||
httpClient = mockHttpClient(exec);
|
||||
defaultHeaders = RestClientTestUtil.randomHeaders(getRandom(), "Header-default");
|
||||
node = new Node(new HttpHost("localhost", 9200));
|
||||
failureListener = new HostsTrackingFailureListener();
|
||||
strictDeprecationMode = randomBoolean();
|
||||
restClient = new RestClient(this.httpClient, defaultHeaders,
|
||||
singletonList(node), null, failureListener, NodeSelector.ANY, strictDeprecationMode);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
static CloseableHttpAsyncClient mockHttpClient(final ExecutorService exec) {
|
||||
CloseableHttpAsyncClient httpClient = mock(CloseableHttpAsyncClient.class);
|
||||
when(httpClient.<HttpResponse>execute(any(HttpAsyncRequestProducer.class), any(HttpAsyncResponseConsumer.class),
|
||||
any(HttpClientContext.class), any(FutureCallback.class))).thenAnswer(new Answer<Future<HttpResponse>>() {
|
||||
@Override
|
||||
public Future<HttpResponse> answer(InvocationOnMock invocationOnMock) throws Throwable {
|
||||
HttpAsyncRequestProducer requestProducer = (HttpAsyncRequestProducer) invocationOnMock.getArguments()[0];
|
||||
HttpClientContext context = (HttpClientContext) invocationOnMock.getArguments()[2];
|
||||
assertThat(context.getAuthCache().get(node.getHost()), instanceOf(BasicScheme.class));
|
||||
final HttpAsyncRequestProducer requestProducer = (HttpAsyncRequestProducer) invocationOnMock.getArguments()[0];
|
||||
final FutureCallback<HttpResponse> futureCallback =
|
||||
(FutureCallback<HttpResponse>) invocationOnMock.getArguments()[3];
|
||||
HttpUriRequest request = (HttpUriRequest)requestProducer.generateRequest();
|
||||
// Call the callback asynchronous to better simulate how async http client works
|
||||
return exec.submit(new Callable<HttpResponse>() {
|
||||
@Override
|
||||
public HttpResponse call() throws Exception {
|
||||
if (futureCallback != null) {
|
||||
try {
|
||||
HttpResponse httpResponse = responseOrException(requestProducer);
|
||||
futureCallback.completed(httpResponse);
|
||||
} catch(Exception e) {
|
||||
futureCallback.failed(e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
return responseOrException(requestProducer);
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
return httpClient;
|
||||
}
|
||||
|
||||
private static HttpResponse responseOrException(HttpAsyncRequestProducer requestProducer) throws Exception {
|
||||
final HttpUriRequest request = (HttpUriRequest)requestProducer.generateRequest();
|
||||
final HttpHost httpHost = requestProducer.getTarget();
|
||||
//return the desired status code or exception depending on the path
|
||||
if (request.getURI().getPath().equals("/soe")) {
|
||||
futureCallback.failed(new SocketTimeoutException());
|
||||
} else if (request.getURI().getPath().equals("/coe")) {
|
||||
futureCallback.failed(new ConnectTimeoutException());
|
||||
} else {
|
||||
switch (request.getURI().getPath()) {
|
||||
case "/soe":
|
||||
throw new SocketTimeoutException(httpHost.toString());
|
||||
case "/coe":
|
||||
throw new ConnectTimeoutException(httpHost.toString());
|
||||
case "/ioe":
|
||||
throw new IOException(httpHost.toString());
|
||||
case "/closed":
|
||||
throw new ConnectionClosedException();
|
||||
case "/handshake":
|
||||
throw new SSLHandshakeException("");
|
||||
case "/uri":
|
||||
throw new URISyntaxException("", "");
|
||||
case "/runtime":
|
||||
throw new RuntimeException();
|
||||
default:
|
||||
int statusCode = Integer.parseInt(request.getURI().getPath().substring(1));
|
||||
StatusLine statusLine = new BasicStatusLine(new ProtocolVersion("http", 1, 1), statusCode, "");
|
||||
|
||||
|
@ -139,24 +187,8 @@ public class RestClientSingleHostTests extends RestClientTestCase {
|
|||
}
|
||||
//return the same headers that were sent
|
||||
httpResponse.setHeaders(request.getAllHeaders());
|
||||
// Call the callback asynchronous to better simulate how async http client works
|
||||
exec.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
futureCallback.completed(httpResponse);
|
||||
return httpResponse;
|
||||
}
|
||||
});
|
||||
}
|
||||
return null;
|
||||
}
|
||||
});
|
||||
|
||||
defaultHeaders = RestClientTestUtil.randomHeaders(getRandom(), "Header-default");
|
||||
node = new Node(new HttpHost("localhost", 9200));
|
||||
failureListener = new HostsTrackingFailureListener();
|
||||
strictDeprecationMode = randomBoolean();
|
||||
restClient = new RestClient(httpClient, 10000, defaultHeaders,
|
||||
singletonList(node), null, failureListener, NodeSelector.ANY, strictDeprecationMode);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -195,10 +227,10 @@ public class RestClientSingleHostTests extends RestClientTestCase {
|
|||
/**
|
||||
* End to end test for ok status codes
|
||||
*/
|
||||
public void testOkStatusCodes() throws IOException {
|
||||
public void testOkStatusCodes() throws Exception {
|
||||
for (String method : getHttpMethods()) {
|
||||
for (int okStatusCode : getOkStatusCodes()) {
|
||||
Response response = restClient.performRequest(new Request(method, "/" + okStatusCode));
|
||||
Response response = performRequestSyncOrAsync(restClient, new Request(method, "/" + okStatusCode));
|
||||
assertThat(response.getStatusLine().getStatusCode(), equalTo(okStatusCode));
|
||||
}
|
||||
}
|
||||
|
@ -208,7 +240,7 @@ public class RestClientSingleHostTests extends RestClientTestCase {
|
|||
/**
|
||||
* End to end test for error status codes: they should cause an exception to be thrown, apart from 404 with HEAD requests
|
||||
*/
|
||||
public void testErrorStatusCodes() throws IOException {
|
||||
public void testErrorStatusCodes() throws Exception {
|
||||
for (String method : getHttpMethods()) {
|
||||
Set<Integer> expectedIgnores = new HashSet<>();
|
||||
String ignoreParam = "";
|
||||
|
@ -256,21 +288,74 @@ public class RestClientSingleHostTests extends RestClientTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testIOExceptions() {
|
||||
public void testPerformRequestIOExceptions() throws Exception {
|
||||
for (String method : getHttpMethods()) {
|
||||
//IOExceptions should be let bubble up
|
||||
try {
|
||||
restClient.performRequest(new Request(method, "/coe"));
|
||||
restClient.performRequest(new Request(method, "/ioe"));
|
||||
fail("request should have failed");
|
||||
} catch(IOException e) {
|
||||
assertThat(e, instanceOf(ConnectTimeoutException.class));
|
||||
// And we do all that so the thrown exception has our method in the stacktrace
|
||||
assertExceptionStackContainsCallingMethod(e);
|
||||
}
|
||||
failureListener.assertCalled(singletonList(node));
|
||||
try {
|
||||
restClient.performRequest(new Request(method, "/coe"));
|
||||
fail("request should have failed");
|
||||
} catch(ConnectTimeoutException e) {
|
||||
// And we do all that so the thrown exception has our method in the stacktrace
|
||||
assertExceptionStackContainsCallingMethod(e);
|
||||
}
|
||||
failureListener.assertCalled(singletonList(node));
|
||||
try {
|
||||
restClient.performRequest(new Request(method, "/soe"));
|
||||
fail("request should have failed");
|
||||
} catch(IOException e) {
|
||||
assertThat(e, instanceOf(SocketTimeoutException.class));
|
||||
} catch(SocketTimeoutException e) {
|
||||
// And we do all that so the thrown exception has our method in the stacktrace
|
||||
assertExceptionStackContainsCallingMethod(e);
|
||||
}
|
||||
failureListener.assertCalled(singletonList(node));
|
||||
try {
|
||||
restClient.performRequest(new Request(method, "/closed"));
|
||||
fail("request should have failed");
|
||||
} catch(ConnectionClosedException e) {
|
||||
// And we do all that so the thrown exception has our method in the stacktrace
|
||||
assertExceptionStackContainsCallingMethod(e);
|
||||
}
|
||||
failureListener.assertCalled(singletonList(node));
|
||||
try {
|
||||
restClient.performRequest(new Request(method, "/handshake"));
|
||||
fail("request should have failed");
|
||||
} catch(SSLHandshakeException e) {
|
||||
// And we do all that so the thrown exception has our method in the stacktrace
|
||||
assertExceptionStackContainsCallingMethod(e);
|
||||
}
|
||||
failureListener.assertCalled(singletonList(node));
|
||||
}
|
||||
}
|
||||
|
||||
public void testPerformRequestRuntimeExceptions() throws Exception {
|
||||
for (String method : getHttpMethods()) {
|
||||
try {
|
||||
restClient.performRequest(new Request(method, "/runtime"));
|
||||
fail("request should have failed");
|
||||
} catch (RuntimeException e) {
|
||||
// And we do all that so the thrown exception has our method in the stacktrace
|
||||
assertExceptionStackContainsCallingMethod(e);
|
||||
}
|
||||
failureListener.assertCalled(singletonList(node));
|
||||
}
|
||||
}
|
||||
|
||||
public void testPerformRequestExceptions() throws Exception {
|
||||
for (String method : getHttpMethods()) {
|
||||
try {
|
||||
restClient.performRequest(new Request(method, "/uri"));
|
||||
fail("request should have failed");
|
||||
} catch (RuntimeException e) {
|
||||
assertThat(e.getCause(), instanceOf(URISyntaxException.class));
|
||||
// And we do all that so the thrown exception has our method in the stacktrace
|
||||
assertExceptionStackContainsCallingMethod(e);
|
||||
}
|
||||
failureListener.assertCalled(singletonList(node));
|
||||
}
|
||||
|
@ -280,7 +365,7 @@ public class RestClientSingleHostTests extends RestClientTestCase {
|
|||
* End to end test for request and response body. Exercises the mock http client ability to send back
|
||||
* whatever body it has received.
|
||||
*/
|
||||
public void testBody() throws IOException {
|
||||
public void testBody() throws Exception {
|
||||
String body = "{ \"field\": \"value\" }";
|
||||
StringEntity entity = new StringEntity(body, ContentType.APPLICATION_JSON);
|
||||
for (String method : Arrays.asList("DELETE", "GET", "PATCH", "POST", "PUT")) {
|
||||
|
@ -309,7 +394,7 @@ public class RestClientSingleHostTests extends RestClientTestCase {
|
|||
Request request = new Request(method, "/" + randomStatusCode(getRandom()));
|
||||
request.setEntity(entity);
|
||||
try {
|
||||
restClient.performRequest(request);
|
||||
performRequestSyncOrAsync(restClient, request);
|
||||
fail("request should have failed");
|
||||
} catch(UnsupportedOperationException e) {
|
||||
assertThat(e.getMessage(), equalTo(method + " with body is not supported"));
|
||||
|
@ -321,7 +406,7 @@ public class RestClientSingleHostTests extends RestClientTestCase {
|
|||
* End to end test for request and response headers. Exercises the mock http client ability to send back
|
||||
* whatever headers it has received.
|
||||
*/
|
||||
public void testHeaders() throws IOException {
|
||||
public void testHeaders() throws Exception {
|
||||
for (String method : getHttpMethods()) {
|
||||
final Header[] requestHeaders = RestClientTestUtil.randomHeaders(getRandom(), "Header");
|
||||
final int statusCode = randomStatusCode(getRandom());
|
||||
|
@ -333,7 +418,7 @@ public class RestClientSingleHostTests extends RestClientTestCase {
|
|||
request.setOptions(options);
|
||||
Response esResponse;
|
||||
try {
|
||||
esResponse = restClient.performRequest(request);
|
||||
esResponse = performRequestSyncOrAsync(restClient, request);
|
||||
} catch(ResponseException e) {
|
||||
esResponse = e.getResponse();
|
||||
}
|
||||
|
@ -343,7 +428,7 @@ public class RestClientSingleHostTests extends RestClientTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testDeprecationWarnings() throws IOException {
|
||||
public void testDeprecationWarnings() throws Exception {
|
||||
String chars = randomAsciiAlphanumOfLength(5);
|
||||
assertDeprecationWarnings(singletonList("poorly formatted " + chars), singletonList("poorly formatted " + chars));
|
||||
assertDeprecationWarnings(singletonList(formatWarning(chars)), singletonList(chars));
|
||||
|
@ -397,7 +482,7 @@ public class RestClientSingleHostTests extends RestClientTestCase {
|
|||
protected abstract WarningsHandler warningsHandler();
|
||||
}
|
||||
|
||||
private void assertDeprecationWarnings(List<String> warningHeaderTexts, List<String> warningBodyTexts) throws IOException {
|
||||
private void assertDeprecationWarnings(List<String> warningHeaderTexts, List<String> warningBodyTexts) throws Exception {
|
||||
String method = randomFrom(getHttpMethods());
|
||||
Request request = new Request(method, "/200");
|
||||
RequestOptions.Builder options = request.getOptions().toBuilder();
|
||||
|
@ -420,7 +505,7 @@ public class RestClientSingleHostTests extends RestClientTestCase {
|
|||
Response response;
|
||||
if (expectFailure) {
|
||||
try {
|
||||
restClient.performRequest(request);
|
||||
performRequestSyncOrAsync(restClient, request);
|
||||
fail("expected WarningFailureException from warnings");
|
||||
return;
|
||||
} catch (WarningFailureException e) {
|
||||
|
@ -430,7 +515,7 @@ public class RestClientSingleHostTests extends RestClientTestCase {
|
|||
response = e.getResponse();
|
||||
}
|
||||
} else {
|
||||
response = restClient.performRequest(request);
|
||||
response = performRequestSyncOrAsync(restClient, request);
|
||||
}
|
||||
assertEquals(false == warningBodyTexts.isEmpty(), response.hasWarnings());
|
||||
assertEquals(warningBodyTexts, response.getWarnings());
|
||||
|
@ -461,7 +546,7 @@ public class RestClientSingleHostTests extends RestClientTestCase {
|
|||
//randomly add some ignore parameter, which doesn't get sent as part of the request
|
||||
String ignore = Integer.toString(randomFrom(RestClientTestUtil.getAllErrorStatusCodes()));
|
||||
if (randomBoolean()) {
|
||||
ignore += "," + Integer.toString(randomFrom(RestClientTestUtil.getAllErrorStatusCodes()));
|
||||
ignore += "," + randomFrom(RestClientTestUtil.getAllErrorStatusCodes());
|
||||
}
|
||||
request.addParameter("ignore", ignore);
|
||||
}
|
||||
|
@ -520,12 +605,66 @@ public class RestClientSingleHostTests extends RestClientTestCase {
|
|||
expectedRequest.addHeader(defaultHeader);
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
restClient.performRequest(request);
|
||||
} catch(ResponseException e) {
|
||||
performRequestSyncOrAsync(restClient, request);
|
||||
} catch(Exception e) {
|
||||
//all good
|
||||
}
|
||||
return expectedRequest;
|
||||
}
|
||||
|
||||
static Response performRequestSyncOrAsync(RestClient restClient, Request request) throws Exception {
|
||||
//randomize between sync and async methods
|
||||
if (randomBoolean()) {
|
||||
return restClient.performRequest(request);
|
||||
} else {
|
||||
final AtomicReference<Exception> exceptionRef = new AtomicReference<>();
|
||||
final AtomicReference<Response> responseRef = new AtomicReference<>();
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
restClient.performRequestAsync(request, new ResponseListener() {
|
||||
@Override
|
||||
public void onSuccess(Response response) {
|
||||
responseRef.set(response);
|
||||
latch.countDown();
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception exception) {
|
||||
exceptionRef.set(exception);
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
latch.await();
|
||||
if (exceptionRef.get() != null) {
|
||||
throw exceptionRef.get();
|
||||
}
|
||||
return responseRef.get();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Asserts that the provided {@linkplain Exception} contains the method
|
||||
* that called this <strong>somewhere</strong> on its stack. This is
|
||||
* normally the case for synchronous calls but {@link RestClient} performs
|
||||
* synchronous calls by performing asynchronous calls and blocking the
|
||||
* current thread until the call returns so it has to take special care
|
||||
* to make sure that the caller shows up in the exception. We use this
|
||||
* assertion to make sure that we don't break that "special care".
|
||||
*/
|
||||
private static void assertExceptionStackContainsCallingMethod(Throwable t) {
|
||||
// 0 is getStackTrace
|
||||
// 1 is this method
|
||||
// 2 is the caller, what we want
|
||||
StackTraceElement myMethod = Thread.currentThread().getStackTrace()[2];
|
||||
for (StackTraceElement se : t.getStackTrace()) {
|
||||
if (se.getClassName().equals(myMethod.getClassName())
|
||||
&& se.getMethodName().equals(myMethod.getMethodName())) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
StringWriter stack = new StringWriter();
|
||||
t.printStackTrace(new PrintWriter(stack));
|
||||
fail("didn't find the calling method (looks like " + myMethod + ") in:\n" + stack);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -57,7 +57,7 @@ public class RestClientTests extends RestClientTestCase {
|
|||
public void testCloseIsIdempotent() throws IOException {
|
||||
List<Node> nodes = singletonList(new Node(new HttpHost("localhost", 9200)));
|
||||
CloseableHttpAsyncClient closeableHttpAsyncClient = mock(CloseableHttpAsyncClient.class);
|
||||
RestClient restClient = new RestClient(closeableHttpAsyncClient, 1_000, new Header[0], nodes, null, null, null, false);
|
||||
RestClient restClient = new RestClient(closeableHttpAsyncClient, new Header[0], nodes, null, null, null, false);
|
||||
restClient.close();
|
||||
verify(closeableHttpAsyncClient, times(1)).close();
|
||||
restClient.close();
|
||||
|
@ -353,8 +353,7 @@ public class RestClientTests extends RestClientTestCase {
|
|||
|
||||
private static RestClient createRestClient() {
|
||||
List<Node> nodes = Collections.singletonList(new Node(new HttpHost("localhost", 9200)));
|
||||
return new RestClient(mock(CloseableHttpAsyncClient.class), randomLongBetween(1_000, 30_000),
|
||||
new Header[] {}, nodes, null, null, null, false);
|
||||
return new RestClient(mock(CloseableHttpAsyncClient.class), new Header[] {}, nodes, null, null, null, false);
|
||||
}
|
||||
|
||||
public void testRoundRobin() throws IOException {
|
||||
|
|
|
@ -1,273 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.client;
|
||||
|
||||
import org.apache.http.ConnectionClosedException;
|
||||
import org.apache.http.HttpHost;
|
||||
import org.apache.http.HttpResponse;
|
||||
import org.apache.http.ProtocolVersion;
|
||||
import org.apache.http.RequestLine;
|
||||
import org.apache.http.StatusLine;
|
||||
import org.apache.http.conn.ConnectTimeoutException;
|
||||
import org.apache.http.message.BasicHttpResponse;
|
||||
import org.apache.http.message.BasicRequestLine;
|
||||
import org.apache.http.message.BasicStatusLine;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.PrintWriter;
|
||||
import java.io.StringWriter;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.net.URISyntaxException;
|
||||
import javax.net.ssl.SSLHandshakeException;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertSame;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
public class SyncResponseListenerTests extends RestClientTestCase {
|
||||
/**
|
||||
* Asserts that the provided {@linkplain Exception} contains the method
|
||||
* that called this <strong>somewhere</strong> on its stack. This is
|
||||
* normally the case for synchronous calls but {@link RestClient} performs
|
||||
* synchronous calls by performing asynchronous calls and blocking the
|
||||
* current thread until the call returns so it has to take special care
|
||||
* to make sure that the caller shows up in the exception. We use this
|
||||
* assertion to make sure that we don't break that "special care".
|
||||
*/
|
||||
static void assertExceptionStackContainsCallingMethod(Exception e) {
|
||||
// 0 is getStackTrace
|
||||
// 1 is this method
|
||||
// 2 is the caller, what we want
|
||||
StackTraceElement myMethod = Thread.currentThread().getStackTrace()[2];
|
||||
for (StackTraceElement se : e.getStackTrace()) {
|
||||
if (se.getClassName().equals(myMethod.getClassName())
|
||||
&& se.getMethodName().equals(myMethod.getMethodName())) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
StringWriter stack = new StringWriter();
|
||||
e.printStackTrace(new PrintWriter(stack));
|
||||
fail("didn't find the calling method (looks like " + myMethod + ") in:\n" + stack);
|
||||
}
|
||||
|
||||
public void testOnSuccessNullResponse() {
|
||||
RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000);
|
||||
try {
|
||||
syncResponseListener.onSuccess(null);
|
||||
fail("onSuccess should have failed");
|
||||
} catch (NullPointerException e) {
|
||||
assertEquals("response must not be null", e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public void testOnFailureNullException() {
|
||||
RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000);
|
||||
try {
|
||||
syncResponseListener.onFailure(null);
|
||||
fail("onFailure should have failed");
|
||||
} catch (NullPointerException e) {
|
||||
assertEquals("exception must not be null", e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public void testOnSuccess() throws Exception {
|
||||
RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000);
|
||||
Response mockResponse = mockResponse();
|
||||
syncResponseListener.onSuccess(mockResponse);
|
||||
Response response = syncResponseListener.get();
|
||||
assertSame(response, mockResponse);
|
||||
|
||||
try {
|
||||
syncResponseListener.onSuccess(mockResponse);
|
||||
fail("get should have failed");
|
||||
} catch (IllegalStateException e) {
|
||||
assertEquals(e.getMessage(), "response is already set");
|
||||
}
|
||||
response = syncResponseListener.get();
|
||||
assertSame(response, mockResponse);
|
||||
}
|
||||
|
||||
public void testOnFailure() throws Exception {
|
||||
RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000);
|
||||
RuntimeException firstException = new RuntimeException("first-test");
|
||||
syncResponseListener.onFailure(firstException);
|
||||
try {
|
||||
syncResponseListener.get();
|
||||
fail("get should have failed");
|
||||
} catch (RuntimeException e) {
|
||||
assertEquals(firstException.getMessage(), e.getMessage());
|
||||
assertSame(firstException, e.getCause());
|
||||
}
|
||||
|
||||
RuntimeException secondException = new RuntimeException("second-test");
|
||||
try {
|
||||
syncResponseListener.onFailure(secondException);
|
||||
} catch(IllegalStateException e) {
|
||||
assertEquals(e.getMessage(), "exception is already set");
|
||||
}
|
||||
try {
|
||||
syncResponseListener.get();
|
||||
fail("get should have failed");
|
||||
} catch (RuntimeException e) {
|
||||
assertEquals(firstException.getMessage(), e.getMessage());
|
||||
assertSame(firstException, e.getCause());
|
||||
}
|
||||
|
||||
Response response = mockResponse();
|
||||
syncResponseListener.onSuccess(response);
|
||||
try {
|
||||
syncResponseListener.get();
|
||||
fail("get should have failed");
|
||||
} catch (IllegalStateException e) {
|
||||
assertEquals("response and exception are unexpectedly set at the same time", e.getMessage());
|
||||
assertNotNull(e.getSuppressed());
|
||||
assertEquals(1, e.getSuppressed().length);
|
||||
assertSame(firstException, e.getSuppressed()[0]);
|
||||
}
|
||||
}
|
||||
|
||||
public void testRuntimeIsBuiltCorrectly() throws Exception {
|
||||
RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000);
|
||||
RuntimeException runtimeException = new RuntimeException();
|
||||
syncResponseListener.onFailure(runtimeException);
|
||||
try {
|
||||
syncResponseListener.get();
|
||||
fail("get should have failed");
|
||||
} catch (RuntimeException e) {
|
||||
// We preserve the original exception in the cause
|
||||
assertSame(runtimeException, e.getCause());
|
||||
// We copy the message
|
||||
assertEquals(runtimeException.getMessage(), e.getMessage());
|
||||
// And we do all that so the thrown exception has our method in the stacktrace
|
||||
assertExceptionStackContainsCallingMethod(e);
|
||||
}
|
||||
}
|
||||
|
||||
public void testConnectTimeoutExceptionIsBuiltCorrectly() throws Exception {
|
||||
RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000);
|
||||
ConnectTimeoutException timeoutException = new ConnectTimeoutException();
|
||||
syncResponseListener.onFailure(timeoutException);
|
||||
try {
|
||||
syncResponseListener.get();
|
||||
fail("get should have failed");
|
||||
} catch (IOException e) {
|
||||
// We preserve the original exception in the cause
|
||||
assertSame(timeoutException, e.getCause());
|
||||
// We copy the message
|
||||
assertEquals(timeoutException.getMessage(), e.getMessage());
|
||||
// And we do all that so the thrown exception has our method in the stacktrace
|
||||
assertExceptionStackContainsCallingMethod(e);
|
||||
}
|
||||
}
|
||||
|
||||
public void testSocketTimeoutExceptionIsBuiltCorrectly() throws Exception {
|
||||
RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000);
|
||||
SocketTimeoutException timeoutException = new SocketTimeoutException();
|
||||
syncResponseListener.onFailure(timeoutException);
|
||||
try {
|
||||
syncResponseListener.get();
|
||||
fail("get should have failed");
|
||||
} catch (IOException e) {
|
||||
// We preserve the original exception in the cause
|
||||
assertSame(timeoutException, e.getCause());
|
||||
// We copy the message
|
||||
assertEquals(timeoutException.getMessage(), e.getMessage());
|
||||
// And we do all that so the thrown exception has our method in the stacktrace
|
||||
assertExceptionStackContainsCallingMethod(e);
|
||||
}
|
||||
}
|
||||
|
||||
public void testConnectionClosedExceptionIsWrapped() throws Exception {
|
||||
RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000);
|
||||
ConnectionClosedException closedException = new ConnectionClosedException(randomAsciiAlphanumOfLength(5));
|
||||
syncResponseListener.onFailure(closedException);
|
||||
try {
|
||||
syncResponseListener.get();
|
||||
fail("get should have failed");
|
||||
} catch (ConnectionClosedException e) {
|
||||
// We preserve the original exception in the cause
|
||||
assertSame(closedException, e.getCause());
|
||||
// We copy the message
|
||||
assertEquals(closedException.getMessage(), e.getMessage());
|
||||
// And we do all that so the thrown exception has our method in the stacktrace
|
||||
assertExceptionStackContainsCallingMethod(e);
|
||||
}
|
||||
}
|
||||
|
||||
public void testSSLHandshakeExceptionIsWrapped() throws Exception {
|
||||
RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000);
|
||||
SSLHandshakeException exception = new SSLHandshakeException(randomAsciiAlphanumOfLength(5));
|
||||
syncResponseListener.onFailure(exception);
|
||||
try {
|
||||
syncResponseListener.get();
|
||||
fail("get should have failed");
|
||||
} catch (SSLHandshakeException e) {
|
||||
// We preserve the original exception in the cause
|
||||
assertSame(exception, e.getCause());
|
||||
// We copy the message
|
||||
assertEquals(exception.getMessage(), e.getMessage());
|
||||
// And we do all that so the thrown exception has our method in the stacktrace
|
||||
assertExceptionStackContainsCallingMethod(e);
|
||||
}
|
||||
}
|
||||
|
||||
public void testIOExceptionIsBuiltCorrectly() throws Exception {
|
||||
RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000);
|
||||
IOException ioException = new IOException();
|
||||
syncResponseListener.onFailure(ioException);
|
||||
try {
|
||||
syncResponseListener.get();
|
||||
fail("get should have failed");
|
||||
} catch (IOException e) {
|
||||
// We preserve the original exception in the cause
|
||||
assertSame(ioException, e.getCause());
|
||||
// We copy the message
|
||||
assertEquals(ioException.getMessage(), e.getMessage());
|
||||
// And we do all that so the thrown exception has our method in the stacktrace
|
||||
assertExceptionStackContainsCallingMethod(e);
|
||||
}
|
||||
}
|
||||
|
||||
public void testExceptionIsWrapped() throws Exception {
|
||||
RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000);
|
||||
//we just need any checked exception
|
||||
URISyntaxException exception = new URISyntaxException("test", "test");
|
||||
syncResponseListener.onFailure(exception);
|
||||
try {
|
||||
syncResponseListener.get();
|
||||
fail("get should have failed");
|
||||
} catch (RuntimeException e) {
|
||||
assertEquals("error while performing request", e.getMessage());
|
||||
// We preserve the original exception in the cause
|
||||
assertSame(exception, e.getCause());
|
||||
// And we do all that so the thrown exception has our method in the stacktrace
|
||||
assertExceptionStackContainsCallingMethod(e);
|
||||
}
|
||||
}
|
||||
|
||||
private static Response mockResponse() {
|
||||
ProtocolVersion protocolVersion = new ProtocolVersion("HTTP", 1, 1);
|
||||
RequestLine requestLine = new BasicRequestLine("GET", "/", protocolVersion);
|
||||
StatusLine statusLine = new BasicStatusLine(protocolVersion, 200, "OK");
|
||||
HttpResponse httpResponse = new BasicHttpResponse(statusLine);
|
||||
return new Response(requestLine, new HttpHost("localhost", 9200), httpResponse);
|
||||
}
|
||||
}
|
|
@ -111,13 +111,6 @@ public class RestClientDocumentation {
|
|||
builder.setDefaultHeaders(defaultHeaders); // <1>
|
||||
//end::rest-client-init-default-headers
|
||||
}
|
||||
{
|
||||
//tag::rest-client-init-max-retry-timeout
|
||||
RestClientBuilder builder = RestClient.builder(
|
||||
new HttpHost("localhost", 9200, "http"));
|
||||
builder.setMaxRetryTimeoutMillis(10000); // <1>
|
||||
//end::rest-client-init-max-retry-timeout
|
||||
}
|
||||
{
|
||||
//tag::rest-client-init-node-selector
|
||||
RestClientBuilder builder = RestClient.builder(
|
||||
|
@ -305,8 +298,7 @@ public class RestClientDocumentation {
|
|||
.setConnectTimeout(5000)
|
||||
.setSocketTimeout(60000);
|
||||
}
|
||||
})
|
||||
.setMaxRetryTimeoutMillis(60000);
|
||||
});
|
||||
//end::rest-client-config-timeouts
|
||||
}
|
||||
{
|
||||
|
|
|
@ -18,8 +18,7 @@ https://hc.apache.org/httpcomponents-client-ga/httpclient/apidocs/org/apache/htt
|
|||
as an argument and has the same return type. The request config builder can
|
||||
be modified and then returned. In the following example we increase the
|
||||
connect timeout (defaults to 1 second) and the socket timeout (defaults to 30
|
||||
seconds). Also we adjust the max retry timeout accordingly (defaults to 30
|
||||
seconds too).
|
||||
seconds).
|
||||
|
||||
["source","java",subs="attributes,callouts,macros"]
|
||||
--------------------------------------------------
|
||||
|
|
|
@ -180,15 +180,6 @@ include-tagged::{doc-tests}/RestClientDocumentation.java[rest-client-init-defaul
|
|||
<1> Set the default headers that need to be sent with each request, to
|
||||
prevent having to specify them with each single request
|
||||
|
||||
["source","java",subs="attributes,callouts,macros"]
|
||||
--------------------------------------------------
|
||||
include-tagged::{doc-tests}/RestClientDocumentation.java[rest-client-init-max-retry-timeout]
|
||||
--------------------------------------------------
|
||||
<1> Set the timeout that should be honoured in case multiple attempts are made
|
||||
for the same request. The default value is 30 seconds, same as the default
|
||||
socket timeout. In case the socket timeout is customized, the maximum retry
|
||||
timeout should be adjusted accordingly
|
||||
|
||||
["source","java",subs="attributes,callouts,macros"]
|
||||
--------------------------------------------------
|
||||
include-tagged::{doc-tests}/RestClientDocumentation.java[rest-client-init-failure-listener]
|
||||
|
|
|
@ -21,3 +21,12 @@ The Cluster Health API used to default to `shards` level to ease migration
|
|||
from transport client that doesn't support the `level` parameter and always
|
||||
returns information including indices and shards details. The level default
|
||||
value has been aligned with the Elasticsearch default level: `cluster`.
|
||||
|
||||
=== Low-level REST client changes
|
||||
|
||||
[float]
|
||||
==== Support for `maxRetryTimeout` removed from RestClient
|
||||
|
||||
`RestClient` and `RestClientBuilder` no longer support the `maxRetryTimeout`
|
||||
setting. The setting was removed as its counting mechanism was not accurate
|
||||
and caused issues while adding little value.
|
|
@ -152,7 +152,7 @@ public class Zen2RestApiIT extends ESNetty4IntegTestCase {
|
|||
} catch (ResponseException e) {
|
||||
assertThat(e.getResponse().getStatusLine().getStatusCode(), is(400));
|
||||
assertThat(
|
||||
e.getCause().getMessage(),
|
||||
e.getMessage(),
|
||||
Matchers.containsString("add voting config exclusions request for [invalid] matched no master-eligible nodes")
|
||||
);
|
||||
}
|
||||
|
|
|
@ -41,7 +41,6 @@ public class AzureStorageRepositoryClientYamlTestSuiteIT extends ESClientYamlSui
|
|||
protected Settings restClientSettings() {
|
||||
// Give more time to repository-azure to complete the snapshot operations
|
||||
return Settings.builder().put(super.restClientSettings())
|
||||
.put(ESRestTestCase.CLIENT_RETRY_TIMEOUT, "60s")
|
||||
.put(ESRestTestCase.CLIENT_SOCKET_TIMEOUT, "60s")
|
||||
.build();
|
||||
}
|
||||
|
|
|
@ -59,7 +59,6 @@ public abstract class AbstractRollingTestCase extends ESRestTestCase {
|
|||
// increase the timeout here to 90 seconds to handle long waits for a green
|
||||
// cluster health. the waits for green need to be longer than a minute to
|
||||
// account for delayed shards
|
||||
.put(ESRestTestCase.CLIENT_RETRY_TIMEOUT, "90s")
|
||||
.put(ESRestTestCase.CLIENT_SOCKET_TIMEOUT, "90s")
|
||||
.build();
|
||||
}
|
||||
|
|
|
@ -55,7 +55,6 @@ public class UpgradeClusterClientYamlTestSuiteIT extends ESClientYamlSuiteTestCa
|
|||
// increase the timeout here to 90 seconds to handle long waits for a green
|
||||
// cluster health. the waits for green need to be longer than a minute to
|
||||
// account for delayed shards
|
||||
.put(ESRestTestCase.CLIENT_RETRY_TIMEOUT, "90s")
|
||||
.put(ESRestTestCase.CLIENT_SOCKET_TIMEOUT, "90s")
|
||||
.build();
|
||||
}
|
||||
|
|
|
@ -92,7 +92,6 @@ import static org.hamcrest.Matchers.equalTo;
|
|||
public abstract class ESRestTestCase extends ESTestCase {
|
||||
public static final String TRUSTSTORE_PATH = "truststore.path";
|
||||
public static final String TRUSTSTORE_PASSWORD = "truststore.password";
|
||||
public static final String CLIENT_RETRY_TIMEOUT = "client.retry.timeout";
|
||||
public static final String CLIENT_SOCKET_TIMEOUT = "client.socket.timeout";
|
||||
public static final String CLIENT_PATH_PREFIX = "client.path.prefix";
|
||||
|
||||
|
@ -750,11 +749,6 @@ public abstract class ESRestTestCase extends ESTestCase {
|
|||
}
|
||||
builder.setDefaultHeaders(defaultHeaders);
|
||||
}
|
||||
final String requestTimeoutString = settings.get(CLIENT_RETRY_TIMEOUT);
|
||||
if (requestTimeoutString != null) {
|
||||
final TimeValue maxRetryTimeout = TimeValue.parseTimeValue(requestTimeoutString, CLIENT_RETRY_TIMEOUT);
|
||||
builder.setMaxRetryTimeoutMillis(Math.toIntExact(maxRetryTimeout.getMillis()));
|
||||
}
|
||||
final String socketTimeoutString = settings.get(CLIENT_SOCKET_TIMEOUT);
|
||||
if (socketTimeoutString != null) {
|
||||
final TimeValue socketTimeout = TimeValue.parseTimeValue(socketTimeoutString, CLIENT_SOCKET_TIMEOUT);
|
||||
|
|
|
@ -45,7 +45,6 @@ import org.junit.Before;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
@ -670,16 +669,16 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
|
|||
});
|
||||
}
|
||||
|
||||
public void testInvalidPolicyNames() throws UnsupportedEncodingException {
|
||||
public void testInvalidPolicyNames() {
|
||||
ResponseException ex;
|
||||
|
||||
policy = randomAlphaOfLengthBetween(0,10) + "," + randomAlphaOfLengthBetween(0,10);
|
||||
ex = expectThrows(ResponseException.class, () -> createNewSingletonPolicy("delete", new DeleteAction()));
|
||||
assertThat(ex.getCause().getMessage(), containsString("invalid policy name"));
|
||||
assertThat(ex.getMessage(), containsString("invalid policy name"));
|
||||
|
||||
policy = randomAlphaOfLengthBetween(0,10) + "%20" + randomAlphaOfLengthBetween(0,10);
|
||||
ex = expectThrows(ResponseException.class, () -> createNewSingletonPolicy("delete", new DeleteAction()));
|
||||
assertThat(ex.getCause().getMessage(), containsString("invalid policy name"));
|
||||
assertThat(ex.getMessage(), containsString("invalid policy name"));
|
||||
|
||||
policy = "_" + randomAlphaOfLengthBetween(1, 20);
|
||||
ex = expectThrows(ResponseException.class, () -> createNewSingletonPolicy("delete", new DeleteAction()));
|
||||
|
@ -716,7 +715,7 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
|
|||
|
||||
Request deleteRequest = new Request("DELETE", "_ilm/policy/" + originalPolicy);
|
||||
ResponseException ex = expectThrows(ResponseException.class, () -> client().performRequest(deleteRequest));
|
||||
assertThat(ex.getCause().getMessage(),
|
||||
assertThat(ex.getMessage(),
|
||||
Matchers.allOf(
|
||||
containsString("Cannot delete policy [" + originalPolicy + "]. It is in use by one or more indices: ["),
|
||||
containsString(managedIndex1),
|
||||
|
|
|
@ -68,7 +68,6 @@ public class FullClusterRestartIT extends AbstractFullClusterRestartTestCase {
|
|||
// we increase the timeout here to 90 seconds to handle long waits for a green
|
||||
// cluster health. the waits for green need to be longer than a minute to
|
||||
// account for delayed shards
|
||||
.put(ESRestTestCase.CLIENT_RETRY_TIMEOUT, "90s")
|
||||
.put(ESRestTestCase.CLIENT_SOCKET_TIMEOUT, "90s")
|
||||
.build();
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.elasticsearch.common.xcontent.XContentType;
|
|||
import org.elasticsearch.test.rest.ESRestTestCase;
|
||||
import org.junit.Before;
|
||||
|
||||
import javax.security.auth.login.LoginContext;
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.UnknownHostException;
|
||||
|
@ -33,8 +34,6 @@ import java.security.PrivilegedExceptionAction;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import javax.security.auth.login.LoginContext;
|
||||
|
||||
import static org.elasticsearch.common.xcontent.XContentHelper.convertToMap;
|
||||
import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
|
||||
import static org.hamcrest.Matchers.contains;
|
||||
|
@ -148,13 +147,7 @@ public class KerberosAuthenticationIT extends ESRestTestCase {
|
|||
return restClientBuilder.build();
|
||||
}
|
||||
|
||||
private static void configureRestClientBuilder(final RestClientBuilder restClientBuilder, final Settings settings)
|
||||
throws IOException {
|
||||
final String requestTimeoutString = settings.get(CLIENT_RETRY_TIMEOUT);
|
||||
if (requestTimeoutString != null) {
|
||||
final TimeValue maxRetryTimeout = TimeValue.parseTimeValue(requestTimeoutString, CLIENT_RETRY_TIMEOUT);
|
||||
restClientBuilder.setMaxRetryTimeoutMillis(Math.toIntExact(maxRetryTimeout.getMillis()));
|
||||
}
|
||||
private static void configureRestClientBuilder(final RestClientBuilder restClientBuilder, final Settings settings) {
|
||||
final String socketTimeoutString = settings.get(CLIENT_SOCKET_TIMEOUT);
|
||||
if (socketTimeoutString != null) {
|
||||
final TimeValue socketTimeout = TimeValue.parseTimeValue(socketTimeoutString, CLIENT_SOCKET_TIMEOUT);
|
||||
|
|
|
@ -68,7 +68,6 @@ public class UpgradeClusterClientYamlTestSuiteIT extends ESClientYamlSuiteTestCa
|
|||
// we increase the timeout here to 90 seconds to handle long waits for a green
|
||||
// cluster health. the waits for green need to be longer than a minute to
|
||||
// account for delayed shards
|
||||
.put(ESRestTestCase.CLIENT_RETRY_TIMEOUT, "90s")
|
||||
.put(ESRestTestCase.CLIENT_SOCKET_TIMEOUT, "90s")
|
||||
.build();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue