diff --git a/plugin/src/main/java/org/elasticsearch/xpack/common/http/HttpClient.java b/plugin/src/main/java/org/elasticsearch/xpack/common/http/HttpClient.java index 112e94b2908..4a24f3e2770 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/common/http/HttpClient.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/common/http/HttpClient.java @@ -34,6 +34,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; @@ -64,12 +65,14 @@ public class HttpClient extends AbstractComponent { private final String proxyHost; private final TimeValue defaultConnectionTimeout; private final TimeValue defaultReadTimeout; + private final ByteSizeValue maxResponseSize; public HttpClient(Settings settings, HttpAuthRegistry httpAuthRegistry, SSLService sslService) { super(settings); this.httpAuthRegistry = httpAuthRegistry; this.defaultConnectionTimeout = HttpSettings.CONNECTION_TIMEOUT.get(settings); this.defaultReadTimeout = HttpSettings.READ_TIMEOUT.get(settings); + this.maxResponseSize = HttpSettings.MAX_HTTP_RESPONSE_SIZE.get(settings); // proxy setup this.proxyHost = HttpSettings.PROXY_HOST.get(settings); @@ -191,7 +194,7 @@ public class HttpClient extends AbstractComponent { body = new byte[0]; } else { try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) { - try (InputStream is = response.getEntity().getContent()) { + try (InputStream is = new SizeLimitInputStream(maxResponseSize, response.getEntity().getContent())) { Streams.copy(is, outputStream); } body = outputStream.toByteArray(); @@ -233,4 +236,5 @@ public class HttpClient extends AbstractComponent { return methodName; } } + } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/common/http/HttpSettings.java b/plugin/src/main/java/org/elasticsearch/xpack/common/http/HttpSettings.java index 07eaf5d1e3f..52e9b94d868 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/common/http/HttpSettings.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/common/http/HttpSettings.java @@ -6,6 +6,8 @@ package org.elasticsearch.xpack.common.http; import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.xpack.ssl.SSLConfigurationSettings; @@ -32,6 +34,12 @@ public class HttpSettings { static final Setting PROXY_HOST = Setting.simpleString(PROXY_HOST_KEY, Setting.Property.NodeScope); static final Setting PROXY_PORT = Setting.intSetting(PROXY_PORT_KEY, 0, 0, 0xFFFF, Setting.Property.NodeScope); + static final Setting MAX_HTTP_RESPONSE_SIZE = Setting.byteSizeSetting("xpack.http.max_response_size", + new ByteSizeValue(10, ByteSizeUnit.MB), // default + new ByteSizeValue(1, ByteSizeUnit.BYTES), // min + new ByteSizeValue(50, ByteSizeUnit.MB), // max + Setting.Property.NodeScope); + private static final SSLConfigurationSettings SSL = SSLConfigurationSettings.withPrefix(SSL_KEY_PREFIX); public static List> getSettings() { @@ -41,6 +49,7 @@ public class HttpSettings { settings.add(CONNECTION_TIMEOUT); settings.add(PROXY_HOST); settings.add(PROXY_PORT); + settings.add(MAX_HTTP_RESPONSE_SIZE); return settings; } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/common/http/SizeLimitInputStream.java b/plugin/src/main/java/org/elasticsearch/xpack/common/http/SizeLimitInputStream.java new file mode 100644 index 00000000000..cc17ed8a6e8 --- /dev/null +++ b/plugin/src/main/java/org/elasticsearch/xpack/common/http/SizeLimitInputStream.java @@ -0,0 +1,70 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.common.http; + +import org.elasticsearch.common.unit.ByteSizeValue; + +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * An inputstream throwing an exception when a preconfigured number of bytes is reached + * This inputstream exists to prevent reading streaming or very big requests + * + * This implementation does not support mark/reset to prevent complex byte counting recalculations + */ +final class SizeLimitInputStream extends FilterInputStream { + + private final int maxByteSize; + private final AtomicInteger byteCounter = new AtomicInteger(0); + + /** + * Creates a new input stream, that throws an exception when a certain number of bytes is read + * @param maxByteSize The maximum data to read, before throwing an exception + * @param in The underlying inputstream containing the data + */ + SizeLimitInputStream(ByteSizeValue maxByteSize, InputStream in) { + super(in); + this.maxByteSize = maxByteSize.bytesAsInt(); + } + + @Override + public int read() throws IOException { + byteCounter.incrementAndGet(); + checkMaximumLengthReached(); + return super.read(); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + byteCounter.addAndGet(len); + checkMaximumLengthReached(); + return super.read(b, off, len); + } + + @Override + public synchronized void mark(int readlimit) { + throw new UnsupportedOperationException("mark not supported"); + } + + @Override + public synchronized void reset() throws IOException { + throw new IOException("reset not supported"); + } + + @Override + public boolean markSupported() { + return false; + } + + private void checkMaximumLengthReached() throws IOException { + if (byteCounter.get() > maxByteSize) { + throw new IOException("Maximum limit of [" + maxByteSize + "] bytes reached"); + } + } +} diff --git a/plugin/src/test/java/org/elasticsearch/xpack/common/http/HttpClientTests.java b/plugin/src/test/java/org/elasticsearch/xpack/common/http/HttpClientTests.java index 48c8407efb8..22c83490aaf 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/common/http/HttpClientTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/common/http/HttpClientTests.java @@ -10,6 +10,8 @@ import org.apache.http.client.ClientProtocolException; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.env.Environment; import org.elasticsearch.mocksocket.MockServerSocket; @@ -50,6 +52,7 @@ import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.startsWith; import static org.hamcrest.core.Is.is; public class HttpClientTests extends ESTestCase { @@ -428,4 +431,20 @@ public class HttpClientTests extends ESTestCase { assertThat(response.status(), is(noContentStatusCode)); assertThat(response.body(), is(nullValue())); } + + public void testMaxHttpResponseSize() throws Exception { + int randomBytesLength = scaledRandomIntBetween(2, 100); + String data = randomAsciiOfLength(randomBytesLength); + webServer.enqueue(new MockResponse().setResponseCode(200).setBody(data)); + + Settings settings = Settings.builder() + .put(HttpSettings.MAX_HTTP_RESPONSE_SIZE.getKey(), new ByteSizeValue(randomBytesLength - 1, ByteSizeUnit.BYTES)) + .build(); + HttpClient httpClient = new HttpClient(settings, authRegistry, new SSLService(environment.settings(), environment)); + + HttpRequest.Builder requestBuilder = HttpRequest.builder("localhost", webServer.getPort()).method(HttpMethod.GET).path("/"); + + IOException e = expectThrows(IOException.class, () -> httpClient.execute(requestBuilder.build())); + assertThat(e.getMessage(), startsWith("Maximum limit of")); + } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/common/http/SizeLimitInputStreamTests.java b/plugin/src/test/java/org/elasticsearch/xpack/common/http/SizeLimitInputStreamTests.java new file mode 100644 index 00000000000..5cad3fc3fa1 --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/common/http/SizeLimitInputStreamTests.java @@ -0,0 +1,55 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.common.http; + +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.test.ESTestCase; + +import java.io.ByteArrayInputStream; +import java.io.IOException; + +import static com.google.common.base.Charsets.UTF_8; +import static org.hamcrest.Matchers.is; + +public class SizeLimitInputStreamTests extends ESTestCase { + + public void testGoodCase() throws IOException { + int length = scaledRandomIntBetween(1, 100); + test(length, length); + } + + public void testLimitReached() { + int length = scaledRandomIntBetween(1, 100); + IOException e = expectThrows(IOException.class, () -> test(length+1, length)); + assertThat(e.getMessage(), is("Maximum limit of [" + length + "] bytes reached")); + } + + public void testMarking() { + ByteSizeValue byteSizeValue = new ByteSizeValue(1, ByteSizeUnit.BYTES); + SizeLimitInputStream is = new SizeLimitInputStream(byteSizeValue, + new ByteArrayInputStream("empty".getBytes(UTF_8))); + assertThat(is.markSupported(), is(false)); + expectThrows(UnsupportedOperationException.class, () -> is.mark(10)); + IOException e = expectThrows(IOException.class, () -> is.reset()); + assertThat(e.getMessage(), is("reset not supported")); + } + + private void test(int inputStreamLength, int maxAllowedSize) throws IOException { + String data = randomAsciiOfLength(inputStreamLength); + ByteSizeValue byteSizeValue = new ByteSizeValue(maxAllowedSize, ByteSizeUnit.BYTES); + SizeLimitInputStream is = new SizeLimitInputStream(byteSizeValue, + new ByteArrayInputStream(data.getBytes(UTF_8))); + + if (randomBoolean()) { + is.read(new byte[inputStreamLength]); + } else { + for (int i = 0; i < inputStreamLength; i++) { + is.read(); + } + } + } +} \ No newline at end of file