Remove Transport class, move all of it within RestClient class

This commit is contained in:
javanna 2016-05-06 10:12:43 +02:00 committed by Luca Cavanna
parent 062a21678c
commit bd29dc1572
3 changed files with 175 additions and 219 deletions

View File

@ -18,28 +18,191 @@
*/ */
package org.elasticsearch.client; package org.elasticsearch.client;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.HttpEntity; import org.apache.http.HttpEntity;
import org.apache.http.StatusLine;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
import org.apache.http.client.methods.HttpHead;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.util.EntityUtils;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Iterator;
import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
public final class RestClient implements Closeable { public final class RestClient implements Closeable {
private final Transport transport; private static final Log logger = LogFactory.getLog(RestClient.class);
private final CloseableHttpClient client;
private final ConnectionPool connectionPool;
private final long maxRetryTimeout;
public RestClient(CloseableHttpClient client, ConnectionPool connectionPool, long maxRetryTimeout) { public RestClient(CloseableHttpClient client, ConnectionPool connectionPool, long maxRetryTimeout) {
this.transport = new Transport(client, connectionPool, maxRetryTimeout); Objects.requireNonNull(client, "client cannot be null");
Objects.requireNonNull(connectionPool, "connectionPool cannot be null");
if (maxRetryTimeout <= 0) {
throw new IllegalArgumentException("maxRetryTimeout must be greater than 0");
}
this.client = client;
this.connectionPool = connectionPool;
this.maxRetryTimeout = maxRetryTimeout;
} }
public ElasticsearchResponse performRequest(String method, String endpoint, Map<String, Object> params, HttpEntity entity) public ElasticsearchResponse performRequest(String method, String endpoint, Map<String, Object> params, HttpEntity entity)
throws IOException { throws IOException {
return transport.performRequest(method, endpoint, params, entity); URI uri = buildUri(endpoint, params);
HttpRequestBase request = createHttpRequest(method, uri, entity);
Iterator<Connection> connectionIterator = connectionPool.nextConnection().iterator();
if (connectionIterator.hasNext() == false) {
Connection connection = connectionPool.lastResortConnection();
logger.info("no healthy nodes available, trying " + connection.getHost());
return performRequest(request, Stream.of(connection).iterator());
}
return performRequest(request, connectionIterator);
}
private ElasticsearchResponse performRequest(HttpRequestBase request, Iterator<Connection> connectionIterator) throws IOException {
//we apply a soft margin so that e.g. if a request took 59 seconds and timeout is set to 60 we don't do another attempt
long retryTimeout = Math.round(this.maxRetryTimeout / (float)100 * 98);
IOException lastSeenException = null;
long startTime = System.nanoTime();
while (connectionIterator.hasNext()) {
Connection connection = connectionIterator.next();
if (lastSeenException != null) {
long timeElapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
long timeout = retryTimeout - timeElapsed;
if (timeout <= 0) {
IOException retryTimeoutException = new IOException(
"request retries exceeded max retry timeout [" + retryTimeout + "]");
retryTimeoutException.addSuppressed(lastSeenException);
throw retryTimeoutException;
}
}
try {
connectionPool.beforeAttempt(connection);
} catch(IOException e) {
lastSeenException = addSuppressedException(lastSeenException, e);
continue;
}
try {
ElasticsearchResponse response = performRequest(request, connection);
connectionPool.onSuccess(connection);
return response;
} catch(ElasticsearchResponseException e) {
if (e.isRecoverable()) {
connectionPool.onFailure(connection);
lastSeenException = addSuppressedException(lastSeenException, e);
} else {
//don't retry and call onSuccess as the error should be a request problem
connectionPool.onSuccess(connection);
throw e;
}
} catch(IOException e) {
connectionPool.onFailure(connection);
lastSeenException = addSuppressedException(lastSeenException, e);
}
}
assert lastSeenException != null;
throw lastSeenException;
}
private ElasticsearchResponse performRequest(HttpRequestBase request, Connection connection) throws IOException {
CloseableHttpResponse response;
try {
response = client.execute(connection.getHost(), request);
} catch(IOException e) {
RequestLogger.log(logger, "request failed", request.getRequestLine(), connection.getHost(), e);
throw e;
} finally {
request.reset();
}
StatusLine statusLine = response.getStatusLine();
//TODO make ignore status code configurable. rest-spec and tests support that parameter.
if (statusLine.getStatusCode() < 300 ||
request.getMethod().equals(HttpHead.METHOD_NAME) && statusLine.getStatusCode() == 404) {
RequestLogger.log(logger, "request succeeded", request.getRequestLine(), connection.getHost(), response.getStatusLine());
return new ElasticsearchResponse(request.getRequestLine(), connection.getHost(), response);
} else {
EntityUtils.consume(response.getEntity());
RequestLogger.log(logger, "request failed", request.getRequestLine(), connection.getHost(), response.getStatusLine());
throw new ElasticsearchResponseException(request.getRequestLine(), connection.getHost(), statusLine);
}
}
private static IOException addSuppressedException(IOException suppressedException, IOException currentException) {
if (suppressedException != null) {
currentException.addSuppressed(suppressedException);
}
return currentException;
}
private static HttpRequestBase createHttpRequest(String method, URI uri, HttpEntity entity) {
switch(method.toUpperCase(Locale.ROOT)) {
case HttpDeleteWithEntity.METHOD_NAME:
HttpDeleteWithEntity httpDeleteWithEntity = new HttpDeleteWithEntity(uri);
addRequestBody(httpDeleteWithEntity, entity);
return httpDeleteWithEntity;
case HttpGetWithEntity.METHOD_NAME:
HttpGetWithEntity httpGetWithEntity = new HttpGetWithEntity(uri);
addRequestBody(httpGetWithEntity, entity);
return httpGetWithEntity;
case HttpHead.METHOD_NAME:
if (entity != null) {
throw new UnsupportedOperationException("HEAD with body is not supported");
}
return new HttpHead(uri);
case HttpPost.METHOD_NAME:
HttpPost httpPost = new HttpPost(uri);
addRequestBody(httpPost, entity);
return httpPost;
case HttpPut.METHOD_NAME:
HttpPut httpPut = new HttpPut(uri);
addRequestBody(httpPut, entity);
return httpPut;
default:
throw new UnsupportedOperationException("http method not supported: " + method);
}
}
private static void addRequestBody(HttpEntityEnclosingRequestBase httpRequest, HttpEntity entity) {
if (entity != null) {
httpRequest.setEntity(entity);
}
}
private static URI buildUri(String path, Map<String, Object> params) {
try {
URIBuilder uriBuilder = new URIBuilder(path);
for (Map.Entry<String, Object> param : params.entrySet()) {
uriBuilder.addParameter(param.getKey(), param.getValue().toString());
}
return uriBuilder.build();
} catch(URISyntaxException e) {
throw new IllegalArgumentException(e.getMessage(), e);
}
} }
@Override @Override
public void close() throws IOException { public void close() throws IOException {
transport.close(); connectionPool.close();
client.close();
} }
} }

View File

@ -1,208 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.HttpEntity;
import org.apache.http.StatusLine;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
import org.apache.http.client.methods.HttpHead;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.util.EntityUtils;
import java.io.Closeable;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Iterator;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
final class Transport implements Closeable {
private static final Log logger = LogFactory.getLog(Transport.class);
private final CloseableHttpClient client;
private final ConnectionPool connectionPool;
private final long maxRetryTimeout;
Transport(CloseableHttpClient client, ConnectionPool connectionPool, long maxRetryTimeout) {
Objects.requireNonNull(client, "client cannot be null");
Objects.requireNonNull(connectionPool, "connectionPool cannot be null");
if (maxRetryTimeout <= 0) {
throw new IllegalArgumentException("maxRetryTimeout must be greater than 0");
}
this.client = client;
this.connectionPool = connectionPool;
this.maxRetryTimeout = maxRetryTimeout;
}
ElasticsearchResponse performRequest(String method, String endpoint, Map<String, Object> params, HttpEntity entity) throws IOException {
URI uri = buildUri(endpoint, params);
HttpRequestBase request = createHttpRequest(method, uri, entity);
Iterator<Connection> connectionIterator = connectionPool.nextConnection().iterator();
if (connectionIterator.hasNext() == false) {
Connection connection = connectionPool.lastResortConnection();
logger.info("no healthy nodes available, trying " + connection.getHost());
return performRequest(request, Stream.of(connection).iterator());
}
return performRequest(request, connectionIterator);
}
private ElasticsearchResponse performRequest(HttpRequestBase request, Iterator<Connection> connectionIterator) throws IOException {
//we apply a soft margin so that e.g. if a request took 59 seconds and timeout is set to 60 we don't do another attempt
long retryTimeout = Math.round(this.maxRetryTimeout / (float)100 * 98);
IOException lastSeenException = null;
long startTime = System.nanoTime();
while (connectionIterator.hasNext()) {
Connection connection = connectionIterator.next();
if (lastSeenException != null) {
long timeElapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
long timeout = retryTimeout - timeElapsed;
if (timeout <= 0) {
IOException retryTimeoutException = new IOException(
"request retries exceeded max retry timeout [" + retryTimeout + "]");
retryTimeoutException.addSuppressed(lastSeenException);
throw retryTimeoutException;
}
}
try {
connectionPool.beforeAttempt(connection);
} catch(IOException e) {
lastSeenException = addSuppressedException(lastSeenException, e);
continue;
}
try {
ElasticsearchResponse response = performRequest(request, connection);
connectionPool.onSuccess(connection);
return response;
} catch(ElasticsearchResponseException e) {
if (e.isRecoverable()) {
connectionPool.onFailure(connection);
lastSeenException = addSuppressedException(lastSeenException, e);
} else {
//don't retry and call onSuccess as the error should be a request problem
connectionPool.onSuccess(connection);
throw e;
}
} catch(IOException e) {
connectionPool.onFailure(connection);
lastSeenException = addSuppressedException(lastSeenException, e);
}
}
assert lastSeenException != null;
throw lastSeenException;
}
private ElasticsearchResponse performRequest(HttpRequestBase request, Connection connection) throws IOException {
CloseableHttpResponse response;
try {
response = client.execute(connection.getHost(), request);
} catch(IOException e) {
RequestLogger.log(logger, "request failed", request.getRequestLine(), connection.getHost(), e);
throw e;
} finally {
request.reset();
}
StatusLine statusLine = response.getStatusLine();
//TODO make ignore status code configurable. rest-spec and tests support that parameter.
if (statusLine.getStatusCode() < 300 ||
request.getMethod().equals(HttpHead.METHOD_NAME) && statusLine.getStatusCode() == 404) {
RequestLogger.log(logger, "request succeeded", request.getRequestLine(), connection.getHost(), response.getStatusLine());
return new ElasticsearchResponse(request.getRequestLine(), connection.getHost(), response);
} else {
EntityUtils.consume(response.getEntity());
RequestLogger.log(logger, "request failed", request.getRequestLine(), connection.getHost(), response.getStatusLine());
throw new ElasticsearchResponseException(request.getRequestLine(), connection.getHost(), statusLine);
}
}
private static IOException addSuppressedException(IOException suppressedException, IOException currentException) {
if (suppressedException != null) {
currentException.addSuppressed(suppressedException);
}
return currentException;
}
private static HttpRequestBase createHttpRequest(String method, URI uri, HttpEntity entity) {
switch(method.toUpperCase(Locale.ROOT)) {
case HttpDeleteWithEntity.METHOD_NAME:
HttpDeleteWithEntity httpDeleteWithEntity = new HttpDeleteWithEntity(uri);
addRequestBody(httpDeleteWithEntity, entity);
return httpDeleteWithEntity;
case HttpGetWithEntity.METHOD_NAME:
HttpGetWithEntity httpGetWithEntity = new HttpGetWithEntity(uri);
addRequestBody(httpGetWithEntity, entity);
return httpGetWithEntity;
case HttpHead.METHOD_NAME:
if (entity != null) {
throw new UnsupportedOperationException("HEAD with body is not supported");
}
return new HttpHead(uri);
case HttpPost.METHOD_NAME:
HttpPost httpPost = new HttpPost(uri);
addRequestBody(httpPost, entity);
return httpPost;
case HttpPut.METHOD_NAME:
HttpPut httpPut = new HttpPut(uri);
addRequestBody(httpPut, entity);
return httpPut;
default:
throw new UnsupportedOperationException("http method not supported: " + method);
}
}
private static void addRequestBody(HttpEntityEnclosingRequestBase httpRequest, HttpEntity entity) {
if (entity != null) {
httpRequest.setEntity(entity);
}
}
private static URI buildUri(String path, Map<String, Object> params) {
try {
URIBuilder uriBuilder = new URIBuilder(path);
for (Map.Entry<String, Object> param : params.entrySet()) {
uriBuilder.addParameter(param.getKey(), param.getValue().toString());
}
return uriBuilder.build();
} catch(URISyntaxException e) {
throw new IllegalArgumentException(e.getMessage(), e);
}
}
@Override
public void close() throws IOException {
connectionPool.close();
client.close();
}
}

View File

@ -28,13 +28,13 @@ import java.io.IOException;
import java.util.logging.LogManager; import java.util.logging.LogManager;
import java.util.stream.Stream; import java.util.stream.Stream;
public class TransportTests extends LuceneTestCase { public class RestClientTests extends LuceneTestCase {
static { static {
LogManager.getLogManager().reset(); LogManager.getLogManager().reset();
} }
public void testConstructor() { public void testConstructor() throws IOException {
CloseableHttpClient httpClient = HttpClientBuilder.create().build(); CloseableHttpClient httpClient = HttpClientBuilder.create().build();
ConnectionPool connectionPool = new ConnectionPool() { ConnectionPool connectionPool = new ConnectionPool() {
@Override @Override
@ -69,27 +69,28 @@ public class TransportTests extends LuceneTestCase {
}; };
try { try {
new Transport(null, connectionPool, RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE)); new RestClient(null, connectionPool, RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE));
fail("transport creation should have failed"); fail("transport creation should have failed");
} catch(NullPointerException e) { } catch(NullPointerException e) {
assertEquals(e.getMessage(), "client cannot be null"); assertEquals(e.getMessage(), "client cannot be null");
} }
try { try {
new Transport(httpClient, null, RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE)); new RestClient(httpClient, null, RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE));
fail("transport creation should have failed"); fail("transport creation should have failed");
} catch(NullPointerException e) { } catch(NullPointerException e) {
assertEquals(e.getMessage(), "connectionPool cannot be null"); assertEquals(e.getMessage(), "connectionPool cannot be null");
} }
try { try {
new Transport(httpClient, connectionPool, RandomInts.randomIntBetween(random(), Integer.MIN_VALUE, 0)); new RestClient(httpClient, connectionPool, RandomInts.randomIntBetween(random(), Integer.MIN_VALUE, 0));
fail("transport creation should have failed"); fail("transport creation should have failed");
} catch(IllegalArgumentException e) { } catch(IllegalArgumentException e) {
assertEquals(e.getMessage(), "maxRetryTimeout must be greater than 0"); assertEquals(e.getMessage(), "maxRetryTimeout must be greater than 0");
} }
Transport transport = new Transport(httpClient, connectionPool, RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE)); try(RestClient client = new RestClient(httpClient, connectionPool, RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE))) {
assertNotNull(transport); assertNotNull(client);
}
} }
} }