Add support for named xcontent parsers to high level REST client (#23328)

NamedXContentRegistry will be used by the high level REST client to parse aggregation responses, and any section whose class type depends on a json key. There are currently on entries in the standard registry, but soon aggs and most likely suggesters will be added. Also it is possible for subclasses to provide additional named xcontent parsers.
This commit is contained in:
Luca Cavanna 2017-02-24 11:46:59 +01:00 committed by GitHub
parent b9eb1bba65
commit 7a96a38104
3 changed files with 202 additions and 28 deletions

View File

@ -43,22 +43,46 @@ import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Stream;
import static java.util.Collections.emptySet;
import static java.util.Collections.singleton;
import static java.util.stream.Collectors.toList;
/**
* High level REST client that wraps an instance of the low level {@link RestClient} and allows to build requests and read responses.
* The provided {@link RestClient} is externally built and closed.
* Can be sub-classed to expose additional client methods that make use of endpoints added to Elasticsearch through plugins, or to
* add support for custom response sections, again added to Elasticsearch through plugins.
*/
public class RestHighLevelClient {
private final RestClient client;
private final NamedXContentRegistry registry;
public RestHighLevelClient(RestClient client) {
this.client = Objects.requireNonNull(client);
/**
* Creates a {@link RestHighLevelClient} given the low level {@link RestClient} that it should use to perform requests.
*/
public RestHighLevelClient(RestClient restClient) {
this(restClient, Collections.emptyList());
}
/**
* Creates a {@link RestHighLevelClient} given the low level {@link RestClient} that it should use to perform requests and
* a list of entries that allow to parse custom response sections added to Elasticsearch through plugins.
*/
protected RestHighLevelClient(RestClient restClient, List<NamedXContentRegistry.Entry> namedXContentEntries) {
this.client = Objects.requireNonNull(restClient);
this.registry = new NamedXContentRegistry(Stream.of(
getNamedXContents().stream(),
namedXContentEntries.stream()
).flatMap(Function.identity()).collect(toList()));
}
/**
@ -224,7 +248,7 @@ public class RestHighLevelClient {
client.performRequestAsync(req.method, req.endpoint, req.params, req.entity, responseListener, headers);
}
static <Resp> ResponseListener wrapResponseListener(CheckedFunction<Response, Resp, IOException> responseConverter,
<Resp> ResponseListener wrapResponseListener(CheckedFunction<Response, Resp, IOException> responseConverter,
ActionListener<Resp> actionListener, Set<Integer> ignores) {
return new ResponseListener() {
@Override
@ -269,7 +293,7 @@ public class RestHighLevelClient {
* that wraps the original {@link ResponseException}. The potential exception obtained while parsing is added to the returned
* exception as a suppressed exception. This method is guaranteed to not throw any exception eventually thrown while parsing.
*/
static ElasticsearchStatusException parseResponseException(ResponseException responseException) {
ElasticsearchStatusException parseResponseException(ResponseException responseException) {
Response response = responseException.getResponse();
HttpEntity entity = response.getEntity();
ElasticsearchStatusException elasticsearchException;
@ -289,7 +313,7 @@ public class RestHighLevelClient {
return elasticsearchException;
}
static <Resp> Resp parseEntity(
<Resp> Resp parseEntity(
HttpEntity entity, CheckedFunction<XContentParser, Resp, IOException> entityParser) throws IOException {
if (entity == null) {
throw new IllegalStateException("Response body expected but not returned");
@ -301,7 +325,7 @@ public class RestHighLevelClient {
if (xContentType == null) {
throw new IllegalStateException("Unsupported Content-Type: " + entity.getContentType().getValue());
}
try (XContentParser parser = xContentType.xContent().createParser(NamedXContentRegistry.EMPTY, entity.getContent())) {
try (XContentParser parser = xContentType.xContent().createParser(registry, entity.getContent())) {
return entityParser.apply(parser);
}
}
@ -309,4 +333,10 @@ public class RestHighLevelClient {
static boolean convertExistsResponse(Response response) {
return response.getStatusLine().getStatusCode() == 200;
}
static List<NamedXContentRegistry.Entry> getNamedXContents() {
List<NamedXContentRegistry.Entry> namedXContents = new ArrayList<>();
//namedXContents.add(new NamedXContentRegistry.Entry(Aggregation.class, new ParseField("sterms"), StringTerms::fromXContent));
return namedXContents;
}
}

View File

@ -0,0 +1,138 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client;
import org.apache.http.HttpEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.mockito.Mockito.mock;
/**
* This test works against a {@link RestHighLevelClient} subclass that simulats how custom response sections returned by
* Elasticsearch plugins can be parsed using the high level client.
*/
public class RestHighLevelClientExtTests extends ESTestCase {
private RestHighLevelClient restHighLevelClient;
@Before
public void initClient() throws IOException {
RestClient restClient = mock(RestClient.class);
restHighLevelClient = new RestHighLevelClientExt(restClient);
}
public void testParseEntityCustomResponseSection() throws IOException {
{
HttpEntity jsonEntity = new StringEntity("{\"custom1\":{ \"field\":\"value\"}}", ContentType.APPLICATION_JSON);
BaseCustomResponseSection customSection = restHighLevelClient.parseEntity(jsonEntity, BaseCustomResponseSection::fromXContent);
assertThat(customSection, instanceOf(CustomResponseSection1.class));
CustomResponseSection1 customResponseSection1 = (CustomResponseSection1) customSection;
assertEquals("value", customResponseSection1.value);
}
{
HttpEntity jsonEntity = new StringEntity("{\"custom2\":{ \"array\": [\"item1\", \"item2\"]}}", ContentType.APPLICATION_JSON);
BaseCustomResponseSection customSection = restHighLevelClient.parseEntity(jsonEntity, BaseCustomResponseSection::fromXContent);
assertThat(customSection, instanceOf(CustomResponseSection2.class));
CustomResponseSection2 customResponseSection2 = (CustomResponseSection2) customSection;
assertArrayEquals(new String[]{"item1", "item2"}, customResponseSection2.values);
}
}
private static class RestHighLevelClientExt extends RestHighLevelClient {
private RestHighLevelClientExt(RestClient restClient) {
super(restClient, getNamedXContentsExt());
}
private static List<NamedXContentRegistry.Entry> getNamedXContentsExt() {
List<NamedXContentRegistry.Entry> entries = new ArrayList<>();
entries.add(new NamedXContentRegistry.Entry(BaseCustomResponseSection.class, new ParseField("custom1"),
CustomResponseSection1::fromXContent));
entries.add(new NamedXContentRegistry.Entry(BaseCustomResponseSection.class, new ParseField("custom2"),
CustomResponseSection2::fromXContent));
return entries;
}
}
private abstract static class BaseCustomResponseSection {
static BaseCustomResponseSection fromXContent(XContentParser parser) throws IOException {
assertEquals(XContentParser.Token.START_OBJECT, parser.nextToken());
assertEquals(XContentParser.Token.FIELD_NAME, parser.nextToken());
BaseCustomResponseSection custom = parser.namedObject(BaseCustomResponseSection.class, parser.currentName(), null);
assertEquals(XContentParser.Token.END_OBJECT, parser.nextToken());
return custom;
}
}
private static class CustomResponseSection1 extends BaseCustomResponseSection {
private final String value;
private CustomResponseSection1(String value) {
this.value = value;
}
static CustomResponseSection1 fromXContent(XContentParser parser) throws IOException {
assertEquals(XContentParser.Token.START_OBJECT, parser.nextToken());
assertEquals(XContentParser.Token.FIELD_NAME, parser.nextToken());
assertEquals("field", parser.currentName());
assertEquals(XContentParser.Token.VALUE_STRING, parser.nextToken());
CustomResponseSection1 responseSection1 = new CustomResponseSection1(parser.text());
assertEquals(XContentParser.Token.END_OBJECT, parser.nextToken());
return responseSection1;
}
}
private static class CustomResponseSection2 extends BaseCustomResponseSection {
private final String[] values;
private CustomResponseSection2(String[] values) {
this.values = values;
}
static CustomResponseSection2 fromXContent(XContentParser parser) throws IOException {
assertEquals(XContentParser.Token.START_OBJECT, parser.nextToken());
assertEquals(XContentParser.Token.FIELD_NAME, parser.nextToken());
assertEquals("array", parser.currentName());
assertEquals(XContentParser.Token.START_ARRAY, parser.nextToken());
List<String> values = new ArrayList<>();
while(parser.nextToken().isValue()) {
values.add(parser.text());
}
assertEquals(XContentParser.Token.END_ARRAY, parser.currentToken());
CustomResponseSection2 responseSection2 = new CustomResponseSection2(values.toArray(new String[values.size()]));
assertEquals(XContentParser.Token.END_OBJECT, parser.nextToken());
return responseSection2;
}
}
}

View File

@ -39,6 +39,7 @@ import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.main.MainRequest;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.cbor.CborXContent;
@ -54,9 +55,9 @@ import org.mockito.internal.matchers.VarargMatcher;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.mockito.Matchers.anyMapOf;
@ -138,17 +139,17 @@ public class RestHighLevelClientTests extends ESTestCase {
public void testParseEntity() throws IOException {
{
IllegalStateException ise = expectThrows(IllegalStateException.class, () -> RestHighLevelClient.parseEntity(null, null));
IllegalStateException ise = expectThrows(IllegalStateException.class, () -> restHighLevelClient.parseEntity(null, null));
assertEquals("Response body expected but not returned", ise.getMessage());
}
{
IllegalStateException ise = expectThrows(IllegalStateException.class,
() -> RestHighLevelClient.parseEntity(new StringEntity("", (ContentType) null), null));
() -> restHighLevelClient.parseEntity(new StringEntity("", (ContentType) null), null));
assertEquals("Elasticsearch didn't return the [Content-Type] header, unable to parse response body", ise.getMessage());
}
{
StringEntity entity = new StringEntity("", ContentType.APPLICATION_SVG_XML);
IllegalStateException ise = expectThrows(IllegalStateException.class, () -> RestHighLevelClient.parseEntity(entity, null));
IllegalStateException ise = expectThrows(IllegalStateException.class, () -> restHighLevelClient.parseEntity(entity, null));
assertEquals("Unsupported Content-Type: " + entity.getContentType().getValue(), ise.getMessage());
}
{
@ -161,13 +162,13 @@ public class RestHighLevelClientTests extends ESTestCase {
return value;
};
HttpEntity jsonEntity = new StringEntity("{\"field\":\"value\"}", ContentType.APPLICATION_JSON);
assertEquals("value", RestHighLevelClient.parseEntity(jsonEntity, entityParser));
assertEquals("value", restHighLevelClient.parseEntity(jsonEntity, entityParser));
HttpEntity yamlEntity = new StringEntity("---\nfield: value\n", ContentType.create("application/yaml"));
assertEquals("value", RestHighLevelClient.parseEntity(yamlEntity, entityParser));
assertEquals("value", restHighLevelClient.parseEntity(yamlEntity, entityParser));
HttpEntity smileEntity = createBinaryEntity(SmileXContent.contentBuilder(), ContentType.create("application/smile"));
assertEquals("value", RestHighLevelClient.parseEntity(smileEntity, entityParser));
assertEquals("value", restHighLevelClient.parseEntity(smileEntity, entityParser));
HttpEntity cborEntity = createBinaryEntity(CborXContent.contentBuilder(), ContentType.create("application/cbor"));
assertEquals("value", RestHighLevelClient.parseEntity(cborEntity, entityParser));
assertEquals("value", restHighLevelClient.parseEntity(cborEntity, entityParser));
}
}
@ -194,7 +195,7 @@ public class RestHighLevelClientTests extends ESTestCase {
HttpResponse httpResponse = new BasicHttpResponse(newStatusLine(restStatus));
Response response = new Response(REQUEST_LINE, new HttpHost("localhost", 9200), httpResponse);
ResponseException responseException = new ResponseException(response);
ElasticsearchException elasticsearchException = RestHighLevelClient.parseResponseException(responseException);
ElasticsearchException elasticsearchException = restHighLevelClient.parseResponseException(responseException);
assertEquals(responseException.getMessage(), elasticsearchException.getMessage());
assertEquals(restStatus, elasticsearchException.status());
assertSame(responseException, elasticsearchException.getCause());
@ -206,7 +207,7 @@ public class RestHighLevelClientTests extends ESTestCase {
ContentType.APPLICATION_JSON));
Response response = new Response(REQUEST_LINE, new HttpHost("localhost", 9200), httpResponse);
ResponseException responseException = new ResponseException(response);
ElasticsearchException elasticsearchException = RestHighLevelClient.parseResponseException(responseException);
ElasticsearchException elasticsearchException = restHighLevelClient.parseResponseException(responseException);
assertEquals("Elasticsearch exception [type=exception, reason=test error message]", elasticsearchException.getMessage());
assertEquals(restStatus, elasticsearchException.status());
assertSame(responseException, elasticsearchException.getSuppressed()[0]);
@ -217,7 +218,7 @@ public class RestHighLevelClientTests extends ESTestCase {
httpResponse.setEntity(new StringEntity("{\"error\":", ContentType.APPLICATION_JSON));
Response response = new Response(REQUEST_LINE, new HttpHost("localhost", 9200), httpResponse);
ResponseException responseException = new ResponseException(response);
ElasticsearchException elasticsearchException = RestHighLevelClient.parseResponseException(responseException);
ElasticsearchException elasticsearchException = restHighLevelClient.parseResponseException(responseException);
assertEquals("Unable to parse response body", elasticsearchException.getMessage());
assertEquals(restStatus, elasticsearchException.status());
assertSame(responseException, elasticsearchException.getCause());
@ -229,7 +230,7 @@ public class RestHighLevelClientTests extends ESTestCase {
httpResponse.setEntity(new StringEntity("{\"status\":" + restStatus.getStatus() + "}", ContentType.APPLICATION_JSON));
Response response = new Response(REQUEST_LINE, new HttpHost("localhost", 9200), httpResponse);
ResponseException responseException = new ResponseException(response);
ElasticsearchException elasticsearchException = RestHighLevelClient.parseResponseException(responseException);
ElasticsearchException elasticsearchException = restHighLevelClient.parseResponseException(responseException);
assertEquals("Unable to parse response body", elasticsearchException.getMessage());
assertEquals(restStatus, elasticsearchException.status());
assertSame(responseException, elasticsearchException.getCause());
@ -390,7 +391,7 @@ public class RestHighLevelClientTests extends ESTestCase {
public void testWrapResponseListenerOnSuccess() throws IOException {
{
TrackingActionListener trackingActionListener = new TrackingActionListener();
ResponseListener responseListener = RestHighLevelClient.wrapResponseListener(
ResponseListener responseListener = restHighLevelClient.wrapResponseListener(
response -> response.getStatusLine().getStatusCode(), trackingActionListener, Collections.emptySet());
RestStatus restStatus = randomFrom(RestStatus.values());
HttpResponse httpResponse = new BasicHttpResponse(newStatusLine(restStatus));
@ -400,7 +401,7 @@ public class RestHighLevelClientTests extends ESTestCase {
}
{
TrackingActionListener trackingActionListener = new TrackingActionListener();
ResponseListener responseListener = RestHighLevelClient.wrapResponseListener(
ResponseListener responseListener = restHighLevelClient.wrapResponseListener(
response -> {throw new IllegalStateException();}, trackingActionListener, Collections.emptySet());
RestStatus restStatus = randomFrom(RestStatus.values());
HttpResponse httpResponse = new BasicHttpResponse(newStatusLine(restStatus));
@ -415,7 +416,7 @@ public class RestHighLevelClientTests extends ESTestCase {
public void testWrapResponseListenerOnException() throws IOException {
TrackingActionListener trackingActionListener = new TrackingActionListener();
ResponseListener responseListener = RestHighLevelClient.wrapResponseListener(
ResponseListener responseListener = restHighLevelClient.wrapResponseListener(
response -> response.getStatusLine().getStatusCode(), trackingActionListener, Collections.emptySet());
IllegalStateException exception = new IllegalStateException();
responseListener.onFailure(exception);
@ -424,7 +425,7 @@ public class RestHighLevelClientTests extends ESTestCase {
public void testWrapResponseListenerOnResponseExceptionWithoutEntity() throws IOException {
TrackingActionListener trackingActionListener = new TrackingActionListener();
ResponseListener responseListener = RestHighLevelClient.wrapResponseListener(
ResponseListener responseListener = restHighLevelClient.wrapResponseListener(
response -> response.getStatusLine().getStatusCode(), trackingActionListener, Collections.emptySet());
RestStatus restStatus = randomFrom(RestStatus.values());
HttpResponse httpResponse = new BasicHttpResponse(newStatusLine(restStatus));
@ -440,7 +441,7 @@ public class RestHighLevelClientTests extends ESTestCase {
public void testWrapResponseListenerOnResponseExceptionWithEntity() throws IOException {
TrackingActionListener trackingActionListener = new TrackingActionListener();
ResponseListener responseListener = RestHighLevelClient.wrapResponseListener(
ResponseListener responseListener = restHighLevelClient.wrapResponseListener(
response -> response.getStatusLine().getStatusCode(), trackingActionListener, Collections.emptySet());
RestStatus restStatus = randomFrom(RestStatus.values());
HttpResponse httpResponse = new BasicHttpResponse(newStatusLine(restStatus));
@ -459,7 +460,7 @@ public class RestHighLevelClientTests extends ESTestCase {
public void testWrapResponseListenerOnResponseExceptionWithBrokenEntity() throws IOException {
{
TrackingActionListener trackingActionListener = new TrackingActionListener();
ResponseListener responseListener = RestHighLevelClient.wrapResponseListener(
ResponseListener responseListener = restHighLevelClient.wrapResponseListener(
response -> response.getStatusLine().getStatusCode(), trackingActionListener, Collections.emptySet());
RestStatus restStatus = randomFrom(RestStatus.values());
HttpResponse httpResponse = new BasicHttpResponse(newStatusLine(restStatus));
@ -476,7 +477,7 @@ public class RestHighLevelClientTests extends ESTestCase {
}
{
TrackingActionListener trackingActionListener = new TrackingActionListener();
ResponseListener responseListener = RestHighLevelClient.wrapResponseListener(
ResponseListener responseListener = restHighLevelClient.wrapResponseListener(
response -> response.getStatusLine().getStatusCode(), trackingActionListener, Collections.emptySet());
RestStatus restStatus = randomFrom(RestStatus.values());
HttpResponse httpResponse = new BasicHttpResponse(newStatusLine(restStatus));
@ -495,7 +496,7 @@ public class RestHighLevelClientTests extends ESTestCase {
public void testWrapResponseListenerOnResponseExceptionWithIgnores() throws IOException {
TrackingActionListener trackingActionListener = new TrackingActionListener();
ResponseListener responseListener = RestHighLevelClient.wrapResponseListener(
ResponseListener responseListener = restHighLevelClient.wrapResponseListener(
response -> response.getStatusLine().getStatusCode(), trackingActionListener, Collections.singleton(404));
HttpResponse httpResponse = new BasicHttpResponse(newStatusLine(RestStatus.NOT_FOUND));
Response response = new Response(REQUEST_LINE, new HttpHost("localhost", 9200), httpResponse);
@ -510,7 +511,7 @@ public class RestHighLevelClientTests extends ESTestCase {
TrackingActionListener trackingActionListener = new TrackingActionListener();
//response parsing throws exception while handling ignores. same as when GetResponse#fromXContent throws error when trying
//to parse a 404 response which contains an error rather than a valid document not found response.
ResponseListener responseListener = RestHighLevelClient.wrapResponseListener(
ResponseListener responseListener = restHighLevelClient.wrapResponseListener(
response -> { throw new IllegalStateException(); }, trackingActionListener, Collections.singleton(404));
HttpResponse httpResponse = new BasicHttpResponse(newStatusLine(RestStatus.NOT_FOUND));
Response response = new Response(REQUEST_LINE, new HttpHost("localhost", 9200), httpResponse);
@ -527,7 +528,7 @@ public class RestHighLevelClientTests extends ESTestCase {
TrackingActionListener trackingActionListener = new TrackingActionListener();
//response parsing throws exception while handling ignores. same as when GetResponse#fromXContent throws error when trying
//to parse a 404 response which contains an error rather than a valid document not found response.
ResponseListener responseListener = RestHighLevelClient.wrapResponseListener(
ResponseListener responseListener = restHighLevelClient.wrapResponseListener(
response -> { throw new IllegalStateException(); }, trackingActionListener, Collections.singleton(404));
HttpResponse httpResponse = new BasicHttpResponse(newStatusLine(RestStatus.NOT_FOUND));
httpResponse.setEntity(new StringEntity("{\"error\":\"test error message\",\"status\":404}",
@ -542,6 +543,11 @@ public class RestHighLevelClientTests extends ESTestCase {
assertEquals("Elasticsearch exception [type=exception, reason=test error message]", elasticsearchException.getMessage());
}
public void testNamedXContents() throws IOException {
List<NamedXContentRegistry.Entry> namedXContents = RestHighLevelClient.getNamedXContents();
assertEquals(0, namedXContents.size());
}
private static class TrackingActionListener implements ActionListener<Integer> {
private final AtomicInteger statusCode = new AtomicInteger(-1);
private final AtomicReference<Exception> exception = new AtomicReference<>();