Rest client: don't reuse that same HttpAsyncResponseConsumer across multiple retries (#21378)

* Rest client: don't reuse that same HttpAsyncResponseConsumer across multiple retries

Turns out that AbstractAsyncResponseConsumer from apache async http client is stateful and cannot be reused across multiple requests. The failover mechanism was mistakenly reusing that same instance, which can be provided by users, across retries in case nodes are down or return 5xx errors. The downside is that we have to change the signature of two public methods, as HttpAsyncResponseConsumer cannot be provided directly anymore, rather its factory needs to be provided which is going to be used to create one instance of the consumer per request attempt.

Up until now we tested our RestClient against multiple nodes only in a mock environment, where we don't really send http requests. In that scenario we can verify that retries etc. work properly but the interaction with the http client library in a real scenario is different and can catch other problems. With this commit we also add an integration test that sends requests to multiple hosts, and some of them may also get stopped meanwhile. The specific test for pathPrefix was also removed as pathPrefix is now randomly applied by default, hence implicitly tested. Moved also a small test method that checked the validity of the path argument to the unit test RestClientSingleHostTests.

Also increase default buffer limit to 100MB and make it required in default consumer

The default buffer limit used to be 10MB but that proved not to be high enough for scroll requests (see reindex from remote). With this commit we increase the limit to 100MB and make it a bit more visibile in the consumer factory.
This commit is contained in:
Luca Cavanna 2016-11-08 16:42:42 +01:00 committed by GitHub
parent 68a94e711a
commit 293a3cab01
10 changed files with 364 additions and 178 deletions

View File

@ -38,25 +38,15 @@ import java.io.IOException;
/**
* Default implementation of {@link org.apache.http.nio.protocol.HttpAsyncResponseConsumer}. Buffers the whole
* response content in heap memory, meaning that the size of the buffer is equal to the content-length of the response.
* Limits the size of responses that can be read to {@link #DEFAULT_BUFFER_LIMIT} by default, configurable value.
* Throws an exception in case the entity is longer than the configured buffer limit.
* Limits the size of responses that can be read based on a configurable argument. Throws an exception in case the entity is longer
* than the configured buffer limit.
*/
public class HeapBufferedAsyncResponseConsumer extends AbstractAsyncResponseConsumer<HttpResponse> {
//default buffer limit is 10MB
public static final int DEFAULT_BUFFER_LIMIT = 10 * 1024 * 1024;
private final int bufferLimitBytes;
private volatile HttpResponse response;
private volatile SimpleInputBuffer buf;
/**
* Creates a new instance of this consumer with a buffer limit of {@link #DEFAULT_BUFFER_LIMIT}
*/
public HeapBufferedAsyncResponseConsumer() {
this.bufferLimitBytes = DEFAULT_BUFFER_LIMIT;
}
/**
* Creates a new instance of this consumer with the provided buffer limit
*/

View File

@ -0,0 +1,65 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client;
import org.apache.http.HttpResponse;
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
import static org.elasticsearch.client.HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory.DEFAULT_BUFFER_LIMIT;
/**
* Factory used to create instances of {@link HttpAsyncResponseConsumer}. Each request retry needs its own instance of the
* consumer object. Users can implement this interface and pass their own instance to the specialized
* performRequest methods that accept an {@link HttpAsyncResponseConsumerFactory} instance as argument.
*/
interface HttpAsyncResponseConsumerFactory {
/**
* Creates the default type of {@link HttpAsyncResponseConsumer}, based on heap buffering with a buffer limit of 100MB.
*/
HttpAsyncResponseConsumerFactory DEFAULT = new HeapBufferedResponseConsumerFactory(DEFAULT_BUFFER_LIMIT);
/**
* Creates the {@link HttpAsyncResponseConsumer}, called once per request attempt.
*/
HttpAsyncResponseConsumer<HttpResponse> createHttpAsyncResponseConsumer();
/**
* Default factory used to create instances of {@link HttpAsyncResponseConsumer}.
* Creates one instance of {@link HeapBufferedAsyncResponseConsumer} for each request attempt, with a configurable
* buffer limit which defaults to 100MB.
*/
class HeapBufferedResponseConsumerFactory implements HttpAsyncResponseConsumerFactory {
//default buffer limit is 100MB
static final int DEFAULT_BUFFER_LIMIT = 100 * 1024 * 1024;
private final int bufferLimit;
public HeapBufferedResponseConsumerFactory(int bufferLimitBytes) {
this.bufferLimit = bufferLimitBytes;
}
@Override
public HttpAsyncResponseConsumer<HttpResponse> createHttpAsyncResponseConsumer() {
return new HeapBufferedAsyncResponseConsumer(bufferLimit);
}
}
}

View File

@ -143,7 +143,7 @@ public class RestClient implements Closeable {
* @throws ResponseException in case Elasticsearch responded with a status code that indicated an error
*/
public Response performRequest(String method, String endpoint, Header... headers) throws IOException {
return performRequest(method, endpoint, Collections.<String, String>emptyMap(), (HttpEntity)null, headers);
return performRequest(method, endpoint, Collections.<String, String>emptyMap(), null, headers);
}
/**
@ -165,9 +165,9 @@ public class RestClient implements Closeable {
/**
* Sends a request to the Elasticsearch cluster that the client points to and waits for the corresponding response
* to be returned. Shortcut to {@link #performRequest(String, String, Map, HttpEntity, HttpAsyncResponseConsumer, Header...)}
* which doesn't require specifying an {@link HttpAsyncResponseConsumer} instance, {@link HeapBufferedAsyncResponseConsumer}
* will be used to consume the response body.
* to be returned. Shortcut to {@link #performRequest(String, String, Map, HttpEntity, HttpAsyncResponseConsumerFactory, Header...)}
* which doesn't require specifying an {@link HttpAsyncResponseConsumerFactory} instance,
* {@link HttpAsyncResponseConsumerFactory} will be used to create the needed instances of {@link HttpAsyncResponseConsumer}.
*
* @param method the http method
* @param endpoint the path of the request (without host and port)
@ -181,8 +181,7 @@ public class RestClient implements Closeable {
*/
public Response performRequest(String method, String endpoint, Map<String, String> params,
HttpEntity entity, Header... headers) throws IOException {
HttpAsyncResponseConsumer<HttpResponse> responseConsumer = new HeapBufferedAsyncResponseConsumer();
return performRequest(method, endpoint, params, entity, responseConsumer, headers);
return performRequest(method, endpoint, params, entity, HttpAsyncResponseConsumerFactory.DEFAULT, headers);
}
/**
@ -196,8 +195,9 @@ public class RestClient implements Closeable {
* @param endpoint the path of the request (without host and port)
* @param params the query_string parameters
* @param entity the body of the request, null if not applicable
* @param responseConsumer the {@link HttpAsyncResponseConsumer} callback. Controls how the response
* body gets streamed from a non-blocking HTTP connection on the client side.
* @param httpAsyncResponseConsumerFactory the {@link HttpAsyncResponseConsumerFactory} used to create one
* {@link HttpAsyncResponseConsumer} callback per retry. Controls how the response body gets streamed from a non-blocking HTTP
* connection on the client side.
* @param headers the optional request headers
* @return the response returned by Elasticsearch
* @throws IOException in case of a problem or the connection was aborted
@ -205,10 +205,10 @@ public class RestClient implements Closeable {
* @throws ResponseException in case Elasticsearch responded with a status code that indicated an error
*/
public Response performRequest(String method, String endpoint, Map<String, String> params,
HttpEntity entity, HttpAsyncResponseConsumer<HttpResponse> responseConsumer,
HttpEntity entity, HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory,
Header... headers) throws IOException {
SyncResponseListener listener = new SyncResponseListener(maxRetryTimeoutMillis);
performRequestAsync(method, endpoint, params, entity, responseConsumer, listener, headers);
performRequestAsync(method, endpoint, params, entity, httpAsyncResponseConsumerFactory, listener, headers);
return listener.get();
}
@ -245,9 +245,9 @@ public class RestClient implements Closeable {
/**
* Sends a request to the Elasticsearch cluster that the client points to. Doesn't wait for the response, instead
* the provided {@link ResponseListener} will be notified upon completion or failure.
* Shortcut to {@link #performRequestAsync(String, String, Map, HttpEntity, HttpAsyncResponseConsumer, ResponseListener, Header...)}
* which doesn't require specifying an {@link HttpAsyncResponseConsumer} instance, {@link HeapBufferedAsyncResponseConsumer}
* will be used to consume the response body.
* Shortcut to {@link #performRequestAsync(String, String, Map, HttpEntity, HttpAsyncResponseConsumerFactory, ResponseListener,
* Header...)} which doesn't require specifying an {@link HttpAsyncResponseConsumerFactory} instance,
* {@link HttpAsyncResponseConsumerFactory} will be used to create the needed instances of {@link HttpAsyncResponseConsumer}.
*
* @param method the http method
* @param endpoint the path of the request (without host and port)
@ -258,8 +258,7 @@ public class RestClient implements Closeable {
*/
public void performRequestAsync(String method, String endpoint, Map<String, String> params,
HttpEntity entity, ResponseListener responseListener, Header... headers) {
HttpAsyncResponseConsumer<HttpResponse> responseConsumer = new HeapBufferedAsyncResponseConsumer();
performRequestAsync(method, endpoint, params, entity, responseConsumer, responseListener, headers);
performRequestAsync(method, endpoint, params, entity, HttpAsyncResponseConsumerFactory.DEFAULT, responseListener, headers);
}
/**
@ -274,29 +273,31 @@ public class RestClient implements Closeable {
* @param endpoint the path of the request (without host and port)
* @param params the query_string parameters
* @param entity the body of the request, null if not applicable
* @param responseConsumer the {@link HttpAsyncResponseConsumer} callback. Controls how the response
* body gets streamed from a non-blocking HTTP connection on the client side.
* @param httpAsyncResponseConsumerFactory the {@link HttpAsyncResponseConsumerFactory} used to create one
* {@link HttpAsyncResponseConsumer} callback per retry. Controls how the response body gets streamed from a non-blocking HTTP
* connection on the client side.
* @param responseListener the {@link ResponseListener} to notify when the request is completed or fails
* @param headers the optional request headers
*/
public void performRequestAsync(String method, String endpoint, Map<String, String> params,
HttpEntity entity, HttpAsyncResponseConsumer<HttpResponse> responseConsumer,
HttpEntity entity, HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory,
ResponseListener responseListener, Header... headers) {
URI uri = buildUri(pathPrefix, endpoint, params);
HttpRequestBase request = createHttpRequest(method, uri, entity);
setHeaders(request, headers);
FailureTrackingResponseListener failureTrackingResponseListener = new FailureTrackingResponseListener(responseListener);
long startTime = System.nanoTime();
performRequestAsync(startTime, nextHost().iterator(), request, responseConsumer, failureTrackingResponseListener);
performRequestAsync(startTime, nextHost().iterator(), request, httpAsyncResponseConsumerFactory, failureTrackingResponseListener);
}
private void performRequestAsync(final long startTime, final Iterator<HttpHost> hosts, final HttpRequestBase request,
final HttpAsyncResponseConsumer<HttpResponse> responseConsumer,
final HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory,
final FailureTrackingResponseListener listener) {
final HttpHost host = hosts.next();
//we stream the request body if the entity allows for it
HttpAsyncRequestProducer requestProducer = HttpAsyncMethods.create(host, request);
client.execute(requestProducer, responseConsumer, new FutureCallback<HttpResponse>() {
HttpAsyncResponseConsumer<HttpResponse> asyncResponseConsumer = httpAsyncResponseConsumerFactory.createHttpAsyncResponseConsumer();
client.execute(requestProducer, asyncResponseConsumer, new FutureCallback<HttpResponse>() {
@Override
public void completed(HttpResponse httpResponse) {
try {
@ -346,7 +347,7 @@ public class RestClient implements Closeable {
} else {
listener.trackFailure(exception);
request.reset();
performRequestAsync(startTime, hosts, request, responseConsumer, listener);
performRequestAsync(startTime, hosts, request, httpAsyncResponseConsumerFactory, listener);
}
} else {
listener.onDefinitiveFailure(exception);

View File

@ -32,7 +32,6 @@ import org.apache.http.nio.ContentDecoder;
import org.apache.http.nio.IOControl;
import org.apache.http.protocol.HttpContext;
import static org.elasticsearch.client.HeapBufferedAsyncResponseConsumer.DEFAULT_BUFFER_LIMIT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
@ -45,13 +44,14 @@ public class HeapBufferedAsyncResponseConsumerTests extends RestClientTestCase {
//maximum buffer that this test ends up allocating is 50MB
private static final int MAX_TEST_BUFFER_SIZE = 50 * 1024 * 1024;
private static final int TEST_BUFFER_LIMIT = 10 * 1024 * 1024;
public void testResponseProcessing() throws Exception {
ContentDecoder contentDecoder = mock(ContentDecoder.class);
IOControl ioControl = mock(IOControl.class);
HttpContext httpContext = mock(HttpContext.class);
HeapBufferedAsyncResponseConsumer consumer = spy(new HeapBufferedAsyncResponseConsumer());
HeapBufferedAsyncResponseConsumer consumer = spy(new HeapBufferedAsyncResponseConsumer(TEST_BUFFER_LIMIT));
ProtocolVersion protocolVersion = new ProtocolVersion("HTTP", 1, 1);
StatusLine statusLine = new BasicStatusLine(protocolVersion, 200, "OK");
@ -74,8 +74,8 @@ public class HeapBufferedAsyncResponseConsumerTests extends RestClientTestCase {
}
public void testDefaultBufferLimit() throws Exception {
HeapBufferedAsyncResponseConsumer consumer = new HeapBufferedAsyncResponseConsumer();
bufferLimitTest(consumer, DEFAULT_BUFFER_LIMIT);
HeapBufferedAsyncResponseConsumer consumer = new HeapBufferedAsyncResponseConsumer(TEST_BUFFER_LIMIT);
bufferLimitTest(consumer, TEST_BUFFER_LIMIT);
}
public void testConfiguredBufferLimit() throws Exception {

View File

@ -0,0 +1,210 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer;
import org.apache.http.HttpHost;
import org.codehaus.mojo.animal_sniffer.IgnoreJRERequirement;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.client.RestClientTestUtil.getAllStatusCodes;
import static org.elasticsearch.client.RestClientTestUtil.randomErrorNoRetryStatusCode;
import static org.elasticsearch.client.RestClientTestUtil.randomOkStatusCode;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* Integration test to check interaction between {@link RestClient} and {@link org.apache.http.client.HttpClient}.
* Works against real http servers, multiple hosts. Also tests failover by randomly shutting down hosts.
*/
//animal-sniffer doesn't like our usage of com.sun.net.httpserver.* classes
@IgnoreJRERequirement
public class RestClientMultipleHostsIntegTests extends RestClientTestCase {
private static HttpServer[] httpServers;
private static RestClient restClient;
private static String pathPrefix;
@BeforeClass
public static void startHttpServer() throws Exception {
String pathPrefixWithoutLeadingSlash;
if (randomBoolean()) {
pathPrefixWithoutLeadingSlash = "testPathPrefix/" + randomAsciiOfLengthBetween(1, 5);
pathPrefix = "/" + pathPrefixWithoutLeadingSlash;
} else {
pathPrefix = pathPrefixWithoutLeadingSlash = "";
}
int numHttpServers = randomIntBetween(2, 4);
httpServers = new HttpServer[numHttpServers];
HttpHost[] httpHosts = new HttpHost[numHttpServers];
for (int i = 0; i < numHttpServers; i++) {
HttpServer httpServer = createHttpServer();
httpServers[i] = httpServer;
httpHosts[i] = new HttpHost(httpServer.getAddress().getHostString(), httpServer.getAddress().getPort());
}
RestClientBuilder restClientBuilder = RestClient.builder(httpHosts);
if (pathPrefix.length() > 0) {
restClientBuilder.setPathPrefix((randomBoolean() ? "/" : "") + pathPrefixWithoutLeadingSlash);
}
restClient = restClientBuilder.build();
}
private static HttpServer createHttpServer() throws Exception {
HttpServer httpServer = HttpServer.create(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0);
httpServer.start();
//returns a different status code depending on the path
for (int statusCode : getAllStatusCodes()) {
httpServer.createContext(pathPrefix + "/" + statusCode, new ResponseHandler(statusCode));
}
return httpServer;
}
//animal-sniffer doesn't like our usage of com.sun.net.httpserver.* classes
@IgnoreJRERequirement
private static class ResponseHandler implements HttpHandler {
private final int statusCode;
ResponseHandler(int statusCode) {
this.statusCode = statusCode;
}
@Override
public void handle(HttpExchange httpExchange) throws IOException {
httpExchange.getRequestBody().close();
httpExchange.sendResponseHeaders(statusCode, -1);
httpExchange.close();
}
}
@AfterClass
public static void stopHttpServers() throws IOException {
restClient.close();
restClient = null;
for (HttpServer httpServer : httpServers) {
httpServer.stop(0);
}
httpServers = null;
}
@Before
public void stopRandomHost() {
//verify that shutting down some hosts doesn't matter as long as one working host is left behind
if (httpServers.length > 1 && randomBoolean()) {
List<HttpServer> updatedHttpServers = new ArrayList<>(httpServers.length - 1);
int nodeIndex = randomInt(httpServers.length - 1);
for (int i = 0; i < httpServers.length; i++) {
HttpServer httpServer = httpServers[i];
if (i == nodeIndex) {
httpServer.stop(0);
} else {
updatedHttpServers.add(httpServer);
}
}
httpServers = updatedHttpServers.toArray(new HttpServer[updatedHttpServers.size()]);
}
}
public void testSyncRequests() throws IOException {
int numRequests = randomIntBetween(5, 20);
for (int i = 0; i < numRequests; i++) {
final String method = RestClientTestUtil.randomHttpMethod(getRandom());
//we don't test status codes that are subject to retries as they interfere with hosts being stopped
final int statusCode = randomBoolean() ? randomOkStatusCode(getRandom()) : randomErrorNoRetryStatusCode(getRandom());
Response response;
try {
response = restClient.performRequest(method, "/" + statusCode);
} catch(ResponseException responseException) {
response = responseException.getResponse();
}
assertEquals(method, response.getRequestLine().getMethod());
assertEquals(statusCode, response.getStatusLine().getStatusCode());
assertEquals((pathPrefix.length() > 0 ? pathPrefix : "") + "/" + statusCode, response.getRequestLine().getUri());
}
}
public void testAsyncRequests() throws Exception {
int numRequests = randomIntBetween(5, 20);
final CountDownLatch latch = new CountDownLatch(numRequests);
final List<TestResponse> responses = new CopyOnWriteArrayList<>();
for (int i = 0; i < numRequests; i++) {
final String method = RestClientTestUtil.randomHttpMethod(getRandom());
//we don't test status codes that are subject to retries as they interfere with hosts being stopped
final int statusCode = randomBoolean() ? randomOkStatusCode(getRandom()) : randomErrorNoRetryStatusCode(getRandom());
restClient.performRequestAsync(method, "/" + statusCode, new ResponseListener() {
@Override
public void onSuccess(Response response) {
responses.add(new TestResponse(method, statusCode, response));
latch.countDown();
}
@Override
public void onFailure(Exception exception) {
responses.add(new TestResponse(method, statusCode, exception));
latch.countDown();
}
});
}
assertTrue(latch.await(5, TimeUnit.SECONDS));
assertEquals(numRequests, responses.size());
for (TestResponse testResponse : responses) {
Response response = testResponse.getResponse();
assertEquals(testResponse.method, response.getRequestLine().getMethod());
assertEquals(testResponse.statusCode, response.getStatusLine().getStatusCode());
assertEquals((pathPrefix.length() > 0 ? pathPrefix : "") + "/" + testResponse.statusCode,
response.getRequestLine().getUri());
}
}
private static class TestResponse {
private final String method;
private final int statusCode;
private final Object response;
TestResponse(String method, int statusCode, Object response) {
this.method = method;
this.statusCode = statusCode;
this.response = response;
}
Response getResponse() {
if (response instanceof Response) {
return (Response) response;
}
if (response instanceof ResponseException) {
return ((ResponseException) response).getResponse();
}
throw new AssertionError("unexpected response " + response.getClass());
}
}
}

View File

@ -20,7 +20,6 @@
package org.elasticsearch.client;
import com.sun.net.httpserver.Headers;
import com.sun.net.httpserver.HttpContext;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer;
@ -45,19 +44,13 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.client.RestClientTestUtil.getAllStatusCodes;
import static org.elasticsearch.client.RestClientTestUtil.getHttpMethods;
import static org.elasticsearch.client.RestClientTestUtil.randomStatusCode;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
* Integration test to check interaction between {@link RestClient} and {@link org.apache.http.client.HttpClient}.
@ -65,28 +58,42 @@ import static org.junit.Assert.fail;
*/
//animal-sniffer doesn't like our usage of com.sun.net.httpserver.* classes
@IgnoreJRERequirement
public class RestClientIntegTests extends RestClientTestCase {
public class RestClientSingleHostIntegTests extends RestClientTestCase {
private static HttpServer httpServer;
private static RestClient restClient;
private static String pathPrefix;
private static Header[] defaultHeaders;
@BeforeClass
public static void startHttpServer() throws Exception {
httpServer = HttpServer.create(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0);
String pathPrefixWithoutLeadingSlash;
if (randomBoolean()) {
pathPrefixWithoutLeadingSlash = "testPathPrefix/" + randomAsciiOfLengthBetween(1, 5);
pathPrefix = "/" + pathPrefixWithoutLeadingSlash;
} else {
pathPrefix = pathPrefixWithoutLeadingSlash = "";
}
httpServer = createHttpServer();
int numHeaders = randomIntBetween(0, 5);
defaultHeaders = generateHeaders("Header-default", "Header-array", numHeaders);
RestClientBuilder restClientBuilder = RestClient.builder(
new HttpHost(httpServer.getAddress().getHostString(), httpServer.getAddress().getPort())).setDefaultHeaders(defaultHeaders);
if (pathPrefix.length() > 0) {
restClientBuilder.setPathPrefix((randomBoolean() ? "/" : "") + pathPrefixWithoutLeadingSlash);
}
restClient = restClientBuilder.build();
}
private static HttpServer createHttpServer() throws Exception {
HttpServer httpServer = HttpServer.create(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0);
httpServer.start();
//returns a different status code depending on the path
for (int statusCode : getAllStatusCodes()) {
createStatusCodeContext(httpServer, statusCode);
httpServer.createContext(pathPrefix + "/" + statusCode, new ResponseHandler(statusCode));
}
int numHeaders = randomIntBetween(0, 5);
defaultHeaders = generateHeaders("Header-default", "Header-array", numHeaders);
restClient = RestClient.builder(new HttpHost(httpServer.getAddress().getHostString(), httpServer.getAddress().getPort()))
.setDefaultHeaders(defaultHeaders).build();
}
private static void createStatusCodeContext(HttpServer httpServer, final int statusCode) {
httpServer.createContext("/" + statusCode, new ResponseHandler(statusCode));
return httpServer;
}
//animal-sniffer doesn't like our usage of com.sun.net.httpserver.* classes
@ -157,7 +164,11 @@ public class RestClientIntegTests extends RestClientTestCase {
} catch(ResponseException e) {
esResponse = e.getResponse();
}
assertThat(esResponse.getStatusLine().getStatusCode(), equalTo(statusCode));
assertEquals(method, esResponse.getRequestLine().getMethod());
assertEquals(statusCode, esResponse.getStatusLine().getStatusCode());
assertEquals((pathPrefix.length() > 0 ? pathPrefix : "") + "/" + statusCode, esResponse.getRequestLine().getUri());
for (final Header responseHeader : esResponse.getHeaders()) {
final String name = responseHeader.getName();
final String value = responseHeader.getValue();
@ -197,49 +208,6 @@ public class RestClientIntegTests extends RestClientTestCase {
bodyTest("GET");
}
/**
* Ensure that pathPrefix works as expected.
*/
public void testPathPrefix() throws IOException {
// guarantee no other test setup collides with this one and lets it sneak through
final String uniqueContextSuffix = "/testPathPrefix";
final String pathPrefix = "base/" + randomAsciiOfLengthBetween(1, 5) + "/";
final int statusCode = randomStatusCode(getRandom());
final HttpContext context =
httpServer.createContext("/" + pathPrefix + statusCode + uniqueContextSuffix, new ResponseHandler(statusCode));
try (final RestClient client =
RestClient.builder(new HttpHost(httpServer.getAddress().getHostString(), httpServer.getAddress().getPort()))
.setPathPrefix((randomBoolean() ? "/" : "") + pathPrefix).build()) {
for (final String method : getHttpMethods()) {
Response esResponse;
try {
esResponse = client.performRequest(method, "/" + statusCode + uniqueContextSuffix);
} catch(ResponseException e) {
esResponse = e.getResponse();
}
assertThat(esResponse.getRequestLine().getUri(), equalTo("/" + pathPrefix + statusCode + uniqueContextSuffix));
assertThat(esResponse.getStatusLine().getStatusCode(), equalTo(statusCode));
}
} finally {
httpServer.removeContext(context);
}
}
public void testPath() throws IOException {
for (String method : getHttpMethods()) {
try {
restClient.performRequest(method, null);
fail("path set to null should fail!");
} catch (NullPointerException e) {
assertEquals("path must not be null", e.getMessage());
}
}
}
private void bodyTest(String method) throws IOException {
String requestBody = "{ \"field\": \"value\" }";
StringEntity entity = new StringEntity(requestBody);
@ -250,60 +218,9 @@ public class RestClientIntegTests extends RestClientTestCase {
} catch(ResponseException e) {
esResponse = e.getResponse();
}
assertEquals(method, esResponse.getRequestLine().getMethod());
assertEquals(statusCode, esResponse.getStatusLine().getStatusCode());
assertEquals((pathPrefix.length() > 0 ? pathPrefix : "") + "/" + statusCode, esResponse.getRequestLine().getUri());
assertEquals(requestBody, EntityUtils.toString(esResponse.getEntity()));
}
public void testAsyncRequests() throws Exception {
int numRequests = randomIntBetween(5, 20);
final CountDownLatch latch = new CountDownLatch(numRequests);
final List<TestResponse> responses = new CopyOnWriteArrayList<>();
for (int i = 0; i < numRequests; i++) {
final String method = RestClientTestUtil.randomHttpMethod(getRandom());
final int statusCode = randomStatusCode(getRandom());
restClient.performRequestAsync(method, "/" + statusCode, new ResponseListener() {
@Override
public void onSuccess(Response response) {
responses.add(new TestResponse(method, statusCode, response));
latch.countDown();
}
@Override
public void onFailure(Exception exception) {
responses.add(new TestResponse(method, statusCode, exception));
latch.countDown();
}
});
}
assertTrue(latch.await(5, TimeUnit.SECONDS));
assertEquals(numRequests, responses.size());
for (TestResponse response : responses) {
assertEquals(response.method, response.getResponse().getRequestLine().getMethod());
assertEquals(response.statusCode, response.getResponse().getStatusLine().getStatusCode());
}
}
private static class TestResponse {
private final String method;
private final int statusCode;
private final Object response;
TestResponse(String method, int statusCode, Object response) {
this.method = method;
this.statusCode = statusCode;
this.response = response;
}
Response getResponse() {
if (response instanceof Response) {
return (Response) response;
}
if (response instanceof ResponseException) {
return ((ResponseException) response).getResponse();
}
throw new AssertionError("unexpected response " + response.getClass());
}
}
}

View File

@ -139,6 +139,17 @@ public class RestClientSingleHostTests extends RestClientTestCase {
restClient = new RestClient(httpClient, 10000, defaultHeaders, new HttpHost[]{httpHost}, null, failureListener);
}
public void testNullPath() throws IOException {
for (String method : getHttpMethods()) {
try {
restClient.performRequest(method, null);
fail("path set to null should fail!");
} catch (NullPointerException e) {
assertEquals("path must not be null", e.getMessage());
}
}
}
/**
* Verifies the content of the {@link HttpRequest} that's internally created and passed through to the http client
*/

View File

@ -117,7 +117,7 @@ Response performRequest(String method, String endpoint,
Response performRequest(String method, String endpoint,
Map<String, String> params,
HttpEntity entity,
HttpAsyncResponseConsumer<HttpResponse> responseConsumer,
HttpAsyncResponseConsumerFactory responseConsumerFactory,
Header... headers)
throws IOException;
@ -141,7 +141,7 @@ void performRequestAsync(String method, String endpoint,
Map<String, String> params,
HttpEntity entity,
ResponseListener responseListener,
HttpAsyncResponseConsumer<HttpResponse> responseConsumer,
HttpAsyncResponseConsumerFactory responseConsumerFactory,
Header... headers);
--------------------------------------------------
@ -155,11 +155,12 @@ call (e.g. `/_cluster/health`)
`params`:: the optional parameters to be sent as querystring parameters
`entity`:: the optional request body enclosed in an
`org.apache.http.HttpEntity` object
`responseConsumer`:: the optional
`responseConsumerFactory`:: the optional factory that is used to create an
http://hc.apache.org/httpcomponents-core-ga/httpcore-nio/apidocs/org/apache/http/nio/protocol/HttpAsyncResponseConsumer.html[`org.apache.http.nio.protocol.HttpAsyncResponseConsumer`]
callback. Controls how the response body gets streamed from a non-blocking
HTTP connection on the client side. When not provided, the default
implementation is used which buffers the whole response body in heap memory
callback instance per request attempt. Controls how the response body gets
streamed from a non-blocking HTTP connection on the client side. When not
provided, the default implementation is used which buffers the whole response
body in heap memory, up to 100 MB
`responseListener`:: the listener to be notified upon asynchronous
request success or failure
`headers`:: optional request headers

View File

@ -30,7 +30,6 @@ import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.Version;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.HeapBufferedAsyncResponseConsumer;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.ResponseListener;
import org.elasticsearch.client.RestClient;
@ -39,8 +38,6 @@ import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.ParseFieldMatcherSupplier;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ThreadContext;
@ -71,10 +68,6 @@ import static org.elasticsearch.index.reindex.remote.RemoteResponseParsers.MAIN_
import static org.elasticsearch.index.reindex.remote.RemoteResponseParsers.RESPONSE_PARSER;
public class RemoteScrollableHitSource extends ScrollableHitSource {
/**
* The maximum size of the remote response to buffer. 200mb because bulks beyond 40mb tend to be slow anyway but 200mb is simply huge.
*/
private static final ByteSizeValue BUFFER_LIMIT = new ByteSizeValue(200, ByteSizeUnit.MB);
private final RestClient client;
private final BytesReference query;
private final SearchRequest searchRequest;
@ -150,8 +143,7 @@ public class RemoteScrollableHitSource extends ScrollableHitSource {
@Override
protected void doRun() throws Exception {
HeapBufferedAsyncResponseConsumer consumer = new HeapBufferedAsyncResponseConsumer(BUFFER_LIMIT.bytesAsInt());
client.performRequestAsync(method, uri, params, entity, consumer, new ResponseListener() {
client.performRequestAsync(method, uri, params, entity, new ResponseListener() {
@Override
public void onSuccess(org.elasticsearch.client.Response response) {
// Restore the thread context to get the precious headers

View File

@ -439,16 +439,15 @@ public class RemoteScrollableHitSourceTests extends ESTestCase {
CloseableHttpAsyncClient httpClient = mock(CloseableHttpAsyncClient.class);
when(httpClient.<HttpResponse>execute(any(HttpAsyncRequestProducer.class), any(HttpAsyncResponseConsumer.class),
any(FutureCallback.class))).then(new Answer<Future<HttpResponse>>() {
@Override
public Future<HttpResponse> answer(InvocationOnMock invocationOnMock) throws Throwable {
HeapBufferedAsyncResponseConsumer consumer = (HeapBufferedAsyncResponseConsumer) invocationOnMock.getArguments()[1];
FutureCallback callback = (FutureCallback) invocationOnMock.getArguments()[2];
assertEquals(new ByteSizeValue(200, ByteSizeUnit.MB).bytesAsInt(), consumer.getBufferLimit());
callback.failed(tooLong);
return null;
}
});
@Override
public Future<HttpResponse> answer(InvocationOnMock invocationOnMock) throws Throwable {
HeapBufferedAsyncResponseConsumer consumer = (HeapBufferedAsyncResponseConsumer) invocationOnMock.getArguments()[1];
FutureCallback callback = (FutureCallback) invocationOnMock.getArguments()[2];
assertEquals(new ByteSizeValue(100, ByteSizeUnit.MB).bytesAsInt(), consumer.getBufferLimit());
callback.failed(tooLong);
return null;
}
});
RemoteScrollableHitSource source = sourceWithMockedClient(true, httpClient);
AtomicBoolean called = new AtomicBoolean();