HLRC: Use nonblocking entity for requests (#32249)

Previously the HLRC used a blocking ByteArrayEntity, but the Request
class also allows to set a NByteArrayEntity, and defaults to nonblocking
when calling the createJsonEntity method. This commit cleans up all the
uses of ByteArrayEntity in the RequestConverters to use the nonblocking
entity.
This commit is contained in:
Michael Basnight 2019-01-08 09:11:58 -06:00 committed by GitHub
parent 054c3bb04f
commit dd69553d4d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 41 additions and 38 deletions

View File

@ -24,7 +24,7 @@ import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.nio.entity.NByteArrayEntity;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.client.RequestConverters.EndpointBuilder;
import org.elasticsearch.client.ml.CloseJobRequest;
@ -462,7 +462,7 @@ final class MLRequestConverters {
BytesReference content = postDataRequest.getContent();
if (content != null) {
BytesRef source = postDataRequest.getContent().toBytesRef();
HttpEntity byteEntity = new ByteArrayEntity(source.bytes,
HttpEntity byteEntity = new NByteArrayEntity(source.bytes,
source.offset,
source.length,
createContentType(postDataRequest.getXContentType()));
@ -686,7 +686,7 @@ final class MLRequestConverters {
BytesReference sample = findFileStructureRequest.getSample();
BytesRef source = sample.toBytesRef();
HttpEntity byteEntity = new ByteArrayEntity(source.bytes, source.offset, source.length, createContentType(XContentType.JSON));
HttpEntity byteEntity = new NByteArrayEntity(source.bytes, source.offset, source.length, createContentType(XContentType.JSON));
request.setEntity(byteEntity);
return request;
}

View File

@ -25,8 +25,8 @@ import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpHead;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.nio.entity.NByteArrayEntity;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
@ -239,7 +239,7 @@ final class RequestConverters {
content.write(separator);
}
}
request.setEntity(new ByteArrayEntity(content.toByteArray(), 0, content.size(), requestContentType));
request.setEntity(new NByteArrayEntity(content.toByteArray(), 0, content.size(), requestContentType));
return request;
}
@ -322,7 +322,7 @@ final class RequestConverters {
BytesRef source = indexRequest.source().toBytesRef();
ContentType contentType = createContentType(indexRequest.getContentType());
request.setEntity(new ByteArrayEntity(source.bytes, source.offset, source.length, contentType));
request.setEntity(new NByteArrayEntity(source.bytes, source.offset, source.length, contentType));
return request;
}
@ -431,7 +431,7 @@ final class RequestConverters {
XContent xContent = REQUEST_BODY_CONTENT_TYPE.xContent();
byte[] source = MultiSearchRequest.writeMultiLineFormat(multiSearchRequest, xContent);
request.setEntity(new ByteArrayEntity(source, createContentType(xContent.type())));
request.setEntity(new NByteArrayEntity(source, createContentType(xContent.type())));
return request;
}
@ -464,7 +464,7 @@ final class RequestConverters {
XContent xContent = REQUEST_BODY_CONTENT_TYPE.xContent();
byte[] source = MultiSearchTemplateRequest.writeMultiLineFormat(multiSearchTemplateRequest, xContent);
request.setEntity(new ByteArrayEntity(source, createContentType(xContent.type())));
request.setEntity(new NByteArrayEntity(source, createContentType(xContent.type())));
return request;
}
@ -694,7 +694,7 @@ final class RequestConverters {
static HttpEntity createEntity(ToXContent toXContent, XContentType xContentType, ToXContent.Params toXContentParams)
throws IOException {
BytesRef source = XContentHelper.toXContent(toXContent, xContentType, toXContentParams, false).toBytesRef();
return new ByteArrayEntity(source.bytes, source.offset, source.length, createContentType(xContentType));
return new NByteArrayEntity(source.bytes, source.offset, source.length, createContentType(xContentType));
}
static String endpoint(String index, String type, String id) {

View File

@ -23,8 +23,8 @@ import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.nio.entity.NByteArrayEntity;
import org.elasticsearch.client.watcher.AckWatchRequest;
import org.elasticsearch.client.watcher.ActivateWatchRequest;
import org.elasticsearch.client.watcher.DeactivateWatchRequest;
@ -75,7 +75,7 @@ final class WatcherRequestConverters {
}
ContentType contentType = RequestConverters.createContentType(putWatchRequest.xContentType());
BytesReference source = putWatchRequest.getSource();
request.setEntity(new ByteArrayEntity(source.toBytesRef().bytes, 0, source.length(), contentType));
request.setEntity(new NByteArrayEntity(source.toBytesRef().bytes, 0, source.length(), contentType));
return request;
}

View File

@ -30,3 +30,7 @@ org.elasticsearch.common.logging.PrefixLogger
@defaultMessage We can't rely on log4j2 being on the classpath so don't log deprecations!
org.elasticsearch.common.xcontent.LoggingDeprecationHandler
@defaultMessage Use Nonblocking org.apache.http.nio.entity.NByteArrayEntity
org.apache.http.entity.ByteArrayEntity
org.apache.http.entity.StringEntity

View File

@ -24,10 +24,10 @@ import org.apache.http.HttpHost;
import org.apache.http.ProtocolVersion;
import org.apache.http.RequestLine;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.message.BasicRequestLine;
import org.apache.http.message.BasicStatusLine;
import org.apache.http.nio.entity.NByteArrayEntity;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.Build;
import org.elasticsearch.Version;
@ -166,7 +166,7 @@ public class CustomRestHighLevelClientTests extends ESTestCase {
MainResponse response = new MainResponse(httpHeader.getValue(), Version.CURRENT, ClusterName.DEFAULT, "_na", Build.CURRENT);
BytesRef bytesRef = XContentHelper.toXContent(response, XContentType.JSON, false).toBytesRef();
when(mockResponse.getEntity()).thenReturn(new ByteArrayEntity(bytesRef.bytes, ContentType.APPLICATION_JSON));
when(mockResponse.getEntity()).thenReturn(new NByteArrayEntity(bytesRef.bytes, ContentType.APPLICATION_JSON));
RequestLine requestLine = new BasicRequestLine(HttpGet.METHOD_NAME, ENDPOINT, protocol);
when(mockResponse.getRequestLine()).thenReturn(requestLine);

View File

@ -25,7 +25,7 @@ import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpHead;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.nio.entity.NByteArrayEntity;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.cluster.storedscripts.DeleteStoredScriptRequest;
@ -671,7 +671,7 @@ public class RequestConvertersTests extends ESTestCase {
assertEquals(method, request.getMethod());
HttpEntity entity = request.getEntity();
assertTrue(entity instanceof ByteArrayEntity);
assertTrue(entity instanceof NByteArrayEntity);
assertEquals(indexRequest.getContentType().mediaTypeWithoutParameters(), entity.getContentType().getValue());
try (XContentParser parser = createParser(xContentType.xContent(), entity.getContent())) {
assertEquals(nbFields, parser.map().size());
@ -714,7 +714,7 @@ public class RequestConvertersTests extends ESTestCase {
assertEquals(method, request.getMethod());
HttpEntity entity = request.getEntity();
assertTrue(entity instanceof ByteArrayEntity);
assertTrue(entity instanceof NByteArrayEntity);
assertEquals(indexRequest.getContentType().mediaTypeWithoutParameters(), entity.getContentType().getValue());
try (XContentParser parser = createParser(xContentType.xContent(), entity.getContent())) {
assertEquals(nbFields, parser.map().size());
@ -787,7 +787,7 @@ public class RequestConvertersTests extends ESTestCase {
assertEquals(HttpPost.METHOD_NAME, request.getMethod());
HttpEntity entity = request.getEntity();
assertTrue(entity instanceof ByteArrayEntity);
assertTrue(entity instanceof NByteArrayEntity);
UpdateRequest parsedUpdateRequest = new UpdateRequest();

View File

@ -21,7 +21,7 @@ package org.elasticsearch.client;
import org.apache.http.HttpEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.nio.entity.NStringEntity;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
@ -51,14 +51,14 @@ public class RestHighLevelClientExtTests extends ESTestCase {
public void testParseEntityCustomResponseSection() throws IOException {
{
HttpEntity jsonEntity = new StringEntity("{\"custom1\":{ \"field\":\"value\"}}", ContentType.APPLICATION_JSON);
HttpEntity jsonEntity = new NStringEntity("{\"custom1\":{ \"field\":\"value\"}}", ContentType.APPLICATION_JSON);
BaseCustomResponseSection customSection = restHighLevelClient.parseEntity(jsonEntity, BaseCustomResponseSection::fromXContent);
assertThat(customSection, instanceOf(CustomResponseSection1.class));
CustomResponseSection1 customResponseSection1 = (CustomResponseSection1) customSection;
assertEquals("value", customResponseSection1.value);
}
{
HttpEntity jsonEntity = new StringEntity("{\"custom2\":{ \"array\": [\"item1\", \"item2\"]}}", ContentType.APPLICATION_JSON);
HttpEntity jsonEntity = new NStringEntity("{\"custom2\":{ \"array\": [\"item1\", \"item2\"]}}", ContentType.APPLICATION_JSON);
BaseCustomResponseSection customSection = restHighLevelClient.parseEntity(jsonEntity, BaseCustomResponseSection::fromXContent);
assertThat(customSection, instanceOf(CustomResponseSection2.class));
CustomResponseSection2 customResponseSection2 = (CustomResponseSection2) customSection;

View File

@ -27,12 +27,11 @@ import org.apache.http.ProtocolVersion;
import org.apache.http.RequestLine;
import org.apache.http.StatusLine;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.message.BasicHttpResponse;
import org.apache.http.message.BasicRequestLine;
import org.apache.http.message.BasicStatusLine;
import org.apache.http.nio.entity.NByteArrayEntity;
import org.apache.http.nio.entity.NStringEntity;
import org.elasticsearch.Build;
import org.elasticsearch.ElasticsearchException;
@ -243,11 +242,11 @@ public class RestHighLevelClientTests extends ESTestCase {
}
{
IllegalStateException ise = expectThrows(IllegalStateException.class,
() -> restHighLevelClient.parseEntity(new StringEntity("", (ContentType) null), null));
() -> restHighLevelClient.parseEntity(new NStringEntity("", (ContentType) null), null));
assertEquals("Elasticsearch didn't return the [Content-Type] header, unable to parse response body", ise.getMessage());
}
{
StringEntity entity = new StringEntity("", ContentType.APPLICATION_SVG_XML);
NStringEntity entity = new NStringEntity("", ContentType.APPLICATION_SVG_XML);
IllegalStateException ise = expectThrows(IllegalStateException.class, () -> restHighLevelClient.parseEntity(entity, null));
assertEquals("Unsupported Content-Type: " + entity.getContentType().getValue(), ise.getMessage());
}
@ -260,9 +259,9 @@ public class RestHighLevelClientTests extends ESTestCase {
assertEquals(XContentParser.Token.END_OBJECT, parser.nextToken());
return value;
};
HttpEntity jsonEntity = new StringEntity("{\"field\":\"value\"}", ContentType.APPLICATION_JSON);
HttpEntity jsonEntity = new NStringEntity("{\"field\":\"value\"}", ContentType.APPLICATION_JSON);
assertEquals("value", restHighLevelClient.parseEntity(jsonEntity, entityParser));
HttpEntity yamlEntity = new StringEntity("---\nfield: value\n", ContentType.create("application/yaml"));
HttpEntity yamlEntity = new NStringEntity("---\nfield: value\n", ContentType.create("application/yaml"));
assertEquals("value", restHighLevelClient.parseEntity(yamlEntity, entityParser));
HttpEntity smileEntity = createBinaryEntity(SmileXContent.contentBuilder(), ContentType.create("application/smile"));
assertEquals("value", restHighLevelClient.parseEntity(smileEntity, entityParser));
@ -276,7 +275,7 @@ public class RestHighLevelClientTests extends ESTestCase {
builder.startObject();
builder.field("field", "value");
builder.endObject();
return new ByteArrayEntity(BytesReference.bytes(builder).toBytesRef().bytes, contentType);
return new NByteArrayEntity(BytesReference.bytes(builder).toBytesRef().bytes, contentType);
}
}
@ -302,7 +301,7 @@ public class RestHighLevelClientTests extends ESTestCase {
{
RestStatus restStatus = randomFrom(RestStatus.values());
HttpResponse httpResponse = new BasicHttpResponse(newStatusLine(restStatus));
httpResponse.setEntity(new StringEntity("{\"error\":\"test error message\",\"status\":" + restStatus.getStatus() + "}",
httpResponse.setEntity(new NStringEntity("{\"error\":\"test error message\",\"status\":" + restStatus.getStatus() + "}",
ContentType.APPLICATION_JSON));
Response response = new Response(REQUEST_LINE, new HttpHost("localhost", 9200), httpResponse);
ResponseException responseException = new ResponseException(response);
@ -314,7 +313,7 @@ public class RestHighLevelClientTests extends ESTestCase {
{
RestStatus restStatus = randomFrom(RestStatus.values());
HttpResponse httpResponse = new BasicHttpResponse(newStatusLine(restStatus));
httpResponse.setEntity(new StringEntity("{\"error\":", ContentType.APPLICATION_JSON));
httpResponse.setEntity(new NStringEntity("{\"error\":", ContentType.APPLICATION_JSON));
Response response = new Response(REQUEST_LINE, new HttpHost("localhost", 9200), httpResponse);
ResponseException responseException = new ResponseException(response);
ElasticsearchException elasticsearchException = restHighLevelClient.parseResponseException(responseException);
@ -326,7 +325,7 @@ public class RestHighLevelClientTests extends ESTestCase {
{
RestStatus restStatus = randomFrom(RestStatus.values());
HttpResponse httpResponse = new BasicHttpResponse(newStatusLine(restStatus));
httpResponse.setEntity(new StringEntity("{\"status\":" + restStatus.getStatus() + "}", ContentType.APPLICATION_JSON));
httpResponse.setEntity(new NStringEntity("{\"status\":" + restStatus.getStatus() + "}", ContentType.APPLICATION_JSON));
Response response = new Response(REQUEST_LINE, new HttpHost("localhost", 9200), httpResponse);
ResponseException responseException = new ResponseException(response);
ElasticsearchException elasticsearchException = restHighLevelClient.parseResponseException(responseException);
@ -378,7 +377,7 @@ public class RestHighLevelClientTests extends ESTestCase {
CheckedFunction<MainRequest, Request, IOException> requestConverter = request -> new Request(HttpGet.METHOD_NAME, "/");
RestStatus restStatus = randomFrom(RestStatus.values());
HttpResponse httpResponse = new BasicHttpResponse(newStatusLine(restStatus));
httpResponse.setEntity(new StringEntity("{\"error\":\"test error message\",\"status\":" + restStatus.getStatus() + "}",
httpResponse.setEntity(new NStringEntity("{\"error\":\"test error message\",\"status\":" + restStatus.getStatus() + "}",
ContentType.APPLICATION_JSON));
Response mockResponse = new Response(REQUEST_LINE, new HttpHost("localhost", 9200), httpResponse);
ResponseException responseException = new ResponseException(mockResponse);
@ -396,7 +395,7 @@ public class RestHighLevelClientTests extends ESTestCase {
CheckedFunction<MainRequest, Request, IOException> requestConverter = request -> new Request(HttpGet.METHOD_NAME, "/");
RestStatus restStatus = randomFrom(RestStatus.values());
HttpResponse httpResponse = new BasicHttpResponse(newStatusLine(restStatus));
httpResponse.setEntity(new StringEntity("{\"error\":", ContentType.APPLICATION_JSON));
httpResponse.setEntity(new NStringEntity("{\"error\":", ContentType.APPLICATION_JSON));
Response mockResponse = new Response(REQUEST_LINE, new HttpHost("localhost", 9200), httpResponse);
ResponseException responseException = new ResponseException(mockResponse);
when(restClient.performRequest(any(Request.class))).thenThrow(responseException);
@ -414,7 +413,7 @@ public class RestHighLevelClientTests extends ESTestCase {
CheckedFunction<MainRequest, Request, IOException> requestConverter = request -> new Request(HttpGet.METHOD_NAME, "/");
RestStatus restStatus = randomFrom(RestStatus.values());
HttpResponse httpResponse = new BasicHttpResponse(newStatusLine(restStatus));
httpResponse.setEntity(new StringEntity("{\"status\":" + restStatus.getStatus() + "}", ContentType.APPLICATION_JSON));
httpResponse.setEntity(new NStringEntity("{\"status\":" + restStatus.getStatus() + "}", ContentType.APPLICATION_JSON));
Response mockResponse = new Response(REQUEST_LINE, new HttpHost("localhost", 9200), httpResponse);
ResponseException responseException = new ResponseException(mockResponse);
when(restClient.performRequest(any(Request.class))).thenThrow(responseException);
@ -458,7 +457,7 @@ public class RestHighLevelClientTests extends ESTestCase {
MainRequest mainRequest = new MainRequest();
CheckedFunction<MainRequest, Request, IOException> requestConverter = request -> new Request(HttpGet.METHOD_NAME, "/");
HttpResponse httpResponse = new BasicHttpResponse(newStatusLine(RestStatus.NOT_FOUND));
httpResponse.setEntity(new StringEntity("{\"error\":\"test error message\",\"status\":404}",
httpResponse.setEntity(new NStringEntity("{\"error\":\"test error message\",\"status\":404}",
ContentType.APPLICATION_JSON));
Response mockResponse = new Response(REQUEST_LINE, new HttpHost("localhost", 9200), httpResponse);
ResponseException responseException = new ResponseException(mockResponse);
@ -528,7 +527,7 @@ public class RestHighLevelClientTests extends ESTestCase {
response -> response.getStatusLine().getStatusCode(), trackingActionListener, Collections.emptySet());
RestStatus restStatus = randomFrom(RestStatus.values());
HttpResponse httpResponse = new BasicHttpResponse(newStatusLine(restStatus));
httpResponse.setEntity(new StringEntity("{\"error\":\"test error message\",\"status\":" + restStatus.getStatus() + "}",
httpResponse.setEntity(new NStringEntity("{\"error\":\"test error message\",\"status\":" + restStatus.getStatus() + "}",
ContentType.APPLICATION_JSON));
Response response = new Response(REQUEST_LINE, new HttpHost("localhost", 9200), httpResponse);
ResponseException responseException = new ResponseException(response);
@ -547,7 +546,7 @@ public class RestHighLevelClientTests extends ESTestCase {
response -> response.getStatusLine().getStatusCode(), trackingActionListener, Collections.emptySet());
RestStatus restStatus = randomFrom(RestStatus.values());
HttpResponse httpResponse = new BasicHttpResponse(newStatusLine(restStatus));
httpResponse.setEntity(new StringEntity("{\"error\":", ContentType.APPLICATION_JSON));
httpResponse.setEntity(new NStringEntity("{\"error\":", ContentType.APPLICATION_JSON));
Response response = new Response(REQUEST_LINE, new HttpHost("localhost", 9200), httpResponse);
ResponseException responseException = new ResponseException(response);
responseListener.onFailure(responseException);
@ -564,7 +563,7 @@ public class RestHighLevelClientTests extends ESTestCase {
response -> response.getStatusLine().getStatusCode(), trackingActionListener, Collections.emptySet());
RestStatus restStatus = randomFrom(RestStatus.values());
HttpResponse httpResponse = new BasicHttpResponse(newStatusLine(restStatus));
httpResponse.setEntity(new StringEntity("{\"status\":" + restStatus.getStatus() + "}", ContentType.APPLICATION_JSON));
httpResponse.setEntity(new NStringEntity("{\"status\":" + restStatus.getStatus() + "}", ContentType.APPLICATION_JSON));
Response response = new Response(REQUEST_LINE, new HttpHost("localhost", 9200), httpResponse);
ResponseException responseException = new ResponseException(response);
responseListener.onFailure(responseException);
@ -614,7 +613,7 @@ public class RestHighLevelClientTests extends ESTestCase {
ResponseListener responseListener = restHighLevelClient.wrapResponseListener(
response -> { throw new IllegalStateException(); }, trackingActionListener, Collections.singleton(404));
HttpResponse httpResponse = new BasicHttpResponse(newStatusLine(RestStatus.NOT_FOUND));
httpResponse.setEntity(new StringEntity("{\"error\":\"test error message\",\"status\":404}",
httpResponse.setEntity(new NStringEntity("{\"error\":\"test error message\",\"status\":404}",
ContentType.APPLICATION_JSON));
Response response = new Response(REQUEST_LINE, new HttpHost("localhost", 9200), httpResponse);
ResponseException responseException = new ResponseException(response);