diff --git a/docs/reference/modules/cluster/misc.asciidoc b/docs/reference/modules/cluster/misc.asciidoc index 837cfcc43eb..96867494868 100644 --- a/docs/reference/modules/cluster/misc.asciidoc +++ b/docs/reference/modules/cluster/misc.asciidoc @@ -82,4 +82,4 @@ Enable or disable allocation for persistent tasks: This setting does not affect the persistent tasks that are already being executed. Only newly created persistent tasks, or tasks that must be reassigned (after a node left the cluster, for example), are impacted by this setting. --- +-- \ No newline at end of file diff --git a/docs/reference/modules/http.asciidoc b/docs/reference/modules/http.asciidoc index 920f62043cf..c69d4991583 100644 --- a/docs/reference/modules/http.asciidoc +++ b/docs/reference/modules/http.asciidoc @@ -20,7 +20,7 @@ http://en.wikipedia.org/wiki/Chunked_transfer_encoding[HTTP chunking]. The settings in the table below can be configured for HTTP. Note that none of them are dynamically updatable so for them to take effect they should be set in -`elasticsearch.yml`. +the Elasticsearch <>. [cols="<,<",options="header",] |======================================================================= @@ -100,6 +100,12 @@ simple message will be returned. Defaults to `true` |`http.pipelining.max_events` |The maximum number of events to be queued up in memory before a HTTP connection is closed, defaults to `10000`. +|`http.max_warning_header_count` |The maximum number of warning headers in + client HTTP responses, defaults to unbounded. + +|`http.max_warning_header_size` |The maximum total size of warning headers in +client HTTP responses, defaults to unbounded. + |======================================================================= It also uses the common diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index ced99fc8065..45eb3cf45ef 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -245,6 +245,8 @@ public final class ClusterSettings extends AbstractScopedSettings { HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH, HttpTransportSettings.SETTING_HTTP_MAX_CHUNK_SIZE, HttpTransportSettings.SETTING_HTTP_MAX_HEADER_SIZE, + HttpTransportSettings.SETTING_HTTP_MAX_WARNING_HEADER_COUNT, + HttpTransportSettings.SETTING_HTTP_MAX_WARNING_HEADER_SIZE, HttpTransportSettings.SETTING_HTTP_MAX_INITIAL_LINE_LENGTH, HttpTransportSettings.SETTING_HTTP_READ_TIMEOUT, HttpTransportSettings.SETTING_HTTP_RESET_COOKIES, diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java index 8f950c5434b..901c6425d71 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java @@ -23,10 +23,16 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.http.HttpTransportSettings; + +import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_WARNING_HEADER_COUNT; +import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_WARNING_HEADER_SIZE; import java.io.Closeable; import java.io.IOException; @@ -39,13 +45,14 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; -import java.util.concurrent.FutureTask; import java.util.concurrent.RunnableFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; +import java.nio.charset.StandardCharsets; + /** * A ThreadContext is a map of string headers and a transient map of keyed objects that are associated with @@ -81,6 +88,8 @@ public final class ThreadContext implements Closeable, Writeable { private static final ThreadContextStruct DEFAULT_CONTEXT = new ThreadContextStruct(); private final Map defaultHeader; private final ContextThreadLocal threadLocal; + private final int maxWarningHeaderCount; + private final long maxWarningHeaderSize; /** * Creates a new ThreadContext instance @@ -98,6 +107,8 @@ public final class ThreadContext implements Closeable, Writeable { this.defaultHeader = Collections.unmodifiableMap(defaultHeader); } threadLocal = new ContextThreadLocal(); + this.maxWarningHeaderCount = SETTING_HTTP_MAX_WARNING_HEADER_COUNT.get(settings); + this.maxWarningHeaderSize = SETTING_HTTP_MAX_WARNING_HEADER_SIZE.get(settings).getBytes(); } @Override @@ -282,7 +293,7 @@ public final class ThreadContext implements Closeable, Writeable { * @param uniqueValue the function that produces de-duplication values */ public void addResponseHeader(final String key, final String value, final Function uniqueValue) { - threadLocal.set(threadLocal.get().putResponse(key, value, uniqueValue)); + threadLocal.set(threadLocal.get().putResponse(key, value, uniqueValue, maxWarningHeaderCount, maxWarningHeaderSize)); } /** @@ -359,7 +370,7 @@ public final class ThreadContext implements Closeable, Writeable { private final Map transientHeaders; private final Map> responseHeaders; private final boolean isSystemContext; - + private long warningHeadersSize; //saving current warning headers' size not to recalculate the size with every new warning header private ThreadContextStruct(StreamInput in) throws IOException { final int numRequest = in.readVInt(); Map requestHeaders = numRequest == 0 ? Collections.emptyMap() : new HashMap<>(numRequest); @@ -371,6 +382,7 @@ public final class ThreadContext implements Closeable, Writeable { this.responseHeaders = in.readMapOfLists(StreamInput::readString, StreamInput::readString); this.transientHeaders = Collections.emptyMap(); isSystemContext = false; // we never serialize this it's a transient flag + this.warningHeadersSize = 0L; } private ThreadContextStruct setSystemContext() { @@ -387,6 +399,18 @@ public final class ThreadContext implements Closeable, Writeable { this.responseHeaders = responseHeaders; this.transientHeaders = transientHeaders; this.isSystemContext = isSystemContext; + this.warningHeadersSize = 0L; + } + + private ThreadContextStruct(Map requestHeaders, + Map> responseHeaders, + Map transientHeaders, boolean isSystemContext, + long warningHeadersSize) { + this.requestHeaders = requestHeaders; + this.responseHeaders = responseHeaders; + this.transientHeaders = transientHeaders; + this.isSystemContext = isSystemContext; + this.warningHeadersSize = warningHeadersSize; } /** @@ -440,30 +464,58 @@ public final class ThreadContext implements Closeable, Writeable { return new ThreadContextStruct(requestHeaders, newResponseHeaders, transientHeaders, isSystemContext); } - private ThreadContextStruct putResponse(final String key, final String value, final Function uniqueValue) { + private ThreadContextStruct putResponse(final String key, final String value, final Function uniqueValue, + final int maxWarningHeaderCount, final long maxWarningHeaderSize) { assert value != null; + long newWarningHeaderSize = warningHeadersSize; + //check if we can add another warning header - if max size within limits + if (key.equals("Warning") && (maxWarningHeaderSize != -1)) { //if size is NOT unbounded, check its limits + if (warningHeadersSize > maxWarningHeaderSize) { // if max size has already been reached before + final String message = "Dropping a warning header, as their total size reached the maximum allowed of [" + + maxWarningHeaderSize + "] bytes set in [" + + HttpTransportSettings.SETTING_HTTP_MAX_WARNING_HEADER_SIZE.getKey() + "]!"; + ESLoggerFactory.getLogger(ThreadContext.class).warn(message); + return this; + } + newWarningHeaderSize += "Warning".getBytes(StandardCharsets.UTF_8).length + value.getBytes(StandardCharsets.UTF_8).length; + if (newWarningHeaderSize > maxWarningHeaderSize) { + final String message = "Dropping a warning header, as their total size reached the maximum allowed of [" + + maxWarningHeaderSize + "] bytes set in [" + + HttpTransportSettings.SETTING_HTTP_MAX_WARNING_HEADER_SIZE.getKey() + "]!"; + ESLoggerFactory.getLogger(ThreadContext.class).warn(message); + return new ThreadContextStruct(requestHeaders, responseHeaders, transientHeaders, isSystemContext, newWarningHeaderSize); + } + } final Map> newResponseHeaders = new HashMap<>(this.responseHeaders); final List existingValues = newResponseHeaders.get(key); - if (existingValues != null) { final Set existingUniqueValues = existingValues.stream().map(uniqueValue).collect(Collectors.toSet()); assert existingValues.size() == existingUniqueValues.size(); if (existingUniqueValues.contains(uniqueValue.apply(value))) { return this; } - final List newValues = new ArrayList<>(existingValues); newValues.add(value); - newResponseHeaders.put(key, Collections.unmodifiableList(newValues)); } else { newResponseHeaders.put(key, Collections.singletonList(value)); } - return new ThreadContextStruct(requestHeaders, newResponseHeaders, transientHeaders, isSystemContext); + //check if we can add another warning header - if max count within limits + if ((key.equals("Warning")) && (maxWarningHeaderCount != -1)) { //if count is NOT unbounded, check its limits + final int warningHeaderCount = newResponseHeaders.containsKey("Warning") ? newResponseHeaders.get("Warning").size() : 0; + if (warningHeaderCount > maxWarningHeaderCount) { + final String message = "Dropping a warning header, as their total count reached the maximum allowed of [" + + maxWarningHeaderCount + "] set in [" + HttpTransportSettings.SETTING_HTTP_MAX_WARNING_HEADER_COUNT.getKey() + "]!"; + ESLoggerFactory.getLogger(ThreadContext.class).warn(message); + return this; + } + } + return new ThreadContextStruct(requestHeaders, newResponseHeaders, transientHeaders, isSystemContext, newWarningHeaderSize); } + private ThreadContextStruct putTransient(String key, Object value) { Map newTransient = new HashMap<>(this.transientHeaders); if (newTransient.putIfAbsent(key, value) != null) { diff --git a/server/src/main/java/org/elasticsearch/http/HttpTransportSettings.java b/server/src/main/java/org/elasticsearch/http/HttpTransportSettings.java index 064406f0d38..98451e0c304 100644 --- a/server/src/main/java/org/elasticsearch/http/HttpTransportSettings.java +++ b/server/src/main/java/org/elasticsearch/http/HttpTransportSettings.java @@ -29,7 +29,6 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import java.util.List; -import java.util.concurrent.TimeUnit; import java.util.function.Function; import static java.util.Collections.emptyList; @@ -93,6 +92,10 @@ public final class HttpTransportSettings { Setting.byteSizeSetting("http.max_chunk_size", new ByteSizeValue(8, ByteSizeUnit.KB), Property.NodeScope); public static final Setting SETTING_HTTP_MAX_HEADER_SIZE = Setting.byteSizeSetting("http.max_header_size", new ByteSizeValue(8, ByteSizeUnit.KB), Property.NodeScope); + public static final Setting SETTING_HTTP_MAX_WARNING_HEADER_COUNT = + Setting.intSetting("http.max_warning_header_count", -1, -1, Property.NodeScope); + public static final Setting SETTING_HTTP_MAX_WARNING_HEADER_SIZE = + Setting.byteSizeSetting("http.max_warning_header_size", new ByteSizeValue(-1), Property.NodeScope); public static final Setting SETTING_HTTP_MAX_INITIAL_LINE_LENGTH = Setting.byteSizeSetting("http.max_initial_line_length", new ByteSizeValue(4, ByteSizeUnit.KB), Property.NodeScope); // don't reset cookies by default, since I don't think we really need to diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 102ae719785..9fa886af8bc 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -93,6 +93,7 @@ import org.elasticsearch.gateway.GatewayModule; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.gateway.MetaStateService; import org.elasticsearch.http.HttpServerTransport; +import org.elasticsearch.http.HttpTransportSettings; import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.indices.IndicesModule; import org.elasticsearch.indices.IndicesService; diff --git a/server/src/test/java/org/elasticsearch/common/logging/DeprecationLoggerTests.java b/server/src/test/java/org/elasticsearch/common/logging/DeprecationLoggerTests.java index fdb530749e1..490f7961a89 100644 --- a/server/src/test/java/org/elasticsearch/common/logging/DeprecationLoggerTests.java +++ b/server/src/test/java/org/elasticsearch/common/logging/DeprecationLoggerTests.java @@ -33,6 +33,7 @@ import java.util.Locale; import java.util.Map; import java.util.Set; import java.util.stream.IntStream; +import java.nio.charset.StandardCharsets; import static org.elasticsearch.common.logging.DeprecationLogger.WARNING_HEADER_PATTERN; import static org.elasticsearch.test.hamcrest.RegexMatcher.matches; @@ -246,6 +247,60 @@ public class DeprecationLoggerTests extends ESTestCase { assertThat(DeprecationLogger.encode(s), IsSame.sameInstance(s)); } + + public void testWarningHeaderCountSetting() throws IOException{ + // Test that the number of warning headers don't exceed 'http.max_warning_header_count' + final int maxWarningHeaderCount = 2; + Settings settings = Settings.builder() + .put("http.max_warning_header_count", maxWarningHeaderCount) + .build(); + try (ThreadContext threadContext = new ThreadContext(settings)) { + final Set threadContexts = Collections.singleton(threadContext); + // try to log three warning messages + logger.deprecated(threadContexts, "A simple message 1"); + logger.deprecated(threadContexts, "A simple message 2"); + logger.deprecated(threadContexts, "A simple message 3"); + final Map> responseHeaders = threadContext.getResponseHeaders(); + final List responses = responseHeaders.get("Warning"); + + assertEquals(maxWarningHeaderCount, responses.size()); + assertThat(responses.get(0), warningValueMatcher); + assertThat(responses.get(0), containsString("\"A simple message 1")); + assertThat(responses.get(1), warningValueMatcher); + assertThat(responses.get(1), containsString("\"A simple message 2")); + } + } + + public void testWarningHeaderSizeSetting() throws IOException{ + // Test that the size of warning headers don't exceed 'http.max_warning_header_size' + Settings settings = Settings.builder() + .put("http.max_warning_header_size", "1Kb") + .build(); + + byte [] arr = new byte[300]; + String message1 = new String(arr, StandardCharsets.UTF_8) + "1"; + String message2 = new String(arr, StandardCharsets.UTF_8) + "2"; + String message3 = new String(arr, StandardCharsets.UTF_8) + "3"; + + try (ThreadContext threadContext = new ThreadContext(settings)) { + final Set threadContexts = Collections.singleton(threadContext); + // try to log three warning messages + logger.deprecated(threadContexts, message1); + logger.deprecated(threadContexts, message2); + logger.deprecated(threadContexts, message3); + final Map> responseHeaders = threadContext.getResponseHeaders(); + final List responses = responseHeaders.get("Warning"); + + long warningHeadersSize = 0L; + for (String response : responses){ + warningHeadersSize += "Warning".getBytes(StandardCharsets.UTF_8).length + + response.getBytes(StandardCharsets.UTF_8).length; + } + // assert that the size of all warning headers is less or equal to 1Kb + assertTrue(warningHeadersSize <= 1024); + } + } + private String range(int lowerInclusive, int upperInclusive) { return IntStream .range(lowerInclusive, upperInclusive + 1)