prevent unclosed response entities in ElasticsearchResponseException, eagerly read response string in case of error status code
This commit is contained in:
parent
e77ab87926
commit
530ad227a2
|
@ -21,7 +21,7 @@ package org.elasticsearch.client;
|
||||||
|
|
||||||
import org.apache.http.HttpHost;
|
import org.apache.http.HttpHost;
|
||||||
import org.apache.http.RequestLine;
|
import org.apache.http.RequestLine;
|
||||||
import org.apache.http.client.methods.CloseableHttpResponse;
|
import org.apache.http.StatusLine;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
@ -32,26 +32,19 @@ public class ElasticsearchResponseException extends IOException {
|
||||||
|
|
||||||
private final HttpHost host;
|
private final HttpHost host;
|
||||||
private final RequestLine requestLine;
|
private final RequestLine requestLine;
|
||||||
private final CloseableHttpResponse response;
|
private final StatusLine statusLine;
|
||||||
|
private final String responseBody;
|
||||||
|
|
||||||
public ElasticsearchResponseException(RequestLine requestLine, HttpHost host, CloseableHttpResponse response) {
|
public ElasticsearchResponseException(RequestLine requestLine, HttpHost host, StatusLine statusLine, String responseBody) {
|
||||||
super(buildMessage(requestLine, host, response));
|
super(buildMessage(requestLine, host, statusLine));
|
||||||
this.host = host;
|
this.host = host;
|
||||||
this.requestLine = requestLine;
|
this.requestLine = requestLine;
|
||||||
this.response = response;
|
this.responseBody = responseBody;
|
||||||
|
this.statusLine = statusLine;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static String buildMessage(RequestLine requestLine, HttpHost host, CloseableHttpResponse response) {
|
private static String buildMessage(RequestLine requestLine, HttpHost host, StatusLine statusLine) {
|
||||||
return requestLine.getMethod() + " " + host + requestLine.getUri() + ": " + response.getStatusLine().toString();
|
return requestLine.getMethod() + " " + host + requestLine.getUri() + ": " + statusLine.toString();
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns whether the error is recoverable or not, hence whether the same request should be retried on other nodes or not
|
|
||||||
*/
|
|
||||||
public boolean isRecoverable() {
|
|
||||||
int statusCode = response.getStatusLine().getStatusCode();
|
|
||||||
//clients don't retry on 500 because elasticsearch still misuses it instead of 400 in some places
|
|
||||||
return statusCode >= 502 && statusCode <= 504;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -68,10 +61,11 @@ public class ElasticsearchResponseException extends IOException {
|
||||||
return requestLine;
|
return requestLine;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
public StatusLine getStatusLine() {
|
||||||
* Returns the {@link CloseableHttpResponse} that was returned by elasticsearch
|
return statusLine;
|
||||||
*/
|
}
|
||||||
public CloseableHttpResponse getResponse() {
|
|
||||||
return response;
|
public String getResponseBody() {
|
||||||
|
return responseBody;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,7 +21,6 @@ package org.elasticsearch.client;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.http.HttpEntity;
|
import org.apache.http.HttpEntity;
|
||||||
import org.apache.http.StatusLine;
|
|
||||||
import org.apache.http.client.methods.CloseableHttpResponse;
|
import org.apache.http.client.methods.CloseableHttpResponse;
|
||||||
import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
|
import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
|
||||||
import org.apache.http.client.methods.HttpHead;
|
import org.apache.http.client.methods.HttpHead;
|
||||||
|
@ -30,6 +29,7 @@ import org.apache.http.client.methods.HttpPut;
|
||||||
import org.apache.http.client.methods.HttpRequestBase;
|
import org.apache.http.client.methods.HttpRequestBase;
|
||||||
import org.apache.http.client.utils.URIBuilder;
|
import org.apache.http.client.utils.URIBuilder;
|
||||||
import org.apache.http.impl.client.CloseableHttpClient;
|
import org.apache.http.impl.client.CloseableHttpClient;
|
||||||
|
import org.apache.http.util.EntityUtils;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -94,50 +94,46 @@ public final class RestClient implements Closeable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
CloseableHttpResponse response;
|
||||||
try {
|
try {
|
||||||
ElasticsearchResponse response = performRequest(request, connection);
|
response = client.execute(connection.getHost(), request);
|
||||||
connectionPool.onSuccess(connection);
|
|
||||||
return response;
|
|
||||||
} catch(ElasticsearchResponseException e) {
|
|
||||||
if (e.isRecoverable()) {
|
|
||||||
connectionPool.onFailure(connection);
|
|
||||||
lastSeenException = addSuppressedException(lastSeenException, e);
|
|
||||||
} else {
|
|
||||||
//don't retry and call onSuccess as the error should be a request problem
|
|
||||||
connectionPool.onSuccess(connection);
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
} catch(IOException e) {
|
} catch(IOException e) {
|
||||||
|
RequestLogger.log(logger, "request failed", request.getRequestLine(), connection.getHost(), e);
|
||||||
connectionPool.onFailure(connection);
|
connectionPool.onFailure(connection);
|
||||||
lastSeenException = addSuppressedException(lastSeenException, e);
|
lastSeenException = addSuppressedException(lastSeenException, e);
|
||||||
|
continue;
|
||||||
|
} finally {
|
||||||
|
request.reset();
|
||||||
|
}
|
||||||
|
int statusCode = response.getStatusLine().getStatusCode();
|
||||||
|
//TODO make ignore status code configurable. rest-spec and tests support that parameter (ignore_missing)
|
||||||
|
if (statusCode < 300 || request.getMethod().equals(HttpHead.METHOD_NAME) && statusCode == 404) {
|
||||||
|
RequestLogger.log(logger, "request succeeded", request.getRequestLine(), connection.getHost(), response.getStatusLine());
|
||||||
|
connectionPool.onSuccess(connection);
|
||||||
|
return new ElasticsearchResponse(request.getRequestLine(), connection.getHost(), response);
|
||||||
|
} else {
|
||||||
|
RequestLogger.log(logger, "request failed", request.getRequestLine(), connection.getHost(), response.getStatusLine());
|
||||||
|
String responseBody = null;
|
||||||
|
if (response.getEntity() != null) {
|
||||||
|
responseBody = EntityUtils.toString(response.getEntity());
|
||||||
|
}
|
||||||
|
ElasticsearchResponseException elasticsearchResponseException = new ElasticsearchResponseException(
|
||||||
|
request.getRequestLine(), connection.getHost(), response.getStatusLine(), responseBody);
|
||||||
|
lastSeenException = addSuppressedException(lastSeenException, elasticsearchResponseException);
|
||||||
|
//clients don't retry on 500 because elasticsearch still misuses it instead of 400 in some places
|
||||||
|
if (statusCode == 502 || statusCode == 503 || statusCode == 504) {
|
||||||
|
connectionPool.onFailure(connection);
|
||||||
|
} else {
|
||||||
|
//don't retry and call onSuccess as the error should be a request problem, the node is alive
|
||||||
|
connectionPool.onSuccess(connection);
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
assert lastSeenException != null;
|
assert lastSeenException != null;
|
||||||
throw lastSeenException;
|
throw lastSeenException;
|
||||||
}
|
}
|
||||||
|
|
||||||
private ElasticsearchResponse performRequest(HttpRequestBase request, Connection connection) throws IOException {
|
|
||||||
CloseableHttpResponse response;
|
|
||||||
try {
|
|
||||||
response = client.execute(connection.getHost(), request);
|
|
||||||
} catch(IOException e) {
|
|
||||||
RequestLogger.log(logger, "request failed", request.getRequestLine(), connection.getHost(), e);
|
|
||||||
throw e;
|
|
||||||
} finally {
|
|
||||||
request.reset();
|
|
||||||
}
|
|
||||||
StatusLine statusLine = response.getStatusLine();
|
|
||||||
//TODO make ignore status code configurable. rest-spec and tests support that parameter.
|
|
||||||
if (statusLine.getStatusCode() < 300 ||
|
|
||||||
request.getMethod().equals(HttpHead.METHOD_NAME) && statusLine.getStatusCode() == 404) {
|
|
||||||
RequestLogger.log(logger, "request succeeded", request.getRequestLine(), connection.getHost(), response.getStatusLine());
|
|
||||||
return new ElasticsearchResponse(request.getRequestLine(), connection.getHost(), response);
|
|
||||||
} else {
|
|
||||||
RequestLogger.log(logger, "request failed", request.getRequestLine(), connection.getHost(), response.getStatusLine());
|
|
||||||
throw new ElasticsearchResponseException(request.getRequestLine(), connection.getHost(), response);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static IOException addSuppressedException(IOException suppressedException, IOException currentException) {
|
private static IOException addSuppressedException(IOException suppressedException, IOException currentException) {
|
||||||
if (suppressedException != null) {
|
if (suppressedException != null) {
|
||||||
currentException.addSuppressed(suppressedException);
|
currentException.addSuppressed(suppressedException);
|
||||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.http.client.config.RequestConfig;
|
||||||
import org.apache.http.client.methods.CloseableHttpResponse;
|
import org.apache.http.client.methods.CloseableHttpResponse;
|
||||||
import org.apache.http.client.methods.HttpGet;
|
import org.apache.http.client.methods.HttpGet;
|
||||||
import org.apache.http.impl.client.CloseableHttpClient;
|
import org.apache.http.impl.client.CloseableHttpClient;
|
||||||
|
import org.apache.http.util.EntityUtils;
|
||||||
import org.elasticsearch.client.ElasticsearchResponseException;
|
import org.elasticsearch.client.ElasticsearchResponseException;
|
||||||
import org.elasticsearch.client.RequestLogger;
|
import org.elasticsearch.client.RequestLogger;
|
||||||
|
|
||||||
|
@ -77,7 +78,11 @@ final class Sniffer {
|
||||||
StatusLine statusLine = response.getStatusLine();
|
StatusLine statusLine = response.getStatusLine();
|
||||||
if (statusLine.getStatusCode() >= 300) {
|
if (statusLine.getStatusCode() >= 300) {
|
||||||
RequestLogger.log(logger, "sniff failed", httpGet.getRequestLine(), host, statusLine);
|
RequestLogger.log(logger, "sniff failed", httpGet.getRequestLine(), host, statusLine);
|
||||||
throw new ElasticsearchResponseException(httpGet.getRequestLine(), host, response);
|
String responseBody = null;
|
||||||
|
if (response.getEntity() != null) {
|
||||||
|
responseBody = EntityUtils.toString(response.getEntity());
|
||||||
|
}
|
||||||
|
throw new ElasticsearchResponseException(httpGet.getRequestLine(), host, response.getStatusLine(), responseBody);
|
||||||
} else {
|
} else {
|
||||||
List<HttpHost> nodes = readHosts(response.getEntity());
|
List<HttpHost> nodes = readHosts(response.getEntity());
|
||||||
RequestLogger.log(logger, "sniff succeeded", httpGet.getRequestLine(), host, statusLine);
|
RequestLogger.log(logger, "sniff succeeded", httpGet.getRequestLine(), host, statusLine);
|
||||||
|
|
|
@ -89,7 +89,7 @@ public class SnifferTests extends LuceneTestCase {
|
||||||
|
|
||||||
public void testSniffNodes() throws IOException, URISyntaxException {
|
public void testSniffNodes() throws IOException, URISyntaxException {
|
||||||
CloseableHttpClient client = HttpClientBuilder.create().build();
|
CloseableHttpClient client = HttpClientBuilder.create().build();
|
||||||
Sniffer sniffer = new Sniffer(client, RequestConfig.DEFAULT, sniffRequestTimeout, scheme.toString());
|
Sniffer sniffer = new Sniffer(client, RequestConfig.DEFAULT, sniffRequestTimeout, scheme);
|
||||||
HttpHost httpHost = new HttpHost(server.getHostName(), server.getPort());
|
HttpHost httpHost = new HttpHost(server.getHostName(), server.getPort());
|
||||||
try {
|
try {
|
||||||
List<HttpHost> sniffedHosts = sniffer.sniffNodes(httpHost);
|
List<HttpHost> sniffedHosts = sniffer.sniffNodes(httpHost);
|
||||||
|
@ -107,10 +107,10 @@ public class SnifferTests extends LuceneTestCase {
|
||||||
"/_nodes/http?timeout=" + sniffRequestTimeout));
|
"/_nodes/http?timeout=" + sniffRequestTimeout));
|
||||||
assertThat(e.getMessage(), containsString(Integer.toString(sniffResponse.nodesInfoResponseCode)));
|
assertThat(e.getMessage(), containsString(Integer.toString(sniffResponse.nodesInfoResponseCode)));
|
||||||
assertThat(e.getHost(), equalTo(httpHost));
|
assertThat(e.getHost(), equalTo(httpHost));
|
||||||
assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(sniffResponse.nodesInfoResponseCode));
|
assertThat(e.getStatusLine().getStatusCode(), equalTo(sniffResponse.nodesInfoResponseCode));
|
||||||
assertThat(e.getRequestLine().toString(), equalTo("GET /_nodes/http?timeout=" + sniffRequestTimeout + "ms HTTP/1.1"));
|
assertThat(e.getRequestLine().toString(), equalTo("GET /_nodes/http?timeout=" + sniffRequestTimeout + "ms HTTP/1.1"));
|
||||||
} else {
|
} else {
|
||||||
fail("sniffNodes should have succeeded: " + e.getResponse().getStatusLine());
|
fail("sniffNodes should have succeeded: " + e.getStatusLine());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -170,7 +170,7 @@ public class SnifferTests extends LuceneTestCase {
|
||||||
if (isHttpEnabled) {
|
if (isHttpEnabled) {
|
||||||
String host = "host" + i;
|
String host = "host" + i;
|
||||||
int port = RandomInts.randomIntBetween(random(), 9200, 9299);
|
int port = RandomInts.randomIntBetween(random(), 9200, 9299);
|
||||||
HttpHost httpHost = new HttpHost(host, port, scheme.toString());
|
HttpHost httpHost = new HttpHost(host, port, scheme);
|
||||||
hosts.add(httpHost);
|
hosts.add(httpHost);
|
||||||
generator.writeObjectFieldStart("http");
|
generator.writeObjectFieldStart("http");
|
||||||
if (random().nextBoolean()) {
|
if (random().nextBoolean()) {
|
||||||
|
|
Loading…
Reference in New Issue