Add "Async" to the end of each Async RestClient method

This makes it much harder to accidentally miss the Response.
This commit is contained in:
Chris Earle 2016-08-25 18:36:16 -04:00
parent 287cb00474
commit bd0b06440e
3 changed files with 45 additions and 42 deletions

View File

@ -65,16 +65,20 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
/** /**
* Client that connects to an elasticsearch cluster through http. * Client that connects to an Elasticsearch cluster through HTTP.
* <p>
* Must be created using {@link RestClientBuilder}, which allows to set all the different options or just rely on defaults. * Must be created using {@link RestClientBuilder}, which allows to set all the different options or just rely on defaults.
* The hosts that are part of the cluster need to be provided at creation time, but can also be replaced later * The hosts that are part of the cluster need to be provided at creation time, but can also be replaced later
* by calling {@link #setHosts(HttpHost...)}. * by calling {@link #setHosts(HttpHost...)}.
* <p>
* The method {@link #performRequest(String, String, Map, HttpEntity, Header...)} allows to send a request to the cluster. When * The method {@link #performRequest(String, String, Map, HttpEntity, Header...)} allows to send a request to the cluster. When
* sending a request, a host gets selected out of the provided ones in a round-robin fashion. Failing hosts are marked dead and * sending a request, a host gets selected out of the provided ones in a round-robin fashion. Failing hosts are marked dead and
* retried after a certain amount of time (minimum 1 minute, maximum 30 minutes), depending on how many times they previously * retried after a certain amount of time (minimum 1 minute, maximum 30 minutes), depending on how many times they previously
* failed (the more failures, the later they will be retried). In case of failures all of the alive nodes (or dead nodes that * failed (the more failures, the later they will be retried). In case of failures all of the alive nodes (or dead nodes that
* deserve a retry) are retried until one responds or none of them does, in which case an {@link IOException} will be thrown. * deserve a retry) are retried until one responds or none of them does, in which case an {@link IOException} will be thrown.
* * <p>
* Requests can be either synchronous or asynchronous. The asynchronous variants all end with {@code Async}.
* <p>
* Requests can be traced by enabling trace logging for "tracer". The trace logger outputs requests and responses in curl format. * Requests can be traced by enabling trace logging for "tracer". The trace logger outputs requests and responses in curl format.
*/ */
public class RestClient implements Closeable { public class RestClient implements Closeable {
@ -124,41 +128,41 @@ public class RestClient implements Closeable {
} }
/** /**
* Sends a request to the elasticsearch cluster that the client points to and waits for the corresponding response * Sends a request to the Elasticsearch cluster that the client points to and waits for the corresponding response
* to be returned. Shortcut to {@link #performRequest(String, String, Map, HttpEntity, Header...)} but without parameters * to be returned. Shortcut to {@link #performRequest(String, String, Map, HttpEntity, Header...)} but without parameters
* and request body. * and request body.
* *
* @param method the http method * @param method the http method
* @param endpoint the path of the request (without host and port) * @param endpoint the path of the request (without host and port)
* @param headers the optional request headers * @param headers the optional request headers
* @return the response returned by elasticsearch * @return the response returned by Elasticsearch
* @throws IOException in case of a problem or the connection was aborted * @throws IOException in case of a problem or the connection was aborted
* @throws ClientProtocolException in case of an http protocol error * @throws ClientProtocolException in case of an http protocol error
* @throws ResponseException in case elasticsearch responded with a status code that indicated an error * @throws ResponseException in case Elasticsearch responded with a status code that indicated an error
*/ */
public Response performRequest(String method, String endpoint, Header... headers) throws IOException { public Response performRequest(String method, String endpoint, Header... headers) throws IOException {
return performRequest(method, endpoint, Collections.<String, String>emptyMap(), (HttpEntity)null, headers); return performRequest(method, endpoint, Collections.<String, String>emptyMap(), (HttpEntity)null, headers);
} }
/** /**
* Sends a request to the elasticsearch cluster that the client points to and waits for the corresponding response * Sends a request to the Elasticsearch cluster that the client points to and waits for the corresponding response
* to be returned. Shortcut to {@link #performRequest(String, String, Map, HttpEntity, Header...)} but without request body. * to be returned. Shortcut to {@link #performRequest(String, String, Map, HttpEntity, Header...)} but without request body.
* *
* @param method the http method * @param method the http method
* @param endpoint the path of the request (without host and port) * @param endpoint the path of the request (without host and port)
* @param params the query_string parameters * @param params the query_string parameters
* @param headers the optional request headers * @param headers the optional request headers
* @return the response returned by elasticsearch * @return the response returned by Elasticsearch
* @throws IOException in case of a problem or the connection was aborted * @throws IOException in case of a problem or the connection was aborted
* @throws ClientProtocolException in case of an http protocol error * @throws ClientProtocolException in case of an http protocol error
* @throws ResponseException in case elasticsearch responded with a status code that indicated an error * @throws ResponseException in case Elasticsearch responded with a status code that indicated an error
*/ */
public Response performRequest(String method, String endpoint, Map<String, String> params, Header... headers) throws IOException { public Response performRequest(String method, String endpoint, Map<String, String> params, Header... headers) throws IOException {
return performRequest(method, endpoint, params, (HttpEntity)null, headers); return performRequest(method, endpoint, params, (HttpEntity)null, headers);
} }
/** /**
* Sends a request to the elasticsearch cluster that the client points to and waits for the corresponding response * Sends a request to the Elasticsearch cluster that the client points to and waits for the corresponding response
* to be returned. Shortcut to {@link #performRequest(String, String, Map, HttpEntity, HttpAsyncResponseConsumer, Header...)} * to be returned. Shortcut to {@link #performRequest(String, String, Map, HttpEntity, HttpAsyncResponseConsumer, Header...)}
* which doesn't require specifying an {@link HttpAsyncResponseConsumer} instance, {@link HeapBufferedAsyncResponseConsumer} * which doesn't require specifying an {@link HttpAsyncResponseConsumer} instance, {@link HeapBufferedAsyncResponseConsumer}
* will be used to consume the response body. * will be used to consume the response body.
@ -168,10 +172,10 @@ public class RestClient implements Closeable {
* @param params the query_string parameters * @param params the query_string parameters
* @param entity the body of the request, null if not applicable * @param entity the body of the request, null if not applicable
* @param headers the optional request headers * @param headers the optional request headers
* @return the response returned by elasticsearch * @return the response returned by Elasticsearch
* @throws IOException in case of a problem or the connection was aborted * @throws IOException in case of a problem or the connection was aborted
* @throws ClientProtocolException in case of an http protocol error * @throws ClientProtocolException in case of an http protocol error
* @throws ResponseException in case elasticsearch responded with a status code that indicated an error * @throws ResponseException in case Elasticsearch responded with a status code that indicated an error
*/ */
public Response performRequest(String method, String endpoint, Map<String, String> params, public Response performRequest(String method, String endpoint, Map<String, String> params,
HttpEntity entity, Header... headers) throws IOException { HttpEntity entity, Header... headers) throws IOException {
@ -180,7 +184,7 @@ public class RestClient implements Closeable {
} }
/** /**
* Sends a request to the elasticsearch cluster that the client points to. Blocks until the request is completed and returns * Sends a request to the Elasticsearch cluster that the client points to. Blocks until the request is completed and returns
* its response or fails by throwing an exception. Selects a host out of the provided ones in a round-robin fashion. Failing hosts * its response or fails by throwing an exception. Selects a host out of the provided ones in a round-robin fashion. Failing hosts
* are marked dead and retried after a certain amount of time (minimum 1 minute, maximum 30 minutes), depending on how many times * are marked dead and retried after a certain amount of time (minimum 1 minute, maximum 30 minutes), depending on how many times
* they previously failed (the more failures, the later they will be retried). In case of failures all of the alive nodes (or dead * they previously failed (the more failures, the later they will be retried). In case of failures all of the alive nodes (or dead
@ -193,37 +197,37 @@ public class RestClient implements Closeable {
* @param responseConsumer the {@link HttpAsyncResponseConsumer} callback. Controls how the response * @param responseConsumer the {@link HttpAsyncResponseConsumer} callback. Controls how the response
* body gets streamed from a non-blocking HTTP connection on the client side. * body gets streamed from a non-blocking HTTP connection on the client side.
* @param headers the optional request headers * @param headers the optional request headers
* @return the response returned by elasticsearch * @return the response returned by Elasticsearch
* @throws IOException in case of a problem or the connection was aborted * @throws IOException in case of a problem or the connection was aborted
* @throws ClientProtocolException in case of an http protocol error * @throws ClientProtocolException in case of an http protocol error
* @throws ResponseException in case elasticsearch responded with a status code that indicated an error * @throws ResponseException in case Elasticsearch responded with a status code that indicated an error
*/ */
public Response performRequest(String method, String endpoint, Map<String, String> params, public Response performRequest(String method, String endpoint, Map<String, String> params,
HttpEntity entity, HttpAsyncResponseConsumer<HttpResponse> responseConsumer, HttpEntity entity, HttpAsyncResponseConsumer<HttpResponse> responseConsumer,
Header... headers) throws IOException { Header... headers) throws IOException {
SyncResponseListener listener = new SyncResponseListener(maxRetryTimeoutMillis); SyncResponseListener listener = new SyncResponseListener(maxRetryTimeoutMillis);
performRequest(method, endpoint, params, entity, responseConsumer, listener, headers); performRequestAsync(method, endpoint, params, entity, responseConsumer, listener, headers);
return listener.get(); return listener.get();
} }
/** /**
* Sends a request to the elasticsearch cluster that the client points to. Doesn't wait for the response, instead * Sends a request to the Elasticsearch cluster that the client points to. Doesn't wait for the response, instead
* the provided {@link ResponseListener} will be notified upon completion or failure. Shortcut to * the provided {@link ResponseListener} will be notified upon completion or failure. Shortcut to
* {@link #performRequest(String, String, Map, HttpEntity, ResponseListener, Header...)} but without parameters and request body. * {@link #performRequestAsync(String, String, Map, HttpEntity, ResponseListener, Header...)} but without parameters and request body.
* *
* @param method the http method * @param method the http method
* @param endpoint the path of the request (without host and port) * @param endpoint the path of the request (without host and port)
* @param responseListener the {@link ResponseListener} to notify when the request is completed or fails * @param responseListener the {@link ResponseListener} to notify when the request is completed or fails
* @param headers the optional request headers * @param headers the optional request headers
*/ */
public void performRequest(String method, String endpoint, ResponseListener responseListener, Header... headers) { public void performRequestAsync(String method, String endpoint, ResponseListener responseListener, Header... headers) {
performRequest(method, endpoint, Collections.<String, String>emptyMap(), null, responseListener, headers); performRequestAsync(method, endpoint, Collections.<String, String>emptyMap(), null, responseListener, headers);
} }
/** /**
* Sends a request to the elasticsearch cluster that the client points to. Doesn't wait for the response, instead * Sends a request to the Elasticsearch cluster that the client points to. Doesn't wait for the response, instead
* the provided {@link ResponseListener} will be notified upon completion or failure. Shortcut to * the provided {@link ResponseListener} will be notified upon completion or failure. Shortcut to
* {@link #performRequest(String, String, Map, HttpEntity, ResponseListener, Header...)} but without request body. * {@link #performRequestAsync(String, String, Map, HttpEntity, ResponseListener, Header...)} but without request body.
* *
* @param method the http method * @param method the http method
* @param endpoint the path of the request (without host and port) * @param endpoint the path of the request (without host and port)
@ -231,15 +235,15 @@ public class RestClient implements Closeable {
* @param responseListener the {@link ResponseListener} to notify when the request is completed or fails * @param responseListener the {@link ResponseListener} to notify when the request is completed or fails
* @param headers the optional request headers * @param headers the optional request headers
*/ */
public void performRequest(String method, String endpoint, Map<String, String> params, public void performRequestAsync(String method, String endpoint, Map<String, String> params,
ResponseListener responseListener, Header... headers) { ResponseListener responseListener, Header... headers) {
performRequest(method, endpoint, params, null, responseListener, headers); performRequestAsync(method, endpoint, params, null, responseListener, headers);
} }
/** /**
* Sends a request to the elasticsearch cluster that the client points to. Doesn't wait for the response, instead * Sends a request to the Elasticsearch cluster that the client points to. Doesn't wait for the response, instead
* the provided {@link ResponseListener} will be notified upon completion or failure. * the provided {@link ResponseListener} will be notified upon completion or failure.
* Shortcut to {@link #performRequest(String, String, Map, HttpEntity, HttpAsyncResponseConsumer, ResponseListener, Header...)} * Shortcut to {@link #performRequestAsync(String, String, Map, HttpEntity, HttpAsyncResponseConsumer, ResponseListener, Header...)}
* which doesn't require specifying an {@link HttpAsyncResponseConsumer} instance, {@link HeapBufferedAsyncResponseConsumer} * which doesn't require specifying an {@link HttpAsyncResponseConsumer} instance, {@link HeapBufferedAsyncResponseConsumer}
* will be used to consume the response body. * will be used to consume the response body.
* *
@ -250,14 +254,14 @@ public class RestClient implements Closeable {
* @param responseListener the {@link ResponseListener} to notify when the request is completed or fails * @param responseListener the {@link ResponseListener} to notify when the request is completed or fails
* @param headers the optional request headers * @param headers the optional request headers
*/ */
public void performRequest(String method, String endpoint, Map<String, String> params, public void performRequestAsync(String method, String endpoint, Map<String, String> params,
HttpEntity entity, ResponseListener responseListener, Header... headers) { HttpEntity entity, ResponseListener responseListener, Header... headers) {
HttpAsyncResponseConsumer<HttpResponse> responseConsumer = new HeapBufferedAsyncResponseConsumer(); HttpAsyncResponseConsumer<HttpResponse> responseConsumer = new HeapBufferedAsyncResponseConsumer();
performRequest(method, endpoint, params, entity, responseConsumer, responseListener, headers); performRequestAsync(method, endpoint, params, entity, responseConsumer, responseListener, headers);
} }
/** /**
* Sends a request to the elasticsearch cluster that the client points to. The request is executed asynchronously * Sends a request to the Elasticsearch cluster that the client points to. The request is executed asynchronously
* and the provided {@link ResponseListener} gets notified upon request completion or failure. * and the provided {@link ResponseListener} gets notified upon request completion or failure.
* Selects a host out of the provided ones in a round-robin fashion. Failing hosts are marked dead and retried after a certain * Selects a host out of the provided ones in a round-robin fashion. Failing hosts are marked dead and retried after a certain
* amount of time (minimum 1 minute, maximum 30 minutes), depending on how many times they previously failed (the more failures, * amount of time (minimum 1 minute, maximum 30 minutes), depending on how many times they previously failed (the more failures,
@ -273,20 +277,20 @@ public class RestClient implements Closeable {
* @param responseListener the {@link ResponseListener} to notify when the request is completed or fails * @param responseListener the {@link ResponseListener} to notify when the request is completed or fails
* @param headers the optional request headers * @param headers the optional request headers
*/ */
public void performRequest(String method, String endpoint, Map<String, String> params, public void performRequestAsync(String method, String endpoint, Map<String, String> params,
HttpEntity entity, HttpAsyncResponseConsumer<HttpResponse> responseConsumer, HttpEntity entity, HttpAsyncResponseConsumer<HttpResponse> responseConsumer,
ResponseListener responseListener, Header... headers) { ResponseListener responseListener, Header... headers) {
URI uri = buildUri(endpoint, params); URI uri = buildUri(endpoint, params);
HttpRequestBase request = createHttpRequest(method, uri, entity); HttpRequestBase request = createHttpRequest(method, uri, entity);
setHeaders(request, headers); setHeaders(request, headers);
FailureTrackingResponseListener failureTrackingResponseListener = new FailureTrackingResponseListener(responseListener); FailureTrackingResponseListener failureTrackingResponseListener = new FailureTrackingResponseListener(responseListener);
long startTime = System.nanoTime(); long startTime = System.nanoTime();
performRequest(startTime, nextHost().iterator(), request, responseConsumer, failureTrackingResponseListener); performRequestAsync(startTime, nextHost().iterator(), request, responseConsumer, failureTrackingResponseListener);
} }
private void performRequest(final long startTime, final Iterator<HttpHost> hosts, final HttpRequestBase request, private void performRequestAsync(final long startTime, final Iterator<HttpHost> hosts, final HttpRequestBase request,
final HttpAsyncResponseConsumer<HttpResponse> responseConsumer, final HttpAsyncResponseConsumer<HttpResponse> responseConsumer,
final FailureTrackingResponseListener listener) { final FailureTrackingResponseListener listener) {
final HttpHost host = hosts.next(); final HttpHost host = hosts.next();
//we stream the request body if the entity allows for it //we stream the request body if the entity allows for it
HttpAsyncRequestProducer requestProducer = HttpAsyncMethods.create(host, request); HttpAsyncRequestProducer requestProducer = HttpAsyncMethods.create(host, request);
@ -340,7 +344,7 @@ public class RestClient implements Closeable {
} else { } else {
listener.trackFailure(exception); listener.trackFailure(exception);
request.reset(); request.reset();
performRequest(startTime, hosts, request, responseConsumer, listener); performRequestAsync(startTime, hosts, request, responseConsumer, listener);
} }
} else { } else {
listener.onDefinitiveFailure(exception); listener.onDefinitiveFailure(exception);

View File

@ -226,7 +226,7 @@ public class RestClientIntegTests extends RestClientTestCase {
for (int i = 0; i < numRequests; i++) { for (int i = 0; i < numRequests; i++) {
final String method = RestClientTestUtil.randomHttpMethod(getRandom()); final String method = RestClientTestUtil.randomHttpMethod(getRandom());
final int statusCode = randomStatusCode(getRandom()); final int statusCode = randomStatusCode(getRandom());
restClient.performRequest(method, "/" + statusCode, new ResponseListener() { restClient.performRequestAsync(method, "/" + statusCode, new ResponseListener() {
@Override @Override
public void onSuccess(Response response) { public void onSuccess(Response response) {
responses.add(new TestResponse(method, statusCode, response)); responses.add(new TestResponse(method, statusCode, response));

View File

@ -98,7 +98,6 @@ public class RemoteScrollableHitSource extends ScrollableHitSource {
void lookupRemoteVersion(Consumer<Version> onVersion) { void lookupRemoteVersion(Consumer<Version> onVersion) {
execute("GET", "", emptyMap(), null, MAIN_ACTION_PARSER, onVersion); execute("GET", "", emptyMap(), null, MAIN_ACTION_PARSER, onVersion);
} }
private void onStartResponse(Consumer<? super Response> onResponse, Response response) { private void onStartResponse(Consumer<? super Response> onResponse, Response response) {
@ -119,7 +118,7 @@ public class RemoteScrollableHitSource extends ScrollableHitSource {
@Override @Override
protected void clearScroll(String scrollId) { protected void clearScroll(String scrollId) {
// Need to throw out response.... // Need to throw out response....
client.performRequest("DELETE", scrollPath(), emptyMap(), scrollEntity(scrollId), new ResponseListener() { client.performRequestAsync("DELETE", scrollPath(), emptyMap(), scrollEntity(scrollId), new ResponseListener() {
@Override @Override
public void onSuccess(org.elasticsearch.client.Response response) { public void onSuccess(org.elasticsearch.client.Response response) {
logger.debug("Successfully cleared [{}]", scrollId); logger.debug("Successfully cleared [{}]", scrollId);
@ -141,7 +140,7 @@ public class RemoteScrollableHitSource extends ScrollableHitSource {
@Override @Override
protected void doRun() throws Exception { protected void doRun() throws Exception {
client.performRequest(method, uri, params, entity, new ResponseListener() { client.performRequestAsync(method, uri, params, entity, new ResponseListener() {
@Override @Override
public void onSuccess(org.elasticsearch.client.Response response) { public void onSuccess(org.elasticsearch.client.Response response) {
// Restore the thread context to get the precious headers // Restore the thread context to get the precious headers