HttpClient: Add support to limit response size (elastic/x-pack-elasticsearch#765)
In order to be stuck with big responses in the HttpClient, this commit adds a maximum limit for the response. Defaults to 10MB, can be set to 50 MB max. relates elastic/x-pack-elasticsearch#263 Original commit: elastic/x-pack-elasticsearch@05c449d4c1
This commit is contained in:
parent
95fd6777a0
commit
d7f3a260fd
|
@ -34,6 +34,7 @@ import org.elasticsearch.common.Strings;
|
||||||
import org.elasticsearch.common.component.AbstractComponent;
|
import org.elasticsearch.common.component.AbstractComponent;
|
||||||
import org.elasticsearch.common.io.Streams;
|
import org.elasticsearch.common.io.Streams;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||||
import org.elasticsearch.common.xcontent.XContentType;
|
import org.elasticsearch.common.xcontent.XContentType;
|
||||||
|
@ -64,12 +65,14 @@ public class HttpClient extends AbstractComponent {
|
||||||
private final String proxyHost;
|
private final String proxyHost;
|
||||||
private final TimeValue defaultConnectionTimeout;
|
private final TimeValue defaultConnectionTimeout;
|
||||||
private final TimeValue defaultReadTimeout;
|
private final TimeValue defaultReadTimeout;
|
||||||
|
private final ByteSizeValue maxResponseSize;
|
||||||
|
|
||||||
public HttpClient(Settings settings, HttpAuthRegistry httpAuthRegistry, SSLService sslService) {
|
public HttpClient(Settings settings, HttpAuthRegistry httpAuthRegistry, SSLService sslService) {
|
||||||
super(settings);
|
super(settings);
|
||||||
this.httpAuthRegistry = httpAuthRegistry;
|
this.httpAuthRegistry = httpAuthRegistry;
|
||||||
this.defaultConnectionTimeout = HttpSettings.CONNECTION_TIMEOUT.get(settings);
|
this.defaultConnectionTimeout = HttpSettings.CONNECTION_TIMEOUT.get(settings);
|
||||||
this.defaultReadTimeout = HttpSettings.READ_TIMEOUT.get(settings);
|
this.defaultReadTimeout = HttpSettings.READ_TIMEOUT.get(settings);
|
||||||
|
this.maxResponseSize = HttpSettings.MAX_HTTP_RESPONSE_SIZE.get(settings);
|
||||||
|
|
||||||
// proxy setup
|
// proxy setup
|
||||||
this.proxyHost = HttpSettings.PROXY_HOST.get(settings);
|
this.proxyHost = HttpSettings.PROXY_HOST.get(settings);
|
||||||
|
@ -191,7 +194,7 @@ public class HttpClient extends AbstractComponent {
|
||||||
body = new byte[0];
|
body = new byte[0];
|
||||||
} else {
|
} else {
|
||||||
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
|
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
|
||||||
try (InputStream is = response.getEntity().getContent()) {
|
try (InputStream is = new SizeLimitInputStream(maxResponseSize, response.getEntity().getContent())) {
|
||||||
Streams.copy(is, outputStream);
|
Streams.copy(is, outputStream);
|
||||||
}
|
}
|
||||||
body = outputStream.toByteArray();
|
body = outputStream.toByteArray();
|
||||||
|
@ -233,4 +236,5 @@ public class HttpClient extends AbstractComponent {
|
||||||
return methodName;
|
return methodName;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,6 +6,8 @@
|
||||||
package org.elasticsearch.xpack.common.http;
|
package org.elasticsearch.xpack.common.http;
|
||||||
|
|
||||||
import org.elasticsearch.common.settings.Setting;
|
import org.elasticsearch.common.settings.Setting;
|
||||||
|
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||||
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.xpack.ssl.SSLConfigurationSettings;
|
import org.elasticsearch.xpack.ssl.SSLConfigurationSettings;
|
||||||
|
|
||||||
|
@ -32,6 +34,12 @@ public class HttpSettings {
|
||||||
static final Setting<String> PROXY_HOST = Setting.simpleString(PROXY_HOST_KEY, Setting.Property.NodeScope);
|
static final Setting<String> PROXY_HOST = Setting.simpleString(PROXY_HOST_KEY, Setting.Property.NodeScope);
|
||||||
static final Setting<Integer> PROXY_PORT = Setting.intSetting(PROXY_PORT_KEY, 0, 0, 0xFFFF, Setting.Property.NodeScope);
|
static final Setting<Integer> PROXY_PORT = Setting.intSetting(PROXY_PORT_KEY, 0, 0, 0xFFFF, Setting.Property.NodeScope);
|
||||||
|
|
||||||
|
static final Setting<ByteSizeValue> MAX_HTTP_RESPONSE_SIZE = Setting.byteSizeSetting("xpack.http.max_response_size",
|
||||||
|
new ByteSizeValue(10, ByteSizeUnit.MB), // default
|
||||||
|
new ByteSizeValue(1, ByteSizeUnit.BYTES), // min
|
||||||
|
new ByteSizeValue(50, ByteSizeUnit.MB), // max
|
||||||
|
Setting.Property.NodeScope);
|
||||||
|
|
||||||
private static final SSLConfigurationSettings SSL = SSLConfigurationSettings.withPrefix(SSL_KEY_PREFIX);
|
private static final SSLConfigurationSettings SSL = SSLConfigurationSettings.withPrefix(SSL_KEY_PREFIX);
|
||||||
|
|
||||||
public static List<? extends Setting<?>> getSettings() {
|
public static List<? extends Setting<?>> getSettings() {
|
||||||
|
@ -41,6 +49,7 @@ public class HttpSettings {
|
||||||
settings.add(CONNECTION_TIMEOUT);
|
settings.add(CONNECTION_TIMEOUT);
|
||||||
settings.add(PROXY_HOST);
|
settings.add(PROXY_HOST);
|
||||||
settings.add(PROXY_PORT);
|
settings.add(PROXY_PORT);
|
||||||
|
settings.add(MAX_HTTP_RESPONSE_SIZE);
|
||||||
return settings;
|
return settings;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,70 @@
|
||||||
|
/*
|
||||||
|
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||||
|
* or more contributor license agreements. Licensed under the Elastic License;
|
||||||
|
* you may not use this file except in compliance with the Elastic License.
|
||||||
|
*/
|
||||||
|
package org.elasticsearch.xpack.common.http;
|
||||||
|
|
||||||
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
|
|
||||||
|
import java.io.FilterInputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An inputstream throwing an exception when a preconfigured number of bytes is reached
|
||||||
|
* This inputstream exists to prevent reading streaming or very big requests
|
||||||
|
*
|
||||||
|
* This implementation does not support mark/reset to prevent complex byte counting recalculations
|
||||||
|
*/
|
||||||
|
final class SizeLimitInputStream extends FilterInputStream {
|
||||||
|
|
||||||
|
private final int maxByteSize;
|
||||||
|
private final AtomicInteger byteCounter = new AtomicInteger(0);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a new input stream, that throws an exception when a certain number of bytes is read
|
||||||
|
* @param maxByteSize The maximum data to read, before throwing an exception
|
||||||
|
* @param in The underlying inputstream containing the data
|
||||||
|
*/
|
||||||
|
SizeLimitInputStream(ByteSizeValue maxByteSize, InputStream in) {
|
||||||
|
super(in);
|
||||||
|
this.maxByteSize = maxByteSize.bytesAsInt();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int read() throws IOException {
|
||||||
|
byteCounter.incrementAndGet();
|
||||||
|
checkMaximumLengthReached();
|
||||||
|
return super.read();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int read(byte[] b, int off, int len) throws IOException {
|
||||||
|
byteCounter.addAndGet(len);
|
||||||
|
checkMaximumLengthReached();
|
||||||
|
return super.read(b, off, len);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void mark(int readlimit) {
|
||||||
|
throw new UnsupportedOperationException("mark not supported");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void reset() throws IOException {
|
||||||
|
throw new IOException("reset not supported");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean markSupported() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkMaximumLengthReached() throws IOException {
|
||||||
|
if (byteCounter.get() > maxByteSize) {
|
||||||
|
throw new IOException("Maximum limit of [" + maxByteSize + "] bytes reached");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -10,6 +10,8 @@ import org.apache.http.client.ClientProtocolException;
|
||||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||||
import org.apache.logging.log4j.util.Supplier;
|
import org.apache.logging.log4j.util.Supplier;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||||
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.env.Environment;
|
import org.elasticsearch.env.Environment;
|
||||||
import org.elasticsearch.mocksocket.MockServerSocket;
|
import org.elasticsearch.mocksocket.MockServerSocket;
|
||||||
|
@ -50,6 +52,7 @@ import static org.hamcrest.Matchers.hasSize;
|
||||||
import static org.hamcrest.Matchers.not;
|
import static org.hamcrest.Matchers.not;
|
||||||
import static org.hamcrest.Matchers.notNullValue;
|
import static org.hamcrest.Matchers.notNullValue;
|
||||||
import static org.hamcrest.Matchers.nullValue;
|
import static org.hamcrest.Matchers.nullValue;
|
||||||
|
import static org.hamcrest.Matchers.startsWith;
|
||||||
import static org.hamcrest.core.Is.is;
|
import static org.hamcrest.core.Is.is;
|
||||||
|
|
||||||
public class HttpClientTests extends ESTestCase {
|
public class HttpClientTests extends ESTestCase {
|
||||||
|
@ -428,4 +431,20 @@ public class HttpClientTests extends ESTestCase {
|
||||||
assertThat(response.status(), is(noContentStatusCode));
|
assertThat(response.status(), is(noContentStatusCode));
|
||||||
assertThat(response.body(), is(nullValue()));
|
assertThat(response.body(), is(nullValue()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testMaxHttpResponseSize() throws Exception {
|
||||||
|
int randomBytesLength = scaledRandomIntBetween(2, 100);
|
||||||
|
String data = randomAsciiOfLength(randomBytesLength);
|
||||||
|
webServer.enqueue(new MockResponse().setResponseCode(200).setBody(data));
|
||||||
|
|
||||||
|
Settings settings = Settings.builder()
|
||||||
|
.put(HttpSettings.MAX_HTTP_RESPONSE_SIZE.getKey(), new ByteSizeValue(randomBytesLength - 1, ByteSizeUnit.BYTES))
|
||||||
|
.build();
|
||||||
|
HttpClient httpClient = new HttpClient(settings, authRegistry, new SSLService(environment.settings(), environment));
|
||||||
|
|
||||||
|
HttpRequest.Builder requestBuilder = HttpRequest.builder("localhost", webServer.getPort()).method(HttpMethod.GET).path("/");
|
||||||
|
|
||||||
|
IOException e = expectThrows(IOException.class, () -> httpClient.execute(requestBuilder.build()));
|
||||||
|
assertThat(e.getMessage(), startsWith("Maximum limit of"));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,55 @@
|
||||||
|
/*
|
||||||
|
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||||
|
* or more contributor license agreements. Licensed under the Elastic License;
|
||||||
|
* you may not use this file except in compliance with the Elastic License.
|
||||||
|
*/
|
||||||
|
package org.elasticsearch.xpack.common.http;
|
||||||
|
|
||||||
|
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||||
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
|
||||||
|
import java.io.ByteArrayInputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import static com.google.common.base.Charsets.UTF_8;
|
||||||
|
import static org.hamcrest.Matchers.is;
|
||||||
|
|
||||||
|
public class SizeLimitInputStreamTests extends ESTestCase {
|
||||||
|
|
||||||
|
public void testGoodCase() throws IOException {
|
||||||
|
int length = scaledRandomIntBetween(1, 100);
|
||||||
|
test(length, length);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testLimitReached() {
|
||||||
|
int length = scaledRandomIntBetween(1, 100);
|
||||||
|
IOException e = expectThrows(IOException.class, () -> test(length+1, length));
|
||||||
|
assertThat(e.getMessage(), is("Maximum limit of [" + length + "] bytes reached"));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testMarking() {
|
||||||
|
ByteSizeValue byteSizeValue = new ByteSizeValue(1, ByteSizeUnit.BYTES);
|
||||||
|
SizeLimitInputStream is = new SizeLimitInputStream(byteSizeValue,
|
||||||
|
new ByteArrayInputStream("empty".getBytes(UTF_8)));
|
||||||
|
assertThat(is.markSupported(), is(false));
|
||||||
|
expectThrows(UnsupportedOperationException.class, () -> is.mark(10));
|
||||||
|
IOException e = expectThrows(IOException.class, () -> is.reset());
|
||||||
|
assertThat(e.getMessage(), is("reset not supported"));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void test(int inputStreamLength, int maxAllowedSize) throws IOException {
|
||||||
|
String data = randomAsciiOfLength(inputStreamLength);
|
||||||
|
ByteSizeValue byteSizeValue = new ByteSizeValue(maxAllowedSize, ByteSizeUnit.BYTES);
|
||||||
|
SizeLimitInputStream is = new SizeLimitInputStream(byteSizeValue,
|
||||||
|
new ByteArrayInputStream(data.getBytes(UTF_8)));
|
||||||
|
|
||||||
|
if (randomBoolean()) {
|
||||||
|
is.read(new byte[inputStreamLength]);
|
||||||
|
} else {
|
||||||
|
for (int i = 0; i < inputStreamLength; i++) {
|
||||||
|
is.read();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue