More Efficient Deserialization of Empty Collections in StreamInput (#44665) (#44674)

* We only had the `size == 0` optimization in some but not all spots of deserializing collections in this class, fixed the remaining spots.
* Also fixed the a similar spot when deserializing `ThreadContextStruct` that could now be simplified (it was apparently doing it's own version of this optimization for the first map it deserialized before ... but not for the second map -> made it not instantiate anything if both maps are empty since it's always the same object here anyway)
This commit is contained in:
Armin Braun 2019-07-22 09:31:12 +02:00 committed by GitHub
parent 0ac137a9a1
commit 0e2e83f591
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 90 additions and 35 deletions

View File

@ -532,6 +532,9 @@ public abstract class StreamInput extends InputStream {
public <K, V> Map<K, V> readMap(Writeable.Reader<K> keyReader, Writeable.Reader<V> valueReader) throws IOException {
int size = readArraySize();
if (size == 0) {
return Collections.emptyMap();
}
Map<K, V> map = new HashMap<>(size);
for (int i = 0; i < size; i++) {
K key = keyReader.read(this);
@ -649,6 +652,9 @@ public abstract class StreamInput extends InputStream {
@SuppressWarnings("unchecked")
private List readArrayList() throws IOException {
int size = readArraySize();
if (size == 0) {
return Collections.emptyList();
}
List list = new ArrayList(size);
for (int i = 0; i < size; i++) {
list.add(readGenericValue());
@ -666,8 +672,13 @@ public abstract class StreamInput extends InputStream {
return ZonedDateTime.ofInstant(Instant.ofEpochMilli(readLong()), ZoneId.of(timeZoneId));
}
private static final Object[] EMPTY_OBJECT_ARRAY = new Object[0];
private Object[] readArray() throws IOException {
int size8 = readArraySize();
if (size8 == 0) {
return EMPTY_OBJECT_ARRAY;
}
Object[] list8 = new Object[size8];
for (int i = 0; i < size8; i++) {
list8[i] = readGenericValue();
@ -677,6 +688,9 @@ public abstract class StreamInput extends InputStream {
private Map readLinkedHashMap() throws IOException {
int size9 = readArraySize();
if (size9 == 0) {
return Collections.emptyMap();
}
Map map9 = new LinkedHashMap(size9);
for (int i = 0; i < size9; i++) {
map9.put(readString(), readGenericValue());
@ -686,6 +700,9 @@ public abstract class StreamInput extends InputStream {
private Map readHashMap() throws IOException {
int size10 = readArraySize();
if (size10 == 0) {
return Collections.emptyMap();
}
Map map10 = new HashMap(size10);
for (int i = 0; i < size10; i++) {
map10.put(readString(), readGenericValue());
@ -738,8 +755,13 @@ public abstract class StreamInput extends InputStream {
return null;
}
private static final int[] EMPTY_INT_ARRAY = new int[0];
public int[] readIntArray() throws IOException {
int length = readArraySize();
if (length == 0) {
return EMPTY_INT_ARRAY;
}
int[] values = new int[length];
for (int i = 0; i < length; i++) {
values[i] = readInt();
@ -749,6 +771,9 @@ public abstract class StreamInput extends InputStream {
public int[] readVIntArray() throws IOException {
int length = readArraySize();
if (length == 0) {
return EMPTY_INT_ARRAY;
}
int[] values = new int[length];
for (int i = 0; i < length; i++) {
values[i] = readVInt();
@ -756,8 +781,13 @@ public abstract class StreamInput extends InputStream {
return values;
}
private static final long[] EMPTY_LONG_ARRAY = new long[0];
public long[] readLongArray() throws IOException {
int length = readArraySize();
if (length == 0) {
return EMPTY_LONG_ARRAY;
}
long[] values = new long[length];
for (int i = 0; i < length; i++) {
values[i] = readLong();
@ -767,6 +797,9 @@ public abstract class StreamInput extends InputStream {
public long[] readVLongArray() throws IOException {
int length = readArraySize();
if (length == 0) {
return EMPTY_LONG_ARRAY;
}
long[] values = new long[length];
for (int i = 0; i < length; i++) {
values[i] = readVLong();
@ -774,8 +807,13 @@ public abstract class StreamInput extends InputStream {
return values;
}
private static final float[] EMPTY_FLOAT_ARRAY = new float[0];
public float[] readFloatArray() throws IOException {
int length = readArraySize();
if (length == 0) {
return EMPTY_FLOAT_ARRAY;
}
float[] values = new float[length];
for (int i = 0; i < length; i++) {
values[i] = readFloat();
@ -783,8 +821,13 @@ public abstract class StreamInput extends InputStream {
return values;
}
private static final double[] EMPTY_DOUBLE_ARRAY = new double[0];
public double[] readDoubleArray() throws IOException {
int length = readArraySize();
if (length == 0) {
return EMPTY_DOUBLE_ARRAY;
}
double[] values = new double[length];
for (int i = 0; i < length; i++) {
values[i] = readDouble();
@ -792,8 +835,13 @@ public abstract class StreamInput extends InputStream {
return values;
}
private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
public byte[] readByteArray() throws IOException {
final int length = readArraySize();
if (length == 0) {
return EMPTY_BYTE_ARRAY;
}
final byte[] bytes = new byte[length];
readBytes(bytes, 0, bytes.length);
return bytes;
@ -983,7 +1031,7 @@ public abstract class StreamInput extends InputStream {
* @throws IOException if an I/O exception occurs reading the list
*/
public <T> List<T> readList(final Writeable.Reader<T> reader) throws IOException {
return readCollection(reader, ArrayList::new);
return readCollection(reader, ArrayList::new, Collections.emptyList());
}
/**
@ -1000,15 +1048,19 @@ public abstract class StreamInput extends InputStream {
* Reads a set of objects
*/
public <T> Set<T> readSet(Writeable.Reader<T> reader) throws IOException {
return readCollection(reader, HashSet::new);
return readCollection(reader, HashSet::new, Collections.emptySet());
}
/**
* Reads a collection of objects
*/
private <T, C extends Collection<? super T>> C readCollection(Writeable.Reader<T> reader,
IntFunction<C> constructor) throws IOException {
IntFunction<C> constructor,
C empty) throws IOException {
int count = readArraySize();
if (count == 0) {
return empty;
}
C builder = constructor.apply(count);
for (int i=0; i<count; i++) {
builder.add(reader.read(this));
@ -1021,6 +1073,9 @@ public abstract class StreamInput extends InputStream {
*/
public <T extends NamedWriteable> List<T> readNamedWriteableList(Class<T> categoryClass) throws IOException {
int count = readArraySize();
if (count == 0) {
return Collections.emptyList();
}
List<T> builder = new ArrayList<>(count);
for (int i=0; i<count; i++) {
builder.add(readNamedWriteable(categoryClass));

View File

@ -259,7 +259,31 @@ public final class ThreadContext implements Closeable, Writeable {
* Reads the headers from the stream into the current context
*/
public void readHeaders(StreamInput in) throws IOException {
threadLocal.set(new ThreadContext.ThreadContextStruct(in));
final Map<String, String> requestHeaders = in.readMap(StreamInput::readString, StreamInput::readString);
final Map<String, Set<String>> responseHeaders = in.readMap(StreamInput::readString, input -> {
final int size = input.readVInt();
if (size == 0) {
return Collections.emptySet();
} else if (size == 1) {
return Collections.singleton(input.readString());
} else {
// use a linked hash set to preserve order
final LinkedHashSet<String> values = new LinkedHashSet<>(size);
for (int i = 0; i < size; i++) {
final String value = input.readString();
final boolean added = values.add(value);
assert added : value;
}
return values;
}
});
final ThreadContextStruct struct;
if (requestHeaders.isEmpty() && responseHeaders.isEmpty()) {
struct = ThreadContextStruct.EMPTY;
} else {
struct = new ThreadContextStruct(requestHeaders, responseHeaders, Collections.emptyMap(), false);
}
threadLocal.set(struct);
}
/**
@ -433,40 +457,16 @@ public final class ThreadContext implements Closeable, Writeable {
}
private static final class ThreadContextStruct {
private static final ThreadContextStruct EMPTY =
new ThreadContextStruct(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), false);
private final Map<String, String> requestHeaders;
private final Map<String, Object> transientHeaders;
private final Map<String, Set<String>> responseHeaders;
private final boolean isSystemContext;
private long warningHeadersSize; //saving current warning headers' size not to recalculate the size with every new warning header
private ThreadContextStruct(StreamInput in) throws IOException {
final int numRequest = in.readVInt();
Map<String, String> requestHeaders = numRequest == 0 ? Collections.emptyMap() : new HashMap<>(numRequest);
for (int i = 0; i < numRequest; i++) {
requestHeaders.put(in.readString(), in.readString());
}
this.requestHeaders = requestHeaders;
this.responseHeaders = in.readMap(StreamInput::readString, input -> {
final int size = input.readVInt();
if (size == 0) {
return Collections.emptySet();
} else if (size == 1) {
return Collections.singleton(input.readString());
} else {
// use a linked hash set to preserve order
final LinkedHashSet<String> values = new LinkedHashSet<>(size);
for (int i = 0; i < size; i++) {
final String value = input.readString();
final boolean added = values.add(value);
assert added : value;
}
return values;
}
});
this.transientHeaders = Collections.emptyMap();
isSystemContext = false; // we never serialize this it's a transient flag
this.warningHeadersSize = 0L;
}
//saving current warning headers' size not to recalculate the size with every new warning header
private final long warningHeadersSize;
private ThreadContextStruct setSystemContext() {
if (isSystemContext) {

View File

@ -118,7 +118,7 @@ public class CollapseBuilderTests extends AbstractSerializingTestCase<CollapseBu
case 2:
default:
newBuilder = copyInstance(instance);
List<InnerHitBuilder> innerHits = newBuilder.getInnerHits();
List<InnerHitBuilder> innerHits = new ArrayList<>(newBuilder.getInnerHits());
for (int i = 0; i < between(1, 5); i++) {
innerHits.add(InnerHitBuilderTests.randomInnerHits());
}