Add get/exists method to RestHighLevelClient (#22706)

This commit adds support for get and exists api to the high level Java Rest Client. It also adds the infrastructure for other methods to be added in the future.
This commit is contained in:
Luca Cavanna 2017-02-04 14:54:26 +01:00 committed by GitHub
parent e181a020a9
commit 3551106aa7
7 changed files with 1145 additions and 31 deletions

View File

@ -0,0 +1,115 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client;
import org.apache.http.HttpEntity;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import java.util.Collections;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.StringJoiner;
final class Request {
final String method;
final String endpoint;
final Map<String, String> params;
final HttpEntity entity;
Request(String method, String endpoint, Map<String, String> params, HttpEntity entity) {
this.method = method;
this.endpoint = endpoint;
this.params = params;
this.entity = entity;
}
@Override
public String toString() {
return "Request{" +
"method='" + method + '\'' +
", endpoint='" + endpoint + '\'' +
", params=" + params +
'}';
}
static Request ping() {
return new Request("HEAD", "/", Collections.emptyMap(), null);
}
static Request exists(GetRequest getRequest) {
return new Request("HEAD", getEndpoint(getRequest), getParams(getRequest), null);
}
static Request get(GetRequest getRequest) {
return new Request("GET", getEndpoint(getRequest), getParams(getRequest), null);
}
private static Map<String, String> getParams(GetRequest getRequest) {
Map<String, String> params = new HashMap<>();
putParam("preference", getRequest.preference(), params);
putParam("routing", getRequest.routing(), params);
putParam("parent", getRequest.parent(), params);
if (getRequest.refresh()) {
params.put("refresh", Boolean.TRUE.toString());
}
if (getRequest.realtime() == false) {
params.put("realtime", Boolean.FALSE.toString());
}
if (getRequest.storedFields() != null && getRequest.storedFields().length > 0) {
params.put("stored_fields", String.join(",", getRequest.storedFields()));
}
if (getRequest.version() != Versions.MATCH_ANY) {
params.put("version", Long.toString(getRequest.version()));
}
if (getRequest.versionType() != VersionType.INTERNAL) {
params.put("version_type", getRequest.versionType().name().toLowerCase(Locale.ROOT));
}
if (getRequest.fetchSourceContext() != null) {
FetchSourceContext fetchSourceContext = getRequest.fetchSourceContext();
if (fetchSourceContext.fetchSource() == false) {
params.put("_source", Boolean.FALSE.toString());
}
if (fetchSourceContext.includes() != null && fetchSourceContext.includes().length > 0) {
params.put("_source_include", String.join(",", fetchSourceContext.includes()));
}
if (fetchSourceContext.excludes() != null && fetchSourceContext.excludes().length > 0) {
params.put("_source_exclude", String.join(",", fetchSourceContext.excludes()));
}
}
return Collections.unmodifiableMap(params);
}
private static String getEndpoint(GetRequest getRequest) {
StringJoiner pathJoiner = new StringJoiner("/", "/", "");
return pathJoiner.add(getRequest.index()).add(getRequest.type()).add(getRequest.id()).toString();
}
private static void putParam(String key, String value, Map<String, String> params) {
if (Strings.hasLength(value)) {
params.put(key, value);
}
}
}

View File

@ -19,20 +19,34 @@
package org.elasticsearch.client;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.main.MainRequest;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException;
import java.util.Collections;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
/**
* High level REST client that wraps an instance of the low level {@link RestClient} and allows to build requests and read responses.
* The provided {@link RestClient} is externally built and closed.
*/
public final class RestHighLevelClient {
private static final Log logger = LogFactory.getLog(RestHighLevelClient.class);
public class RestHighLevelClient {
private final RestClient client;
@ -40,14 +54,180 @@ public final class RestHighLevelClient {
this.client = Objects.requireNonNull(client);
}
public boolean ping(Header... headers) {
/**
* Pings the remote Elasticsearch cluster and returns true if the ping succeeded, false otherwise
*/
public boolean ping(Header... headers) throws IOException {
return performRequest(new MainRequest(), (request) -> Request.ping(), RestHighLevelClient::convertExistsResponse,
Collections.emptySet(), headers);
}
/**
* Retrieves a document by id using the get api
*/
public GetResponse get(GetRequest getRequest, Header... headers) throws IOException {
return performRequestAndParseEntity(getRequest, Request::get, GetResponse::fromXContent, Collections.singleton(404), headers);
}
/**
* Asynchronously retrieves a document by id using the get api
*/
public void getAsync(GetRequest getRequest, ActionListener<GetResponse> listener, Header... headers) {
performRequestAsyncAndParseEntity(getRequest, Request::get, GetResponse::fromXContent, listener,
Collections.singleton(404), headers);
}
/**
* Checks for the existence of a document. Returns true if it exists, false otherwise
*/
public boolean exists(GetRequest getRequest, Header... headers) throws IOException {
return performRequest(getRequest, Request::exists, RestHighLevelClient::convertExistsResponse, Collections.emptySet(), headers);
}
/**
* Asynchronously checks for the existence of a document. Returns true if it exists, false otherwise
*/
public void existsAsync(GetRequest getRequest, ActionListener<Boolean> listener, Header... headers) {
performRequestAsync(getRequest, Request::exists, RestHighLevelClient::convertExistsResponse, listener,
Collections.emptySet(), headers);
}
private <Req extends ActionRequest, Resp> Resp performRequestAndParseEntity(Req request, Function<Req, Request> requestConverter,
CheckedFunction<XContentParser, Resp, IOException> entityParser, Set<Integer> ignores, Header... headers) throws IOException {
return performRequest(request, requestConverter, (response) -> parseEntity(response.getEntity(), entityParser), ignores, headers);
}
<Req extends ActionRequest, Resp> Resp performRequest(Req request, Function<Req, Request> requestConverter,
CheckedFunction<Response, Resp, IOException> responseConverter, Set<Integer> ignores, Header... headers) throws IOException {
ActionRequestValidationException validationException = request.validate();
if (validationException != null) {
throw validationException;
}
Request req = requestConverter.apply(request);
Response response;
try {
client.performRequest("HEAD", "/", headers);
return true;
} catch(IOException exception) {
return false;
response = client.performRequest(req.method, req.endpoint, req.params, req.entity, headers);
} catch (ResponseException e) {
if (ignores.contains(e.getResponse().getStatusLine().getStatusCode())) {
try {
return responseConverter.apply(e.getResponse());
} catch (Exception innerException) {
throw parseResponseException(e);
}
}
throw parseResponseException(e);
}
try {
return responseConverter.apply(response);
} catch(Exception e) {
throw new IOException("Unable to parse response body for " + response, e);
}
}
private <Req extends ActionRequest, Resp> void performRequestAsyncAndParseEntity(Req request, Function<Req, Request> requestConverter,
CheckedFunction<XContentParser, Resp, IOException> entityParser, ActionListener<Resp> listener,
Set<Integer> ignores, Header... headers) {
performRequestAsync(request, requestConverter, (response) -> parseEntity(response.getEntity(), entityParser),
listener, ignores, headers);
}
<Req extends ActionRequest, Resp> void performRequestAsync(Req request, Function<Req, Request> requestConverter,
CheckedFunction<Response, Resp, IOException> responseConverter, ActionListener<Resp> listener,
Set<Integer> ignores, Header... headers) {
ActionRequestValidationException validationException = request.validate();
if (validationException != null) {
listener.onFailure(validationException);
return;
}
Request req = requestConverter.apply(request);
ResponseListener responseListener = wrapResponseListener(responseConverter, listener, ignores);
client.performRequestAsync(req.method, req.endpoint, req.params, req.entity, responseListener, headers);
}
static <Resp> ResponseListener wrapResponseListener(CheckedFunction<Response, Resp, IOException> responseConverter,
ActionListener<Resp> actionListener, Set<Integer> ignores) {
return new ResponseListener() {
@Override
public void onSuccess(Response response) {
try {
actionListener.onResponse(responseConverter.apply(response));
} catch(Exception e) {
IOException ioe = new IOException("Unable to parse response body for " + response, e);
onFailure(ioe);
}
}
@Override
public void onFailure(Exception exception) {
if (exception instanceof ResponseException) {
ResponseException responseException = (ResponseException) exception;
Response response = responseException.getResponse();
if (ignores.contains(response.getStatusLine().getStatusCode())) {
try {
actionListener.onResponse(responseConverter.apply(response));
} catch (Exception innerException) {
//the exception is ignored as we now try to parse the response as an error.
//this covers cases like get where 404 can either be a valid document not found response,
//or an error for which parsing is completely different. We try to consider the 404 response as a valid one
//first. If parsing of the response breaks, we fall back to parsing it as an error.
actionListener.onFailure(parseResponseException(responseException));
}
} else {
actionListener.onFailure(parseResponseException(responseException));
}
} else {
actionListener.onFailure(exception);
}
}
};
}
/**
* Converts a {@link ResponseException} obtained from the low level REST client into an {@link ElasticsearchException}.
* If a response body was returned, tries to parse it as an error returned from Elasticsearch.
* If no response body was returned or anything goes wrong while parsing the error, returns a new {@link ElasticsearchStatusException}
* that wraps the original {@link ResponseException}. The potential exception obtained while parsing is added to the returned
* exception as a suppressed exception. This method is guaranteed to not throw any exception eventually thrown while parsing.
*/
static ElasticsearchStatusException parseResponseException(ResponseException responseException) {
Response response = responseException.getResponse();
HttpEntity entity = response.getEntity();
ElasticsearchStatusException elasticsearchException;
if (entity == null) {
elasticsearchException = new ElasticsearchStatusException(
responseException.getMessage(), RestStatus.fromCode(response.getStatusLine().getStatusCode()), responseException);
} else {
try {
elasticsearchException = parseEntity(entity, BytesRestResponse::errorFromXContent);
elasticsearchException.addSuppressed(responseException);
} catch (Exception e) {
RestStatus restStatus = RestStatus.fromCode(response.getStatusLine().getStatusCode());
elasticsearchException = new ElasticsearchStatusException("Unable to parse response body", restStatus, responseException);
elasticsearchException.addSuppressed(e);
}
}
return elasticsearchException;
}
static <Resp> Resp parseEntity(
HttpEntity entity, CheckedFunction<XContentParser, Resp, IOException> entityParser) throws IOException {
if (entity == null) {
throw new IllegalStateException("Response body expected but not returned");
}
if (entity.getContentType() == null) {
throw new IllegalStateException("Elasticsearch didn't return the [Content-Type] header, unable to parse response body");
}
XContentType xContentType = XContentType.fromMediaTypeOrFormat(entity.getContentType().getValue());
if (xContentType == null) {
throw new IllegalStateException("Unsupported Content-Type: " + entity.getContentType().getValue());
}
try (XContentParser parser = xContentType.xContent().createParser(NamedXContentRegistry.EMPTY, entity.getContent())) {
return entityParser.apply(parser);
}
}
static boolean convertExistsResponse(Response response) {
return response.getStatusLine().getStatusCode() == 200;
}
}

View File

@ -0,0 +1,144 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.common.Strings;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import static org.hamcrest.CoreMatchers.containsString;
public class CrudIT extends ESRestHighLevelClientTestCase {
public void testExists() throws IOException {
{
GetRequest getRequest = new GetRequest("index", "type", "id");
assertFalse(execute(getRequest, highLevelClient()::exists, highLevelClient()::existsAsync));
}
String document = "{\"field1\":\"value1\",\"field2\":\"value2\"}";
StringEntity stringEntity = new StringEntity(document, ContentType.APPLICATION_JSON);
Response response = client().performRequest("PUT", "/index/type/id", Collections.singletonMap("refresh", "wait_for"), stringEntity);
assertEquals(201, response.getStatusLine().getStatusCode());
{
GetRequest getRequest = new GetRequest("index", "type", "id");
assertTrue(execute(getRequest, highLevelClient()::exists, highLevelClient()::existsAsync));
}
{
GetRequest getRequest = new GetRequest("index", "type", "does_not_exist");
assertFalse(execute(getRequest, highLevelClient()::exists, highLevelClient()::existsAsync));
}
{
GetRequest getRequest = new GetRequest("index", "type", "does_not_exist").version(1);
ElasticsearchException exception = expectThrows(ElasticsearchException.class,
() -> execute(getRequest, highLevelClient()::exists, highLevelClient()::existsAsync));
assertEquals(RestStatus.BAD_REQUEST, exception.status());
assertThat(exception.getMessage(), containsString("/index/type/does_not_exist?version=1: HTTP/1.1 400 Bad Request"));
}
}
public void testGet() throws IOException {
{
GetRequest getRequest = new GetRequest("index", "type", "id");
ElasticsearchException exception = expectThrows(ElasticsearchException.class,
() -> execute(getRequest, highLevelClient()::get, highLevelClient()::getAsync));
assertEquals(RestStatus.NOT_FOUND, exception.status());
assertEquals("Elasticsearch exception [type=index_not_found_exception, reason=no such index]", exception.getMessage());
assertEquals("index", exception.getMetadata("es.index").get(0));
}
String document = "{\"field1\":\"value1\",\"field2\":\"value2\"}";
StringEntity stringEntity = new StringEntity(document, ContentType.APPLICATION_JSON);
Response response = client().performRequest("PUT", "/index/type/id", Collections.singletonMap("refresh", "wait_for"), stringEntity);
assertEquals(201, response.getStatusLine().getStatusCode());
{
GetRequest getRequest = new GetRequest("index", "type", "id").version(2);
ElasticsearchException exception = expectThrows(ElasticsearchException.class,
() -> execute(getRequest, highLevelClient()::get, highLevelClient()::getAsync));
assertEquals(RestStatus.CONFLICT, exception.status());
assertEquals("Elasticsearch exception [type=version_conflict_engine_exception, " + "reason=[type][id]: " +
"version conflict, current version [1] is different than the one provided [2]]", exception.getMessage());
assertEquals("index", exception.getMetadata("es.index").get(0));
}
{
GetRequest getRequest = new GetRequest("index", "type", "id");
if (randomBoolean()) {
getRequest.version(1L);
}
GetResponse getResponse = execute(getRequest, highLevelClient()::get, highLevelClient()::getAsync);
assertEquals("index", getResponse.getIndex());
assertEquals("type", getResponse.getType());
assertEquals("id", getResponse.getId());
assertTrue(getResponse.isExists());
assertFalse(getResponse.isSourceEmpty());
assertEquals(1L, getResponse.getVersion());
assertEquals(document, getResponse.getSourceAsString());
}
{
GetRequest getRequest = new GetRequest("index", "type", "does_not_exist");
GetResponse getResponse = execute(getRequest, highLevelClient()::get, highLevelClient()::getAsync);
assertEquals("index", getResponse.getIndex());
assertEquals("type", getResponse.getType());
assertEquals("does_not_exist", getResponse.getId());
assertFalse(getResponse.isExists());
assertEquals(-1, getResponse.getVersion());
assertTrue(getResponse.isSourceEmpty());
assertNull(getResponse.getSourceAsString());
}
{
GetRequest getRequest = new GetRequest("index", "type", "id");
getRequest.fetchSourceContext(new FetchSourceContext(false, Strings.EMPTY_ARRAY, Strings.EMPTY_ARRAY));
GetResponse getResponse = execute(getRequest, highLevelClient()::get, highLevelClient()::getAsync);
assertEquals("index", getResponse.getIndex());
assertEquals("type", getResponse.getType());
assertEquals("id", getResponse.getId());
assertTrue(getResponse.isExists());
assertTrue(getResponse.isSourceEmpty());
assertEquals(1L, getResponse.getVersion());
assertNull(getResponse.getSourceAsString());
}
{
GetRequest getRequest = new GetRequest("index", "type", "id");
if (randomBoolean()) {
getRequest.fetchSourceContext(new FetchSourceContext(true, new String[]{"field1"}, Strings.EMPTY_ARRAY));
} else {
getRequest.fetchSourceContext(new FetchSourceContext(true, Strings.EMPTY_ARRAY, new String[]{"field2"}));
}
GetResponse getResponse = execute(getRequest, highLevelClient()::get, highLevelClient()::getAsync);
assertEquals("index", getResponse.getIndex());
assertEquals("type", getResponse.getType());
assertEquals("id", getResponse.getId());
assertTrue(getResponse.isExists());
assertFalse(getResponse.isSourceEmpty());
assertEquals(1L, getResponse.getVersion());
Map<String, Object> sourceAsMap = getResponse.getSourceAsMap();
assertEquals(1, sourceAsMap.size());
assertEquals("value1", sourceAsMap.get("field1"));
}
}
}

View File

@ -19,6 +19,9 @@
package org.elasticsearch.client;
import org.apache.http.Header;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.junit.AfterClass;
import org.junit.Before;
@ -45,4 +48,28 @@ public abstract class ESRestHighLevelClientTestCase extends ESRestTestCase {
protected static RestHighLevelClient highLevelClient() {
return restHighLevelClient;
}
/**
* Executes the provided request using either the sync method or its async variant, both provided as functions
*/
protected static <Req, Resp> Resp execute(Req request, SyncMethod<Req, Resp> syncMethod,
AsyncMethod<Req, Resp> asyncMethod, Header... headers) throws IOException {
if (randomBoolean()) {
return syncMethod.execute(request, headers);
} else {
PlainActionFuture<Resp> future = PlainActionFuture.newFuture();
asyncMethod.execute(request, future, headers);
return future.actionGet();
}
}
@FunctionalInterface
protected interface SyncMethod<Request, Response> {
Response execute(Request request, Header... headers) throws IOException;
}
@FunctionalInterface
protected interface AsyncMethod<Request, Response> {
void execute(Request request, ActionListener<Response> listener, Header... headers);
}
}

View File

@ -19,9 +19,13 @@
package org.elasticsearch.client;
public class MainActionIT extends ESRestHighLevelClientTestCase {
import java.io.IOException;
public void testPing() {
public class PingAndInfoIT extends ESRestHighLevelClientTestCase {
public void testPing() throws IOException {
assertTrue(highLevelClient().ping());
}
//TODO add here integ tests for info api: "GET /" once we have parsing code for MainResponse
}

View File

@ -0,0 +1,158 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.test.ESTestCase;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.function.Function;
public class RequestTests extends ESTestCase {
public void testPing() {
Request request = Request.ping();
assertEquals("/", request.endpoint);
assertEquals(0, request.params.size());
assertNull(request.entity);
assertEquals("HEAD", request.method);
}
public void testGet() {
getAndExistsTest(Request::get, "GET");
}
public void testExists() {
getAndExistsTest(Request::exists, "HEAD");
}
private static void getAndExistsTest(Function<GetRequest, Request> requestConverter, String method) {
String index = randomAsciiOfLengthBetween(3, 10);
String type = randomAsciiOfLengthBetween(3, 10);
String id = randomAsciiOfLengthBetween(3, 10);
GetRequest getRequest = new GetRequest(index, type, id);
Map<String, String> expectedParams = new HashMap<>();
if (randomBoolean()) {
if (randomBoolean()) {
String preference = randomAsciiOfLengthBetween(3, 10);
getRequest.preference(preference);
expectedParams.put("preference", preference);
}
if (randomBoolean()) {
String routing = randomAsciiOfLengthBetween(3, 10);
getRequest.routing(routing);
expectedParams.put("routing", routing);
}
if (randomBoolean()) {
boolean realtime = randomBoolean();
getRequest.realtime(realtime);
if (realtime == false) {
expectedParams.put("realtime", "false");
}
}
if (randomBoolean()) {
boolean refresh = randomBoolean();
getRequest.refresh(refresh);
if (refresh) {
expectedParams.put("refresh", "true");
}
}
if (randomBoolean()) {
long version = randomLong();
getRequest.version(version);
if (version != Versions.MATCH_ANY) {
expectedParams.put("version", Long.toString(version));
}
}
if (randomBoolean()) {
VersionType versionType = randomFrom(VersionType.values());
getRequest.versionType(versionType);
if (versionType != VersionType.INTERNAL) {
expectedParams.put("version_type", versionType.name().toLowerCase(Locale.ROOT));
}
}
if (randomBoolean()) {
int numStoredFields = randomIntBetween(1, 10);
String[] storedFields = new String[numStoredFields];
StringBuilder storedFieldsParam = new StringBuilder();
for (int i = 0; i < numStoredFields; i++) {
String storedField = randomAsciiOfLengthBetween(3, 10);
storedFields[i] = storedField;
storedFieldsParam.append(storedField);
if (i < numStoredFields - 1) {
storedFieldsParam.append(",");
}
}
getRequest.storedFields(storedFields);
expectedParams.put("stored_fields", storedFieldsParam.toString());
}
if (randomBoolean()) {
if (randomBoolean()) {
boolean fetchSource = randomBoolean();
getRequest.fetchSourceContext(new FetchSourceContext(fetchSource));
if (fetchSource == false) {
expectedParams.put("_source", "false");
}
} else {
int numIncludes = randomIntBetween(0, 5);
String[] includes = new String[numIncludes];
StringBuilder includesParam = new StringBuilder();
for (int i = 0; i < numIncludes; i++) {
String include = randomAsciiOfLengthBetween(3, 10);
includes[i] = include;
includesParam.append(include);
if (i < numIncludes - 1) {
includesParam.append(",");
}
}
if (numIncludes > 0) {
expectedParams.put("_source_include", includesParam.toString());
}
int numExcludes = randomIntBetween(0, 5);
String[] excludes = new String[numExcludes];
StringBuilder excludesParam = new StringBuilder();
for (int i = 0; i < numExcludes; i++) {
String exclude = randomAsciiOfLengthBetween(3, 10);
excludes[i] = exclude;
excludesParam.append(exclude);
if (i < numExcludes - 1) {
excludesParam.append(",");
}
}
if (numExcludes > 0) {
expectedParams.put("_source_exclude", excludesParam.toString());
}
getRequest.fetchSourceContext(new FetchSourceContext(true, includes, excludes));
}
}
}
Request request = requestConverter.apply(getRequest);
assertEquals("/" + index + "/" + type + "/" + id, request.endpoint);
assertEquals(expectedParams, request.params);
assertNull(request.entity);
assertEquals(method, request.method);
}
}

View File

@ -19,17 +19,51 @@
package org.elasticsearch.client;
import com.fasterxml.jackson.core.JsonParseException;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.HttpResponse;
import org.apache.http.ProtocolVersion;
import org.apache.http.RequestLine;
import org.apache.http.StatusLine;
import org.apache.http.entity.BasicHttpEntity;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.message.BasicHttpResponse;
import org.apache.http.message.BasicRequestLine;
import org.apache.http.message.BasicStatusLine;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.main.MainRequest;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.cbor.CborXContent;
import org.elasticsearch.common.xcontent.smile.SmileXContent;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import org.mockito.ArgumentMatcher;
import org.mockito.Matchers;
import org.mockito.internal.matchers.ArrayEquals;
import org.mockito.internal.matchers.VarargMatcher;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import static org.mockito.Matchers.any;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.mockito.Matchers.anyMapOf;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.anyVararg;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
@ -38,6 +72,9 @@ import static org.mockito.Mockito.when;
public class RestHighLevelClientTests extends ESTestCase {
private static final ProtocolVersion HTTP_PROTOCOL = new ProtocolVersion("http", 1, 1);
private static final RequestLine REQUEST_LINE = new BasicRequestLine("GET", "/", HTTP_PROTOCOL);
private RestClient restClient;
private RestHighLevelClient restHighLevelClient;
@ -47,28 +84,473 @@ public class RestHighLevelClientTests extends ESTestCase {
restHighLevelClient = new RestHighLevelClient(restClient);
}
public void testPing() throws IOException {
assertTrue(restHighLevelClient.ping());
verify(restClient).performRequest(eq("HEAD"), eq("/"), argThat(new HeadersVarargMatcher()));
}
public void testPingFailure() throws IOException {
when(restClient.performRequest(any(), any())).thenThrow(new IllegalStateException());
expectThrows(IllegalStateException.class, () -> restHighLevelClient.ping());
}
public void testPingFailed() throws IOException {
when(restClient.performRequest(any(), any())).thenThrow(new SocketTimeoutException());
assertFalse(restHighLevelClient.ping());
}
public void testPingWithHeaders() throws IOException {
public void testPingSuccessful() throws IOException {
Header[] headers = RestClientTestUtil.randomHeaders(random(), "Header");
Response response = mock(Response.class);
when(response.getStatusLine()).thenReturn(newStatusLine(RestStatus.OK));
when(restClient.performRequest(anyString(), anyString(), anyMapOf(String.class, String.class),
anyObject(), anyVararg())).thenReturn(response);
assertTrue(restHighLevelClient.ping(headers));
verify(restClient).performRequest(eq("HEAD"), eq("/"), argThat(new HeadersVarargMatcher(headers)));
verify(restClient).performRequest(eq("HEAD"), eq("/"), eq(Collections.emptyMap()),
Matchers.isNull(HttpEntity.class), argThat(new HeadersVarargMatcher(headers)));
}
private class HeadersVarargMatcher extends ArgumentMatcher<Header[]> implements VarargMatcher {
public void testPing404NotFound() throws IOException {
Header[] headers = RestClientTestUtil.randomHeaders(random(), "Header");
Response response = mock(Response.class);
when(response.getStatusLine()).thenReturn(newStatusLine(RestStatus.NOT_FOUND));
when(restClient.performRequest(anyString(), anyString(), anyMapOf(String.class, String.class),
anyObject(), anyVararg())).thenReturn(response);
assertFalse(restHighLevelClient.ping(headers));
verify(restClient).performRequest(eq("HEAD"), eq("/"), eq(Collections.emptyMap()),
Matchers.isNull(HttpEntity.class), argThat(new HeadersVarargMatcher(headers)));
}
public void testPingSocketTimeout() throws IOException {
Header[] headers = RestClientTestUtil.randomHeaders(random(), "Header");
when(restClient.performRequest(anyString(), anyString(), anyMapOf(String.class, String.class),
anyObject(), anyVararg())).thenThrow(new SocketTimeoutException());
expectThrows(SocketTimeoutException.class, () -> restHighLevelClient.ping(headers));
verify(restClient).performRequest(eq("HEAD"), eq("/"), eq(Collections.emptyMap()),
Matchers.isNull(HttpEntity.class), argThat(new HeadersVarargMatcher(headers)));
}
public void testRequestValidation() {
ActionRequestValidationException validationException = new ActionRequestValidationException();
validationException.addValidationError("validation error");
ActionRequest request = new ActionRequest() {
@Override
public ActionRequestValidationException validate() {
return validationException;
}
};
{
ActionRequestValidationException actualException = expectThrows(ActionRequestValidationException.class,
() -> restHighLevelClient.performRequest(request, null, null, null));
assertSame(validationException, actualException);
}
{
TrackingActionListener trackingActionListener = new TrackingActionListener();
restHighLevelClient.performRequestAsync(request, null, null, trackingActionListener, null);
assertSame(validationException, trackingActionListener.exception.get());
}
}
public void testParseEntity() throws IOException {
{
IllegalStateException ise = expectThrows(IllegalStateException.class, () -> RestHighLevelClient.parseEntity(null, null));
assertEquals("Response body expected but not returned", ise.getMessage());
}
{
IllegalStateException ise = expectThrows(IllegalStateException.class,
() -> RestHighLevelClient.parseEntity(new BasicHttpEntity(), null));
assertEquals("Elasticsearch didn't return the [Content-Type] header, unable to parse response body", ise.getMessage());
}
{
StringEntity entity = new StringEntity("", ContentType.APPLICATION_SVG_XML);
IllegalStateException ise = expectThrows(IllegalStateException.class, () -> RestHighLevelClient.parseEntity(entity, null));
assertEquals("Unsupported Content-Type: " + entity.getContentType().getValue(), ise.getMessage());
}
{
CheckedFunction<XContentParser, String, IOException> entityParser = parser -> {
assertEquals(XContentParser.Token.START_OBJECT, parser.nextToken());
assertEquals(XContentParser.Token.FIELD_NAME, parser.nextToken());
assertTrue(parser.nextToken().isValue());
String value = parser.text();
assertEquals(XContentParser.Token.END_OBJECT, parser.nextToken());
return value;
};
HttpEntity jsonEntity = new StringEntity("{\"field\":\"value\"}", ContentType.APPLICATION_JSON);
assertEquals("value", RestHighLevelClient.parseEntity(jsonEntity, entityParser));
HttpEntity yamlEntity = new StringEntity("---\nfield: value\n", ContentType.create("application/yaml"));
assertEquals("value", RestHighLevelClient.parseEntity(yamlEntity, entityParser));
HttpEntity smileEntity = createBinaryEntity(SmileXContent.contentBuilder(), ContentType.create("application/smile"));
assertEquals("value", RestHighLevelClient.parseEntity(smileEntity, entityParser));
HttpEntity cborEntity = createBinaryEntity(CborXContent.contentBuilder(), ContentType.create("application/cbor"));
assertEquals("value", RestHighLevelClient.parseEntity(cborEntity, entityParser));
}
}
private static HttpEntity createBinaryEntity(XContentBuilder xContentBuilder, ContentType contentType) throws IOException {
try (XContentBuilder builder = xContentBuilder) {
builder.startObject();
builder.field("field", "value");
builder.endObject();
return new ByteArrayEntity(builder.bytes().toBytesRef().bytes, contentType);
}
}
public void testConvertExistsResponse() {
RestStatus restStatus = randomBoolean() ? RestStatus.OK : randomFrom(RestStatus.values());
HttpResponse httpResponse = new BasicHttpResponse(newStatusLine(restStatus));
Response response = new Response(REQUEST_LINE, new HttpHost("localhost", 9200), httpResponse);
boolean result = RestHighLevelClient.convertExistsResponse(response);
assertEquals(restStatus == RestStatus.OK, result);
}
public void testParseResponseException() throws IOException {
{
RestStatus restStatus = randomFrom(RestStatus.values());
HttpResponse httpResponse = new BasicHttpResponse(newStatusLine(restStatus));
Response response = new Response(REQUEST_LINE, new HttpHost("localhost", 9200), httpResponse);
ResponseException responseException = new ResponseException(response);
ElasticsearchException elasticsearchException = RestHighLevelClient.parseResponseException(responseException);
assertEquals(responseException.getMessage(), elasticsearchException.getMessage());
assertEquals(restStatus, elasticsearchException.status());
assertSame(responseException, elasticsearchException.getCause());
}
{
RestStatus restStatus = randomFrom(RestStatus.values());
HttpResponse httpResponse = new BasicHttpResponse(newStatusLine(restStatus));
httpResponse.setEntity(new StringEntity("{\"error\":\"test error message\",\"status\":" + restStatus.getStatus() + "}",
ContentType.APPLICATION_JSON));
Response response = new Response(REQUEST_LINE, new HttpHost("localhost", 9200), httpResponse);
ResponseException responseException = new ResponseException(response);
ElasticsearchException elasticsearchException = RestHighLevelClient.parseResponseException(responseException);
assertEquals("Elasticsearch exception [type=exception, reason=test error message]", elasticsearchException.getMessage());
assertEquals(restStatus, elasticsearchException.status());
assertSame(responseException, elasticsearchException.getSuppressed()[0]);
}
{
RestStatus restStatus = randomFrom(RestStatus.values());
HttpResponse httpResponse = new BasicHttpResponse(newStatusLine(restStatus));
httpResponse.setEntity(new StringEntity("{\"error\":", ContentType.APPLICATION_JSON));
Response response = new Response(REQUEST_LINE, new HttpHost("localhost", 9200), httpResponse);
ResponseException responseException = new ResponseException(response);
ElasticsearchException elasticsearchException = RestHighLevelClient.parseResponseException(responseException);
assertEquals("Unable to parse response body", elasticsearchException.getMessage());
assertEquals(restStatus, elasticsearchException.status());
assertSame(responseException, elasticsearchException.getCause());
assertThat(elasticsearchException.getSuppressed()[0], instanceOf(IOException.class));
}
{
RestStatus restStatus = randomFrom(RestStatus.values());
HttpResponse httpResponse = new BasicHttpResponse(newStatusLine(restStatus));
httpResponse.setEntity(new StringEntity("{\"status\":" + restStatus.getStatus() + "}", ContentType.APPLICATION_JSON));
Response response = new Response(REQUEST_LINE, new HttpHost("localhost", 9200), httpResponse);
ResponseException responseException = new ResponseException(response);
ElasticsearchException elasticsearchException = RestHighLevelClient.parseResponseException(responseException);
assertEquals("Unable to parse response body", elasticsearchException.getMessage());
assertEquals(restStatus, elasticsearchException.status());
assertSame(responseException, elasticsearchException.getCause());
assertThat(elasticsearchException.getSuppressed()[0], instanceOf(IllegalStateException.class));
}
}
public void testPerformRequestOnSuccess() throws IOException {
MainRequest mainRequest = new MainRequest();
Function<MainRequest, Request> requestConverter = request -> new Request("GET", "/", Collections.emptyMap(), null);
RestStatus restStatus = randomFrom(RestStatus.values());
HttpResponse httpResponse = new BasicHttpResponse(newStatusLine(restStatus));
Response mockResponse = new Response(REQUEST_LINE, new HttpHost("localhost", 9200), httpResponse);
when(restClient.performRequest(anyString(), anyString(), anyMapOf(String.class, String.class),
anyObject(), anyVararg())).thenReturn(mockResponse);
{
Integer result = restHighLevelClient.performRequest(mainRequest, requestConverter,
response -> response.getStatusLine().getStatusCode(), Collections.emptySet());
assertEquals(restStatus.getStatus(), result.intValue());
}
{
IOException ioe = expectThrows(IOException.class, () -> restHighLevelClient.performRequest(mainRequest,
requestConverter, response -> {throw new IllegalStateException();}, Collections.emptySet()));
assertEquals("Unable to parse response body for Response{requestLine=GET / http/1.1, host=http://localhost:9200, " +
"response=http/1.1 " + restStatus.getStatus() + " " + restStatus.name() + "}", ioe.getMessage());
}
}
public void testPerformRequestOnResponseExceptionWithoutEntity() throws IOException {
MainRequest mainRequest = new MainRequest();
Function<MainRequest, Request> requestConverter = request -> new Request("GET", "/", Collections.emptyMap(), null);
RestStatus restStatus = randomFrom(RestStatus.values());
HttpResponse httpResponse = new BasicHttpResponse(newStatusLine(restStatus));
Response mockResponse = new Response(REQUEST_LINE, new HttpHost("localhost", 9200), httpResponse);
ResponseException responseException = new ResponseException(mockResponse);
when(restClient.performRequest(anyString(), anyString(), anyMapOf(String.class, String.class),
anyObject(), anyVararg())).thenThrow(responseException);
ElasticsearchException elasticsearchException = expectThrows(ElasticsearchException.class,
() -> restHighLevelClient.performRequest(mainRequest, requestConverter,
response -> response.getStatusLine().getStatusCode(), Collections.emptySet()));
assertEquals(responseException.getMessage(), elasticsearchException.getMessage());
assertEquals(restStatus, elasticsearchException.status());
assertSame(responseException, elasticsearchException.getCause());
}
public void testPerformRequestOnResponseExceptionWithEntity() throws IOException {
MainRequest mainRequest = new MainRequest();
Function<MainRequest, Request> requestConverter = request -> new Request("GET", "/", Collections.emptyMap(), null);
RestStatus restStatus = randomFrom(RestStatus.values());
HttpResponse httpResponse = new BasicHttpResponse(newStatusLine(restStatus));
httpResponse.setEntity(new StringEntity("{\"error\":\"test error message\",\"status\":" + restStatus.getStatus() + "}",
ContentType.APPLICATION_JSON));
Response mockResponse = new Response(REQUEST_LINE, new HttpHost("localhost", 9200), httpResponse);
ResponseException responseException = new ResponseException(mockResponse);
when(restClient.performRequest(anyString(), anyString(), anyMapOf(String.class, String.class),
anyObject(), anyVararg())).thenThrow(responseException);
ElasticsearchException elasticsearchException = expectThrows(ElasticsearchException.class,
() -> restHighLevelClient.performRequest(mainRequest, requestConverter,
response -> response.getStatusLine().getStatusCode(), Collections.emptySet()));
assertEquals("Elasticsearch exception [type=exception, reason=test error message]", elasticsearchException.getMessage());
assertEquals(restStatus, elasticsearchException.status());
assertSame(responseException, elasticsearchException.getSuppressed()[0]);
}
public void testPerformRequestOnResponseExceptionWithBrokenEntity() throws IOException {
MainRequest mainRequest = new MainRequest();
Function<MainRequest, Request> requestConverter = request -> new Request("GET", "/", Collections.emptyMap(), null);
RestStatus restStatus = randomFrom(RestStatus.values());
HttpResponse httpResponse = new BasicHttpResponse(newStatusLine(restStatus));
httpResponse.setEntity(new StringEntity("{\"error\":", ContentType.APPLICATION_JSON));
Response mockResponse = new Response(REQUEST_LINE, new HttpHost("localhost", 9200), httpResponse);
ResponseException responseException = new ResponseException(mockResponse);
when(restClient.performRequest(anyString(), anyString(), anyMapOf(String.class, String.class),
anyObject(), anyVararg())).thenThrow(responseException);
ElasticsearchException elasticsearchException = expectThrows(ElasticsearchException.class,
() -> restHighLevelClient.performRequest(mainRequest, requestConverter,
response -> response.getStatusLine().getStatusCode(), Collections.emptySet()));
assertEquals("Unable to parse response body", elasticsearchException.getMessage());
assertEquals(restStatus, elasticsearchException.status());
assertSame(responseException, elasticsearchException.getCause());
assertThat(elasticsearchException.getSuppressed()[0], instanceOf(JsonParseException.class));
}
public void testPerformRequestOnResponseExceptionWithBrokenEntity2() throws IOException {
MainRequest mainRequest = new MainRequest();
Function<MainRequest, Request> requestConverter = request -> new Request("GET", "/", Collections.emptyMap(), null);
RestStatus restStatus = randomFrom(RestStatus.values());
HttpResponse httpResponse = new BasicHttpResponse(newStatusLine(restStatus));
httpResponse.setEntity(new StringEntity("{\"status\":" + restStatus.getStatus() + "}", ContentType.APPLICATION_JSON));
Response mockResponse = new Response(REQUEST_LINE, new HttpHost("localhost", 9200), httpResponse);
ResponseException responseException = new ResponseException(mockResponse);
when(restClient.performRequest(anyString(), anyString(), anyMapOf(String.class, String.class),
anyObject(), anyVararg())).thenThrow(responseException);
ElasticsearchException elasticsearchException = expectThrows(ElasticsearchException.class,
() -> restHighLevelClient.performRequest(mainRequest, requestConverter,
response -> response.getStatusLine().getStatusCode(), Collections.emptySet()));
assertEquals("Unable to parse response body", elasticsearchException.getMessage());
assertEquals(restStatus, elasticsearchException.status());
assertSame(responseException, elasticsearchException.getCause());
assertThat(elasticsearchException.getSuppressed()[0], instanceOf(IllegalStateException.class));
}
public void testPerformRequestOnResponseExceptionWithIgnores() throws IOException {
MainRequest mainRequest = new MainRequest();
Function<MainRequest, Request> requestConverter = request -> new Request("GET", "/", Collections.emptyMap(), null);
HttpResponse httpResponse = new BasicHttpResponse(newStatusLine(RestStatus.NOT_FOUND));
Response mockResponse = new Response(REQUEST_LINE, new HttpHost("localhost", 9200), httpResponse);
ResponseException responseException = new ResponseException(mockResponse);
when(restClient.performRequest(anyString(), anyString(), anyMapOf(String.class, String.class),
anyObject(), anyVararg())).thenThrow(responseException);
//although we got an exception, we turn it into a successful response because the status code was provided among ignores
assertEquals(Integer.valueOf(404), restHighLevelClient.performRequest(mainRequest, requestConverter,
response -> response.getStatusLine().getStatusCode(), Collections.singleton(404)));
}
public void testPerformRequestOnResponseExceptionWithIgnoresErrorNoBody() throws IOException {
MainRequest mainRequest = new MainRequest();
Function<MainRequest, Request> requestConverter = request -> new Request("GET", "/", Collections.emptyMap(), null);
HttpResponse httpResponse = new BasicHttpResponse(newStatusLine(RestStatus.NOT_FOUND));
Response mockResponse = new Response(REQUEST_LINE, new HttpHost("localhost", 9200), httpResponse);
ResponseException responseException = new ResponseException(mockResponse);
when(restClient.performRequest(anyString(), anyString(), anyMapOf(String.class, String.class),
anyObject(), anyVararg())).thenThrow(responseException);
ElasticsearchException elasticsearchException = expectThrows(ElasticsearchException.class,
() -> restHighLevelClient.performRequest(mainRequest, requestConverter,
response -> {throw new IllegalStateException();}, Collections.singleton(404)));
assertEquals(RestStatus.NOT_FOUND, elasticsearchException.status());
assertSame(responseException, elasticsearchException.getCause());
assertEquals(responseException.getMessage(), elasticsearchException.getMessage());
}
public void testPerformRequestOnResponseExceptionWithIgnoresErrorValidBody() throws IOException {
MainRequest mainRequest = new MainRequest();
Function<MainRequest, Request> requestConverter = request -> new Request("GET", "/", Collections.emptyMap(), null);
HttpResponse httpResponse = new BasicHttpResponse(newStatusLine(RestStatus.NOT_FOUND));
httpResponse.setEntity(new StringEntity("{\"error\":\"test error message\",\"status\":404}",
ContentType.APPLICATION_JSON));
Response mockResponse = new Response(REQUEST_LINE, new HttpHost("localhost", 9200), httpResponse);
ResponseException responseException = new ResponseException(mockResponse);
when(restClient.performRequest(anyString(), anyString(), anyMapOf(String.class, String.class),
anyObject(), anyVararg())).thenThrow(responseException);
ElasticsearchException elasticsearchException = expectThrows(ElasticsearchException.class,
() -> restHighLevelClient.performRequest(mainRequest, requestConverter,
response -> {throw new IllegalStateException();}, Collections.singleton(404)));
assertEquals(RestStatus.NOT_FOUND, elasticsearchException.status());
assertSame(responseException, elasticsearchException.getSuppressed()[0]);
assertEquals("Elasticsearch exception [type=exception, reason=test error message]", elasticsearchException.getMessage());
}
public void testWrapResponseListenerOnSuccess() throws IOException {
{
TrackingActionListener trackingActionListener = new TrackingActionListener();
ResponseListener responseListener = RestHighLevelClient.wrapResponseListener(
response -> response.getStatusLine().getStatusCode(), trackingActionListener, Collections.emptySet());
RestStatus restStatus = randomFrom(RestStatus.values());
HttpResponse httpResponse = new BasicHttpResponse(newStatusLine(restStatus));
responseListener.onSuccess(new Response(REQUEST_LINE, new HttpHost("localhost", 9200), httpResponse));
assertNull(trackingActionListener.exception.get());
assertEquals(restStatus.getStatus(), trackingActionListener.statusCode.get());
}
{
TrackingActionListener trackingActionListener = new TrackingActionListener();
ResponseListener responseListener = RestHighLevelClient.wrapResponseListener(
response -> {throw new IllegalStateException();}, trackingActionListener, Collections.emptySet());
RestStatus restStatus = randomFrom(RestStatus.values());
HttpResponse httpResponse = new BasicHttpResponse(newStatusLine(restStatus));
responseListener.onSuccess(new Response(REQUEST_LINE, new HttpHost("localhost", 9200), httpResponse));
assertThat(trackingActionListener.exception.get(), instanceOf(IOException.class));
IOException ioe = (IOException) trackingActionListener.exception.get();
assertEquals("Unable to parse response body for Response{requestLine=GET / http/1.1, host=http://localhost:9200, " +
"response=http/1.1 " + restStatus.getStatus() + " " + restStatus.name() + "}", ioe.getMessage());
assertThat(ioe.getCause(), instanceOf(IllegalStateException.class));
}
}
public void testWrapResponseListenerOnException() throws IOException {
TrackingActionListener trackingActionListener = new TrackingActionListener();
ResponseListener responseListener = RestHighLevelClient.wrapResponseListener(
response -> response.getStatusLine().getStatusCode(), trackingActionListener, Collections.emptySet());
IllegalStateException exception = new IllegalStateException();
responseListener.onFailure(exception);
assertSame(exception, trackingActionListener.exception.get());
}
public void testWrapResponseListenerOnResponseExceptionWithoutEntity() throws IOException {
TrackingActionListener trackingActionListener = new TrackingActionListener();
ResponseListener responseListener = RestHighLevelClient.wrapResponseListener(
response -> response.getStatusLine().getStatusCode(), trackingActionListener, Collections.emptySet());
RestStatus restStatus = randomFrom(RestStatus.values());
HttpResponse httpResponse = new BasicHttpResponse(newStatusLine(restStatus));
Response response = new Response(REQUEST_LINE, new HttpHost("localhost", 9200), httpResponse);
ResponseException responseException = new ResponseException(response);
responseListener.onFailure(responseException);
assertThat(trackingActionListener.exception.get(), instanceOf(ElasticsearchException.class));
ElasticsearchException elasticsearchException = (ElasticsearchException) trackingActionListener.exception.get();
assertEquals(responseException.getMessage(), elasticsearchException.getMessage());
assertEquals(restStatus, elasticsearchException.status());
assertSame(responseException, elasticsearchException.getCause());
}
public void testWrapResponseListenerOnResponseExceptionWithEntity() throws IOException {
TrackingActionListener trackingActionListener = new TrackingActionListener();
ResponseListener responseListener = RestHighLevelClient.wrapResponseListener(
response -> response.getStatusLine().getStatusCode(), trackingActionListener, Collections.emptySet());
RestStatus restStatus = randomFrom(RestStatus.values());
HttpResponse httpResponse = new BasicHttpResponse(newStatusLine(restStatus));
httpResponse.setEntity(new StringEntity("{\"error\":\"test error message\",\"status\":" + restStatus.getStatus() + "}",
ContentType.APPLICATION_JSON));
Response response = new Response(REQUEST_LINE, new HttpHost("localhost", 9200), httpResponse);
ResponseException responseException = new ResponseException(response);
responseListener.onFailure(responseException);
assertThat(trackingActionListener.exception.get(), instanceOf(ElasticsearchException.class));
ElasticsearchException elasticsearchException = (ElasticsearchException)trackingActionListener.exception.get();
assertEquals("Elasticsearch exception [type=exception, reason=test error message]", elasticsearchException.getMessage());
assertEquals(restStatus, elasticsearchException.status());
assertSame(responseException, elasticsearchException.getSuppressed()[0]);
}
public void testWrapResponseListenerOnResponseExceptionWithBrokenEntity() throws IOException {
{
TrackingActionListener trackingActionListener = new TrackingActionListener();
ResponseListener responseListener = RestHighLevelClient.wrapResponseListener(
response -> response.getStatusLine().getStatusCode(), trackingActionListener, Collections.emptySet());
RestStatus restStatus = randomFrom(RestStatus.values());
HttpResponse httpResponse = new BasicHttpResponse(newStatusLine(restStatus));
httpResponse.setEntity(new StringEntity("{\"error\":", ContentType.APPLICATION_JSON));
Response response = new Response(REQUEST_LINE, new HttpHost("localhost", 9200), httpResponse);
ResponseException responseException = new ResponseException(response);
responseListener.onFailure(responseException);
assertThat(trackingActionListener.exception.get(), instanceOf(ElasticsearchException.class));
ElasticsearchException elasticsearchException = (ElasticsearchException)trackingActionListener.exception.get();
assertEquals("Unable to parse response body", elasticsearchException.getMessage());
assertEquals(restStatus, elasticsearchException.status());
assertSame(responseException, elasticsearchException.getCause());
assertThat(elasticsearchException.getSuppressed()[0], instanceOf(JsonParseException.class));
}
{
TrackingActionListener trackingActionListener = new TrackingActionListener();
ResponseListener responseListener = RestHighLevelClient.wrapResponseListener(
response -> response.getStatusLine().getStatusCode(), trackingActionListener, Collections.emptySet());
RestStatus restStatus = randomFrom(RestStatus.values());
HttpResponse httpResponse = new BasicHttpResponse(newStatusLine(restStatus));
httpResponse.setEntity(new StringEntity("{\"status\":" + restStatus.getStatus() + "}", ContentType.APPLICATION_JSON));
Response response = new Response(REQUEST_LINE, new HttpHost("localhost", 9200), httpResponse);
ResponseException responseException = new ResponseException(response);
responseListener.onFailure(responseException);
assertThat(trackingActionListener.exception.get(), instanceOf(ElasticsearchException.class));
ElasticsearchException elasticsearchException = (ElasticsearchException)trackingActionListener.exception.get();
assertEquals("Unable to parse response body", elasticsearchException.getMessage());
assertEquals(restStatus, elasticsearchException.status());
assertSame(responseException, elasticsearchException.getCause());
assertThat(elasticsearchException.getSuppressed()[0], instanceOf(IllegalStateException.class));
}
}
public void testWrapResponseListenerOnResponseExceptionWithIgnores() throws IOException {
TrackingActionListener trackingActionListener = new TrackingActionListener();
ResponseListener responseListener = RestHighLevelClient.wrapResponseListener(
response -> response.getStatusLine().getStatusCode(), trackingActionListener, Collections.singleton(404));
HttpResponse httpResponse = new BasicHttpResponse(newStatusLine(RestStatus.NOT_FOUND));
Response response = new Response(REQUEST_LINE, new HttpHost("localhost", 9200), httpResponse);
ResponseException responseException = new ResponseException(response);
responseListener.onFailure(responseException);
//although we got an exception, we turn it into a successful response because the status code was provided among ignores
assertNull(trackingActionListener.exception.get());
assertEquals(404, trackingActionListener.statusCode.get());
}
public void testWrapResponseListenerOnResponseExceptionWithIgnoresErrorNoBody() throws IOException {
TrackingActionListener trackingActionListener = new TrackingActionListener();
//response parsing throws exception while handling ignores. same as when GetResponse#fromXContent throws error when trying
//to parse a 404 response which contains an error rather than a valid document not found response.
ResponseListener responseListener = RestHighLevelClient.wrapResponseListener(
response -> { throw new IllegalStateException(); }, trackingActionListener, Collections.singleton(404));
HttpResponse httpResponse = new BasicHttpResponse(newStatusLine(RestStatus.NOT_FOUND));
Response response = new Response(REQUEST_LINE, new HttpHost("localhost", 9200), httpResponse);
ResponseException responseException = new ResponseException(response);
responseListener.onFailure(responseException);
assertThat(trackingActionListener.exception.get(), instanceOf(ElasticsearchException.class));
ElasticsearchException elasticsearchException = (ElasticsearchException)trackingActionListener.exception.get();
assertEquals(RestStatus.NOT_FOUND, elasticsearchException.status());
assertSame(responseException, elasticsearchException.getCause());
assertEquals(responseException.getMessage(), elasticsearchException.getMessage());
}
public void testWrapResponseListenerOnResponseExceptionWithIgnoresErrorValidBody() throws IOException {
TrackingActionListener trackingActionListener = new TrackingActionListener();
//response parsing throws exception while handling ignores. same as when GetResponse#fromXContent throws error when trying
//to parse a 404 response which contains an error rather than a valid document not found response.
ResponseListener responseListener = RestHighLevelClient.wrapResponseListener(
response -> { throw new IllegalStateException(); }, trackingActionListener, Collections.singleton(404));
HttpResponse httpResponse = new BasicHttpResponse(newStatusLine(RestStatus.NOT_FOUND));
httpResponse.setEntity(new StringEntity("{\"error\":\"test error message\",\"status\":404}",
ContentType.APPLICATION_JSON));
Response response = new Response(REQUEST_LINE, new HttpHost("localhost", 9200), httpResponse);
ResponseException responseException = new ResponseException(response);
responseListener.onFailure(responseException);
assertThat(trackingActionListener.exception.get(), instanceOf(ElasticsearchException.class));
ElasticsearchException elasticsearchException = (ElasticsearchException)trackingActionListener.exception.get();
assertEquals(RestStatus.NOT_FOUND, elasticsearchException.status());
assertSame(responseException, elasticsearchException.getSuppressed()[0]);
assertEquals("Elasticsearch exception [type=exception, reason=test error message]", elasticsearchException.getMessage());
}
private static class TrackingActionListener implements ActionListener<Integer> {
private final AtomicInteger statusCode = new AtomicInteger(-1);
private final AtomicReference<Exception> exception = new AtomicReference<>();
@Override
public void onResponse(Integer statusCode) {
assertTrue(this.statusCode.compareAndSet(-1, statusCode));
}
@Override
public void onFailure(Exception e) {
assertTrue(exception.compareAndSet(null, e));
}
}
private static class HeadersVarargMatcher extends ArgumentMatcher<Header[]> implements VarargMatcher {
private Header[] expectedHeaders;
HeadersVarargMatcher(Header... expectedHeaders) {
@ -84,4 +566,8 @@ public class RestHighLevelClientTests extends ESTestCase {
return false;
}
}
private static StatusLine newStatusLine(RestStatus restStatus) {
return new BasicStatusLine(HTTP_PROTOCOL, restStatus.getStatus(), restStatus.name());
}
}