From f7e840b012f6778cadb4d9e52fdbe44374c21ac5 Mon Sep 17 00:00:00 2001 From: Sylvain Wallez Date: Fri, 20 Nov 2020 07:38:21 +0100 Subject: [PATCH] [client] Add simple support for gzip compression Adds a `RestClient.setCompressionEnabled()` setting that will gzip- compress request bodies and add a `Accept-Encoding: gzip` header so that the ES server can send compressed responses. --- .../org/elasticsearch/client/RestClient.java | 70 +++++- .../client/RestClientBuilder.java | 12 +- .../RestClientGzipCompressionTests.java | 205 ++++++++++++++++++ .../client/RestClientMultipleHostsTests.java | 2 +- .../client/RestClientSingleHostTests.java | 2 +- .../elasticsearch/client/RestClientTests.java | 6 +- 6 files changed, 279 insertions(+), 18 deletions(-) create mode 100644 client/rest/src/test/java/org/elasticsearch/client/RestClientGzipCompressionTests.java diff --git a/client/rest/src/main/java/org/elasticsearch/client/RestClient.java b/client/rest/src/main/java/org/elasticsearch/client/RestClient.java index ec9f98d1a71..e7851223514 100644 --- a/client/rest/src/main/java/org/elasticsearch/client/RestClient.java +++ b/client/rest/src/main/java/org/elasticsearch/client/RestClient.java @@ -28,6 +28,7 @@ import org.apache.http.HttpRequest; import org.apache.http.HttpResponse; import org.apache.http.client.AuthCache; import org.apache.http.client.ClientProtocolException; +import org.apache.http.client.entity.GzipCompressingEntity; import org.apache.http.client.entity.GzipDecompressingEntity; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.HttpEntityEnclosingRequestBase; @@ -50,8 +51,11 @@ import org.apache.http.nio.protocol.HttpAsyncRequestProducer; import org.apache.http.nio.protocol.HttpAsyncResponseConsumer; import javax.net.ssl.SSLHandshakeException; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.Closeable; import java.io.IOException; +import java.io.InputStream; import java.net.ConnectException; import java.net.SocketTimeoutException; import java.net.URI; @@ -76,6 +80,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import java.util.zip.GZIPOutputStream; import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Collections.singletonList; @@ -112,15 +117,18 @@ public class RestClient implements Closeable { private final NodeSelector nodeSelector; private volatile NodeTuple> nodeTuple; private final WarningsHandler warningsHandler; + private final boolean compressionEnabled; RestClient(CloseableHttpAsyncClient client, Header[] defaultHeaders, List nodes, String pathPrefix, - FailureListener failureListener, NodeSelector nodeSelector, boolean strictDeprecationMode) { + FailureListener failureListener, NodeSelector nodeSelector, boolean strictDeprecationMode, + boolean compressionEnabled) { this.client = client; this.defaultHeaders = Collections.unmodifiableList(Arrays.asList(defaultHeaders)); this.failureListener = failureListener; this.pathPrefix = pathPrefix; this.nodeSelector = nodeSelector; this.warningsHandler = strictDeprecationMode ? WarningsHandler.STRICT : WarningsHandler.PERMISSIVE; + this.compressionEnabled = compressionEnabled; setNodes(nodes); } @@ -543,34 +551,37 @@ public class RestClient implements Closeable { } } - private static HttpRequestBase createHttpRequest(String method, URI uri, HttpEntity entity) { + private static HttpRequestBase createHttpRequest(String method, URI uri, HttpEntity entity, boolean compressionEnabled) { switch(method.toUpperCase(Locale.ROOT)) { case HttpDeleteWithEntity.METHOD_NAME: - return addRequestBody(new HttpDeleteWithEntity(uri), entity); + return addRequestBody(new HttpDeleteWithEntity(uri), entity, compressionEnabled); case HttpGetWithEntity.METHOD_NAME: - return addRequestBody(new HttpGetWithEntity(uri), entity); + return addRequestBody(new HttpGetWithEntity(uri), entity, compressionEnabled); case HttpHead.METHOD_NAME: - return addRequestBody(new HttpHead(uri), entity); + return addRequestBody(new HttpHead(uri), entity, compressionEnabled); case HttpOptions.METHOD_NAME: - return addRequestBody(new HttpOptions(uri), entity); + return addRequestBody(new HttpOptions(uri), entity, compressionEnabled); case HttpPatch.METHOD_NAME: - return addRequestBody(new HttpPatch(uri), entity); + return addRequestBody(new HttpPatch(uri), entity, compressionEnabled); case HttpPost.METHOD_NAME: HttpPost httpPost = new HttpPost(uri); - addRequestBody(httpPost, entity); + addRequestBody(httpPost, entity, compressionEnabled); return httpPost; case HttpPut.METHOD_NAME: - return addRequestBody(new HttpPut(uri), entity); + return addRequestBody(new HttpPut(uri), entity, compressionEnabled); case HttpTrace.METHOD_NAME: - return addRequestBody(new HttpTrace(uri), entity); + return addRequestBody(new HttpTrace(uri), entity, compressionEnabled); default: throw new UnsupportedOperationException("http method not supported: " + method); } } - private static HttpRequestBase addRequestBody(HttpRequestBase httpRequest, HttpEntity entity) { + private static HttpRequestBase addRequestBody(HttpRequestBase httpRequest, HttpEntity entity, boolean compressionEnabled) { if (entity != null) { if (httpRequest instanceof HttpEntityEnclosingRequestBase) { + if (compressionEnabled) { + entity = new ContentCompressingEntity(entity); + } ((HttpEntityEnclosingRequestBase)httpRequest).setEntity(entity); } else { throw new UnsupportedOperationException(httpRequest.getMethod() + " with body is not supported"); @@ -732,7 +743,7 @@ public class RestClient implements Closeable { String ignoreString = params.remove("ignore"); this.ignoreErrorCodes = getIgnoreErrorCodes(ignoreString, request.getMethod()); URI uri = buildUri(pathPrefix, request.getEndpoint(), params); - this.httpRequest = createHttpRequest(request.getMethod(), uri, request.getEntity()); + this.httpRequest = createHttpRequest(request.getMethod(), uri, request.getEntity(), compressionEnabled); this.cancellable = Cancellable.fromRequest(httpRequest); setHeaders(httpRequest, request.getOptions().getHeaders()); setRequestConfig(httpRequest, request.getOptions().getRequestConfig()); @@ -752,6 +763,9 @@ public class RestClient implements Closeable { httpRequest.addHeader(defaultHeader); } } + if (compressionEnabled) { + httpRequest.addHeader("Accept-Encoding", "gzip"); + } } private void setRequestConfig(HttpRequestBase httpRequest, RequestConfig requestConfig) { @@ -874,4 +888,36 @@ public class RestClient implements Closeable { } return new RuntimeException("error while performing request", exception); } + + /** + * A gzip compressing entity that also implements {@code getContent()}. + */ + public static class ContentCompressingEntity extends GzipCompressingEntity { + + public ContentCompressingEntity(HttpEntity entity) { + super(entity); + } + + @Override + public InputStream getContent() throws IOException { + ByteArrayInputOutputStream out = new ByteArrayInputOutputStream(1024); + try (GZIPOutputStream gzipOut = new GZIPOutputStream(out)) { + wrappedEntity.writeTo(gzipOut); + } + return out.asInput(); + } + } + + /** + * A ByteArrayOutputStream that can be turned into an input stream without copying the underlying buffer. + */ + private static class ByteArrayInputOutputStream extends ByteArrayOutputStream { + ByteArrayInputOutputStream(int size) { + super(size); + } + + public InputStream asInput() { + return new ByteArrayInputStream(this.buf, 0, this.count); + } + } } diff --git a/client/rest/src/main/java/org/elasticsearch/client/RestClientBuilder.java b/client/rest/src/main/java/org/elasticsearch/client/RestClientBuilder.java index b5775ae53ba..3305340ec5a 100644 --- a/client/rest/src/main/java/org/elasticsearch/client/RestClientBuilder.java +++ b/client/rest/src/main/java/org/elasticsearch/client/RestClientBuilder.java @@ -55,6 +55,7 @@ public final class RestClientBuilder { private String pathPrefix; private NodeSelector nodeSelector = NodeSelector.ANY; private boolean strictDeprecationMode = false; + private boolean compressionEnabled = false; /** * Creates a new builder instance and sets the hosts that the client will send requests to. @@ -181,6 +182,15 @@ public final class RestClientBuilder { return this; } + /** + * Whether the REST client should compress requests using gzip content encoding and add the "Accept-Encoding: gzip" + * header to receive compressed responses. + */ + public RestClientBuilder setCompressionEnabled(boolean compressionEnabled) { + this.compressionEnabled = compressionEnabled; + return this; + } + /** * Creates a new {@link RestClient} based on the provided configuration. */ @@ -191,7 +201,7 @@ public final class RestClientBuilder { CloseableHttpAsyncClient httpClient = AccessController.doPrivileged( (PrivilegedAction) this::createHttpClient); RestClient restClient = new RestClient(httpClient, defaultHeaders, nodes, - pathPrefix, failureListener, nodeSelector, strictDeprecationMode); + pathPrefix, failureListener, nodeSelector, strictDeprecationMode, compressionEnabled); httpClient.start(); return restClient; } diff --git a/client/rest/src/test/java/org/elasticsearch/client/RestClientGzipCompressionTests.java b/client/rest/src/test/java/org/elasticsearch/client/RestClientGzipCompressionTests.java new file mode 100644 index 00000000000..faa86b11f5a --- /dev/null +++ b/client/rest/src/test/java/org/elasticsearch/client/RestClientGzipCompressionTests.java @@ -0,0 +1,205 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client; + +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; +import com.sun.net.httpserver.HttpServer; +import org.apache.http.HttpEntity; +import org.apache.http.HttpHost; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; +import org.elasticsearch.mocksocket.MockHttpServer; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.CompletableFuture; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +public class RestClientGzipCompressionTests extends RestClientTestCase { + + private static HttpServer httpServer; + + @BeforeClass + public static void startHttpServer() throws Exception { + httpServer = MockHttpServer.createHttp(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0); + httpServer.createContext("/", new GzipResponseHandler()); + httpServer.start(); + } + + @AfterClass + public static void stopHttpServers() throws IOException { + httpServer.stop(0); + httpServer = null; + } + + /** + * A response handler that accepts gzip-encoded data and replies request and response encoding values + * followed by the request body. The response is compressed if "Accept-Encoding" is "gzip". + */ + private static class GzipResponseHandler implements HttpHandler { + @Override + public void handle(HttpExchange exchange) throws IOException { + + // Decode body (if any) + String contentEncoding = exchange.getRequestHeaders().getFirst("Content-Encoding"); + InputStream body = exchange.getRequestBody(); + if ("gzip".equals(contentEncoding)) { + body = new GZIPInputStream(body); + } + byte[] bytes = readAll(body); + + boolean compress = "gzip".equals(exchange.getRequestHeaders().getFirst("Accept-Encoding")); + if (compress) { + exchange.getResponseHeaders().add("Content-Encoding", "gzip"); + } + + exchange.sendResponseHeaders(200, 0); + + // Encode response if needed + OutputStream out = exchange.getResponseBody(); + if (compress) { + out = new GZIPOutputStream(out); + } + + // Outputs ## + out.write(String.valueOf(contentEncoding).getBytes(StandardCharsets.UTF_8)); + out.write('#'); + out.write((compress ? "gzip" : "null").getBytes(StandardCharsets.UTF_8)); + out.write('#'); + out.write(bytes); + out.close(); + + exchange.close(); + } + } + + /** Read all bytes of an input stream and close it. */ + private static byte[] readAll(InputStream in) throws IOException { + byte[] buffer = new byte[1024]; + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + int len = 0; + while ((len = in.read(buffer)) > 0) { + bos.write(buffer, 0, len); + } + in.close(); + return bos.toByteArray(); + } + + private RestClient createClient(boolean enableCompression) { + InetSocketAddress address = httpServer.getAddress(); + return RestClient.builder(new HttpHost(address.getHostString(), address.getPort(), "http")) + .setCompressionEnabled(enableCompression) + .build(); + } + + public void testGzipHeaderSync() throws Exception { + RestClient restClient = createClient(false); + + // Send non-compressed request, expect compressed response + Request request = new Request("POST", "/"); + request.setEntity(new StringEntity("plain request, gzip response", ContentType.TEXT_PLAIN)); + request.setOptions(RequestOptions.DEFAULT.toBuilder().addHeader("Accept-Encoding", "gzip").build()); + + Response response = restClient.performRequest(request); + + HttpEntity entity = response.getEntity(); + String content = new String(readAll(entity.getContent()), StandardCharsets.UTF_8); + Assert.assertEquals("null#gzip#plain request, gzip response", content); + + restClient.close(); + } + + public void testGzipHeaderAsync() throws Exception { + RestClient restClient = createClient(false); + + // Send non-compressed request, expect compressed response + Request request = new Request("POST", "/"); + request.setEntity(new StringEntity("plain request, gzip response", ContentType.TEXT_PLAIN)); + request.setOptions(RequestOptions.DEFAULT.toBuilder().addHeader("Accept-Encoding", "gzip").build()); + + FutureResponse futureResponse = new FutureResponse(); + restClient.performRequestAsync(request, futureResponse); + Response response = futureResponse.get(); + + HttpEntity entity = response.getEntity(); + String content = new String(readAll(entity.getContent()), StandardCharsets.UTF_8); + Assert.assertEquals("null#gzip#plain request, gzip response", content); + + restClient.close(); + } + + public void testCompressingClientSync() throws Exception { + RestClient restClient = createClient(true); + + Request request = new Request("POST", "/"); + request.setEntity(new StringEntity("compressing client", ContentType.TEXT_PLAIN)); + + Response response = restClient.performRequest(request); + + HttpEntity entity = response.getEntity(); + String content = new String(readAll(entity.getContent()), StandardCharsets.UTF_8); + Assert.assertEquals("gzip#gzip#compressing client", content); + + restClient.close(); + } + + public void testCompressingClientAsync() throws Exception { + InetSocketAddress address = httpServer.getAddress(); + RestClient restClient = RestClient.builder(new HttpHost(address.getHostString(), address.getPort(), "http")) + .setCompressionEnabled(true) + .build(); + + Request request = new Request("POST", "/"); + request.setEntity(new StringEntity("compressing client", ContentType.TEXT_PLAIN)); + + FutureResponse futureResponse = new FutureResponse(); + restClient.performRequestAsync(request, futureResponse); + Response response = futureResponse.get(); + + // Server should report it had a compressed request and sent back a compressed response + HttpEntity entity = response.getEntity(); + String content = new String(readAll(entity.getContent()), StandardCharsets.UTF_8); + Assert.assertEquals("gzip#gzip#compressing client", content); + + restClient.close(); + } + + public static class FutureResponse extends CompletableFuture implements ResponseListener { + @Override + public void onSuccess(Response response) { + this.complete(response); + } + + @Override + public void onFailure(Exception exception) { + this.completeExceptionally(exception); + } + } +} diff --git a/client/rest/src/test/java/org/elasticsearch/client/RestClientMultipleHostsTests.java b/client/rest/src/test/java/org/elasticsearch/client/RestClientMultipleHostsTests.java index c827002f645..80c1eb1c67c 100644 --- a/client/rest/src/test/java/org/elasticsearch/client/RestClientMultipleHostsTests.java +++ b/client/rest/src/test/java/org/elasticsearch/client/RestClientMultipleHostsTests.java @@ -67,7 +67,7 @@ public class RestClientMultipleHostsTests extends RestClientTestCase { } nodes = Collections.unmodifiableList(nodes); failureListener = new HostsTrackingFailureListener(); - return new RestClient(httpClient, new Header[0], nodes, null, failureListener, nodeSelector, false); + return new RestClient(httpClient, new Header[0], nodes, null, failureListener, nodeSelector, false, false); } /** diff --git a/client/rest/src/test/java/org/elasticsearch/client/RestClientSingleHostTests.java b/client/rest/src/test/java/org/elasticsearch/client/RestClientSingleHostTests.java index 44264a99c4d..7daf1e0d749 100644 --- a/client/rest/src/test/java/org/elasticsearch/client/RestClientSingleHostTests.java +++ b/client/rest/src/test/java/org/elasticsearch/client/RestClientSingleHostTests.java @@ -117,7 +117,7 @@ public class RestClientSingleHostTests extends RestClientTestCase { failureListener = new HostsTrackingFailureListener(); strictDeprecationMode = randomBoolean(); restClient = new RestClient(this.httpClient, defaultHeaders, - singletonList(node), null, failureListener, NodeSelector.ANY, strictDeprecationMode); + singletonList(node), null, failureListener, NodeSelector.ANY, strictDeprecationMode, false); } @SuppressWarnings("unchecked") diff --git a/client/rest/src/test/java/org/elasticsearch/client/RestClientTests.java b/client/rest/src/test/java/org/elasticsearch/client/RestClientTests.java index b5a94778afe..430a4a1155a 100644 --- a/client/rest/src/test/java/org/elasticsearch/client/RestClientTests.java +++ b/client/rest/src/test/java/org/elasticsearch/client/RestClientTests.java @@ -60,7 +60,7 @@ public class RestClientTests extends RestClientTestCase { public void testCloseIsIdempotent() throws IOException { List nodes = singletonList(new Node(new HttpHost("localhost", 9200))); CloseableHttpAsyncClient closeableHttpAsyncClient = mock(CloseableHttpAsyncClient.class); - RestClient restClient = new RestClient(closeableHttpAsyncClient, new Header[0], nodes, null, null, null, false); + RestClient restClient = new RestClient(closeableHttpAsyncClient, new Header[0], nodes, null, null, null, false, false); restClient.close(); verify(closeableHttpAsyncClient, times(1)).close(); restClient.close(); @@ -357,7 +357,7 @@ public class RestClientTests extends RestClientTestCase { private static RestClient createRestClient() { List nodes = Collections.singletonList(new Node(new HttpHost("localhost", 9200))); - return new RestClient(mock(CloseableHttpAsyncClient.class), new Header[] {}, nodes, null, null, null, false); + return new RestClient(mock(CloseableHttpAsyncClient.class), new Header[] {}, nodes, null, null, null, false, false); } public void testRoundRobin() throws IOException { @@ -392,7 +392,7 @@ public class RestClientTests extends RestClientTestCase { public void testIsRunning(){ List nodes = Collections.singletonList(new Node(new HttpHost("localhost", 9200))); CloseableHttpAsyncClient client = mock(CloseableHttpAsyncClient.class); - RestClient restClient = new RestClient(client, new Header[] {}, nodes, null, null, null, false); + RestClient restClient = new RestClient(client, new Header[] {}, nodes, null, null, null, false, false); when(client.isRunning()).thenReturn(true); assertTrue(restClient.isRunning());