Remove SoftReferences from StreamInput/StreamOutput
We try to reuse character arrays and UTF8 writers with softreferences. SoftReferences have negative impact on GC and should be avoided in general. Yet in this case it can simply replaced with a per-stream Bytes/CharsRef that is thread local and has the same lifetime as the stream.
This commit is contained in:
parent
11a3201a09
commit
bf22df7fd0
|
@ -19,9 +19,8 @@
|
|||
|
||||
package org.elasticsearch.common.io.stream;
|
||||
|
||||
import org.apache.lucene.util.ArrayUtil;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.RamUsageEstimator;
|
||||
import org.apache.lucene.util.CharsRef;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.Strings;
|
||||
|
@ -34,7 +33,6 @@ import org.joda.time.DateTime;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.lang.ref.SoftReference;
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
|
@ -42,18 +40,6 @@ import java.util.*;
|
|||
*/
|
||||
public abstract class StreamInput extends InputStream {
|
||||
|
||||
private static final ThreadLocal<SoftReference<char[]>> charCache = new ThreadLocal<>();
|
||||
|
||||
private static char[] charCache(int size) {
|
||||
SoftReference<char[]> ref = charCache.get();
|
||||
char[] arr = (ref == null) ? null : ref.get();
|
||||
if (arr == null || arr.length < size) {
|
||||
arr = new char[ArrayUtil.oversize(size, RamUsageEstimator.NUM_BYTES_CHAR)];
|
||||
charCache.set(new SoftReference<>(arr));
|
||||
}
|
||||
return arr;
|
||||
}
|
||||
|
||||
private Version version = Version.CURRENT;
|
||||
|
||||
public Version getVersion() {
|
||||
|
@ -268,11 +254,15 @@ public abstract class StreamInput extends InputStream {
|
|||
return null;
|
||||
}
|
||||
|
||||
private final CharsRef spare = new CharsRef();
|
||||
|
||||
public String readString() throws IOException {
|
||||
int charCount = readVInt();
|
||||
char[] chars = charCache(charCount);
|
||||
int c, charIndex = 0;
|
||||
while (charIndex < charCount) {
|
||||
final int charCount = readVInt();
|
||||
spare.offset = 0;
|
||||
spare.length = 0;
|
||||
spare.grow(charCount);
|
||||
int c = 0;
|
||||
while (spare.length < charCount) {
|
||||
c = readByte() & 0xff;
|
||||
switch (c >> 4) {
|
||||
case 0:
|
||||
|
@ -283,18 +273,18 @@ public abstract class StreamInput extends InputStream {
|
|||
case 5:
|
||||
case 6:
|
||||
case 7:
|
||||
chars[charIndex++] = (char) c;
|
||||
spare.chars[spare.length++] = (char) c;
|
||||
break;
|
||||
case 12:
|
||||
case 13:
|
||||
chars[charIndex++] = (char) ((c & 0x1F) << 6 | readByte() & 0x3F);
|
||||
spare.chars[spare.length++] = (char) ((c & 0x1F) << 6 | readByte() & 0x3F);
|
||||
break;
|
||||
case 14:
|
||||
chars[charIndex++] = (char) ((c & 0x0F) << 12 | (readByte() & 0x3F) << 6 | (readByte() & 0x3F) << 0);
|
||||
spare.chars[spare.length++] = (char) ((c & 0x0F) << 12 | (readByte() & 0x3F) << 6 | (readByte() & 0x3F) << 0);
|
||||
break;
|
||||
}
|
||||
}
|
||||
return new String(chars, 0, charCount);
|
||||
return spare.toString();
|
||||
}
|
||||
|
||||
public String readSharedString() throws IOException {
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package org.elasticsearch.common.io.stream;
|
||||
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.UnicodeUtil;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
|
@ -40,19 +41,6 @@ import java.util.Map;
|
|||
*/
|
||||
public abstract class StreamOutput extends OutputStream {
|
||||
|
||||
private static ThreadLocal<SoftReference<UTF8StreamWriter>> utf8StreamWriter = new ThreadLocal<>();
|
||||
|
||||
public static UTF8StreamWriter utf8StreamWriter() {
|
||||
SoftReference<UTF8StreamWriter> ref = utf8StreamWriter.get();
|
||||
UTF8StreamWriter writer = (ref == null) ? null : ref.get();
|
||||
if (writer == null) {
|
||||
writer = new UTF8StreamWriter(1024 * 4);
|
||||
utf8StreamWriter.set(new SoftReference<>(writer));
|
||||
}
|
||||
writer.reset();
|
||||
return writer;
|
||||
}
|
||||
|
||||
private Version version = Version.CURRENT;
|
||||
|
||||
public Version getVersion() {
|
||||
|
@ -207,19 +195,14 @@ public abstract class StreamOutput extends OutputStream {
|
|||
}
|
||||
}
|
||||
|
||||
private final BytesRef spare = new BytesRef();
|
||||
|
||||
public void writeText(Text text) throws IOException {
|
||||
if (!text.hasBytes() && seekPositionSupported()) {
|
||||
long pos1 = position();
|
||||
// make room for the size
|
||||
seek(pos1 + 4);
|
||||
UTF8StreamWriter utf8StreamWriter = utf8StreamWriter();
|
||||
utf8StreamWriter.setOutput(this);
|
||||
utf8StreamWriter.write(text.string());
|
||||
utf8StreamWriter.close();
|
||||
long pos2 = position();
|
||||
seek(pos1);
|
||||
writeInt((int) (pos2 - pos1 - 4));
|
||||
seek(pos2);
|
||||
if (!text.hasBytes()) {
|
||||
final String string = text.string();
|
||||
UnicodeUtil.UTF16toUTF8(string, 0, string.length(), spare);
|
||||
writeInt(spare.length);
|
||||
write(spare.bytes, spare.offset, spare.length);
|
||||
} else {
|
||||
BytesReference bytes = text.bytes();
|
||||
writeInt(bytes.length());
|
||||
|
|
Loading…
Reference in New Issue