[client] Add simple support for gzip compression

Adds a `RestClient.setCompressionEnabled()` setting that will gzip-
compress request bodies and add a `Accept-Encoding: gzip` header so
that the ES server can send compressed responses.
This commit is contained in:
Sylvain Wallez 2020-11-20 07:38:21 +01:00 committed by GitHub
parent d4e308a135
commit f7e840b012
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 279 additions and 18 deletions

View File

@ -28,6 +28,7 @@ import org.apache.http.HttpRequest;
import org.apache.http.HttpResponse;
import org.apache.http.client.AuthCache;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.entity.GzipCompressingEntity;
import org.apache.http.client.entity.GzipDecompressingEntity;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
@ -50,8 +51,11 @@ import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
import javax.net.ssl.SSLHandshakeException;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.net.URI;
@ -76,6 +80,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.zip.GZIPOutputStream;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Collections.singletonList;
@ -112,15 +117,18 @@ public class RestClient implements Closeable {
private final NodeSelector nodeSelector;
private volatile NodeTuple<List<Node>> nodeTuple;
private final WarningsHandler warningsHandler;
private final boolean compressionEnabled;
RestClient(CloseableHttpAsyncClient client, Header[] defaultHeaders, List<Node> nodes, String pathPrefix,
FailureListener failureListener, NodeSelector nodeSelector, boolean strictDeprecationMode) {
FailureListener failureListener, NodeSelector nodeSelector, boolean strictDeprecationMode,
boolean compressionEnabled) {
this.client = client;
this.defaultHeaders = Collections.unmodifiableList(Arrays.asList(defaultHeaders));
this.failureListener = failureListener;
this.pathPrefix = pathPrefix;
this.nodeSelector = nodeSelector;
this.warningsHandler = strictDeprecationMode ? WarningsHandler.STRICT : WarningsHandler.PERMISSIVE;
this.compressionEnabled = compressionEnabled;
setNodes(nodes);
}
@ -543,34 +551,37 @@ public class RestClient implements Closeable {
}
}
private static HttpRequestBase createHttpRequest(String method, URI uri, HttpEntity entity) {
private static HttpRequestBase createHttpRequest(String method, URI uri, HttpEntity entity, boolean compressionEnabled) {
switch(method.toUpperCase(Locale.ROOT)) {
case HttpDeleteWithEntity.METHOD_NAME:
return addRequestBody(new HttpDeleteWithEntity(uri), entity);
return addRequestBody(new HttpDeleteWithEntity(uri), entity, compressionEnabled);
case HttpGetWithEntity.METHOD_NAME:
return addRequestBody(new HttpGetWithEntity(uri), entity);
return addRequestBody(new HttpGetWithEntity(uri), entity, compressionEnabled);
case HttpHead.METHOD_NAME:
return addRequestBody(new HttpHead(uri), entity);
return addRequestBody(new HttpHead(uri), entity, compressionEnabled);
case HttpOptions.METHOD_NAME:
return addRequestBody(new HttpOptions(uri), entity);
return addRequestBody(new HttpOptions(uri), entity, compressionEnabled);
case HttpPatch.METHOD_NAME:
return addRequestBody(new HttpPatch(uri), entity);
return addRequestBody(new HttpPatch(uri), entity, compressionEnabled);
case HttpPost.METHOD_NAME:
HttpPost httpPost = new HttpPost(uri);
addRequestBody(httpPost, entity);
addRequestBody(httpPost, entity, compressionEnabled);
return httpPost;
case HttpPut.METHOD_NAME:
return addRequestBody(new HttpPut(uri), entity);
return addRequestBody(new HttpPut(uri), entity, compressionEnabled);
case HttpTrace.METHOD_NAME:
return addRequestBody(new HttpTrace(uri), entity);
return addRequestBody(new HttpTrace(uri), entity, compressionEnabled);
default:
throw new UnsupportedOperationException("http method not supported: " + method);
}
}
private static HttpRequestBase addRequestBody(HttpRequestBase httpRequest, HttpEntity entity) {
private static HttpRequestBase addRequestBody(HttpRequestBase httpRequest, HttpEntity entity, boolean compressionEnabled) {
if (entity != null) {
if (httpRequest instanceof HttpEntityEnclosingRequestBase) {
if (compressionEnabled) {
entity = new ContentCompressingEntity(entity);
}
((HttpEntityEnclosingRequestBase)httpRequest).setEntity(entity);
} else {
throw new UnsupportedOperationException(httpRequest.getMethod() + " with body is not supported");
@ -732,7 +743,7 @@ public class RestClient implements Closeable {
String ignoreString = params.remove("ignore");
this.ignoreErrorCodes = getIgnoreErrorCodes(ignoreString, request.getMethod());
URI uri = buildUri(pathPrefix, request.getEndpoint(), params);
this.httpRequest = createHttpRequest(request.getMethod(), uri, request.getEntity());
this.httpRequest = createHttpRequest(request.getMethod(), uri, request.getEntity(), compressionEnabled);
this.cancellable = Cancellable.fromRequest(httpRequest);
setHeaders(httpRequest, request.getOptions().getHeaders());
setRequestConfig(httpRequest, request.getOptions().getRequestConfig());
@ -752,6 +763,9 @@ public class RestClient implements Closeable {
httpRequest.addHeader(defaultHeader);
}
}
if (compressionEnabled) {
httpRequest.addHeader("Accept-Encoding", "gzip");
}
}
private void setRequestConfig(HttpRequestBase httpRequest, RequestConfig requestConfig) {
@ -874,4 +888,36 @@ public class RestClient implements Closeable {
}
return new RuntimeException("error while performing request", exception);
}
/**
* A gzip compressing entity that also implements {@code getContent()}.
*/
public static class ContentCompressingEntity extends GzipCompressingEntity {
public ContentCompressingEntity(HttpEntity entity) {
super(entity);
}
@Override
public InputStream getContent() throws IOException {
ByteArrayInputOutputStream out = new ByteArrayInputOutputStream(1024);
try (GZIPOutputStream gzipOut = new GZIPOutputStream(out)) {
wrappedEntity.writeTo(gzipOut);
}
return out.asInput();
}
}
/**
* A ByteArrayOutputStream that can be turned into an input stream without copying the underlying buffer.
*/
private static class ByteArrayInputOutputStream extends ByteArrayOutputStream {
ByteArrayInputOutputStream(int size) {
super(size);
}
public InputStream asInput() {
return new ByteArrayInputStream(this.buf, 0, this.count);
}
}
}

View File

@ -55,6 +55,7 @@ public final class RestClientBuilder {
private String pathPrefix;
private NodeSelector nodeSelector = NodeSelector.ANY;
private boolean strictDeprecationMode = false;
private boolean compressionEnabled = false;
/**
* Creates a new builder instance and sets the hosts that the client will send requests to.
@ -181,6 +182,15 @@ public final class RestClientBuilder {
return this;
}
/**
* Whether the REST client should compress requests using gzip content encoding and add the "Accept-Encoding: gzip"
* header to receive compressed responses.
*/
public RestClientBuilder setCompressionEnabled(boolean compressionEnabled) {
this.compressionEnabled = compressionEnabled;
return this;
}
/**
* Creates a new {@link RestClient} based on the provided configuration.
*/
@ -191,7 +201,7 @@ public final class RestClientBuilder {
CloseableHttpAsyncClient httpClient = AccessController.doPrivileged(
(PrivilegedAction<CloseableHttpAsyncClient>) this::createHttpClient);
RestClient restClient = new RestClient(httpClient, defaultHeaders, nodes,
pathPrefix, failureListener, nodeSelector, strictDeprecationMode);
pathPrefix, failureListener, nodeSelector, strictDeprecationMode, compressionEnabled);
httpClient.start();
return restClient;
}

View File

@ -0,0 +1,205 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.elasticsearch.mocksocket.MockHttpServer;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
public class RestClientGzipCompressionTests extends RestClientTestCase {
private static HttpServer httpServer;
@BeforeClass
public static void startHttpServer() throws Exception {
httpServer = MockHttpServer.createHttp(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0);
httpServer.createContext("/", new GzipResponseHandler());
httpServer.start();
}
@AfterClass
public static void stopHttpServers() throws IOException {
httpServer.stop(0);
httpServer = null;
}
/**
* A response handler that accepts gzip-encoded data and replies request and response encoding values
* followed by the request body. The response is compressed if "Accept-Encoding" is "gzip".
*/
private static class GzipResponseHandler implements HttpHandler {
@Override
public void handle(HttpExchange exchange) throws IOException {
// Decode body (if any)
String contentEncoding = exchange.getRequestHeaders().getFirst("Content-Encoding");
InputStream body = exchange.getRequestBody();
if ("gzip".equals(contentEncoding)) {
body = new GZIPInputStream(body);
}
byte[] bytes = readAll(body);
boolean compress = "gzip".equals(exchange.getRequestHeaders().getFirst("Accept-Encoding"));
if (compress) {
exchange.getResponseHeaders().add("Content-Encoding", "gzip");
}
exchange.sendResponseHeaders(200, 0);
// Encode response if needed
OutputStream out = exchange.getResponseBody();
if (compress) {
out = new GZIPOutputStream(out);
}
// Outputs <request-encoding|null>#<response-encoding|null>#<request-body>
out.write(String.valueOf(contentEncoding).getBytes(StandardCharsets.UTF_8));
out.write('#');
out.write((compress ? "gzip" : "null").getBytes(StandardCharsets.UTF_8));
out.write('#');
out.write(bytes);
out.close();
exchange.close();
}
}
/** Read all bytes of an input stream and close it. */
private static byte[] readAll(InputStream in) throws IOException {
byte[] buffer = new byte[1024];
ByteArrayOutputStream bos = new ByteArrayOutputStream();
int len = 0;
while ((len = in.read(buffer)) > 0) {
bos.write(buffer, 0, len);
}
in.close();
return bos.toByteArray();
}
private RestClient createClient(boolean enableCompression) {
InetSocketAddress address = httpServer.getAddress();
return RestClient.builder(new HttpHost(address.getHostString(), address.getPort(), "http"))
.setCompressionEnabled(enableCompression)
.build();
}
public void testGzipHeaderSync() throws Exception {
RestClient restClient = createClient(false);
// Send non-compressed request, expect compressed response
Request request = new Request("POST", "/");
request.setEntity(new StringEntity("plain request, gzip response", ContentType.TEXT_PLAIN));
request.setOptions(RequestOptions.DEFAULT.toBuilder().addHeader("Accept-Encoding", "gzip").build());
Response response = restClient.performRequest(request);
HttpEntity entity = response.getEntity();
String content = new String(readAll(entity.getContent()), StandardCharsets.UTF_8);
Assert.assertEquals("null#gzip#plain request, gzip response", content);
restClient.close();
}
public void testGzipHeaderAsync() throws Exception {
RestClient restClient = createClient(false);
// Send non-compressed request, expect compressed response
Request request = new Request("POST", "/");
request.setEntity(new StringEntity("plain request, gzip response", ContentType.TEXT_PLAIN));
request.setOptions(RequestOptions.DEFAULT.toBuilder().addHeader("Accept-Encoding", "gzip").build());
FutureResponse futureResponse = new FutureResponse();
restClient.performRequestAsync(request, futureResponse);
Response response = futureResponse.get();
HttpEntity entity = response.getEntity();
String content = new String(readAll(entity.getContent()), StandardCharsets.UTF_8);
Assert.assertEquals("null#gzip#plain request, gzip response", content);
restClient.close();
}
public void testCompressingClientSync() throws Exception {
RestClient restClient = createClient(true);
Request request = new Request("POST", "/");
request.setEntity(new StringEntity("compressing client", ContentType.TEXT_PLAIN));
Response response = restClient.performRequest(request);
HttpEntity entity = response.getEntity();
String content = new String(readAll(entity.getContent()), StandardCharsets.UTF_8);
Assert.assertEquals("gzip#gzip#compressing client", content);
restClient.close();
}
public void testCompressingClientAsync() throws Exception {
InetSocketAddress address = httpServer.getAddress();
RestClient restClient = RestClient.builder(new HttpHost(address.getHostString(), address.getPort(), "http"))
.setCompressionEnabled(true)
.build();
Request request = new Request("POST", "/");
request.setEntity(new StringEntity("compressing client", ContentType.TEXT_PLAIN));
FutureResponse futureResponse = new FutureResponse();
restClient.performRequestAsync(request, futureResponse);
Response response = futureResponse.get();
// Server should report it had a compressed request and sent back a compressed response
HttpEntity entity = response.getEntity();
String content = new String(readAll(entity.getContent()), StandardCharsets.UTF_8);
Assert.assertEquals("gzip#gzip#compressing client", content);
restClient.close();
}
public static class FutureResponse extends CompletableFuture<Response> implements ResponseListener {
@Override
public void onSuccess(Response response) {
this.complete(response);
}
@Override
public void onFailure(Exception exception) {
this.completeExceptionally(exception);
}
}
}

View File

@ -67,7 +67,7 @@ public class RestClientMultipleHostsTests extends RestClientTestCase {
}
nodes = Collections.unmodifiableList(nodes);
failureListener = new HostsTrackingFailureListener();
return new RestClient(httpClient, new Header[0], nodes, null, failureListener, nodeSelector, false);
return new RestClient(httpClient, new Header[0], nodes, null, failureListener, nodeSelector, false, false);
}
/**

View File

@ -117,7 +117,7 @@ public class RestClientSingleHostTests extends RestClientTestCase {
failureListener = new HostsTrackingFailureListener();
strictDeprecationMode = randomBoolean();
restClient = new RestClient(this.httpClient, defaultHeaders,
singletonList(node), null, failureListener, NodeSelector.ANY, strictDeprecationMode);
singletonList(node), null, failureListener, NodeSelector.ANY, strictDeprecationMode, false);
}
@SuppressWarnings("unchecked")

View File

@ -60,7 +60,7 @@ public class RestClientTests extends RestClientTestCase {
public void testCloseIsIdempotent() throws IOException {
List<Node> nodes = singletonList(new Node(new HttpHost("localhost", 9200)));
CloseableHttpAsyncClient closeableHttpAsyncClient = mock(CloseableHttpAsyncClient.class);
RestClient restClient = new RestClient(closeableHttpAsyncClient, new Header[0], nodes, null, null, null, false);
RestClient restClient = new RestClient(closeableHttpAsyncClient, new Header[0], nodes, null, null, null, false, false);
restClient.close();
verify(closeableHttpAsyncClient, times(1)).close();
restClient.close();
@ -357,7 +357,7 @@ public class RestClientTests extends RestClientTestCase {
private static RestClient createRestClient() {
List<Node> nodes = Collections.singletonList(new Node(new HttpHost("localhost", 9200)));
return new RestClient(mock(CloseableHttpAsyncClient.class), new Header[] {}, nodes, null, null, null, false);
return new RestClient(mock(CloseableHttpAsyncClient.class), new Header[] {}, nodes, null, null, null, false, false);
}
public void testRoundRobin() throws IOException {
@ -392,7 +392,7 @@ public class RestClientTests extends RestClientTestCase {
public void testIsRunning(){
List<Node> nodes = Collections.singletonList(new Node(new HttpHost("localhost", 9200)));
CloseableHttpAsyncClient client = mock(CloseableHttpAsyncClient.class);
RestClient restClient = new RestClient(client, new Header[] {}, nodes, null, null, null, false);
RestClient restClient = new RestClient(client, new Header[] {}, nodes, null, null, null, false, false);
when(client.isRunning()).thenReturn(true);
assertTrue(restClient.isRunning());