Optimize warning header de-duplication (#37725)
Now that warning headers no longer contain a timestamp of when the warning was generated, we no longer need to extract the warning value from the warning to determine whether or not the warning value is duplicated. Instead, we can compare strings directly. Further, when de-duplicating warning headers, are constantly rebuilding sets. Instead of doing that, we can carry about the set with us and rebuild it if we find a new warning value. This commit applies both of these optimizations.
This commit is contained in:
parent
feab59df03
commit
7517e3a7bd
|
@ -232,7 +232,7 @@ public class DeprecationLogger {
|
|||
while (iterator.hasNext()) {
|
||||
try {
|
||||
final ThreadContext next = iterator.next();
|
||||
next.addResponseHeader("Warning", warningHeaderValue, DeprecationLogger::extractWarningValueFromWarningHeader);
|
||||
next.addResponseHeader("Warning", warningHeaderValue);
|
||||
} catch (final IllegalStateException e) {
|
||||
// ignored; it should be removed shortly
|
||||
}
|
||||
|
|
|
@ -36,14 +36,18 @@ import java.io.IOException;
|
|||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.BinaryOperator;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Collector;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_WARNING_HEADER_COUNT;
|
||||
|
@ -258,11 +262,11 @@ public final class ThreadContext implements Closeable, Writeable {
|
|||
* @return Never {@code null}.
|
||||
*/
|
||||
public Map<String, List<String>> getResponseHeaders() {
|
||||
Map<String, List<String>> responseHeaders = threadLocal.get().responseHeaders;
|
||||
Map<String, Set<String>> responseHeaders = threadLocal.get().responseHeaders;
|
||||
HashMap<String, List<String>> map = new HashMap<>(responseHeaders.size());
|
||||
|
||||
for (Map.Entry<String, List<String>> entry : responseHeaders.entrySet()) {
|
||||
map.put(entry.getKey(), Collections.unmodifiableList(entry.getValue()));
|
||||
for (Map.Entry<String, Set<String>> entry : responseHeaders.entrySet()) {
|
||||
map.put(entry.getKey(), Collections.unmodifiableList(new ArrayList<>(entry.getValue())));
|
||||
}
|
||||
|
||||
return Collections.unmodifiableMap(map);
|
||||
|
@ -405,7 +409,7 @@ public final class ThreadContext implements Closeable, Writeable {
|
|||
private static final class ThreadContextStruct {
|
||||
private final Map<String, String> requestHeaders;
|
||||
private final Map<String, Object> transientHeaders;
|
||||
private final Map<String, List<String>> responseHeaders;
|
||||
private final Map<String, Set<String>> responseHeaders;
|
||||
private final boolean isSystemContext;
|
||||
private long warningHeadersSize; //saving current warning headers' size not to recalculate the size with every new warning header
|
||||
private ThreadContextStruct(StreamInput in) throws IOException {
|
||||
|
@ -416,7 +420,23 @@ public final class ThreadContext implements Closeable, Writeable {
|
|||
}
|
||||
|
||||
this.requestHeaders = requestHeaders;
|
||||
this.responseHeaders = in.readMapOfLists(StreamInput::readString, StreamInput::readString);
|
||||
this.responseHeaders = in.readMap(StreamInput::readString, input -> {
|
||||
final int size = input.readVInt();
|
||||
if (size == 0) {
|
||||
return Collections.emptySet();
|
||||
} else if (size == 1) {
|
||||
return Collections.singleton(input.readString());
|
||||
} else {
|
||||
// use a linked hash set to preserve order
|
||||
final LinkedHashSet<String> values = new LinkedHashSet<>(size);
|
||||
for (int i = 0; i < size; i++) {
|
||||
final String value = input.readString();
|
||||
final boolean added = values.add(value);
|
||||
assert added : value;
|
||||
}
|
||||
return values;
|
||||
}
|
||||
});
|
||||
this.transientHeaders = Collections.emptyMap();
|
||||
isSystemContext = false; // we never serialize this it's a transient flag
|
||||
this.warningHeadersSize = 0L;
|
||||
|
@ -430,7 +450,7 @@ public final class ThreadContext implements Closeable, Writeable {
|
|||
}
|
||||
|
||||
private ThreadContextStruct(Map<String, String> requestHeaders,
|
||||
Map<String, List<String>> responseHeaders,
|
||||
Map<String, Set<String>> responseHeaders,
|
||||
Map<String, Object> transientHeaders, boolean isSystemContext) {
|
||||
this.requestHeaders = requestHeaders;
|
||||
this.responseHeaders = responseHeaders;
|
||||
|
@ -440,7 +460,7 @@ public final class ThreadContext implements Closeable, Writeable {
|
|||
}
|
||||
|
||||
private ThreadContextStruct(Map<String, String> requestHeaders,
|
||||
Map<String, List<String>> responseHeaders,
|
||||
Map<String, Set<String>> responseHeaders,
|
||||
Map<String, Object> transientHeaders, boolean isSystemContext,
|
||||
long warningHeadersSize) {
|
||||
this.requestHeaders = requestHeaders;
|
||||
|
@ -481,19 +501,19 @@ public final class ThreadContext implements Closeable, Writeable {
|
|||
}
|
||||
}
|
||||
|
||||
private ThreadContextStruct putResponseHeaders(Map<String, List<String>> headers) {
|
||||
private ThreadContextStruct putResponseHeaders(Map<String, Set<String>> headers) {
|
||||
assert headers != null;
|
||||
if (headers.isEmpty()) {
|
||||
return this;
|
||||
}
|
||||
final Map<String, List<String>> newResponseHeaders = new HashMap<>(this.responseHeaders);
|
||||
for (Map.Entry<String, List<String>> entry : headers.entrySet()) {
|
||||
final Map<String, Set<String>> newResponseHeaders = new HashMap<>(this.responseHeaders);
|
||||
for (Map.Entry<String, Set<String>> entry : headers.entrySet()) {
|
||||
String key = entry.getKey();
|
||||
final List<String> existingValues = newResponseHeaders.get(key);
|
||||
final Set<String> existingValues = newResponseHeaders.get(key);
|
||||
if (existingValues != null) {
|
||||
List<String> newValues = Stream.concat(entry.getValue().stream(),
|
||||
existingValues.stream()).distinct().collect(Collectors.toList());
|
||||
newResponseHeaders.put(key, Collections.unmodifiableList(newValues));
|
||||
final Set<String> newValues =
|
||||
Stream.concat(entry.getValue().stream(), existingValues.stream()).collect(LINKED_HASH_SET_COLLECTOR);
|
||||
newResponseHeaders.put(key, Collections.unmodifiableSet(newValues));
|
||||
} else {
|
||||
newResponseHeaders.put(key, entry.getValue());
|
||||
}
|
||||
|
@ -523,20 +543,19 @@ public final class ThreadContext implements Closeable, Writeable {
|
|||
}
|
||||
}
|
||||
|
||||
final Map<String, List<String>> newResponseHeaders = new HashMap<>(this.responseHeaders);
|
||||
final List<String> existingValues = newResponseHeaders.get(key);
|
||||
final Map<String, Set<String>> newResponseHeaders;
|
||||
final Set<String> existingValues = responseHeaders.get(key);
|
||||
if (existingValues != null) {
|
||||
final Set<String> existingUniqueValues = existingValues.stream().map(uniqueValue).collect(Collectors.toSet());
|
||||
assert existingValues.size() == existingUniqueValues.size() :
|
||||
"existing values: [" + existingValues + "], existing unique values [" + existingUniqueValues + "]";
|
||||
if (existingUniqueValues.contains(uniqueValue.apply(value))) {
|
||||
if (existingValues.contains(uniqueValue.apply(value))) {
|
||||
return this;
|
||||
}
|
||||
final List<String> newValues = new ArrayList<>(existingValues);
|
||||
newValues.add(value);
|
||||
newResponseHeaders.put(key, Collections.unmodifiableList(newValues));
|
||||
// preserve insertion order
|
||||
final Set<String> newValues = Stream.concat(existingValues.stream(), Stream.of(value)).collect(LINKED_HASH_SET_COLLECTOR);
|
||||
newResponseHeaders = new HashMap<>(responseHeaders);
|
||||
newResponseHeaders.put(key, Collections.unmodifiableSet(newValues));
|
||||
} else {
|
||||
newResponseHeaders.put(key, Collections.singletonList(value));
|
||||
newResponseHeaders = new HashMap<>(responseHeaders);
|
||||
newResponseHeaders.put(key, Collections.singleton(value));
|
||||
}
|
||||
|
||||
//check if we can add another warning header - if max count within limits
|
||||
|
@ -588,7 +607,7 @@ public final class ThreadContext implements Closeable, Writeable {
|
|||
out.writeString(entry.getValue());
|
||||
}
|
||||
|
||||
out.writeMapOfLists(responseHeaders, StreamOutput::writeString, StreamOutput::writeString);
|
||||
out.writeMap(responseHeaders, StreamOutput::writeString, StreamOutput::writeStringCollection);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -751,4 +770,40 @@ public final class ThreadContext implements Closeable, Writeable {
|
|||
return in;
|
||||
}
|
||||
}
|
||||
|
||||
private static final Collector<String, Set<String>, Set<String>> LINKED_HASH_SET_COLLECTOR = new LinkedHashSetCollector<>();
|
||||
|
||||
private static class LinkedHashSetCollector<T> implements Collector<T, Set<T>, Set<T>> {
|
||||
@Override
|
||||
public Supplier<Set<T>> supplier() {
|
||||
return LinkedHashSet::new;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BiConsumer<Set<T>, T> accumulator() {
|
||||
return Set::add;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BinaryOperator<Set<T>> combiner() {
|
||||
return (left, right) -> {
|
||||
left.addAll(right);
|
||||
return left;
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public Function<Set<T>, Set<T>> finisher() {
|
||||
return Function.identity();
|
||||
}
|
||||
|
||||
private static final Set<Characteristics> CHARACTERISTICS =
|
||||
Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.IDENTITY_FINISH));
|
||||
|
||||
@Override
|
||||
public Set<Characteristics> characteristics() {
|
||||
return CHARACTERISTICS;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue