diff --git a/client/rest/src/main/java/org/elasticsearch/client/ResponseException.java b/client/rest/src/main/java/org/elasticsearch/client/ResponseException.java index 4d57f12742e..e133776affb 100644 --- a/client/rest/src/main/java/org/elasticsearch/client/ResponseException.java +++ b/client/rest/src/main/java/org/elasticsearch/client/ResponseException.java @@ -39,16 +39,6 @@ public final class ResponseException extends IOException { this.response = response; } - /** - * Wrap a {@linkplain ResponseException} with another one with the current - * stack trace. This is used during synchronous calls so that the caller - * ends up in the stack trace of the exception thrown. - */ - ResponseException(ResponseException e) throws IOException { - super(e.getMessage(), e); - this.response = e.getResponse(); - } - static String buildMessage(Response response) throws IOException { String message = String.format(Locale.ROOT, "method [%s], host [%s], URI [%s], status line [%s]", diff --git a/client/rest/src/main/java/org/elasticsearch/client/RestClient.java b/client/rest/src/main/java/org/elasticsearch/client/RestClient.java index 175d524f02a..f0478742ecc 100644 --- a/client/rest/src/main/java/org/elasticsearch/client/RestClient.java +++ b/client/rest/src/main/java/org/elasticsearch/client/RestClient.java @@ -70,11 +70,8 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; import static java.util.Collections.singletonList; @@ -103,7 +100,6 @@ public class RestClient implements Closeable { // We don't rely on default headers supported by HttpAsyncClient as those cannot be replaced. // These are package private for tests. final List
defaultHeaders; - private final long maxRetryTimeoutMillis; private final String pathPrefix; private final AtomicInteger lastNodeIndex = new AtomicInteger(0); private final ConcurrentMap blacklist = new ConcurrentHashMap<>(); @@ -112,10 +108,9 @@ public class RestClient implements Closeable { private volatile NodeTuple> nodeTuple; private final WarningsHandler warningsHandler; - RestClient(CloseableHttpAsyncClient client, long maxRetryTimeoutMillis, Header[] defaultHeaders, List nodes, String pathPrefix, + RestClient(CloseableHttpAsyncClient client, Header[] defaultHeaders, List nodes, String pathPrefix, FailureListener failureListener, NodeSelector nodeSelector, boolean strictDeprecationMode) { this.client = client; - this.maxRetryTimeoutMillis = maxRetryTimeoutMillis; this.defaultHeaders = Collections.unmodifiableList(Arrays.asList(defaultHeaders)); this.failureListener = failureListener; this.pathPrefix = pathPrefix; @@ -213,9 +208,64 @@ public class RestClient implements Closeable { * @throws ResponseException in case Elasticsearch responded with a status code that indicated an error */ public Response performRequest(Request request) throws IOException { - SyncResponseListener listener = new SyncResponseListener(maxRetryTimeoutMillis); - performRequestAsyncNoCatch(request, listener); - return listener.get(); + InternalRequest internalRequest = new InternalRequest(request); + return performRequest(nextNodes(), internalRequest, null); + } + + private Response performRequest(final NodeTuple> nodeTuple, + final InternalRequest request, + Exception previousException) throws IOException { + RequestContext context = request.createContextForNextAttempt(nodeTuple.nodes.next(), nodeTuple.authCache); + HttpResponse httpResponse; + try { + httpResponse = client.execute(context.requestProducer, context.asyncResponseConsumer, context.context, null).get(); + } catch(Exception e) { + RequestLogger.logFailedRequest(logger, request.httpRequest, context.node, e); + onFailure(context.node); + Exception cause = extractAndWrapCause(e); + addSuppressedException(previousException, cause); + if (nodeTuple.nodes.hasNext()) { + return performRequest(nodeTuple, request, cause); + } + if (cause instanceof IOException) { + throw (IOException) cause; + } + if (cause instanceof RuntimeException) { + throw (RuntimeException) cause; + } + throw new IllegalStateException("unexpected exception type: must be either RuntimeException or IOException", cause); + } + ResponseOrResponseException responseOrResponseException = convertResponse(request, context.node, httpResponse); + if (responseOrResponseException.responseException == null) { + return responseOrResponseException.response; + } + addSuppressedException(previousException, responseOrResponseException.responseException); + if (nodeTuple.nodes.hasNext()) { + return performRequest(nodeTuple, request, responseOrResponseException.responseException); + } + throw responseOrResponseException.responseException; + } + + private ResponseOrResponseException convertResponse(InternalRequest request, Node node, HttpResponse httpResponse) throws IOException { + RequestLogger.logResponse(logger, request.httpRequest, node.getHost(), httpResponse); + int statusCode = httpResponse.getStatusLine().getStatusCode(); + Response response = new Response(request.httpRequest.getRequestLine(), node.getHost(), httpResponse); + if (isSuccessfulResponse(statusCode) || request.ignoreErrorCodes.contains(response.getStatusLine().getStatusCode())) { + onResponse(node); + if (request.warningsHandler.warningsShouldFailRequest(response.getWarnings())) { + throw new WarningFailureException(response); + } + return new ResponseOrResponseException(response); + } + ResponseException responseException = new ResponseException(response); + if (isRetryStatus(statusCode)) { + //mark host dead and retry against next one + onFailure(node); + return new ResponseOrResponseException(responseException); + } + //mark host alive and don't retry, as the error should be a request problem + onResponse(node); + throw responseException; } /** @@ -236,85 +286,31 @@ public class RestClient implements Closeable { */ public void performRequestAsync(Request request, ResponseListener responseListener) { try { - performRequestAsyncNoCatch(request, responseListener); + FailureTrackingResponseListener failureTrackingResponseListener = new FailureTrackingResponseListener(responseListener); + InternalRequest internalRequest = new InternalRequest(request); + performRequestAsync(nextNodes(), internalRequest, failureTrackingResponseListener); } catch (Exception e) { responseListener.onFailure(e); } } - void performRequestAsyncNoCatch(Request request, ResponseListener listener) throws IOException { - Map requestParams = new HashMap<>(request.getParameters()); - //ignore is a special parameter supported by the clients, shouldn't be sent to es - String ignoreString = requestParams.remove("ignore"); - Set ignoreErrorCodes; - if (ignoreString == null) { - if (HttpHead.METHOD_NAME.equals(request.getMethod())) { - //404 never causes error if returned for a HEAD request - ignoreErrorCodes = Collections.singleton(404); - } else { - ignoreErrorCodes = Collections.emptySet(); - } - } else { - String[] ignoresArray = ignoreString.split(","); - ignoreErrorCodes = new HashSet<>(); - if (HttpHead.METHOD_NAME.equals(request.getMethod())) { - //404 never causes error if returned for a HEAD request - ignoreErrorCodes.add(404); - } - for (String ignoreCode : ignoresArray) { - try { - ignoreErrorCodes.add(Integer.valueOf(ignoreCode)); - } catch (NumberFormatException e) { - throw new IllegalArgumentException("ignore value should be a number, found [" + ignoreString + "] instead", e); - } - } - } - URI uri = buildUri(pathPrefix, request.getEndpoint(), requestParams); - HttpRequestBase httpRequest = createHttpRequest(request.getMethod(), uri, request.getEntity()); - setHeaders(httpRequest, request.getOptions().getHeaders()); - FailureTrackingResponseListener failureTrackingResponseListener = new FailureTrackingResponseListener(listener); - long startTime = System.nanoTime(); - performRequestAsync(startTime, nextNode(), httpRequest, ignoreErrorCodes, - request.getOptions().getWarningsHandler() == null ? warningsHandler : request.getOptions().getWarningsHandler(), - request.getOptions().getHttpAsyncResponseConsumerFactory(), failureTrackingResponseListener); - } - - private void performRequestAsync(final long startTime, final NodeTuple> nodeTuple, final HttpRequestBase request, - final Set ignoreErrorCodes, - final WarningsHandler thisWarningsHandler, - final HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory, + private void performRequestAsync(final NodeTuple> nodeTuple, + final InternalRequest request, final FailureTrackingResponseListener listener) { - final Node node = nodeTuple.nodes.next(); - //we stream the request body if the entity allows for it - final HttpAsyncRequestProducer requestProducer = HttpAsyncMethods.create(node.getHost(), request); - final HttpAsyncResponseConsumer asyncResponseConsumer = - httpAsyncResponseConsumerFactory.createHttpAsyncResponseConsumer(); - final HttpClientContext context = HttpClientContext.create(); - context.setAuthCache(nodeTuple.authCache); - client.execute(requestProducer, asyncResponseConsumer, context, new FutureCallback() { + final RequestContext context = request.createContextForNextAttempt(nodeTuple.nodes.next(), nodeTuple.authCache); + client.execute(context.requestProducer, context.asyncResponseConsumer, context.context, new FutureCallback() { @Override public void completed(HttpResponse httpResponse) { try { - RequestLogger.logResponse(logger, request, node.getHost(), httpResponse); - int statusCode = httpResponse.getStatusLine().getStatusCode(); - Response response = new Response(request.getRequestLine(), node.getHost(), httpResponse); - if (isSuccessfulResponse(statusCode) || ignoreErrorCodes.contains(response.getStatusLine().getStatusCode())) { - onResponse(node); - if (thisWarningsHandler.warningsShouldFailRequest(response.getWarnings())) { - listener.onDefinitiveFailure(new WarningFailureException(response)); - } else { - listener.onSuccess(response); - } + ResponseOrResponseException responseOrResponseException = convertResponse(request, context.node, httpResponse); + if (responseOrResponseException.responseException == null) { + listener.onSuccess(responseOrResponseException.response); } else { - ResponseException responseException = new ResponseException(response); - if (isRetryStatus(statusCode)) { - //mark host dead and retry against next one - onFailure(node); - retryIfPossible(responseException); + if (nodeTuple.nodes.hasNext()) { + listener.trackFailure(responseOrResponseException.responseException); + performRequestAsync(nodeTuple, request, listener); } else { - //mark host alive and don't retry, as the error should be a request problem - onResponse(node); - listener.onDefinitiveFailure(responseException); + listener.onDefinitiveFailure(responseOrResponseException.responseException); } } } catch(Exception e) { @@ -325,34 +321,19 @@ public class RestClient implements Closeable { @Override public void failed(Exception failure) { try { - RequestLogger.logFailedRequest(logger, request, node, failure); - onFailure(node); - retryIfPossible(failure); + RequestLogger.logFailedRequest(logger, request.httpRequest, context.node, failure); + onFailure(context.node); + if (nodeTuple.nodes.hasNext()) { + listener.trackFailure(failure); + performRequestAsync(nodeTuple, request, listener); + } else { + listener.onDefinitiveFailure(failure); + } } catch(Exception e) { listener.onDefinitiveFailure(e); } } - private void retryIfPossible(Exception exception) { - if (nodeTuple.nodes.hasNext()) { - //in case we are retrying, check whether maxRetryTimeout has been reached - long timeElapsedMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime); - long timeout = maxRetryTimeoutMillis - timeElapsedMillis; - if (timeout <= 0) { - IOException retryTimeoutException = new IOException( - "request retries exceeded max retry timeout [" + maxRetryTimeoutMillis + "]", exception); - listener.onDefinitiveFailure(retryTimeoutException); - } else { - listener.trackFailure(exception); - request.reset(); - performRequestAsync(startTime, nodeTuple, request, ignoreErrorCodes, - thisWarningsHandler, httpAsyncResponseConsumerFactory, listener); - } - } else { - listener.onDefinitiveFailure(exception); - } - } - @Override public void cancelled() { listener.onDefinitiveFailure(new ExecutionException("request was cancelled", null)); @@ -360,20 +341,6 @@ public class RestClient implements Closeable { }); } - private void setHeaders(HttpRequest httpRequest, Collection
requestHeaders) { - // request headers override default headers, so we don't add default headers if they exist as request headers - final Set requestNames = new HashSet<>(requestHeaders.size()); - for (Header requestHeader : requestHeaders) { - httpRequest.addHeader(requestHeader); - requestNames.add(requestHeader.getName()); - } - for (Header defaultHeader : defaultHeaders) { - if (requestNames.contains(defaultHeader.getName()) == false) { - httpRequest.addHeader(defaultHeader); - } - } - } - /** * Returns a non-empty {@link Iterator} of nodes to be used for a request * that match the {@link NodeSelector}. @@ -383,7 +350,7 @@ public class RestClient implements Closeable { * that is closest to being revived. * @throws IOException if no nodes are available */ - private NodeTuple> nextNode() throws IOException { + private NodeTuple> nextNodes() throws IOException { NodeTuple> nodeTuple = this.nodeTuple; Iterable hosts = selectNodes(nodeTuple, blacklist, lastNodeIndex, nodeSelector); return new NodeTuple<>(hosts.iterator(), nodeTuple.authCache); @@ -517,11 +484,10 @@ public class RestClient implements Closeable { return false; } - private static Exception addSuppressedException(Exception suppressedException, Exception currentException) { + private static void addSuppressedException(Exception suppressedException, Exception currentException) { if (suppressedException != null) { currentException.addSuppressed(suppressedException); } - return currentException; } private static HttpRequestBase createHttpRequest(String method, URI uri, HttpEntity entity) { @@ -618,118 +584,8 @@ public class RestClient implements Closeable { * Tracks an exception, which caused a retry hence we should not return yet to the caller */ void trackFailure(Exception exception) { - this.exception = addSuppressedException(this.exception, exception); - } - } - - /** - * Listener used in any sync performRequest calls, it waits for a response or an exception back up to a timeout - */ - static class SyncResponseListener implements ResponseListener { - private final CountDownLatch latch = new CountDownLatch(1); - private final AtomicReference response = new AtomicReference<>(); - private final AtomicReference exception = new AtomicReference<>(); - - private final long timeout; - - SyncResponseListener(long timeout) { - assert timeout > 0; - this.timeout = timeout; - } - - @Override - public void onSuccess(Response response) { - Objects.requireNonNull(response, "response must not be null"); - boolean wasResponseNull = this.response.compareAndSet(null, response); - if (wasResponseNull == false) { - throw new IllegalStateException("response is already set"); - } - - latch.countDown(); - } - - @Override - public void onFailure(Exception exception) { - Objects.requireNonNull(exception, "exception must not be null"); - boolean wasExceptionNull = this.exception.compareAndSet(null, exception); - if (wasExceptionNull == false) { - throw new IllegalStateException("exception is already set"); - } - latch.countDown(); - } - - /** - * Waits (up to a timeout) for some result of the request: either a response, or an exception. - */ - Response get() throws IOException { - try { - //providing timeout is just a safety measure to prevent everlasting waits - //the different client timeouts should already do their jobs - if (latch.await(timeout, TimeUnit.MILLISECONDS) == false) { - throw new IOException("listener timeout after waiting for [" + timeout + "] ms"); - } - } catch (InterruptedException e) { - throw new RuntimeException("thread waiting for the response was interrupted", e); - } - - Exception exception = this.exception.get(); - Response response = this.response.get(); - if (exception != null) { - if (response != null) { - IllegalStateException e = new IllegalStateException("response and exception are unexpectedly set at the same time"); - e.addSuppressed(exception); - throw e; - } - /* - * Wrap and rethrow whatever exception we received, copying the type - * where possible so the synchronous API looks as much as possible - * like the asynchronous API. We wrap the exception so that the caller's - * signature shows up in any exception we throw. - */ - if (exception instanceof WarningFailureException) { - throw new WarningFailureException((WarningFailureException) exception); - } - if (exception instanceof ResponseException) { - throw new ResponseException((ResponseException) exception); - } - if (exception instanceof ConnectTimeoutException) { - ConnectTimeoutException e = new ConnectTimeoutException(exception.getMessage()); - e.initCause(exception); - throw e; - } - if (exception instanceof SocketTimeoutException) { - SocketTimeoutException e = new SocketTimeoutException(exception.getMessage()); - e.initCause(exception); - throw e; - } - if (exception instanceof ConnectionClosedException) { - ConnectionClosedException e = new ConnectionClosedException(exception.getMessage()); - e.initCause(exception); - throw e; - } - if (exception instanceof SSLHandshakeException) { - SSLHandshakeException e = new SSLHandshakeException(exception.getMessage()); - e.initCause(exception); - throw e; - } - if (exception instanceof ConnectException) { - ConnectException e = new ConnectException(exception.getMessage()); - e.initCause(exception); - throw e; - } - if (exception instanceof IOException) { - throw new IOException(exception.getMessage(), exception); - } - if (exception instanceof RuntimeException){ - throw new RuntimeException(exception.getMessage(), exception); - } - throw new RuntimeException("error while performing request", exception); - } - - if (response == null) { - throw new IllegalStateException("response not set and no exception caught either"); - } - return response; + addSuppressedException(this.exception, exception); + this.exception = exception; } } @@ -808,4 +664,153 @@ public class RestClient implements Closeable { itr.remove(); } } + + private class InternalRequest { + private final Request request; + private final Map params; + private final Set ignoreErrorCodes; + private final HttpRequestBase httpRequest; + private final WarningsHandler warningsHandler; + + InternalRequest(Request request) { + this.request = request; + this.params = new HashMap<>(request.getParameters()); + //ignore is a special parameter supported by the clients, shouldn't be sent to es + String ignoreString = params.remove("ignore"); + this.ignoreErrorCodes = getIgnoreErrorCodes(ignoreString, request.getMethod()); + URI uri = buildUri(pathPrefix, request.getEndpoint(), params); + this.httpRequest = createHttpRequest(request.getMethod(), uri, request.getEntity()); + setHeaders(httpRequest, request.getOptions().getHeaders()); + this.warningsHandler = request.getOptions().getWarningsHandler() == null ? + RestClient.this.warningsHandler : request.getOptions().getWarningsHandler(); + } + + private void setHeaders(HttpRequest httpRequest, Collection
requestHeaders) { + // request headers override default headers, so we don't add default headers if they exist as request headers + final Set requestNames = new HashSet<>(requestHeaders.size()); + for (Header requestHeader : requestHeaders) { + httpRequest.addHeader(requestHeader); + requestNames.add(requestHeader.getName()); + } + for (Header defaultHeader : defaultHeaders) { + if (requestNames.contains(defaultHeader.getName()) == false) { + httpRequest.addHeader(defaultHeader); + } + } + } + + RequestContext createContextForNextAttempt(Node node, AuthCache authCache) { + this.httpRequest.reset(); + return new RequestContext(this, node, authCache); + } + } + + private static class RequestContext { + private final Node node; + private final HttpAsyncRequestProducer requestProducer; + private final HttpAsyncResponseConsumer asyncResponseConsumer; + private final HttpClientContext context; + + RequestContext(InternalRequest request, Node node, AuthCache authCache) { + this.node = node; + //we stream the request body if the entity allows for it + this.requestProducer = HttpAsyncMethods.create(node.getHost(), request.httpRequest); + this.asyncResponseConsumer = + request.request.getOptions().getHttpAsyncResponseConsumerFactory().createHttpAsyncResponseConsumer(); + this.context = HttpClientContext.create(); + context.setAuthCache(authCache); + } + } + + private static Set getIgnoreErrorCodes(String ignoreString, String requestMethod) { + Set ignoreErrorCodes; + if (ignoreString == null) { + if (HttpHead.METHOD_NAME.equals(requestMethod)) { + //404 never causes error if returned for a HEAD request + ignoreErrorCodes = Collections.singleton(404); + } else { + ignoreErrorCodes = Collections.emptySet(); + } + } else { + String[] ignoresArray = ignoreString.split(","); + ignoreErrorCodes = new HashSet<>(); + if (HttpHead.METHOD_NAME.equals(requestMethod)) { + //404 never causes error if returned for a HEAD request + ignoreErrorCodes.add(404); + } + for (String ignoreCode : ignoresArray) { + try { + ignoreErrorCodes.add(Integer.valueOf(ignoreCode)); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("ignore value should be a number, found [" + ignoreString + "] instead", e); + } + } + } + return ignoreErrorCodes; + } + + private static class ResponseOrResponseException { + private final Response response; + private final ResponseException responseException; + + ResponseOrResponseException(Response response) { + this.response = Objects.requireNonNull(response); + this.responseException = null; + } + + ResponseOrResponseException(ResponseException responseException) { + this.responseException = Objects.requireNonNull(responseException); + this.response = null; + } + } + + /** + * Wrap the exception so the caller's signature shows up in the stack trace, taking care to copy the original type and message + * where possible so async and sync code don't have to check different exceptions. + */ + private static Exception extractAndWrapCause(Exception exception) { + if (exception instanceof InterruptedException) { + throw new RuntimeException("thread waiting for the response was interrupted", exception); + } + if (exception instanceof ExecutionException) { + ExecutionException executionException = (ExecutionException)exception; + Throwable t = executionException.getCause() == null ? executionException : executionException.getCause(); + if (t instanceof Error) { + throw (Error)t; + } + exception = (Exception)t; + } + if (exception instanceof ConnectTimeoutException) { + ConnectTimeoutException e = new ConnectTimeoutException(exception.getMessage()); + e.initCause(exception); + return e; + } + if (exception instanceof SocketTimeoutException) { + SocketTimeoutException e = new SocketTimeoutException(exception.getMessage()); + e.initCause(exception); + return e; + } + if (exception instanceof ConnectionClosedException) { + ConnectionClosedException e = new ConnectionClosedException(exception.getMessage()); + e.initCause(exception); + return e; + } + if (exception instanceof SSLHandshakeException) { + SSLHandshakeException e = new SSLHandshakeException(exception.getMessage()); + e.initCause(exception); + return e; + } + if (exception instanceof ConnectException) { + ConnectException e = new ConnectException(exception.getMessage()); + e.initCause(exception); + return e; + } + if (exception instanceof IOException) { + return new IOException(exception.getMessage(), exception); + } + if (exception instanceof RuntimeException){ + return new RuntimeException(exception.getMessage(), exception); + } + return new RuntimeException("error while performing request", exception); + } } diff --git a/client/rest/src/main/java/org/elasticsearch/client/RestClientBuilder.java b/client/rest/src/main/java/org/elasticsearch/client/RestClientBuilder.java index 84cc3ee1667..2337cbf1fd0 100644 --- a/client/rest/src/main/java/org/elasticsearch/client/RestClientBuilder.java +++ b/client/rest/src/main/java/org/elasticsearch/client/RestClientBuilder.java @@ -42,14 +42,12 @@ import java.util.Objects; public final class RestClientBuilder { public static final int DEFAULT_CONNECT_TIMEOUT_MILLIS = 1000; public static final int DEFAULT_SOCKET_TIMEOUT_MILLIS = 30000; - public static final int DEFAULT_MAX_RETRY_TIMEOUT_MILLIS = DEFAULT_SOCKET_TIMEOUT_MILLIS; public static final int DEFAULT_MAX_CONN_PER_ROUTE = 10; public static final int DEFAULT_MAX_CONN_TOTAL = 30; private static final Header[] EMPTY_HEADERS = new Header[0]; private final List nodes; - private int maxRetryTimeout = DEFAULT_MAX_RETRY_TIMEOUT_MILLIS; private Header[] defaultHeaders = EMPTY_HEADERS; private RestClient.FailureListener failureListener; private HttpClientConfigCallback httpClientConfigCallback; @@ -102,20 +100,6 @@ public final class RestClientBuilder { return this; } - /** - * Sets the maximum timeout (in milliseconds) to honour in case of multiple retries of the same request. - * {@link #DEFAULT_MAX_RETRY_TIMEOUT_MILLIS} if not specified. - * - * @throws IllegalArgumentException if {@code maxRetryTimeoutMillis} is not greater than 0 - */ - public RestClientBuilder setMaxRetryTimeoutMillis(int maxRetryTimeoutMillis) { - if (maxRetryTimeoutMillis <= 0) { - throw new IllegalArgumentException("maxRetryTimeoutMillis must be greater than 0"); - } - this.maxRetryTimeout = maxRetryTimeoutMillis; - return this; - } - /** * Sets the {@link HttpClientConfigCallback} to be used to customize http client configuration * @@ -208,7 +192,7 @@ public final class RestClientBuilder { return createHttpClient(); } }); - RestClient restClient = new RestClient(httpClient, maxRetryTimeout, defaultHeaders, nodes, + RestClient restClient = new RestClient(httpClient, defaultHeaders, nodes, pathPrefix, failureListener, nodeSelector, strictDeprecationMode); httpClient.start(); return restClient; diff --git a/client/rest/src/test/java/org/elasticsearch/client/RestClientBuilderTests.java b/client/rest/src/test/java/org/elasticsearch/client/RestClientBuilderTests.java index 834748d65de..1e16d94076a 100644 --- a/client/rest/src/test/java/org/elasticsearch/client/RestClientBuilderTests.java +++ b/client/rest/src/test/java/org/elasticsearch/client/RestClientBuilderTests.java @@ -82,14 +82,6 @@ public class RestClientBuilderTests extends RestClientTestCase { assertNotNull(restClient); } - try { - RestClient.builder(new HttpHost("localhost", 9200)) - .setMaxRetryTimeoutMillis(randomIntBetween(Integer.MIN_VALUE, 0)); - fail("should have failed"); - } catch(IllegalArgumentException e) { - assertEquals("maxRetryTimeoutMillis must be greater than 0", e.getMessage()); - } - try { RestClient.builder(new HttpHost("localhost", 9200)).setDefaultHeaders(null); fail("should have failed"); @@ -156,12 +148,9 @@ public class RestClientBuilderTests extends RestClientTestCase { builder.setDefaultHeaders(headers); } if (randomBoolean()) { - builder.setMaxRetryTimeoutMillis(randomIntBetween(1, Integer.MAX_VALUE)); - } - if (randomBoolean()) { - String pathPrefix = (randomBoolean() ? "/" : "") + randomAsciiOfLengthBetween(2, 5); + String pathPrefix = (randomBoolean() ? "/" : "") + randomAsciiLettersOfLengthBetween(2, 5); while (pathPrefix.length() < 20 && randomBoolean()) { - pathPrefix += "/" + randomAsciiOfLengthBetween(3, 6); + pathPrefix += "/" + randomAsciiLettersOfLengthBetween(3, 6); } builder.setPathPrefix(pathPrefix + (randomBoolean() ? "/" : "")); } diff --git a/client/rest/src/test/java/org/elasticsearch/client/RestClientMultipleHostsIntegTests.java b/client/rest/src/test/java/org/elasticsearch/client/RestClientMultipleHostsIntegTests.java index 272859e8441..4cc16c45bab 100644 --- a/client/rest/src/test/java/org/elasticsearch/client/RestClientMultipleHostsIntegTests.java +++ b/client/rest/src/test/java/org/elasticsearch/client/RestClientMultipleHostsIntegTests.java @@ -199,7 +199,7 @@ public class RestClientMultipleHostsIntegTests extends RestClientTestCase { * Test host selector against a real server and * test what happens after calling */ - public void testNodeSelector() throws IOException { + public void testNodeSelector() throws Exception { try (RestClient restClient = buildRestClient(firstPositionNodeSelector())) { Request request = new Request("GET", "/200"); int rounds = between(1, 10); @@ -210,7 +210,7 @@ public class RestClientMultipleHostsIntegTests extends RestClientTestCase { */ if (stoppedFirstHost) { try { - restClient.performRequest(request); + RestClientSingleHostTests.performRequestSyncOrAsync(restClient, request); fail("expected to fail to connect"); } catch (ConnectException e) { // Windows isn't consistent here. Sometimes the message is even null! @@ -219,7 +219,7 @@ public class RestClientMultipleHostsIntegTests extends RestClientTestCase { } } } else { - Response response = restClient.performRequest(request); + Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, request); assertEquals(httpHosts[0], response.getHost()); } } diff --git a/client/rest/src/test/java/org/elasticsearch/client/RestClientMultipleHostsTests.java b/client/rest/src/test/java/org/elasticsearch/client/RestClientMultipleHostsTests.java index 7dd1c4d842b..f3df9bf3bfd 100644 --- a/client/rest/src/test/java/org/elasticsearch/client/RestClientMultipleHostsTests.java +++ b/client/rest/src/test/java/org/elasticsearch/client/RestClientMultipleHostsTests.java @@ -22,25 +22,10 @@ package org.elasticsearch.client; import com.carrotsearch.randomizedtesting.generators.RandomNumbers; import org.apache.http.Header; import org.apache.http.HttpHost; -import org.apache.http.HttpResponse; -import org.apache.http.ProtocolVersion; -import org.apache.http.StatusLine; -import org.apache.http.client.methods.HttpUriRequest; -import org.apache.http.client.protocol.HttpClientContext; -import org.apache.http.concurrent.FutureCallback; -import org.apache.http.conn.ConnectTimeoutException; -import org.apache.http.impl.auth.BasicScheme; import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; -import org.apache.http.message.BasicHttpResponse; -import org.apache.http.message.BasicStatusLine; -import org.apache.http.nio.protocol.HttpAsyncRequestProducer; -import org.apache.http.nio.protocol.HttpAsyncResponseConsumer; import org.junit.After; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import java.io.IOException; -import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -49,7 +34,6 @@ import java.util.List; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; import static org.elasticsearch.client.RestClientTestUtil.randomErrorNoRetryStatusCode; import static org.elasticsearch.client.RestClientTestUtil.randomErrorRetryStatusCode; @@ -61,9 +45,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; /** * Tests for {@link RestClient} behaviour against multiple hosts: fail-over, blacklisting etc. @@ -75,39 +56,8 @@ public class RestClientMultipleHostsTests extends RestClientTestCase { private List nodes; private HostsTrackingFailureListener failureListener; - @SuppressWarnings("unchecked") public RestClient createRestClient(NodeSelector nodeSelector) { - CloseableHttpAsyncClient httpClient = mock(CloseableHttpAsyncClient.class); - when(httpClient.execute(any(HttpAsyncRequestProducer.class), any(HttpAsyncResponseConsumer.class), - any(HttpClientContext.class), any(FutureCallback.class))).thenAnswer(new Answer>() { - @Override - public Future answer(InvocationOnMock invocationOnMock) throws Throwable { - HttpAsyncRequestProducer requestProducer = (HttpAsyncRequestProducer) invocationOnMock.getArguments()[0]; - final HttpUriRequest request = (HttpUriRequest)requestProducer.generateRequest(); - final HttpHost httpHost = requestProducer.getTarget(); - HttpClientContext context = (HttpClientContext) invocationOnMock.getArguments()[2]; - assertThat(context.getAuthCache().get(httpHost), instanceOf(BasicScheme.class)); - final FutureCallback futureCallback = (FutureCallback) invocationOnMock.getArguments()[3]; - //return the desired status code or exception depending on the path - exec.execute(new Runnable() { - @Override - public void run() { - if (request.getURI().getPath().equals("/soe")) { - futureCallback.failed(new SocketTimeoutException(httpHost.toString())); - } else if (request.getURI().getPath().equals("/coe")) { - futureCallback.failed(new ConnectTimeoutException(httpHost.toString())); - } else if (request.getURI().getPath().equals("/ioe")) { - futureCallback.failed(new IOException(httpHost.toString())); - } else { - int statusCode = Integer.parseInt(request.getURI().getPath().substring(1)); - StatusLine statusLine = new BasicStatusLine(new ProtocolVersion("http", 1, 1), statusCode, ""); - futureCallback.completed(new BasicHttpResponse(statusLine)); - } - } - }); - return null; - } - }); + CloseableHttpAsyncClient httpClient = RestClientSingleHostTests.mockHttpClient(exec); int numNodes = RandomNumbers.randomIntBetween(getRandom(), 2, 5); nodes = new ArrayList<>(numNodes); for (int i = 0; i < numNodes; i++) { @@ -115,7 +65,7 @@ public class RestClientMultipleHostsTests extends RestClientTestCase { } nodes = Collections.unmodifiableList(nodes); failureListener = new HostsTrackingFailureListener(); - return new RestClient(httpClient, 10000, new Header[0], nodes, null, failureListener, nodeSelector, false); + return new RestClient(httpClient, new Header[0], nodes, null, failureListener, nodeSelector, false); } /** @@ -126,14 +76,15 @@ public class RestClientMultipleHostsTests extends RestClientTestCase { exec.shutdown(); } - public void testRoundRobinOkStatusCodes() throws IOException { + public void testRoundRobinOkStatusCodes() throws Exception { RestClient restClient = createRestClient(NodeSelector.ANY); int numIters = RandomNumbers.randomIntBetween(getRandom(), 1, 5); for (int i = 0; i < numIters; i++) { Set hostsSet = hostsSet(); for (int j = 0; j < nodes.size(); j++) { int statusCode = randomOkStatusCode(getRandom()); - Response response = restClient.performRequest(new Request(randomHttpMethod(getRandom()), "/" + statusCode)); + Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, + new Request(randomHttpMethod(getRandom()), "/" + statusCode)); assertEquals(statusCode, response.getStatusLine().getStatusCode()); assertTrue("host not found: " + response.getHost(), hostsSet.remove(response.getHost())); } @@ -142,7 +93,7 @@ public class RestClientMultipleHostsTests extends RestClientTestCase { failureListener.assertNotCalled(); } - public void testRoundRobinNoRetryErrors() throws IOException { + public void testRoundRobinNoRetryErrors() throws Exception { RestClient restClient = createRestClient(NodeSelector.ANY); int numIters = RandomNumbers.randomIntBetween(getRandom(), 1, 5); for (int i = 0; i < numIters; i++) { @@ -151,7 +102,8 @@ public class RestClientMultipleHostsTests extends RestClientTestCase { String method = randomHttpMethod(getRandom()); int statusCode = randomErrorNoRetryStatusCode(getRandom()); try { - Response response = restClient.performRequest(new Request(method, "/" + statusCode)); + Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, + new Request(method, "/" + statusCode)); if (method.equals("HEAD") && statusCode == 404) { //no exception gets thrown although we got a 404 assertEquals(404, response.getStatusLine().getStatusCode()); @@ -175,18 +127,13 @@ public class RestClientMultipleHostsTests extends RestClientTestCase { failureListener.assertNotCalled(); } - public void testRoundRobinRetryErrors() throws IOException { + public void testRoundRobinRetryErrors() throws Exception { RestClient restClient = createRestClient(NodeSelector.ANY); String retryEndpoint = randomErrorRetryEndpoint(); try { - restClient.performRequest(new Request(randomHttpMethod(getRandom()), retryEndpoint)); + RestClientSingleHostTests.performRequestSyncOrAsync(restClient, new Request(randomHttpMethod(getRandom()), retryEndpoint)); fail("request should have failed"); } catch (ResponseException e) { - /* - * Unwrap the top level failure that was added so the stack trace contains - * the caller. It wraps the exception that contains the failed hosts. - */ - e = (ResponseException) e.getCause(); Set hostsSet = hostsSet(); //first request causes all the hosts to be blacklisted, the returned exception holds one suppressed exception each failureListener.assertCalled(nodes); @@ -206,11 +153,6 @@ public class RestClientMultipleHostsTests extends RestClientTestCase { } while(e != null); assertEquals("every host should have been used but some weren't: " + hostsSet, 0, hostsSet.size()); } catch (IOException e) { - /* - * Unwrap the top level failure that was added so the stack trace contains - * the caller. It wraps the exception that contains the failed hosts. - */ - e = (IOException) e.getCause(); Set hostsSet = hostsSet(); //first request causes all the hosts to be blacklisted, the returned exception holds one suppressed exception each failureListener.assertCalled(nodes); @@ -236,7 +178,8 @@ public class RestClientMultipleHostsTests extends RestClientTestCase { for (int j = 0; j < nodes.size(); j++) { retryEndpoint = randomErrorRetryEndpoint(); try { - restClient.performRequest(new Request(randomHttpMethod(getRandom()), retryEndpoint)); + RestClientSingleHostTests.performRequestSyncOrAsync(restClient, + new Request(randomHttpMethod(getRandom()), retryEndpoint)); fail("request should have failed"); } catch (ResponseException e) { Response response = e.getResponse(); @@ -247,11 +190,6 @@ public class RestClientMultipleHostsTests extends RestClientTestCase { failureListener.assertCalled(response.getHost()); assertEquals(0, e.getSuppressed().length); } catch (IOException e) { - /* - * Unwrap the top level failure that was added so the stack trace contains - * the caller. It wraps the exception that contains the failed hosts. - */ - e = (IOException) e.getCause(); HttpHost httpHost = HttpHost.create(e.getMessage()); assertTrue("host [" + httpHost + "] not found, most likely used multiple times", hostsSet.remove(httpHost)); //after the first request, all hosts are blacklisted, a single one gets resurrected each time @@ -268,7 +206,8 @@ public class RestClientMultipleHostsTests extends RestClientTestCase { int statusCode = randomErrorNoRetryStatusCode(getRandom()); Response response; try { - response = restClient.performRequest(new Request(randomHttpMethod(getRandom()), "/" + statusCode)); + response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, + new Request(randomHttpMethod(getRandom()), "/" + statusCode)); } catch (ResponseException e) { response = e.getResponse(); } @@ -285,7 +224,8 @@ public class RestClientMultipleHostsTests extends RestClientTestCase { for (int y = 0; y < i + 1; y++) { retryEndpoint = randomErrorRetryEndpoint(); try { - restClient.performRequest(new Request(randomHttpMethod(getRandom()), retryEndpoint)); + RestClientSingleHostTests.performRequestSyncOrAsync(restClient, + new Request(randomHttpMethod(getRandom()), retryEndpoint)); fail("request should have failed"); } catch (ResponseException e) { Response response = e.getResponse(); @@ -293,11 +233,6 @@ public class RestClientMultipleHostsTests extends RestClientTestCase { assertThat(response.getHost(), equalTo(selectedHost)); failureListener.assertCalled(selectedHost); } catch(IOException e) { - /* - * Unwrap the top level failure that was added so the stack trace contains - * the caller. It wraps the exception that contains the failed hosts. - */ - e = (IOException) e.getCause(); HttpHost httpHost = HttpHost.create(e.getMessage()); assertThat(httpHost, equalTo(selectedHost)); failureListener.assertCalled(selectedHost); @@ -307,7 +242,7 @@ public class RestClientMultipleHostsTests extends RestClientTestCase { } } - public void testNodeSelector() throws IOException { + public void testNodeSelector() throws Exception { NodeSelector firstPositionOnly = new NodeSelector() { @Override public void select(Iterable restClientNodes) { @@ -330,12 +265,12 @@ public class RestClientMultipleHostsTests extends RestClientTestCase { * NodeSelector overrides the round robin behavior. */ Request request = new Request("GET", "/200"); - Response response = restClient.performRequest(request); + Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, request); assertEquals(nodes.get(0).getHost(), response.getHost()); } } - public void testSetNodes() throws IOException { + public void testSetNodes() throws Exception { RestClient restClient = createRestClient(NodeSelector.SKIP_DEDICATED_MASTERS); List newNodes = new ArrayList<>(nodes.size()); for (int i = 0; i < nodes.size(); i++) { @@ -350,7 +285,7 @@ public class RestClientMultipleHostsTests extends RestClientTestCase { * NodeSelector overrides the round robin behavior. */ Request request = new Request("GET", "/200"); - Response response = restClient.performRequest(request); + Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, request); assertEquals(newNodes.get(0).getHost(), response.getHost()); } } diff --git a/client/rest/src/test/java/org/elasticsearch/client/RestClientSingleHostIntegTests.java b/client/rest/src/test/java/org/elasticsearch/client/RestClientSingleHostIntegTests.java index fb58f18d42a..e3fd3c31137 100644 --- a/client/rest/src/test/java/org/elasticsearch/client/RestClientSingleHostIntegTests.java +++ b/client/rest/src/test/java/org/elasticsearch/client/RestClientSingleHostIntegTests.java @@ -206,7 +206,7 @@ public class RestClientSingleHostIntegTests extends RestClientTestCase { * to set/add headers to the {@link org.apache.http.client.HttpClient}. * Exercises the test http server ability to send back whatever headers it received. */ - public void testHeaders() throws IOException { + public void testHeaders() throws Exception { for (String method : getHttpMethods()) { final Set standardHeaders = new HashSet<>(Arrays.asList("Connection", "Host", "User-agent", "Date")); if (method.equals("HEAD") == false) { @@ -222,7 +222,7 @@ public class RestClientSingleHostIntegTests extends RestClientTestCase { request.setOptions(options); Response esResponse; try { - esResponse = restClient.performRequest(request); + esResponse = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, request); } catch (ResponseException e) { esResponse = e.getResponse(); } @@ -246,7 +246,7 @@ public class RestClientSingleHostIntegTests extends RestClientTestCase { * out of the box by {@link org.apache.http.client.HttpClient}. * Exercises the test http server ability to send back whatever body it received. */ - public void testDeleteWithBody() throws IOException { + public void testDeleteWithBody() throws Exception { bodyTest("DELETE"); } @@ -255,57 +255,57 @@ public class RestClientSingleHostIntegTests extends RestClientTestCase { * out of the box by {@link org.apache.http.client.HttpClient}. * Exercises the test http server ability to send back whatever body it received. */ - public void testGetWithBody() throws IOException { + public void testGetWithBody() throws Exception { bodyTest("GET"); } - public void testEncodeParams() throws IOException { + public void testEncodeParams() throws Exception { { Request request = new Request("PUT", "/200"); request.addParameter("routing", "this/is/the/routing"); - Response response = restClient.performRequest(request); + Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, request); assertEquals(pathPrefix + "/200?routing=this%2Fis%2Fthe%2Frouting", response.getRequestLine().getUri()); } { Request request = new Request("PUT", "/200"); request.addParameter("routing", "this|is|the|routing"); - Response response = restClient.performRequest(request); + Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, request); assertEquals(pathPrefix + "/200?routing=this%7Cis%7Cthe%7Crouting", response.getRequestLine().getUri()); } { Request request = new Request("PUT", "/200"); request.addParameter("routing", "routing#1"); - Response response = restClient.performRequest(request); + Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, request); assertEquals(pathPrefix + "/200?routing=routing%231", response.getRequestLine().getUri()); } { Request request = new Request("PUT", "/200"); request.addParameter("routing", "中文"); - Response response = restClient.performRequest(request); + Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, request); assertEquals(pathPrefix + "/200?routing=%E4%B8%AD%E6%96%87", response.getRequestLine().getUri()); } { Request request = new Request("PUT", "/200"); request.addParameter("routing", "foo bar"); - Response response = restClient.performRequest(request); + Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, request); assertEquals(pathPrefix + "/200?routing=foo+bar", response.getRequestLine().getUri()); } { Request request = new Request("PUT", "/200"); request.addParameter("routing", "foo+bar"); - Response response = restClient.performRequest(request); + Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, request); assertEquals(pathPrefix + "/200?routing=foo%2Bbar", response.getRequestLine().getUri()); } { Request request = new Request("PUT", "/200"); request.addParameter("routing", "foo/bar"); - Response response = restClient.performRequest(request); + Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, request); assertEquals(pathPrefix + "/200?routing=foo%2Fbar", response.getRequestLine().getUri()); } { Request request = new Request("PUT", "/200"); request.addParameter("routing", "foo^bar"); - Response response = restClient.performRequest(request); + Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, request); assertEquals(pathPrefix + "/200?routing=foo%5Ebar", response.getRequestLine().getUri()); } } @@ -313,7 +313,7 @@ public class RestClientSingleHostIntegTests extends RestClientTestCase { /** * Verify that credentials are sent on the first request with preemptive auth enabled (default when provided with credentials). */ - public void testPreemptiveAuthEnabled() throws IOException { + public void testPreemptiveAuthEnabled() throws Exception { final String[] methods = {"POST", "PUT", "GET", "DELETE"}; try (RestClient restClient = createRestClient(true, true)) { @@ -328,7 +328,7 @@ public class RestClientSingleHostIntegTests extends RestClientTestCase { /** * Verify that credentials are not sent on the first request with preemptive auth disabled. */ - public void testPreemptiveAuthDisabled() throws IOException { + public void testPreemptiveAuthDisabled() throws Exception { final String[] methods = {"POST", "PUT", "GET", "DELETE"}; try (RestClient restClient = createRestClient(true, false)) { @@ -343,7 +343,7 @@ public class RestClientSingleHostIntegTests extends RestClientTestCase { /** * Verify that credentials continue to be sent even if a 401 (Unauthorized) response is received */ - public void testAuthCredentialsAreNotClearedOnAuthChallenge() throws IOException { + public void testAuthCredentialsAreNotClearedOnAuthChallenge() throws Exception { final String[] methods = {"POST", "PUT", "GET", "DELETE"}; try (RestClient restClient = createRestClient(true, true)) { @@ -362,14 +362,14 @@ public class RestClientSingleHostIntegTests extends RestClientTestCase { public void testUrlWithoutLeadingSlash() throws Exception { if (pathPrefix.length() == 0) { try { - restClient.performRequest(new Request("GET", "200")); + RestClientSingleHostTests.performRequestSyncOrAsync(restClient, new Request("GET", "200")); fail("request should have failed"); } catch (ResponseException e) { assertEquals(404, e.getResponse().getStatusLine().getStatusCode()); } } else { { - Response response = restClient.performRequest(new Request("GET", "200")); + Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, new Request("GET", "200")); //a trailing slash gets automatically added if a pathPrefix is configured assertEquals(200, response.getStatusLine().getStatusCode()); } @@ -378,7 +378,7 @@ public class RestClientSingleHostIntegTests extends RestClientTestCase { try (RestClient restClient = RestClient.builder( new HttpHost(httpServer.getAddress().getHostString(), httpServer.getAddress().getPort())) .setPathPrefix(pathPrefix.substring(1)).build()) { - Response response = restClient.performRequest(new Request("GET", "200")); + Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, new Request("GET", "200")); //a trailing slash gets automatically added if a pathPrefix is configured assertEquals(200, response.getStatusLine().getStatusCode()); } @@ -386,16 +386,16 @@ public class RestClientSingleHostIntegTests extends RestClientTestCase { } } - private Response bodyTest(final String method) throws IOException { + private Response bodyTest(final String method) throws Exception { return bodyTest(restClient, method); } - private Response bodyTest(final RestClient restClient, final String method) throws IOException { + private Response bodyTest(final RestClient restClient, final String method) throws Exception { int statusCode = randomStatusCode(getRandom()); return bodyTest(restClient, method, statusCode, new Header[0]); } - private Response bodyTest(RestClient restClient, String method, int statusCode, Header[] headers) throws IOException { + private Response bodyTest(RestClient restClient, String method, int statusCode, Header[] headers) throws Exception { String requestBody = "{ \"field\": \"value\" }"; Request request = new Request(method, "/" + statusCode); request.setJsonEntity(requestBody); @@ -406,7 +406,7 @@ public class RestClientSingleHostIntegTests extends RestClientTestCase { request.setOptions(options); Response esResponse; try { - esResponse = restClient.performRequest(request); + esResponse = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, request); } catch(ResponseException e) { esResponse = e.getResponse(); } diff --git a/client/rest/src/test/java/org/elasticsearch/client/RestClientSingleHostTests.java b/client/rest/src/test/java/org/elasticsearch/client/RestClientSingleHostTests.java index aaef5404f28..625a4612f33 100644 --- a/client/rest/src/test/java/org/elasticsearch/client/RestClientSingleHostTests.java +++ b/client/rest/src/test/java/org/elasticsearch/client/RestClientSingleHostTests.java @@ -21,6 +21,7 @@ package org.elasticsearch.client; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.http.ConnectionClosedException; import org.apache.http.Header; import org.apache.http.HttpEntity; import org.apache.http.HttpEntityEnclosingRequest; @@ -42,7 +43,6 @@ import org.apache.http.concurrent.FutureCallback; import org.apache.http.conn.ConnectTimeoutException; import org.apache.http.entity.ContentType; import org.apache.http.entity.StringEntity; -import org.apache.http.impl.auth.BasicScheme; import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; import org.apache.http.message.BasicHttpResponse; import org.apache.http.message.BasicStatusLine; @@ -55,24 +55,30 @@ import org.mockito.ArgumentCaptor; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import javax.net.ssl.SSLHandshakeException; import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; import java.net.SocketTimeoutException; import java.net.URI; +import java.net.URISyntaxException; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicReference; import static java.util.Collections.singletonList; import static org.elasticsearch.client.RestClientTestUtil.getAllErrorStatusCodes; import static org.elasticsearch.client.RestClientTestUtil.getHttpMethods; import static org.elasticsearch.client.RestClientTestUtil.getOkStatusCodes; import static org.elasticsearch.client.RestClientTestUtil.randomStatusCode; -import static org.elasticsearch.client.SyncResponseListenerTests.assertExceptionStackContainsCallingMethod; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; @@ -105,60 +111,86 @@ public class RestClientSingleHostTests extends RestClientTestCase { private boolean strictDeprecationMode; @Before - @SuppressWarnings("unchecked") public void createRestClient() { - httpClient = mock(CloseableHttpAsyncClient.class); - when(httpClient.execute(any(HttpAsyncRequestProducer.class), any(HttpAsyncResponseConsumer.class), - any(HttpClientContext.class), any(FutureCallback.class))).thenAnswer(new Answer>() { - @Override - public Future answer(InvocationOnMock invocationOnMock) throws Throwable { - HttpAsyncRequestProducer requestProducer = (HttpAsyncRequestProducer) invocationOnMock.getArguments()[0]; - HttpClientContext context = (HttpClientContext) invocationOnMock.getArguments()[2]; - assertThat(context.getAuthCache().get(node.getHost()), instanceOf(BasicScheme.class)); - final FutureCallback futureCallback = - (FutureCallback) invocationOnMock.getArguments()[3]; - HttpUriRequest request = (HttpUriRequest)requestProducer.generateRequest(); - //return the desired status code or exception depending on the path - if (request.getURI().getPath().equals("/soe")) { - futureCallback.failed(new SocketTimeoutException()); - } else if (request.getURI().getPath().equals("/coe")) { - futureCallback.failed(new ConnectTimeoutException()); - } else { - int statusCode = Integer.parseInt(request.getURI().getPath().substring(1)); - StatusLine statusLine = new BasicStatusLine(new ProtocolVersion("http", 1, 1), statusCode, ""); - - final HttpResponse httpResponse = new BasicHttpResponse(statusLine); - //return the same body that was sent - if (request instanceof HttpEntityEnclosingRequest) { - HttpEntity entity = ((HttpEntityEnclosingRequest) request).getEntity(); - if (entity != null) { - assertTrue("the entity is not repeatable, cannot set it to the response directly", - entity.isRepeatable()); - httpResponse.setEntity(entity); - } - } - //return the same headers that were sent - httpResponse.setHeaders(request.getAllHeaders()); - // Call the callback asynchronous to better simulate how async http client works - exec.execute(new Runnable() { - @Override - public void run() { - futureCallback.completed(httpResponse); - } - }); - } - return null; - } - }); - + httpClient = mockHttpClient(exec); defaultHeaders = RestClientTestUtil.randomHeaders(getRandom(), "Header-default"); node = new Node(new HttpHost("localhost", 9200)); failureListener = new HostsTrackingFailureListener(); strictDeprecationMode = randomBoolean(); - restClient = new RestClient(httpClient, 10000, defaultHeaders, + restClient = new RestClient(this.httpClient, defaultHeaders, singletonList(node), null, failureListener, NodeSelector.ANY, strictDeprecationMode); } + @SuppressWarnings("unchecked") + static CloseableHttpAsyncClient mockHttpClient(final ExecutorService exec) { + CloseableHttpAsyncClient httpClient = mock(CloseableHttpAsyncClient.class); + when(httpClient.execute(any(HttpAsyncRequestProducer.class), any(HttpAsyncResponseConsumer.class), + any(HttpClientContext.class), any(FutureCallback.class))).thenAnswer(new Answer>() { + @Override + public Future answer(InvocationOnMock invocationOnMock) throws Throwable { + final HttpAsyncRequestProducer requestProducer = (HttpAsyncRequestProducer) invocationOnMock.getArguments()[0]; + final FutureCallback futureCallback = + (FutureCallback) invocationOnMock.getArguments()[3]; + // Call the callback asynchronous to better simulate how async http client works + return exec.submit(new Callable() { + @Override + public HttpResponse call() throws Exception { + if (futureCallback != null) { + try { + HttpResponse httpResponse = responseOrException(requestProducer); + futureCallback.completed(httpResponse); + } catch(Exception e) { + futureCallback.failed(e); + } + return null; + } + return responseOrException(requestProducer); + } + }); + } + }); + return httpClient; + } + + private static HttpResponse responseOrException(HttpAsyncRequestProducer requestProducer) throws Exception { + final HttpUriRequest request = (HttpUriRequest)requestProducer.generateRequest(); + final HttpHost httpHost = requestProducer.getTarget(); + //return the desired status code or exception depending on the path + switch (request.getURI().getPath()) { + case "/soe": + throw new SocketTimeoutException(httpHost.toString()); + case "/coe": + throw new ConnectTimeoutException(httpHost.toString()); + case "/ioe": + throw new IOException(httpHost.toString()); + case "/closed": + throw new ConnectionClosedException(); + case "/handshake": + throw new SSLHandshakeException(""); + case "/uri": + throw new URISyntaxException("", ""); + case "/runtime": + throw new RuntimeException(); + default: + int statusCode = Integer.parseInt(request.getURI().getPath().substring(1)); + StatusLine statusLine = new BasicStatusLine(new ProtocolVersion("http", 1, 1), statusCode, ""); + + final HttpResponse httpResponse = new BasicHttpResponse(statusLine); + //return the same body that was sent + if (request instanceof HttpEntityEnclosingRequest) { + HttpEntity entity = ((HttpEntityEnclosingRequest) request).getEntity(); + if (entity != null) { + assertTrue("the entity is not repeatable, cannot set it to the response directly", + entity.isRepeatable()); + httpResponse.setEntity(entity); + } + } + //return the same headers that were sent + httpResponse.setHeaders(request.getAllHeaders()); + return httpResponse; + } + } + /** * Shutdown the executor so we don't leak threads into other test runs. */ @@ -195,10 +227,10 @@ public class RestClientSingleHostTests extends RestClientTestCase { /** * End to end test for ok status codes */ - public void testOkStatusCodes() throws IOException { + public void testOkStatusCodes() throws Exception { for (String method : getHttpMethods()) { for (int okStatusCode : getOkStatusCodes()) { - Response response = restClient.performRequest(new Request(method, "/" + okStatusCode)); + Response response = performRequestSyncOrAsync(restClient, new Request(method, "/" + okStatusCode)); assertThat(response.getStatusLine().getStatusCode(), equalTo(okStatusCode)); } } @@ -208,7 +240,7 @@ public class RestClientSingleHostTests extends RestClientTestCase { /** * End to end test for error status codes: they should cause an exception to be thrown, apart from 404 with HEAD requests */ - public void testErrorStatusCodes() throws IOException { + public void testErrorStatusCodes() throws Exception { for (String method : getHttpMethods()) { Set expectedIgnores = new HashSet<>(); String ignoreParam = ""; @@ -256,21 +288,74 @@ public class RestClientSingleHostTests extends RestClientTestCase { } } - public void testIOExceptions() { + public void testPerformRequestIOExceptions() throws Exception { for (String method : getHttpMethods()) { //IOExceptions should be let bubble up try { - restClient.performRequest(new Request(method, "/coe")); + restClient.performRequest(new Request(method, "/ioe")); fail("request should have failed"); } catch(IOException e) { - assertThat(e, instanceOf(ConnectTimeoutException.class)); + // And we do all that so the thrown exception has our method in the stacktrace + assertExceptionStackContainsCallingMethod(e); + } + failureListener.assertCalled(singletonList(node)); + try { + restClient.performRequest(new Request(method, "/coe")); + fail("request should have failed"); + } catch(ConnectTimeoutException e) { + // And we do all that so the thrown exception has our method in the stacktrace + assertExceptionStackContainsCallingMethod(e); } failureListener.assertCalled(singletonList(node)); try { restClient.performRequest(new Request(method, "/soe")); fail("request should have failed"); - } catch(IOException e) { - assertThat(e, instanceOf(SocketTimeoutException.class)); + } catch(SocketTimeoutException e) { + // And we do all that so the thrown exception has our method in the stacktrace + assertExceptionStackContainsCallingMethod(e); + } + failureListener.assertCalled(singletonList(node)); + try { + restClient.performRequest(new Request(method, "/closed")); + fail("request should have failed"); + } catch(ConnectionClosedException e) { + // And we do all that so the thrown exception has our method in the stacktrace + assertExceptionStackContainsCallingMethod(e); + } + failureListener.assertCalled(singletonList(node)); + try { + restClient.performRequest(new Request(method, "/handshake")); + fail("request should have failed"); + } catch(SSLHandshakeException e) { + // And we do all that so the thrown exception has our method in the stacktrace + assertExceptionStackContainsCallingMethod(e); + } + failureListener.assertCalled(singletonList(node)); + } + } + + public void testPerformRequestRuntimeExceptions() throws Exception { + for (String method : getHttpMethods()) { + try { + restClient.performRequest(new Request(method, "/runtime")); + fail("request should have failed"); + } catch (RuntimeException e) { + // And we do all that so the thrown exception has our method in the stacktrace + assertExceptionStackContainsCallingMethod(e); + } + failureListener.assertCalled(singletonList(node)); + } + } + + public void testPerformRequestExceptions() throws Exception { + for (String method : getHttpMethods()) { + try { + restClient.performRequest(new Request(method, "/uri")); + fail("request should have failed"); + } catch (RuntimeException e) { + assertThat(e.getCause(), instanceOf(URISyntaxException.class)); + // And we do all that so the thrown exception has our method in the stacktrace + assertExceptionStackContainsCallingMethod(e); } failureListener.assertCalled(singletonList(node)); } @@ -280,7 +365,7 @@ public class RestClientSingleHostTests extends RestClientTestCase { * End to end test for request and response body. Exercises the mock http client ability to send back * whatever body it has received. */ - public void testBody() throws IOException { + public void testBody() throws Exception { String body = "{ \"field\": \"value\" }"; StringEntity entity = new StringEntity(body, ContentType.APPLICATION_JSON); for (String method : Arrays.asList("DELETE", "GET", "PATCH", "POST", "PUT")) { @@ -309,7 +394,7 @@ public class RestClientSingleHostTests extends RestClientTestCase { Request request = new Request(method, "/" + randomStatusCode(getRandom())); request.setEntity(entity); try { - restClient.performRequest(request); + performRequestSyncOrAsync(restClient, request); fail("request should have failed"); } catch(UnsupportedOperationException e) { assertThat(e.getMessage(), equalTo(method + " with body is not supported")); @@ -321,7 +406,7 @@ public class RestClientSingleHostTests extends RestClientTestCase { * End to end test for request and response headers. Exercises the mock http client ability to send back * whatever headers it has received. */ - public void testHeaders() throws IOException { + public void testHeaders() throws Exception { for (String method : getHttpMethods()) { final Header[] requestHeaders = RestClientTestUtil.randomHeaders(getRandom(), "Header"); final int statusCode = randomStatusCode(getRandom()); @@ -333,7 +418,7 @@ public class RestClientSingleHostTests extends RestClientTestCase { request.setOptions(options); Response esResponse; try { - esResponse = restClient.performRequest(request); + esResponse = performRequestSyncOrAsync(restClient, request); } catch(ResponseException e) { esResponse = e.getResponse(); } @@ -343,7 +428,7 @@ public class RestClientSingleHostTests extends RestClientTestCase { } } - public void testDeprecationWarnings() throws IOException { + public void testDeprecationWarnings() throws Exception { String chars = randomAsciiAlphanumOfLength(5); assertDeprecationWarnings(singletonList("poorly formatted " + chars), singletonList("poorly formatted " + chars)); assertDeprecationWarnings(singletonList(formatWarning(chars)), singletonList(chars)); @@ -397,7 +482,7 @@ public class RestClientSingleHostTests extends RestClientTestCase { protected abstract WarningsHandler warningsHandler(); } - private void assertDeprecationWarnings(List warningHeaderTexts, List warningBodyTexts) throws IOException { + private void assertDeprecationWarnings(List warningHeaderTexts, List warningBodyTexts) throws Exception { String method = randomFrom(getHttpMethods()); Request request = new Request(method, "/200"); RequestOptions.Builder options = request.getOptions().toBuilder(); @@ -420,7 +505,7 @@ public class RestClientSingleHostTests extends RestClientTestCase { Response response; if (expectFailure) { try { - restClient.performRequest(request); + performRequestSyncOrAsync(restClient, request); fail("expected WarningFailureException from warnings"); return; } catch (WarningFailureException e) { @@ -430,7 +515,7 @@ public class RestClientSingleHostTests extends RestClientTestCase { response = e.getResponse(); } } else { - response = restClient.performRequest(request); + response = performRequestSyncOrAsync(restClient, request); } assertEquals(false == warningBodyTexts.isEmpty(), response.hasWarnings()); assertEquals(warningBodyTexts, response.getWarnings()); @@ -461,7 +546,7 @@ public class RestClientSingleHostTests extends RestClientTestCase { //randomly add some ignore parameter, which doesn't get sent as part of the request String ignore = Integer.toString(randomFrom(RestClientTestUtil.getAllErrorStatusCodes())); if (randomBoolean()) { - ignore += "," + Integer.toString(randomFrom(RestClientTestUtil.getAllErrorStatusCodes())); + ignore += "," + randomFrom(RestClientTestUtil.getAllErrorStatusCodes()); } request.addParameter("ignore", ignore); } @@ -520,12 +605,66 @@ public class RestClientSingleHostTests extends RestClientTestCase { expectedRequest.addHeader(defaultHeader); } } - try { - restClient.performRequest(request); - } catch(ResponseException e) { + performRequestSyncOrAsync(restClient, request); + } catch(Exception e) { //all good } return expectedRequest; } + + static Response performRequestSyncOrAsync(RestClient restClient, Request request) throws Exception { + //randomize between sync and async methods + if (randomBoolean()) { + return restClient.performRequest(request); + } else { + final AtomicReference exceptionRef = new AtomicReference<>(); + final AtomicReference responseRef = new AtomicReference<>(); + final CountDownLatch latch = new CountDownLatch(1); + restClient.performRequestAsync(request, new ResponseListener() { + @Override + public void onSuccess(Response response) { + responseRef.set(response); + latch.countDown(); + + } + + @Override + public void onFailure(Exception exception) { + exceptionRef.set(exception); + latch.countDown(); + } + }); + latch.await(); + if (exceptionRef.get() != null) { + throw exceptionRef.get(); + } + return responseRef.get(); + } + } + + /** + * Asserts that the provided {@linkplain Exception} contains the method + * that called this somewhere on its stack. This is + * normally the case for synchronous calls but {@link RestClient} performs + * synchronous calls by performing asynchronous calls and blocking the + * current thread until the call returns so it has to take special care + * to make sure that the caller shows up in the exception. We use this + * assertion to make sure that we don't break that "special care". + */ + private static void assertExceptionStackContainsCallingMethod(Throwable t) { + // 0 is getStackTrace + // 1 is this method + // 2 is the caller, what we want + StackTraceElement myMethod = Thread.currentThread().getStackTrace()[2]; + for (StackTraceElement se : t.getStackTrace()) { + if (se.getClassName().equals(myMethod.getClassName()) + && se.getMethodName().equals(myMethod.getMethodName())) { + return; + } + } + StringWriter stack = new StringWriter(); + t.printStackTrace(new PrintWriter(stack)); + fail("didn't find the calling method (looks like " + myMethod + ") in:\n" + stack); + } } diff --git a/client/rest/src/test/java/org/elasticsearch/client/RestClientTests.java b/client/rest/src/test/java/org/elasticsearch/client/RestClientTests.java index f3f0f0e58b9..6b5f8bf907e 100644 --- a/client/rest/src/test/java/org/elasticsearch/client/RestClientTests.java +++ b/client/rest/src/test/java/org/elasticsearch/client/RestClientTests.java @@ -57,7 +57,7 @@ public class RestClientTests extends RestClientTestCase { public void testCloseIsIdempotent() throws IOException { List nodes = singletonList(new Node(new HttpHost("localhost", 9200))); CloseableHttpAsyncClient closeableHttpAsyncClient = mock(CloseableHttpAsyncClient.class); - RestClient restClient = new RestClient(closeableHttpAsyncClient, 1_000, new Header[0], nodes, null, null, null, false); + RestClient restClient = new RestClient(closeableHttpAsyncClient, new Header[0], nodes, null, null, null, false); restClient.close(); verify(closeableHttpAsyncClient, times(1)).close(); restClient.close(); @@ -353,8 +353,7 @@ public class RestClientTests extends RestClientTestCase { private static RestClient createRestClient() { List nodes = Collections.singletonList(new Node(new HttpHost("localhost", 9200))); - return new RestClient(mock(CloseableHttpAsyncClient.class), randomLongBetween(1_000, 30_000), - new Header[] {}, nodes, null, null, null, false); + return new RestClient(mock(CloseableHttpAsyncClient.class), new Header[] {}, nodes, null, null, null, false); } public void testRoundRobin() throws IOException { diff --git a/client/rest/src/test/java/org/elasticsearch/client/SyncResponseListenerTests.java b/client/rest/src/test/java/org/elasticsearch/client/SyncResponseListenerTests.java deleted file mode 100644 index 683b23a596a..00000000000 --- a/client/rest/src/test/java/org/elasticsearch/client/SyncResponseListenerTests.java +++ /dev/null @@ -1,273 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.client; - -import org.apache.http.ConnectionClosedException; -import org.apache.http.HttpHost; -import org.apache.http.HttpResponse; -import org.apache.http.ProtocolVersion; -import org.apache.http.RequestLine; -import org.apache.http.StatusLine; -import org.apache.http.conn.ConnectTimeoutException; -import org.apache.http.message.BasicHttpResponse; -import org.apache.http.message.BasicRequestLine; -import org.apache.http.message.BasicStatusLine; - -import java.io.IOException; -import java.io.PrintWriter; -import java.io.StringWriter; -import java.net.SocketTimeoutException; -import java.net.URISyntaxException; -import javax.net.ssl.SSLHandshakeException; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.fail; - -public class SyncResponseListenerTests extends RestClientTestCase { - /** - * Asserts that the provided {@linkplain Exception} contains the method - * that called this somewhere on its stack. This is - * normally the case for synchronous calls but {@link RestClient} performs - * synchronous calls by performing asynchronous calls and blocking the - * current thread until the call returns so it has to take special care - * to make sure that the caller shows up in the exception. We use this - * assertion to make sure that we don't break that "special care". - */ - static void assertExceptionStackContainsCallingMethod(Exception e) { - // 0 is getStackTrace - // 1 is this method - // 2 is the caller, what we want - StackTraceElement myMethod = Thread.currentThread().getStackTrace()[2]; - for (StackTraceElement se : e.getStackTrace()) { - if (se.getClassName().equals(myMethod.getClassName()) - && se.getMethodName().equals(myMethod.getMethodName())) { - return; - } - } - StringWriter stack = new StringWriter(); - e.printStackTrace(new PrintWriter(stack)); - fail("didn't find the calling method (looks like " + myMethod + ") in:\n" + stack); - } - - public void testOnSuccessNullResponse() { - RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000); - try { - syncResponseListener.onSuccess(null); - fail("onSuccess should have failed"); - } catch (NullPointerException e) { - assertEquals("response must not be null", e.getMessage()); - } - } - - public void testOnFailureNullException() { - RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000); - try { - syncResponseListener.onFailure(null); - fail("onFailure should have failed"); - } catch (NullPointerException e) { - assertEquals("exception must not be null", e.getMessage()); - } - } - - public void testOnSuccess() throws Exception { - RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000); - Response mockResponse = mockResponse(); - syncResponseListener.onSuccess(mockResponse); - Response response = syncResponseListener.get(); - assertSame(response, mockResponse); - - try { - syncResponseListener.onSuccess(mockResponse); - fail("get should have failed"); - } catch (IllegalStateException e) { - assertEquals(e.getMessage(), "response is already set"); - } - response = syncResponseListener.get(); - assertSame(response, mockResponse); - } - - public void testOnFailure() throws Exception { - RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000); - RuntimeException firstException = new RuntimeException("first-test"); - syncResponseListener.onFailure(firstException); - try { - syncResponseListener.get(); - fail("get should have failed"); - } catch (RuntimeException e) { - assertEquals(firstException.getMessage(), e.getMessage()); - assertSame(firstException, e.getCause()); - } - - RuntimeException secondException = new RuntimeException("second-test"); - try { - syncResponseListener.onFailure(secondException); - } catch(IllegalStateException e) { - assertEquals(e.getMessage(), "exception is already set"); - } - try { - syncResponseListener.get(); - fail("get should have failed"); - } catch (RuntimeException e) { - assertEquals(firstException.getMessage(), e.getMessage()); - assertSame(firstException, e.getCause()); - } - - Response response = mockResponse(); - syncResponseListener.onSuccess(response); - try { - syncResponseListener.get(); - fail("get should have failed"); - } catch (IllegalStateException e) { - assertEquals("response and exception are unexpectedly set at the same time", e.getMessage()); - assertNotNull(e.getSuppressed()); - assertEquals(1, e.getSuppressed().length); - assertSame(firstException, e.getSuppressed()[0]); - } - } - - public void testRuntimeIsBuiltCorrectly() throws Exception { - RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000); - RuntimeException runtimeException = new RuntimeException(); - syncResponseListener.onFailure(runtimeException); - try { - syncResponseListener.get(); - fail("get should have failed"); - } catch (RuntimeException e) { - // We preserve the original exception in the cause - assertSame(runtimeException, e.getCause()); - // We copy the message - assertEquals(runtimeException.getMessage(), e.getMessage()); - // And we do all that so the thrown exception has our method in the stacktrace - assertExceptionStackContainsCallingMethod(e); - } - } - - public void testConnectTimeoutExceptionIsBuiltCorrectly() throws Exception { - RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000); - ConnectTimeoutException timeoutException = new ConnectTimeoutException(); - syncResponseListener.onFailure(timeoutException); - try { - syncResponseListener.get(); - fail("get should have failed"); - } catch (IOException e) { - // We preserve the original exception in the cause - assertSame(timeoutException, e.getCause()); - // We copy the message - assertEquals(timeoutException.getMessage(), e.getMessage()); - // And we do all that so the thrown exception has our method in the stacktrace - assertExceptionStackContainsCallingMethod(e); - } - } - - public void testSocketTimeoutExceptionIsBuiltCorrectly() throws Exception { - RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000); - SocketTimeoutException timeoutException = new SocketTimeoutException(); - syncResponseListener.onFailure(timeoutException); - try { - syncResponseListener.get(); - fail("get should have failed"); - } catch (IOException e) { - // We preserve the original exception in the cause - assertSame(timeoutException, e.getCause()); - // We copy the message - assertEquals(timeoutException.getMessage(), e.getMessage()); - // And we do all that so the thrown exception has our method in the stacktrace - assertExceptionStackContainsCallingMethod(e); - } - } - - public void testConnectionClosedExceptionIsWrapped() throws Exception { - RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000); - ConnectionClosedException closedException = new ConnectionClosedException(randomAsciiAlphanumOfLength(5)); - syncResponseListener.onFailure(closedException); - try { - syncResponseListener.get(); - fail("get should have failed"); - } catch (ConnectionClosedException e) { - // We preserve the original exception in the cause - assertSame(closedException, e.getCause()); - // We copy the message - assertEquals(closedException.getMessage(), e.getMessage()); - // And we do all that so the thrown exception has our method in the stacktrace - assertExceptionStackContainsCallingMethod(e); - } - } - - public void testSSLHandshakeExceptionIsWrapped() throws Exception { - RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000); - SSLHandshakeException exception = new SSLHandshakeException(randomAsciiAlphanumOfLength(5)); - syncResponseListener.onFailure(exception); - try { - syncResponseListener.get(); - fail("get should have failed"); - } catch (SSLHandshakeException e) { - // We preserve the original exception in the cause - assertSame(exception, e.getCause()); - // We copy the message - assertEquals(exception.getMessage(), e.getMessage()); - // And we do all that so the thrown exception has our method in the stacktrace - assertExceptionStackContainsCallingMethod(e); - } - } - - public void testIOExceptionIsBuiltCorrectly() throws Exception { - RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000); - IOException ioException = new IOException(); - syncResponseListener.onFailure(ioException); - try { - syncResponseListener.get(); - fail("get should have failed"); - } catch (IOException e) { - // We preserve the original exception in the cause - assertSame(ioException, e.getCause()); - // We copy the message - assertEquals(ioException.getMessage(), e.getMessage()); - // And we do all that so the thrown exception has our method in the stacktrace - assertExceptionStackContainsCallingMethod(e); - } - } - - public void testExceptionIsWrapped() throws Exception { - RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000); - //we just need any checked exception - URISyntaxException exception = new URISyntaxException("test", "test"); - syncResponseListener.onFailure(exception); - try { - syncResponseListener.get(); - fail("get should have failed"); - } catch (RuntimeException e) { - assertEquals("error while performing request", e.getMessage()); - // We preserve the original exception in the cause - assertSame(exception, e.getCause()); - // And we do all that so the thrown exception has our method in the stacktrace - assertExceptionStackContainsCallingMethod(e); - } - } - - private static Response mockResponse() { - ProtocolVersion protocolVersion = new ProtocolVersion("HTTP", 1, 1); - RequestLine requestLine = new BasicRequestLine("GET", "/", protocolVersion); - StatusLine statusLine = new BasicStatusLine(protocolVersion, 200, "OK"); - HttpResponse httpResponse = new BasicHttpResponse(statusLine); - return new Response(requestLine, new HttpHost("localhost", 9200), httpResponse); - } -} diff --git a/client/rest/src/test/java/org/elasticsearch/client/documentation/RestClientDocumentation.java b/client/rest/src/test/java/org/elasticsearch/client/documentation/RestClientDocumentation.java index 7eae17d83cf..8653db4226f 100644 --- a/client/rest/src/test/java/org/elasticsearch/client/documentation/RestClientDocumentation.java +++ b/client/rest/src/test/java/org/elasticsearch/client/documentation/RestClientDocumentation.java @@ -111,13 +111,6 @@ public class RestClientDocumentation { builder.setDefaultHeaders(defaultHeaders); // <1> //end::rest-client-init-default-headers } - { - //tag::rest-client-init-max-retry-timeout - RestClientBuilder builder = RestClient.builder( - new HttpHost("localhost", 9200, "http")); - builder.setMaxRetryTimeoutMillis(10000); // <1> - //end::rest-client-init-max-retry-timeout - } { //tag::rest-client-init-node-selector RestClientBuilder builder = RestClient.builder( @@ -305,8 +298,7 @@ public class RestClientDocumentation { .setConnectTimeout(5000) .setSocketTimeout(60000); } - }) - .setMaxRetryTimeoutMillis(60000); + }); //end::rest-client-config-timeouts } { diff --git a/docs/java-rest/low-level/configuration.asciidoc b/docs/java-rest/low-level/configuration.asciidoc index b7da2b5ebcc..e284b52c67a 100644 --- a/docs/java-rest/low-level/configuration.asciidoc +++ b/docs/java-rest/low-level/configuration.asciidoc @@ -18,8 +18,7 @@ https://hc.apache.org/httpcomponents-client-ga/httpclient/apidocs/org/apache/htt as an argument and has the same return type. The request config builder can be modified and then returned. In the following example we increase the connect timeout (defaults to 1 second) and the socket timeout (defaults to 30 -seconds). Also we adjust the max retry timeout accordingly (defaults to 30 -seconds too). +seconds). ["source","java",subs="attributes,callouts,macros"] -------------------------------------------------- diff --git a/docs/java-rest/low-level/usage.asciidoc b/docs/java-rest/low-level/usage.asciidoc index 3747314b6ec..ee1555019db 100644 --- a/docs/java-rest/low-level/usage.asciidoc +++ b/docs/java-rest/low-level/usage.asciidoc @@ -180,15 +180,6 @@ include-tagged::{doc-tests}/RestClientDocumentation.java[rest-client-init-defaul <1> Set the default headers that need to be sent with each request, to prevent having to specify them with each single request -["source","java",subs="attributes,callouts,macros"] --------------------------------------------------- -include-tagged::{doc-tests}/RestClientDocumentation.java[rest-client-init-max-retry-timeout] --------------------------------------------------- -<1> Set the timeout that should be honoured in case multiple attempts are made -for the same request. The default value is 30 seconds, same as the default -socket timeout. In case the socket timeout is customized, the maximum retry -timeout should be adjusted accordingly - ["source","java",subs="attributes,callouts,macros"] -------------------------------------------------- include-tagged::{doc-tests}/RestClientDocumentation.java[rest-client-init-failure-listener] diff --git a/docs/reference/migration/migrate_7_0/restclient.asciidoc b/docs/reference/migration/migrate_7_0/restclient.asciidoc index 3c0237db6e7..39d19c345cd 100644 --- a/docs/reference/migration/migrate_7_0/restclient.asciidoc +++ b/docs/reference/migration/migrate_7_0/restclient.asciidoc @@ -20,4 +20,13 @@ e.g. `client.index(indexRequest, new Header("name" "value"))` becomes The Cluster Health API used to default to `shards` level to ease migration from transport client that doesn't support the `level` parameter and always returns information including indices and shards details. The level default -value has been aligned with the Elasticsearch default level: `cluster`. \ No newline at end of file +value has been aligned with the Elasticsearch default level: `cluster`. + +=== Low-level REST client changes + +[float] +==== Support for `maxRetryTimeout` removed from RestClient + +`RestClient` and `RestClientBuilder` no longer support the `maxRetryTimeout` +setting. The setting was removed as its counting mechanism was not accurate +and caused issues while adding little value. \ No newline at end of file diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/rest/discovery/Zen2RestApiIT.java b/modules/transport-netty4/src/test/java/org/elasticsearch/rest/discovery/Zen2RestApiIT.java index 5cf96419f55..0a9e5f8dfc1 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/rest/discovery/Zen2RestApiIT.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/rest/discovery/Zen2RestApiIT.java @@ -152,7 +152,7 @@ public class Zen2RestApiIT extends ESNetty4IntegTestCase { } catch (ResponseException e) { assertThat(e.getResponse().getStatusLine().getStatusCode(), is(400)); assertThat( - e.getCause().getMessage(), + e.getMessage(), Matchers.containsString("add voting config exclusions request for [invalid] matched no master-eligible nodes") ); } diff --git a/plugins/repository-azure/qa/microsoft-azure-storage/src/test/java/org/elasticsearch/repositories/azure/AzureStorageRepositoryClientYamlTestSuiteIT.java b/plugins/repository-azure/qa/microsoft-azure-storage/src/test/java/org/elasticsearch/repositories/azure/AzureStorageRepositoryClientYamlTestSuiteIT.java index 720f152265a..9822da98fde 100644 --- a/plugins/repository-azure/qa/microsoft-azure-storage/src/test/java/org/elasticsearch/repositories/azure/AzureStorageRepositoryClientYamlTestSuiteIT.java +++ b/plugins/repository-azure/qa/microsoft-azure-storage/src/test/java/org/elasticsearch/repositories/azure/AzureStorageRepositoryClientYamlTestSuiteIT.java @@ -41,7 +41,6 @@ public class AzureStorageRepositoryClientYamlTestSuiteIT extends ESClientYamlSui protected Settings restClientSettings() { // Give more time to repository-azure to complete the snapshot operations return Settings.builder().put(super.restClientSettings()) - .put(ESRestTestCase.CLIENT_RETRY_TIMEOUT, "60s") .put(ESRestTestCase.CLIENT_SOCKET_TIMEOUT, "60s") .build(); } diff --git a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/AbstractRollingTestCase.java b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/AbstractRollingTestCase.java index eb5517b7acb..1c57be7abba 100644 --- a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/AbstractRollingTestCase.java +++ b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/AbstractRollingTestCase.java @@ -59,7 +59,6 @@ public abstract class AbstractRollingTestCase extends ESRestTestCase { // increase the timeout here to 90 seconds to handle long waits for a green // cluster health. the waits for green need to be longer than a minute to // account for delayed shards - .put(ESRestTestCase.CLIENT_RETRY_TIMEOUT, "90s") .put(ESRestTestCase.CLIENT_SOCKET_TIMEOUT, "90s") .build(); } diff --git a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/UpgradeClusterClientYamlTestSuiteIT.java b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/UpgradeClusterClientYamlTestSuiteIT.java index 7932328c8c2..e7c111aad16 100644 --- a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/UpgradeClusterClientYamlTestSuiteIT.java +++ b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/UpgradeClusterClientYamlTestSuiteIT.java @@ -55,7 +55,6 @@ public class UpgradeClusterClientYamlTestSuiteIT extends ESClientYamlSuiteTestCa // increase the timeout here to 90 seconds to handle long waits for a green // cluster health. the waits for green need to be longer than a minute to // account for delayed shards - .put(ESRestTestCase.CLIENT_RETRY_TIMEOUT, "90s") .put(ESRestTestCase.CLIENT_SOCKET_TIMEOUT, "90s") .build(); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java index 56f8881a5f5..177cdaad941 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java @@ -92,7 +92,6 @@ import static org.hamcrest.Matchers.equalTo; public abstract class ESRestTestCase extends ESTestCase { public static final String TRUSTSTORE_PATH = "truststore.path"; public static final String TRUSTSTORE_PASSWORD = "truststore.password"; - public static final String CLIENT_RETRY_TIMEOUT = "client.retry.timeout"; public static final String CLIENT_SOCKET_TIMEOUT = "client.socket.timeout"; public static final String CLIENT_PATH_PREFIX = "client.path.prefix"; @@ -750,11 +749,6 @@ public abstract class ESRestTestCase extends ESTestCase { } builder.setDefaultHeaders(defaultHeaders); } - final String requestTimeoutString = settings.get(CLIENT_RETRY_TIMEOUT); - if (requestTimeoutString != null) { - final TimeValue maxRetryTimeout = TimeValue.parseTimeValue(requestTimeoutString, CLIENT_RETRY_TIMEOUT); - builder.setMaxRetryTimeoutMillis(Math.toIntExact(maxRetryTimeout.getMillis())); - } final String socketTimeoutString = settings.get(CLIENT_SOCKET_TIMEOUT); if (socketTimeoutString != null) { final TimeValue socketTimeout = TimeValue.parseTimeValue(socketTimeoutString, CLIENT_SOCKET_TIMEOUT); diff --git a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/indexlifecycle/TimeSeriesLifecycleActionsIT.java b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/indexlifecycle/TimeSeriesLifecycleActionsIT.java index 24c1ab1c1cb..8f07b532769 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/indexlifecycle/TimeSeriesLifecycleActionsIT.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/indexlifecycle/TimeSeriesLifecycleActionsIT.java @@ -45,7 +45,6 @@ import org.junit.Before; import java.io.IOException; import java.io.InputStream; -import java.io.UnsupportedEncodingException; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -670,16 +669,16 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase { }); } - public void testInvalidPolicyNames() throws UnsupportedEncodingException { + public void testInvalidPolicyNames() { ResponseException ex; policy = randomAlphaOfLengthBetween(0,10) + "," + randomAlphaOfLengthBetween(0,10); ex = expectThrows(ResponseException.class, () -> createNewSingletonPolicy("delete", new DeleteAction())); - assertThat(ex.getCause().getMessage(), containsString("invalid policy name")); + assertThat(ex.getMessage(), containsString("invalid policy name")); policy = randomAlphaOfLengthBetween(0,10) + "%20" + randomAlphaOfLengthBetween(0,10); ex = expectThrows(ResponseException.class, () -> createNewSingletonPolicy("delete", new DeleteAction())); - assertThat(ex.getCause().getMessage(), containsString("invalid policy name")); + assertThat(ex.getMessage(), containsString("invalid policy name")); policy = "_" + randomAlphaOfLengthBetween(1, 20); ex = expectThrows(ResponseException.class, () -> createNewSingletonPolicy("delete", new DeleteAction())); @@ -716,7 +715,7 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase { Request deleteRequest = new Request("DELETE", "_ilm/policy/" + originalPolicy); ResponseException ex = expectThrows(ResponseException.class, () -> client().performRequest(deleteRequest)); - assertThat(ex.getCause().getMessage(), + assertThat(ex.getMessage(), Matchers.allOf( containsString("Cannot delete policy [" + originalPolicy + "]. It is in use by one or more indices: ["), containsString(managedIndex1), diff --git a/x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/FullClusterRestartIT.java b/x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/FullClusterRestartIT.java index 1e40b31a24c..e4d96645b87 100644 --- a/x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/FullClusterRestartIT.java +++ b/x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/FullClusterRestartIT.java @@ -68,7 +68,6 @@ public class FullClusterRestartIT extends AbstractFullClusterRestartTestCase { // we increase the timeout here to 90 seconds to handle long waits for a green // cluster health. the waits for green need to be longer than a minute to // account for delayed shards - .put(ESRestTestCase.CLIENT_RETRY_TIMEOUT, "90s") .put(ESRestTestCase.CLIENT_SOCKET_TIMEOUT, "90s") .build(); } diff --git a/x-pack/qa/kerberos-tests/src/test/java/org/elasticsearch/xpack/security/authc/kerberos/KerberosAuthenticationIT.java b/x-pack/qa/kerberos-tests/src/test/java/org/elasticsearch/xpack/security/authc/kerberos/KerberosAuthenticationIT.java index ff5c24b15ed..0281f38933b 100644 --- a/x-pack/qa/kerberos-tests/src/test/java/org/elasticsearch/xpack/security/authc/kerberos/KerberosAuthenticationIT.java +++ b/x-pack/qa/kerberos-tests/src/test/java/org/elasticsearch/xpack/security/authc/kerberos/KerberosAuthenticationIT.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.test.rest.ESRestTestCase; import org.junit.Before; +import javax.security.auth.login.LoginContext; import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; @@ -33,8 +34,6 @@ import java.security.PrivilegedExceptionAction; import java.util.List; import java.util.Map; -import javax.security.auth.login.LoginContext; - import static org.elasticsearch.common.xcontent.XContentHelper.convertToMap; import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue; import static org.hamcrest.Matchers.contains; @@ -148,13 +147,7 @@ public class KerberosAuthenticationIT extends ESRestTestCase { return restClientBuilder.build(); } - private static void configureRestClientBuilder(final RestClientBuilder restClientBuilder, final Settings settings) - throws IOException { - final String requestTimeoutString = settings.get(CLIENT_RETRY_TIMEOUT); - if (requestTimeoutString != null) { - final TimeValue maxRetryTimeout = TimeValue.parseTimeValue(requestTimeoutString, CLIENT_RETRY_TIMEOUT); - restClientBuilder.setMaxRetryTimeoutMillis(Math.toIntExact(maxRetryTimeout.getMillis())); - } + private static void configureRestClientBuilder(final RestClientBuilder restClientBuilder, final Settings settings) { final String socketTimeoutString = settings.get(CLIENT_SOCKET_TIMEOUT); if (socketTimeoutString != null) { final TimeValue socketTimeout = TimeValue.parseTimeValue(socketTimeoutString, CLIENT_SOCKET_TIMEOUT); diff --git a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/UpgradeClusterClientYamlTestSuiteIT.java b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/UpgradeClusterClientYamlTestSuiteIT.java index 3c7a9cee455..9374346449c 100644 --- a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/UpgradeClusterClientYamlTestSuiteIT.java +++ b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/UpgradeClusterClientYamlTestSuiteIT.java @@ -68,7 +68,6 @@ public class UpgradeClusterClientYamlTestSuiteIT extends ESClientYamlSuiteTestCa // we increase the timeout here to 90 seconds to handle long waits for a green // cluster health. the waits for green need to be longer than a minute to // account for delayed shards - .put(ESRestTestCase.CLIENT_RETRY_TIMEOUT, "90s") .put(ESRestTestCase.CLIENT_SOCKET_TIMEOUT, "90s") .build(); }