Make RestHighLevelClient Closeable and simplify its creation (#26180)

By making RestHighLevelClient Closeable, its close method will close the internal low-level REST client instance by default, which simplifies the way most users interact with the high-level client.

Its constructor accepts now a RestClientBuilder, which clarifies that the low-level REST client is internally created and managed.

It is still possible to provide an already built `RestClient` instance, but that can only be done by subclassing `RestHighLevelClient` and calling the protected constructor that accepts a `RestClient`. In such case a consumer has also to be provided, which controls what has to be done when the high-level client gets done.

Closes #26086
This commit is contained in:
Luca Cavanna 2017-08-24 09:39:41 +02:00 committed by GitHub
parent 587409e893
commit 6d8e2c6d4c
12 changed files with 149 additions and 66 deletions

View File

@ -19,8 +19,6 @@
package org.elasticsearch.client;
import org.elasticsearch.client.http.Header;
import org.elasticsearch.client.http.HttpEntity;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
@ -43,6 +41,9 @@ import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.http.Header;
import org.elasticsearch.client.http.HttpEntity;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ContextParser;
@ -138,6 +139,7 @@ import org.elasticsearch.search.suggest.completion.CompletionSuggestion;
import org.elasticsearch.search.suggest.phrase.PhraseSuggestion;
import org.elasticsearch.search.suggest.term.TermSuggestion;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@ -157,33 +159,63 @@ import static java.util.stream.Collectors.toList;
/**
* High level REST client that wraps an instance of the low level {@link RestClient} and allows to build requests and read responses.
* The provided {@link RestClient} is externally built and closed.
* Can be sub-classed to expose additional client methods that make use of endpoints added to Elasticsearch through plugins, or to
* add support for custom response sections, again added to Elasticsearch through plugins.
* The {@link RestClient} instance is internally built based on the provided {@link RestClientBuilder} and it gets closed automatically
* when closing the {@link RestHighLevelClient} instance that wraps it.
* In case an already existing instance of a low-level REST client needs to be provided, this class can be subclassed and the
* {@link #RestHighLevelClient(RestClient, CheckedConsumer, List)} constructor can be used.
* This class can also be sub-classed to expose additional client methods that make use of endpoints added to Elasticsearch through
* plugins, or to add support for custom response sections, again added to Elasticsearch through plugins.
*/
public class RestHighLevelClient {
public class RestHighLevelClient implements Closeable {
private final RestClient client;
private final NamedXContentRegistry registry;
private final CheckedConsumer<RestClient, IOException> doClose;
/**
* Creates a {@link RestHighLevelClient} given the low level {@link RestClient} that it should use to perform requests.
* Creates a {@link RestHighLevelClient} given the low level {@link RestClientBuilder} that allows to build the
* {@link RestClient} to be used to perform requests.
*/
public RestHighLevelClient(RestClient restClient) {
this(restClient, Collections.emptyList());
public RestHighLevelClient(RestClientBuilder restClientBuilder) {
this(restClientBuilder, Collections.emptyList());
}
/**
* Creates a {@link RestHighLevelClient} given the low level {@link RestClientBuilder} that allows to build the
* {@link RestClient} to be used to perform requests and parsers for custom response sections added to Elasticsearch through plugins.
*/
protected RestHighLevelClient(RestClientBuilder restClientBuilder, List<NamedXContentRegistry.Entry> namedXContentEntries) {
this(restClientBuilder.build(), RestClient::close, namedXContentEntries);
}
/**
* Creates a {@link RestHighLevelClient} given the low level {@link RestClient} that it should use to perform requests and
* a list of entries that allow to parse custom response sections added to Elasticsearch through plugins.
* This constructor can be called by subclasses in case an externally created low-level REST client needs to be provided.
* The consumer argument allows to control what needs to be done when the {@link #close()} method is called.
* Also subclasses can provide parsers for custom response sections added to Elasticsearch through plugins.
*/
protected RestHighLevelClient(RestClient restClient, List<NamedXContentRegistry.Entry> namedXContentEntries) {
this.client = Objects.requireNonNull(restClient);
protected RestHighLevelClient(RestClient restClient, CheckedConsumer<RestClient, IOException> doClose,
List<NamedXContentRegistry.Entry> namedXContentEntries) {
this.client = Objects.requireNonNull(restClient, "restClient must not be null");
this.doClose = Objects.requireNonNull(doClose, "doClose consumer must not be null");
this.registry = new NamedXContentRegistry(
Stream.of(getDefaultNamedXContents().stream(), getProvidedNamedXContents().stream(), namedXContentEntries.stream())
.flatMap(Function.identity()).collect(toList()));
}
/**
* Returns the low-level client that the current high-level client instance is using to perform requests
*/
public RestClient getLowLevelClient() {
return client;
}
@Override
public final void close() throws IOException {
doClose.accept(client);
}
/**
* Executes a bulk request using the Bulk API
*

View File

@ -19,6 +19,12 @@
package org.elasticsearch.client;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.Build;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.main.MainRequest;
import org.elasticsearch.action.main.MainResponse;
import org.elasticsearch.client.http.Header;
import org.elasticsearch.client.http.HttpEntity;
import org.elasticsearch.client.http.HttpHost;
@ -32,12 +38,6 @@ import org.elasticsearch.client.http.message.BasicHeader;
import org.elasticsearch.client.http.message.BasicHttpResponse;
import org.elasticsearch.client.http.message.BasicRequestLine;
import org.elasticsearch.client.http.message.BasicStatusLine;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.Build;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.main.MainRequest;
import org.elasticsearch.action.main.MainResponse;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.xcontent.XContentHelper;
@ -48,6 +48,7 @@ import org.junit.Before;
import java.io.IOException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.Collections;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
@ -151,7 +152,7 @@ public class CustomRestHighLevelClientTests extends ESTestCase {
static class CustomRestClient extends RestHighLevelClient {
private CustomRestClient(RestClient restClient) {
super(restClient);
super(restClient, RestClient::close, Collections.emptyList());
}
MainResponse custom(MainRequest mainRequest, Header... headers) throws IOException {

View File

@ -19,14 +19,15 @@
package org.elasticsearch.client;
import org.elasticsearch.client.http.Header;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.http.Header;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.junit.AfterClass;
import org.junit.Before;
import java.io.IOException;
import java.util.Collections;
public abstract class ESRestHighLevelClientTestCase extends ESRestTestCase {
@ -36,12 +37,13 @@ public abstract class ESRestHighLevelClientTestCase extends ESRestTestCase {
public void initHighLevelClient() throws IOException {
super.initClient();
if (restHighLevelClient == null) {
restHighLevelClient = new RestHighLevelClient(client());
restHighLevelClient = new HighLevelClient(client());
}
}
@AfterClass
public static void cleanupClient() {
public static void cleanupClient() throws IOException {
restHighLevelClient.close();
restHighLevelClient = null;
}
@ -72,4 +74,10 @@ public abstract class ESRestHighLevelClientTestCase extends ESRestTestCase {
protected interface AsyncMethod<Request, Response> {
void execute(Request request, ActionListener<Response> listener, Header... headers);
}
private static class HighLevelClient extends RestHighLevelClient {
private HighLevelClient(RestClient restClient) {
super(restClient, (client) -> {}, Collections.emptyList());
}
}
}

View File

@ -69,7 +69,7 @@ public class RestHighLevelClientExtTests extends ESTestCase {
private static class RestHighLevelClientExt extends RestHighLevelClient {
private RestHighLevelClientExt(RestClient restClient) {
super(restClient, getNamedXContentsExt());
super(restClient, RestClient::close, getNamedXContentsExt());
}
private static List<NamedXContentRegistry.Entry> getNamedXContentsExt() {

View File

@ -91,6 +91,7 @@ import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isNotNull;
import static org.mockito.Matchers.isNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -105,7 +106,16 @@ public class RestHighLevelClientTests extends ESTestCase {
@Before
public void initClient() {
restClient = mock(RestClient.class);
restHighLevelClient = new RestHighLevelClient(restClient);
restHighLevelClient = new RestHighLevelClient(restClient, RestClient::close, Collections.emptyList());
}
public void testCloseIsIdempotent() throws IOException {
restHighLevelClient.close();
verify(restClient, times(1)).close();
restHighLevelClient.close();
verify(restClient, times(2)).close();
restHighLevelClient.close();
verify(restClient, times(3)).close();
}
public void testPingSuccessful() throws IOException {

View File

@ -23,7 +23,9 @@ import org.elasticsearch.Build;
import org.elasticsearch.Version;
import org.elasticsearch.action.main.MainResponse;
import org.elasticsearch.client.ESRestHighLevelClientTestCase;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.http.HttpHost;
import org.elasticsearch.cluster.ClusterName;
import java.io.IOException;
@ -65,4 +67,17 @@ public class MainDocumentationIT extends ESRestHighLevelClientTestCase {
assertNotNull(build);
}
}
public void testInitializationFromClientBuilder() throws IOException {
//tag::rest-high-level-client-init
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("localhost", 9200, "http"),
new HttpHost("localhost", 9201, "http")));
//end::rest-high-level-client-init
//tag::rest-high-level-client-close
client.close();
//end::rest-high-level-client-close
}
}

View File

@ -23,11 +23,9 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.ESRestHighLevelClientTestCase;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.http.HttpEntity;
import org.elasticsearch.client.http.HttpStatus;
@ -67,7 +65,7 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF
public class MigrationDocumentationIT extends ESRestHighLevelClientTestCase {
public void testCreateIndex() throws IOException {
RestClient restClient = client();
RestHighLevelClient client = highLevelClient();
{
//tag::migration-create-inded
Settings indexSettings = Settings.builder() // <1>
@ -93,7 +91,7 @@ public class MigrationDocumentationIT extends ESRestHighLevelClientTestCase {
HttpEntity entity = new NStringEntity(payload, ContentType.APPLICATION_JSON); // <5>
Response response = restClient.performRequest("PUT", "my-index", emptyMap(), entity); // <6>
Response response = client.getLowLevelClient().performRequest("PUT", "my-index", emptyMap(), entity); // <6>
if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
// <7>
}
@ -103,10 +101,10 @@ public class MigrationDocumentationIT extends ESRestHighLevelClientTestCase {
}
public void testClusterHealth() throws IOException {
RestClient restClient = client();
RestHighLevelClient client = highLevelClient();
{
//tag::migration-cluster-health
Response response = restClient.performRequest("GET", "/_cluster/health"); // <1>
Response response = client.getLowLevelClient().performRequest("GET", "/_cluster/health"); // <1>
ClusterHealthStatus healthStatus;
try (InputStream is = response.getEntity().getContent()) { // <2>

View File

@ -112,6 +112,7 @@ public class RestClient implements Closeable {
/**
* Returns a new {@link RestClientBuilder} to help with {@link RestClient} creation.
* Creates a new builder instance and sets the hosts that the client will send requests to.
*/
public static RestClientBuilder builder(HttpHost... hosts) {
return new RestClientBuilder(hosts);
@ -706,8 +707,8 @@ public class RestClient implements Closeable {
* safe, volatile way.
*/
private static class HostTuple<T> {
public final T hosts;
public final AuthCache authCache;
final T hosts;
final AuthCache authCache;
HostTuple(final T hosts, final AuthCache authCache) {
this.hosts = hosts;

View File

@ -23,19 +23,34 @@ import org.elasticsearch.client.http.Header;
import org.elasticsearch.client.http.HttpHost;
import org.elasticsearch.client.http.impl.nio.client.CloseableHttpAsyncClient;
import java.io.IOException;
import java.net.URI;
import java.util.Collections;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
public class RestClientTests extends RestClientTestCase {
public void testCloseIsIdempotent() throws IOException {
HttpHost[] hosts = new HttpHost[]{new HttpHost("localhost", 9200)};
CloseableHttpAsyncClient closeableHttpAsyncClient = mock(CloseableHttpAsyncClient.class);
RestClient restClient = new RestClient(closeableHttpAsyncClient, 1_000, new Header[0], hosts, null, null);
restClient.close();
verify(closeableHttpAsyncClient, times(1)).close();
restClient.close();
verify(closeableHttpAsyncClient, times(2)).close();
restClient.close();
verify(closeableHttpAsyncClient, times(3)).close();
}
public void testPerformAsyncWithUnsupportedMethod() throws Exception {
RestClient.SyncResponseListener listener = new RestClient.SyncResponseListener(10000);
try (RestClient restClient = createRestClient()) {
restClient.performRequestAsync("unsupported", randomAsciiOfLength(5), listener);
restClient.performRequestAsync("unsupported", randomAsciiLettersOfLength(5), listener);
listener.get();
fail("should have failed because of unsupported method");
@ -47,7 +62,7 @@ public class RestClientTests extends RestClientTestCase {
public void testPerformAsyncWithNullParams() throws Exception {
RestClient.SyncResponseListener listener = new RestClient.SyncResponseListener(10000);
try (RestClient restClient = createRestClient()) {
restClient.performRequestAsync(randomAsciiOfLength(5), randomAsciiOfLength(5), null, listener);
restClient.performRequestAsync(randomAsciiLettersOfLength(5), randomAsciiLettersOfLength(5), null, listener);
listener.get();
fail("should have failed because of null parameters");
@ -59,7 +74,7 @@ public class RestClientTests extends RestClientTestCase {
public void testPerformAsyncWithNullHeaders() throws Exception {
RestClient.SyncResponseListener listener = new RestClient.SyncResponseListener(10000);
try (RestClient restClient = createRestClient()) {
restClient.performRequestAsync("GET", randomAsciiOfLength(5), listener, (Header) null);
restClient.performRequestAsync("GET", randomAsciiLettersOfLength(5), listener, (Header) null);
listener.get();
fail("should have failed because of null headers");

View File

@ -54,52 +54,44 @@ Settings settings = Settings.builder()
.put("cluster.name", "prod").build();
TransportClient transportClient = new PreBuiltTransportClient(settings)
.addTransportAddress(new TransportAddress(InetAddress.getByName("host"), 9300));
.addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"), 9300))
.addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"), 9301));
--------------------------------------------------
The initialization of a `RestHighLevelClient` is different. It first requires the initialization
of a <<java-rest-low-usage-initialization,low-level client>>:
The initialization of a `RestHighLevelClient` is different. It requires to provide
a <<java-rest-low-usage-initialization,low-level client builder>> as a constructor
argument:
[source,java]
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
RestClient lowLevelRestClient = RestClient.builder(
new HttpHost("host", 9200, "http")).build();
include-tagged::{doc-tests}/MainDocumentationIT.java[rest-high-level-client-init]
--------------------------------------------------
NOTE: The `RestClient` uses Elasticsearch's HTTP service which is
bounded by default on `9200`. This port is different from the port
used to connect to Elasticsearch with a `TransportClient`.
Which is then passed to the constructor of the `RestHighLevelClient`:
The `RestHighLevelClient` is thread-safe. It is typically instantiated by the
application at startup time or when the first request is executed.
[source,java]
--------------------------------------------------
RestHighLevelClient client =
new RestHighLevelClient(lowLevelRestClient);
--------------------------------------------------
Once the `RestHighLevelClient` is initialized, it can be used to execute any
of the <<java-rest-high-supported-apis,supported APIs>>.
Both `RestClient` and `RestHighLevelClient` are thread safe. They are
typically instantiated by the application at startup time or when the
first request is executed.
Once the `RestHighLevelClient` is initialized, it can then be used to
execute any of the <<java-rest-high-supported-apis,supported APIs>>.
As with the `TransportClient`, the `RestClient` must be closed when it
As with the `TransportClient`, the `RestHighLevelClient` must be closed when it
is not needed anymore or when the application is stopped.
So the code that closes the `TransportClient`:
The code that closes the `TransportClient`:
[source,java]
--------------------------------------------------
transportClient.close();
--------------------------------------------------
Must be replaced with:
must be replaced with:
[source,java]
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
lowLevelRestClient.close();
include-tagged::{doc-tests}/MainDocumentationIT.java[rest-high-level-client-close]
--------------------------------------------------
=== Changing the application's code
@ -309,7 +301,8 @@ include-tagged::{doc-tests}/MigrationDocumentationIT.java[migration-create-inded
set its content type (here, JSON)
<6> Execute the request using the low-level client. The execution is synchronous
and blocks on the `performRequest()` method until the remote cluster returns
a response.
a response. The low-level client can be retrieved from an existing `RestHighLevelClient`
instance through the `getLowLevelClient` getter method.
<7> Handle the situation where the index has not been created
@ -339,7 +332,7 @@ With the low-level client, the code can be changed to:
include-tagged::{doc-tests}/MigrationDocumentationIT.java[migration-cluster-health]
--------------------------------------------------
<1> Call the cluster's health REST endpoint using the default paramaters
and gets back a `Response` object
and gets back a `Response` object.
<2> Retrieve an `InputStream` object in order to read the response's content
<3> Parse the response's content using Elasticsearch's helper class `XContentHelper`. This
helper requires the content type of the response to be passed as an argument and returns

View File

@ -118,15 +118,26 @@ transitive dependencies:
[[java-rest-high-usage-initialization]]
=== Initialization
A `RestHighLevelClient` instance needs a <<java-rest-low-usage-initialization,REST low-level client>>
A `RestHighLevelClient` instance needs a <<java-rest-low-usage-initialization,REST low-level client builder>>
to be built as follows:
[source,java]
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
RestHighLevelClient client =
new RestHighLevelClient(lowLevelRestClient); <1>
include-tagged::{doc-tests}/MainDocumentationIT.java[rest-high-level-client-init]
--------------------------------------------------
The high-level client will internally create the low-level client used to
perform requests based on the provided builder, and manage its lifecycle.
The high-level client instance needs to be closed when no longer needed so that
all the resources used by it get properly released, as well as the underlying
http client instance and its threads. This can be done through the `close`
method, which will close the internal `RestClient` instance.
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/MainDocumentationIT.java[rest-high-level-client-close]
--------------------------------------------------
<1> We pass the <<java-rest-low-usage-initialization,REST low-level client>> instance
In the rest of this documentation about the Java High Level Client, the `RestHighLevelClient` instance
will be referenced as `client`.

View File

@ -28,7 +28,6 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.test.rest.yaml.ObjectPath;