diff --git a/client/src/main/java/org/elasticsearch/client/RestClient.java b/client/src/main/java/org/elasticsearch/client/RestClient.java index e4186edc126..3e59cc297e4 100644 --- a/client/src/main/java/org/elasticsearch/client/RestClient.java +++ b/client/src/main/java/org/elasticsearch/client/RestClient.java @@ -18,28 +18,191 @@ */ package org.elasticsearch.client; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.http.HttpEntity; +import org.apache.http.StatusLine; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpEntityEnclosingRequestBase; +import org.apache.http.client.methods.HttpHead; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.client.methods.HttpRequestBase; +import org.apache.http.client.utils.URIBuilder; import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.util.EntityUtils; import java.io.Closeable; import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Iterator; +import java.util.Locale; import java.util.Map; +import java.util.Objects; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; public final class RestClient implements Closeable { - private final Transport transport; + private static final Log logger = LogFactory.getLog(RestClient.class); + + private final CloseableHttpClient client; + private final ConnectionPool connectionPool; + private final long maxRetryTimeout; public RestClient(CloseableHttpClient client, ConnectionPool connectionPool, long maxRetryTimeout) { - this.transport = new Transport(client, connectionPool, maxRetryTimeout); + Objects.requireNonNull(client, "client cannot be null"); + Objects.requireNonNull(connectionPool, "connectionPool cannot be null"); + if (maxRetryTimeout <= 0) { + throw new IllegalArgumentException("maxRetryTimeout must be greater than 0"); + } + this.client = client; + this.connectionPool = connectionPool; + this.maxRetryTimeout = maxRetryTimeout; } public ElasticsearchResponse performRequest(String method, String endpoint, Map params, HttpEntity entity) throws IOException { - return transport.performRequest(method, endpoint, params, entity); + URI uri = buildUri(endpoint, params); + HttpRequestBase request = createHttpRequest(method, uri, entity); + Iterator connectionIterator = connectionPool.nextConnection().iterator(); + if (connectionIterator.hasNext() == false) { + Connection connection = connectionPool.lastResortConnection(); + logger.info("no healthy nodes available, trying " + connection.getHost()); + return performRequest(request, Stream.of(connection).iterator()); + } + return performRequest(request, connectionIterator); + } + + private ElasticsearchResponse performRequest(HttpRequestBase request, Iterator connectionIterator) throws IOException { + //we apply a soft margin so that e.g. if a request took 59 seconds and timeout is set to 60 we don't do another attempt + long retryTimeout = Math.round(this.maxRetryTimeout / (float)100 * 98); + IOException lastSeenException = null; + long startTime = System.nanoTime(); + + while (connectionIterator.hasNext()) { + Connection connection = connectionIterator.next(); + + if (lastSeenException != null) { + long timeElapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime); + long timeout = retryTimeout - timeElapsed; + if (timeout <= 0) { + IOException retryTimeoutException = new IOException( + "request retries exceeded max retry timeout [" + retryTimeout + "]"); + retryTimeoutException.addSuppressed(lastSeenException); + throw retryTimeoutException; + } + } + + try { + connectionPool.beforeAttempt(connection); + } catch(IOException e) { + lastSeenException = addSuppressedException(lastSeenException, e); + continue; + } + + try { + ElasticsearchResponse response = performRequest(request, connection); + connectionPool.onSuccess(connection); + return response; + } catch(ElasticsearchResponseException e) { + if (e.isRecoverable()) { + connectionPool.onFailure(connection); + lastSeenException = addSuppressedException(lastSeenException, e); + } else { + //don't retry and call onSuccess as the error should be a request problem + connectionPool.onSuccess(connection); + throw e; + } + } catch(IOException e) { + connectionPool.onFailure(connection); + lastSeenException = addSuppressedException(lastSeenException, e); + } + } + assert lastSeenException != null; + throw lastSeenException; + } + + private ElasticsearchResponse performRequest(HttpRequestBase request, Connection connection) throws IOException { + CloseableHttpResponse response; + try { + response = client.execute(connection.getHost(), request); + } catch(IOException e) { + RequestLogger.log(logger, "request failed", request.getRequestLine(), connection.getHost(), e); + throw e; + } finally { + request.reset(); + } + StatusLine statusLine = response.getStatusLine(); + //TODO make ignore status code configurable. rest-spec and tests support that parameter. + if (statusLine.getStatusCode() < 300 || + request.getMethod().equals(HttpHead.METHOD_NAME) && statusLine.getStatusCode() == 404) { + RequestLogger.log(logger, "request succeeded", request.getRequestLine(), connection.getHost(), response.getStatusLine()); + return new ElasticsearchResponse(request.getRequestLine(), connection.getHost(), response); + } else { + EntityUtils.consume(response.getEntity()); + RequestLogger.log(logger, "request failed", request.getRequestLine(), connection.getHost(), response.getStatusLine()); + throw new ElasticsearchResponseException(request.getRequestLine(), connection.getHost(), statusLine); + } + } + + private static IOException addSuppressedException(IOException suppressedException, IOException currentException) { + if (suppressedException != null) { + currentException.addSuppressed(suppressedException); + } + return currentException; + } + + private static HttpRequestBase createHttpRequest(String method, URI uri, HttpEntity entity) { + switch(method.toUpperCase(Locale.ROOT)) { + case HttpDeleteWithEntity.METHOD_NAME: + HttpDeleteWithEntity httpDeleteWithEntity = new HttpDeleteWithEntity(uri); + addRequestBody(httpDeleteWithEntity, entity); + return httpDeleteWithEntity; + case HttpGetWithEntity.METHOD_NAME: + HttpGetWithEntity httpGetWithEntity = new HttpGetWithEntity(uri); + addRequestBody(httpGetWithEntity, entity); + return httpGetWithEntity; + case HttpHead.METHOD_NAME: + if (entity != null) { + throw new UnsupportedOperationException("HEAD with body is not supported"); + } + return new HttpHead(uri); + case HttpPost.METHOD_NAME: + HttpPost httpPost = new HttpPost(uri); + addRequestBody(httpPost, entity); + return httpPost; + case HttpPut.METHOD_NAME: + HttpPut httpPut = new HttpPut(uri); + addRequestBody(httpPut, entity); + return httpPut; + default: + throw new UnsupportedOperationException("http method not supported: " + method); + } + } + + private static void addRequestBody(HttpEntityEnclosingRequestBase httpRequest, HttpEntity entity) { + if (entity != null) { + httpRequest.setEntity(entity); + } + } + + private static URI buildUri(String path, Map params) { + try { + URIBuilder uriBuilder = new URIBuilder(path); + for (Map.Entry param : params.entrySet()) { + uriBuilder.addParameter(param.getKey(), param.getValue().toString()); + } + return uriBuilder.build(); + } catch(URISyntaxException e) { + throw new IllegalArgumentException(e.getMessage(), e); + } } @Override public void close() throws IOException { - transport.close(); + connectionPool.close(); + client.close(); } } diff --git a/client/src/main/java/org/elasticsearch/client/Transport.java b/client/src/main/java/org/elasticsearch/client/Transport.java deleted file mode 100644 index b0f843ec19b..00000000000 --- a/client/src/main/java/org/elasticsearch/client/Transport.java +++ /dev/null @@ -1,208 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.client; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.http.HttpEntity; -import org.apache.http.StatusLine; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpEntityEnclosingRequestBase; -import org.apache.http.client.methods.HttpHead; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.client.methods.HttpPut; -import org.apache.http.client.methods.HttpRequestBase; -import org.apache.http.client.utils.URIBuilder; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.util.EntityUtils; - -import java.io.Closeable; -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.Iterator; -import java.util.Locale; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.TimeUnit; -import java.util.stream.Stream; - -final class Transport implements Closeable { - - private static final Log logger = LogFactory.getLog(Transport.class); - - private final CloseableHttpClient client; - private final ConnectionPool connectionPool; - private final long maxRetryTimeout; - - Transport(CloseableHttpClient client, ConnectionPool connectionPool, long maxRetryTimeout) { - Objects.requireNonNull(client, "client cannot be null"); - Objects.requireNonNull(connectionPool, "connectionPool cannot be null"); - if (maxRetryTimeout <= 0) { - throw new IllegalArgumentException("maxRetryTimeout must be greater than 0"); - } - this.client = client; - this.connectionPool = connectionPool; - this.maxRetryTimeout = maxRetryTimeout; - } - - ElasticsearchResponse performRequest(String method, String endpoint, Map params, HttpEntity entity) throws IOException { - URI uri = buildUri(endpoint, params); - HttpRequestBase request = createHttpRequest(method, uri, entity); - Iterator connectionIterator = connectionPool.nextConnection().iterator(); - if (connectionIterator.hasNext() == false) { - Connection connection = connectionPool.lastResortConnection(); - logger.info("no healthy nodes available, trying " + connection.getHost()); - return performRequest(request, Stream.of(connection).iterator()); - } - return performRequest(request, connectionIterator); - } - - private ElasticsearchResponse performRequest(HttpRequestBase request, Iterator connectionIterator) throws IOException { - //we apply a soft margin so that e.g. if a request took 59 seconds and timeout is set to 60 we don't do another attempt - long retryTimeout = Math.round(this.maxRetryTimeout / (float)100 * 98); - IOException lastSeenException = null; - long startTime = System.nanoTime(); - - while (connectionIterator.hasNext()) { - Connection connection = connectionIterator.next(); - - if (lastSeenException != null) { - long timeElapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime); - long timeout = retryTimeout - timeElapsed; - if (timeout <= 0) { - IOException retryTimeoutException = new IOException( - "request retries exceeded max retry timeout [" + retryTimeout + "]"); - retryTimeoutException.addSuppressed(lastSeenException); - throw retryTimeoutException; - } - } - - try { - connectionPool.beforeAttempt(connection); - } catch(IOException e) { - lastSeenException = addSuppressedException(lastSeenException, e); - continue; - } - - try { - ElasticsearchResponse response = performRequest(request, connection); - connectionPool.onSuccess(connection); - return response; - } catch(ElasticsearchResponseException e) { - if (e.isRecoverable()) { - connectionPool.onFailure(connection); - lastSeenException = addSuppressedException(lastSeenException, e); - } else { - //don't retry and call onSuccess as the error should be a request problem - connectionPool.onSuccess(connection); - throw e; - } - } catch(IOException e) { - connectionPool.onFailure(connection); - lastSeenException = addSuppressedException(lastSeenException, e); - } - } - assert lastSeenException != null; - throw lastSeenException; - } - - private ElasticsearchResponse performRequest(HttpRequestBase request, Connection connection) throws IOException { - CloseableHttpResponse response; - try { - response = client.execute(connection.getHost(), request); - } catch(IOException e) { - RequestLogger.log(logger, "request failed", request.getRequestLine(), connection.getHost(), e); - throw e; - } finally { - request.reset(); - } - StatusLine statusLine = response.getStatusLine(); - //TODO make ignore status code configurable. rest-spec and tests support that parameter. - if (statusLine.getStatusCode() < 300 || - request.getMethod().equals(HttpHead.METHOD_NAME) && statusLine.getStatusCode() == 404) { - RequestLogger.log(logger, "request succeeded", request.getRequestLine(), connection.getHost(), response.getStatusLine()); - return new ElasticsearchResponse(request.getRequestLine(), connection.getHost(), response); - } else { - EntityUtils.consume(response.getEntity()); - RequestLogger.log(logger, "request failed", request.getRequestLine(), connection.getHost(), response.getStatusLine()); - throw new ElasticsearchResponseException(request.getRequestLine(), connection.getHost(), statusLine); - } - } - - private static IOException addSuppressedException(IOException suppressedException, IOException currentException) { - if (suppressedException != null) { - currentException.addSuppressed(suppressedException); - } - return currentException; - } - - private static HttpRequestBase createHttpRequest(String method, URI uri, HttpEntity entity) { - switch(method.toUpperCase(Locale.ROOT)) { - case HttpDeleteWithEntity.METHOD_NAME: - HttpDeleteWithEntity httpDeleteWithEntity = new HttpDeleteWithEntity(uri); - addRequestBody(httpDeleteWithEntity, entity); - return httpDeleteWithEntity; - case HttpGetWithEntity.METHOD_NAME: - HttpGetWithEntity httpGetWithEntity = new HttpGetWithEntity(uri); - addRequestBody(httpGetWithEntity, entity); - return httpGetWithEntity; - case HttpHead.METHOD_NAME: - if (entity != null) { - throw new UnsupportedOperationException("HEAD with body is not supported"); - } - return new HttpHead(uri); - case HttpPost.METHOD_NAME: - HttpPost httpPost = new HttpPost(uri); - addRequestBody(httpPost, entity); - return httpPost; - case HttpPut.METHOD_NAME: - HttpPut httpPut = new HttpPut(uri); - addRequestBody(httpPut, entity); - return httpPut; - default: - throw new UnsupportedOperationException("http method not supported: " + method); - } - } - - private static void addRequestBody(HttpEntityEnclosingRequestBase httpRequest, HttpEntity entity) { - if (entity != null) { - httpRequest.setEntity(entity); - } - } - - private static URI buildUri(String path, Map params) { - try { - URIBuilder uriBuilder = new URIBuilder(path); - for (Map.Entry param : params.entrySet()) { - uriBuilder.addParameter(param.getKey(), param.getValue().toString()); - } - return uriBuilder.build(); - } catch(URISyntaxException e) { - throw new IllegalArgumentException(e.getMessage(), e); - } - } - - @Override - public void close() throws IOException { - connectionPool.close(); - client.close(); - } -} diff --git a/client/src/test/java/org/elasticsearch/client/TransportTests.java b/client/src/test/java/org/elasticsearch/client/RestClientTests.java similarity index 80% rename from client/src/test/java/org/elasticsearch/client/TransportTests.java rename to client/src/test/java/org/elasticsearch/client/RestClientTests.java index 1efafd438b0..52e18831591 100644 --- a/client/src/test/java/org/elasticsearch/client/TransportTests.java +++ b/client/src/test/java/org/elasticsearch/client/RestClientTests.java @@ -28,13 +28,13 @@ import java.io.IOException; import java.util.logging.LogManager; import java.util.stream.Stream; -public class TransportTests extends LuceneTestCase { +public class RestClientTests extends LuceneTestCase { static { LogManager.getLogManager().reset(); } - public void testConstructor() { + public void testConstructor() throws IOException { CloseableHttpClient httpClient = HttpClientBuilder.create().build(); ConnectionPool connectionPool = new ConnectionPool() { @Override @@ -69,27 +69,28 @@ public class TransportTests extends LuceneTestCase { }; try { - new Transport(null, connectionPool, RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE)); + new RestClient(null, connectionPool, RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE)); fail("transport creation should have failed"); } catch(NullPointerException e) { assertEquals(e.getMessage(), "client cannot be null"); } try { - new Transport(httpClient, null, RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE)); + new RestClient(httpClient, null, RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE)); fail("transport creation should have failed"); } catch(NullPointerException e) { assertEquals(e.getMessage(), "connectionPool cannot be null"); } try { - new Transport(httpClient, connectionPool, RandomInts.randomIntBetween(random(), Integer.MIN_VALUE, 0)); + new RestClient(httpClient, connectionPool, RandomInts.randomIntBetween(random(), Integer.MIN_VALUE, 0)); fail("transport creation should have failed"); } catch(IllegalArgumentException e) { assertEquals(e.getMessage(), "maxRetryTimeout must be greater than 0"); } - Transport transport = new Transport(httpClient, connectionPool, RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE)); - assertNotNull(transport); + try(RestClient client = new RestClient(httpClient, connectionPool, RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE))) { + assertNotNull(client); + } } }